Files
2026-04-20 15:53:02 +08:00

442 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
from logging.handlers import TimedRotatingFileHandler
from dataclasses import dataclass, field
from datetime import datetime, time as dt_time, timedelta, timezone, tzinfo
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models.models import (
AppUser,
DeliveryHistory,
ExtractedTopic,
InfoSource,
TargetType,
TaskStatus,
TrendingEvent,
UnifiedEvent,
UserDeliverySchedule,
UserPushEndpoint,
UserTopicPreference,
utcnow,
)
from app.prompts.digest_email_template import build_digest_html
from app.services.matching_service import recommend_events_for_user
from app.utils.email_utils import send_html_email
logger = logging.getLogger("delivery_service")
_delivery_log_dir = Path(__file__).resolve().parents[2] / "logs"
_delivery_log_dir.mkdir(parents=True, exist_ok=True)
_delivery_log_file = _delivery_log_dir / "delivery_check.log"
if not logger.handlers:
_file_handler = TimedRotatingFileHandler(
filename=str(_delivery_log_file),
when="midnight",
interval=1,
backupCount=14,
encoding="utf-8",
)
_file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
logger.addHandler(_file_handler)
logger.setLevel(logging.INFO)
logger.propagate = False
# AI辅助生成:deepseek-v3-22026年3月20日
# 推送时间窗口:实际执行时刻与设定时间的最大容差(分钟)
DELIVERY_WINDOW_MINUTES = int(os.getenv("DELIVERY_WINDOW_MINUTES", 2))
# 同一用户两次推送之间的最小间隔(分钟)
MIN_PUSH_INTERVAL_MINUTES = int(os.getenv("MIN_PUSH_INTERVAL_MINUTES", 30))
# 单次推送最多携带的事件数
MAX_EVENTS_PER_PUSH = int(os.getenv("MAX_EVENTS_PER_PUSH", 12))
# 默认模式热度阈值(无关键词或无匹配时使用)
DEFAULT_MODE_HOT_THRESHOLD = int(os.getenv("DEFAULT_MODE_HOT_THRESHOLD", 3))
# 默认模式查询时间窗口(小时)
DEFAULT_MODE_HOURS = int(os.getenv("DEFAULT_MODE_HOURS", 24))
# 用户时区无效时的兜底时区
DEFAULT_FALLBACK_TIMEZONE = os.getenv("DEFAULT_FALLBACK_TIMEZONE", "Asia/Shanghai")
@dataclass
class _DefaultEventItem:
"""
默认热点事件容器
无关键词订阅或关键词无匹配时的默认热点包装器,
接口与 MatchedEventResult 保持一致,方便统一传给模板。
"""
event: UnifiedEvent
match_score: float = 0.0
exact_hits: list[str] = field(default_factory=list)
semantic_hits: list[dict[str, Any]] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
is_default: bool = True
def _time_to_minutes(t: dt_time) -> int:
return t.hour * 60 + t.minute
def _is_within_window(schedule_time: dt_time, current_time: dt_time, window: int = DELIVERY_WINDOW_MINUTES) -> bool:
"""判断 schedule_time 是否在 current_time ± window 分钟范围内(跨午夜安全)。"""
s = _time_to_minutes(schedule_time)
c = _time_to_minutes(current_time)
diff = abs(s - c)
return min(diff, 1440 - diff) <= window
def _resolve_user_timezone(user_timezone: str | None) -> tzinfo:
"""解析用户时区,异常时回退到默认时区。"""
tz_name = (user_timezone or "").strip() or DEFAULT_FALLBACK_TIMEZONE
try:
return ZoneInfo(tz_name)
except ZoneInfoNotFoundError:
logger.warning(
"用户时区无效,已回退默认时区。timezone=%s fallback=%s",
tz_name, DEFAULT_FALLBACK_TIMEZONE,
)
try:
return ZoneInfo(DEFAULT_FALLBACK_TIMEZONE)
except ZoneInfoNotFoundError:
logger.warning("系统缺少时区数据库,最终回退为 UTC。建议安装 tzdata 包。")
return timezone.utc
def _user_local_time(now_utc: datetime, user_timezone: str | None) -> dt_time:
"""把 UTC 当前时刻转换为用户本地时间(仅取 HH:MM)。"""
local_dt = now_utc.astimezone(_resolve_user_timezone(user_timezone))
return local_dt.time().replace(second=0, microsecond=0)
def _ensure_aware(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
# AI辅助生成结束
# 数据库查询辅助
def _should_skip_by_interval(db: Session, user_id: int) -> bool:
"""检查用户是否仍在冷却期内,避免短时间内重复推送"""
row = (
db.query(DeliveryHistory.created_at)
.filter(
DeliveryHistory.user_id == user_id,
DeliveryHistory.status == TaskStatus.SUCCESS,
)
.order_by(DeliveryHistory.created_at.desc())
.first()
)
if row is None:
return False
last_time = _ensure_aware(row[0])
elapsed = (utcnow() - last_time).total_seconds() / 60.0
return elapsed < MIN_PUSH_INTERVAL_MINUTES
def _get_user_email_endpoints(db: Session, user_id: int) -> list[UserPushEndpoint]:
"""获取用户已启用的邮件类型推送渠道,按优先级排序。"""
return (
db.query(UserPushEndpoint)
.filter(
UserPushEndpoint.user_id == user_id,
UserPushEndpoint.channel_type == "EMAIL",
UserPushEndpoint.is_active == True,
)
.order_by(UserPushEndpoint.priority_level.asc())
.all()
)
def _get_already_pushed_event_ids(db: Session, user_id: int) -> set[int]:
"""获取已经推送过的事件 ID 集合,避免重复轰炸。"""
rows = (
db.query(DeliveryHistory.target_id)
.filter(
DeliveryHistory.user_id == user_id,
DeliveryHistory.target_type == TargetType.EVENT,
DeliveryHistory.status == TaskStatus.SUCCESS,
)
.all()
)
return {r[0] for r in rows}
def _load_event_platforms(db: Session, event_ids: list[int]) -> dict[int, list[dict]]:
"""
批量加载事件的平台来源数据。
返回:event_id → [{source_name, headline, url, ranking, icon_url}, ...]
按排名升序排列(rank 1 最靠前)。
"""
if not event_ids:
return {}
rows = (
db.query(
TrendingEvent.unified_event_id,
TrendingEvent.current_headline,
TrendingEvent.event_url,
TrendingEvent.current_ranking,
TrendingEvent.icon_url,
InfoSource.source_name,
)
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
.filter(TrendingEvent.unified_event_id.in_(event_ids))
.order_by(
TrendingEvent.unified_event_id,
TrendingEvent.current_ranking.asc().nulls_last(),
)
.all()
)
result: dict[int, list[dict]] = {}
for event_id, headline, url, ranking, icon_url, source_name in rows:
result.setdefault(event_id, []).append({
"source_name": source_name or "未知",
"headline": headline or "",
"url": url or "",
"ranking": ranking,
"icon_url": icon_url or "",
})
return result
def _load_event_tags(db: Session, event_ids: list[int]) -> dict[int, list[str]]:
"""批量加载事件的标签,返回 event_id → [tag, ...]。"""
if not event_ids:
return {}
rows = (
db.query(ExtractedTopic.target_id, ExtractedTopic.topic_keyword)
.filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id.in_(event_ids),
)
.all()
)
tags_map: dict[int, list[str]] = {}
for eid, kw in rows:
if kw:
tags_map.setdefault(eid, []).append(kw)
return tags_map
def _user_has_keywords(db: Session, user_id: int) -> bool:
"""判断用户是否配置了关键词订阅。"""
return (
db.query(UserTopicPreference.id)
.filter(UserTopicPreference.user_id == user_id)
.first()
) is not None
def _get_default_hot_events(
db: Session,
pushed_ids: set[int],
) -> list[_DefaultEventItem]:
"""
默认模式:获取热度 >= DEFAULT_MODE_HOT_THRESHOLD 的近期热点,
排除已推送过的,封装成与 MatchedEventResult 接口相同的对象。
"""
time_limit = utcnow() - timedelta(hours=DEFAULT_MODE_HOURS)
events = (
db.query(UnifiedEvent)
.filter(
UnifiedEvent.hot_score >= DEFAULT_MODE_HOT_THRESHOLD,
UnifiedEvent.created_at >= time_limit,
)
.order_by(UnifiedEvent.hot_score.desc())
.limit(MAX_EVENTS_PER_PUSH * 2)
.all()
)
event_ids = [e.id for e in events if e.id not in pushed_ids]
tags_map = _load_event_tags(db, event_ids)
result: list[_DefaultEventItem] = []
for ev in events:
if ev.id in pushed_ids:
continue
result.append(_DefaultEventItem(
event=ev,
tags=list(dict.fromkeys(tags_map.get(ev.id, [])))[:6],
))
if len(result) >= MAX_EVENTS_PER_PUSH:
break
return result
def _record_delivery(
db: Session,
user_id: int,
event_ids: list[int],
status: TaskStatus,
) -> None:
"""批量写入推送历史记录。"""
for eid in event_ids:
record = DeliveryHistory(
user_id=user_id,
target_type=TargetType.EVENT,
target_id=eid,
status=status,
)
db.add(record)
db.commit()
# AI辅助生成:deepseek-v3-22026年3月20日
# 推送准备
@dataclass
class _PendingPush:
"""暂存需要发送邮件的信息,便于在 async 上下文中发送。"""
user_id: int
email_targets: list[str]
subject: str
html_body: str
event_ids: list[int]
# AI生成结束
def _prepare_user_push(db: Session, user: AppUser, schedule: UserDeliverySchedule) -> _PendingPush | None:
"""
同步准备单个用户的推送数据(DB 操作),不实际发送邮件。
推送优先级:
1. 有关键词 且 有匹配 → 发送匹配事件
2. 有关键词 但 无匹配 → 发送默认热点(热度 >= 3)
3. 无关键词 → 发送默认热点(热度 >= 3)
"""
user_id = user.id
if _should_skip_by_interval(db, user_id):
logger.info(f"用户 {user_id} 仍在 {MIN_PUSH_INTERVAL_MINUTES} 分钟冷却期内,跳过")
return None
email_endpoints = _get_user_email_endpoints(db, user_id)
if not email_endpoints:
logger.info(f"用户 {user_id} 无可用邮件渠道,跳过")
return None
pushed_ids = _get_already_pushed_event_ids(db, user_id)
items: list = []
is_default = False
has_keywords = _user_has_keywords(db, user_id)
if has_keywords:
matched = recommend_events_for_user(
db,
user_id=user_id,
min_hot=1,
hours=72,
limit=MAX_EVENTS_PER_PUSH * 2,
)
fresh_matched = [m for m in matched if m.event.id not in pushed_ids]
if fresh_matched:
items = fresh_matched[:MAX_EVENTS_PER_PUSH]
logger.info(f"用户 {user_id} 关键词匹配,推送 {len(items)} 条事件")
else:
logger.info(f"用户 {user_id} 关键词无匹配结果,切换为默认热点模式")
is_default = True
else:
logger.info(f"用户 {user_id} 未配置关键词,使用默认热点模式")
is_default = True
if is_default:
items = _get_default_hot_events(db, pushed_ids)
if not items:
logger.info(f"用户 {user_id} 默认热点无可推送内容,跳过")
return None
event_ids = [item.event.id for item in items]
platforms_map = _load_event_platforms(db, event_ids)
time_str = schedule.delivery_time.strftime("%H:%M")
html_body = build_digest_html(
items=items,
delivery_time_str=time_str,
platforms_map=platforms_map,
is_default_push=is_default,
)
subject_suffix = "全网热点快报" if is_default else "个性化简报"
return _PendingPush(
user_id=user_id,
email_targets=[ep.channel_account for ep in email_endpoints],
subject=f"聚势智见 {subject_suffix} · {time_str}",
html_body=html_body,
event_ids=event_ids,
)
async def check_and_deliver() -> None:
"""
定时推送主入口,由 APScheduler 每分钟调用。
流程:
1. 获取当前 UTC 时间
2. 查询所有启用的推送计划
3. 对每个计划,按用户本地时区判断是否在推送窗口
4. 同步准备推送数据 → 异步发送邮件 → 记录结果
"""
now = datetime.now(timezone.utc)
current_utc = now.time().replace(second=0, microsecond=0)
logger.debug(f"推送调度检查 @ UTC {current_utc.strftime('%H:%M')}")
db: Session = SessionLocal()
try:
active_schedules = (
db.query(UserDeliverySchedule)
.filter(UserDeliverySchedule.is_active == True)
.all()
)
for schedule in active_schedules:
user = db.query(AppUser).filter(AppUser.id == schedule.user_id).first()
if not user:
continue
user_current = _user_local_time(now, user.timezone)
if not _is_within_window(schedule.delivery_time, user_current):
continue
try:
pending = _prepare_user_push(db, user, schedule)
if pending is None:
continue
sent = False
for target_email in pending.email_targets:
try:
success = await send_html_email(
to_email=target_email,
subject=pending.subject,
html_content=pending.html_body,
)
if success:
sent = True
logger.info(f"用户 {pending.user_id} 邮件发送成功 → {target_email}")
break
else:
logger.warning(f"用户 {pending.user_id} 渠道 {target_email} 发送失败,尝试下一个")
except Exception as e:
logger.error(f"用户 {pending.user_id} 发送至 {target_email} 异常: {e}")
_record_delivery(
db,
user_id=pending.user_id,
event_ids=pending.event_ids,
status=TaskStatus.SUCCESS if sent else TaskStatus.ERROR,
)
except Exception as e:
logger.error(f"推送用户 {schedule.user_id} 时异常: {e}", exc_info=True)
except Exception as e:
logger.error(f"推送调度主循环异常: {e}", exc_info=True)
finally:
db.close()