# 定时推送调度服务 # 由 APScheduler 每分钟调用,检查当前时刻是否有用户需要接收推送, # 如匹配则生成摘要邮件并发送,同时写入 DeliveryHistory 防重复。 # 推送优先级:有关键词且匹配 → 个性化简报;无关键词或无匹配 → 默认热点快报 import logging import os from logging.handlers import TimedRotatingFileHandler from dataclasses import dataclass, field from datetime import datetime, time as dt_time, timedelta, timezone, tzinfo from pathlib import Path from typing import Any from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.models import ( AppUser, DeliveryHistory, ExtractedTopic, InfoSource, TargetType, TaskStatus, TrendingEvent, UnifiedEvent, UserDeliverySchedule, UserPushEndpoint, UserTopicPreference, utcnow, ) from app.prompts.digest_email_template import build_digest_html from app.services.matching_service import recommend_events_for_user from app.utils.email_utils import send_html_email logger = logging.getLogger("delivery_service") # delivery_service 日志单独写文件 _delivery_log_dir = Path(__file__).resolve().parents[2] / "logs" _delivery_log_dir.mkdir(parents=True, exist_ok=True) _delivery_log_file = _delivery_log_dir / "delivery_check.log" if not logger.handlers: _file_handler = TimedRotatingFileHandler( filename=str(_delivery_log_file), when="midnight", interval=1, backupCount=14, encoding="utf-8", ) _file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) logger.addHandler(_file_handler) logger.setLevel(logging.INFO) logger.propagate = False # 推送时间窗口:实际执行时刻与设定时间的最大容差(分钟) DELIVERY_WINDOW_MINUTES = int(os.getenv("DELIVERY_WINDOW_MINUTES", 2)) # 同一用户两次推送之间的最小间隔(分钟) MIN_PUSH_INTERVAL_MINUTES = int(os.getenv("MIN_PUSH_INTERVAL_MINUTES", 30)) # 单次推送最多携带的事件数 MAX_EVENTS_PER_PUSH = int(os.getenv("MAX_EVENTS_PER_PUSH", 12)) # 默认模式热度阈值(无关键词或无匹配时使用) DEFAULT_MODE_HOT_THRESHOLD = int(os.getenv("DEFAULT_MODE_HOT_THRESHOLD", 3)) # 默认模式查询时间窗口(小时) DEFAULT_MODE_HOURS = int(os.getenv("DEFAULT_MODE_HOURS", 24)) # 用户时区无效时的兜底时区 DEFAULT_FALLBACK_TIMEZONE = os.getenv("DEFAULT_FALLBACK_TIMEZONE", "Asia/Shanghai") # ========================================== # 默认热点事件容器(无关键词时使用) # ========================================== @dataclass class _DefaultEventItem: """ 无关键词订阅或关键词无匹配时的默认热点包装器, 接口与 MatchedEventResult 保持一致,方便统一传给模板。 """ event: UnifiedEvent match_score: float = 0.0 exact_hits: list[str] = field(default_factory=list) semantic_hits: list[dict[str, Any]] = field(default_factory=list) tags: list[str] = field(default_factory=list) is_default: bool = True # ========================================== # 时区工具 # ========================================== def _time_to_minutes(t: dt_time) -> int: return t.hour * 60 + t.minute def _is_within_window(schedule_time: dt_time, current_time: dt_time, window: int = DELIVERY_WINDOW_MINUTES) -> bool: """判断 schedule_time 是否在 current_time ± window 分钟范围内(跨午夜安全)。""" s = _time_to_minutes(schedule_time) c = _time_to_minutes(current_time) diff = abs(s - c) return min(diff, 1440 - diff) <= window def _resolve_user_timezone(user_timezone: str | None) -> tzinfo: """解析用户时区,异常时回退到默认时区。""" tz_name = (user_timezone or "").strip() or DEFAULT_FALLBACK_TIMEZONE try: return ZoneInfo(tz_name) except ZoneInfoNotFoundError: logger.warning( "用户时区无效,已回退默认时区。timezone=%s fallback=%s", tz_name, DEFAULT_FALLBACK_TIMEZONE, ) try: return ZoneInfo(DEFAULT_FALLBACK_TIMEZONE) except ZoneInfoNotFoundError: logger.warning("系统缺少时区数据库,最终回退为 UTC。建议安装 tzdata 包。") return timezone.utc def _user_local_time(now_utc: datetime, user_timezone: str | None) -> dt_time: """把 UTC 当前时刻转换为用户本地时间(仅取 HH:MM)。""" local_dt = now_utc.astimezone(_resolve_user_timezone(user_timezone)) return local_dt.time().replace(second=0, microsecond=0) def _ensure_aware(dt: datetime) -> datetime: if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt # ========================================== # 数据库查询辅助 # ========================================== def _should_skip_by_interval(db: Session, user_id: int) -> bool: """检查用户是否仍在冷却期内,避免短时间内重复推送""" row = ( db.query(DeliveryHistory.created_at) .filter( DeliveryHistory.user_id == user_id, DeliveryHistory.status == TaskStatus.SUCCESS, ) .order_by(DeliveryHistory.created_at.desc()) .first() ) if row is None: return False last_time = _ensure_aware(row[0]) elapsed = (utcnow() - last_time).total_seconds() / 60.0 return elapsed < MIN_PUSH_INTERVAL_MINUTES def _get_user_email_endpoints(db: Session, user_id: int) -> list[UserPushEndpoint]: """获取用户已启用的邮件类型推送渠道,按优先级排序。""" return ( db.query(UserPushEndpoint) .filter( UserPushEndpoint.user_id == user_id, UserPushEndpoint.channel_type == "EMAIL", UserPushEndpoint.is_active == True, ) .order_by(UserPushEndpoint.priority_level.asc()) .all() ) def _get_already_pushed_event_ids(db: Session, user_id: int) -> set[int]: """获取已经推送过的事件 ID 集合,避免重复轰炸。""" rows = ( db.query(DeliveryHistory.target_id) .filter( DeliveryHistory.user_id == user_id, DeliveryHistory.target_type == TargetType.EVENT, DeliveryHistory.status == TaskStatus.SUCCESS, ) .all() ) return {r[0] for r in rows} def _load_event_platforms(db: Session, event_ids: list[int]) -> dict[int, list[dict]]: """ 批量加载事件的平台来源数据。 返回:event_id → [{source_name, headline, url, ranking, icon_url}, ...] 按排名升序排列(rank 1 最靠前)。 """ if not event_ids: return {} rows = ( db.query( TrendingEvent.unified_event_id, TrendingEvent.current_headline, TrendingEvent.event_url, TrendingEvent.current_ranking, TrendingEvent.icon_url, InfoSource.source_name, ) .join(InfoSource, TrendingEvent.source_id == InfoSource.id) .filter(TrendingEvent.unified_event_id.in_(event_ids)) .order_by( TrendingEvent.unified_event_id, TrendingEvent.current_ranking.asc().nulls_last(), ) .all() ) result: dict[int, list[dict]] = {} for event_id, headline, url, ranking, icon_url, source_name in rows: result.setdefault(event_id, []).append({ "source_name": source_name or "未知", "headline": headline or "", "url": url or "", "ranking": ranking, "icon_url": icon_url or "", }) return result def _load_event_tags(db: Session, event_ids: list[int]) -> dict[int, list[str]]: """批量加载事件的标签,返回 event_id → [tag, ...]。""" if not event_ids: return {} rows = ( db.query(ExtractedTopic.target_id, ExtractedTopic.topic_keyword) .filter( ExtractedTopic.target_type == TargetType.EVENT, ExtractedTopic.target_id.in_(event_ids), ) .all() ) tags_map: dict[int, list[str]] = {} for eid, kw in rows: if kw: tags_map.setdefault(eid, []).append(kw) return tags_map def _user_has_keywords(db: Session, user_id: int) -> bool: """判断用户是否配置了关键词订阅。""" return ( db.query(UserTopicPreference.id) .filter(UserTopicPreference.user_id == user_id) .first() ) is not None def _get_default_hot_events( db: Session, pushed_ids: set[int], ) -> list[_DefaultEventItem]: """ 默认模式:获取热度 >= DEFAULT_MODE_HOT_THRESHOLD 的近期热点, 排除已推送过的,封装成与 MatchedEventResult 接口相同的对象。 """ time_limit = utcnow() - timedelta(hours=DEFAULT_MODE_HOURS) events = ( db.query(UnifiedEvent) .filter( UnifiedEvent.hot_score >= DEFAULT_MODE_HOT_THRESHOLD, UnifiedEvent.created_at >= time_limit, ) .order_by(UnifiedEvent.hot_score.desc()) .limit(MAX_EVENTS_PER_PUSH * 2) .all() ) event_ids = [e.id for e in events if e.id not in pushed_ids] tags_map = _load_event_tags(db, event_ids) result: list[_DefaultEventItem] = [] for ev in events: if ev.id in pushed_ids: continue result.append(_DefaultEventItem( event=ev, tags=list(dict.fromkeys(tags_map.get(ev.id, [])))[:6], )) if len(result) >= MAX_EVENTS_PER_PUSH: break return result def _record_delivery( db: Session, user_id: int, event_ids: list[int], status: TaskStatus, ) -> None: """批量写入推送历史记录。""" for eid in event_ids: record = DeliveryHistory( user_id=user_id, target_type=TargetType.EVENT, target_id=eid, status=status, ) db.add(record) db.commit() # ========================================== # 推送准备 # ========================================== @dataclass class _PendingPush: """暂存需要发送邮件的信息,便于在 async 上下文中发送。""" user_id: int email_targets: list[str] subject: str html_body: str event_ids: list[int] def _prepare_user_push(db: Session, user: AppUser, schedule: UserDeliverySchedule) -> _PendingPush | None: """ 同步准备单个用户的推送数据(DB 操作),不实际发送邮件。 推送优先级: 1. 有关键词 且 有匹配 → 发送匹配事件 2. 有关键词 但 无匹配 → 发送默认热点(热度 >= 3) 3. 无关键词 → 发送默认热点(热度 >= 3) """ user_id = user.id if _should_skip_by_interval(db, user_id): logger.info(f"用户 {user_id} 仍在 {MIN_PUSH_INTERVAL_MINUTES} 分钟冷却期内,跳过") return None email_endpoints = _get_user_email_endpoints(db, user_id) if not email_endpoints: logger.info(f"用户 {user_id} 无可用邮件渠道,跳过") return None pushed_ids = _get_already_pushed_event_ids(db, user_id) # 决策:有关键词且有匹配 → 匹配模式;否则 → 默认热点模式 items: list = [] is_default = False has_keywords = _user_has_keywords(db, user_id) if has_keywords: matched = recommend_events_for_user( db, user_id=user_id, min_hot=1, hours=72, limit=MAX_EVENTS_PER_PUSH * 2, ) fresh_matched = [m for m in matched if m.event.id not in pushed_ids] if fresh_matched: items = fresh_matched[:MAX_EVENTS_PER_PUSH] logger.info(f"用户 {user_id} 关键词匹配,推送 {len(items)} 条事件") else: logger.info(f"用户 {user_id} 关键词无匹配结果,切换为默认热点模式") is_default = True else: logger.info(f"用户 {user_id} 未配置关键词,使用默认热点模式") is_default = True if is_default: items = _get_default_hot_events(db, pushed_ids) if not items: logger.info(f"用户 {user_id} 默认热点无可推送内容,跳过") return None # 批量加载平台数据(来源名、标题、URL、排名) event_ids = [item.event.id for item in items] platforms_map = _load_event_platforms(db, event_ids) time_str = schedule.delivery_time.strftime("%H:%M") html_body = build_digest_html( items=items, delivery_time_str=time_str, platforms_map=platforms_map, is_default_push=is_default, ) subject_suffix = "全网热点快报" if is_default else "个性化简报" return _PendingPush( user_id=user_id, email_targets=[ep.channel_account for ep in email_endpoints], subject=f"InsightRadar {subject_suffix} · {time_str}", html_body=html_body, event_ids=event_ids, ) # ========================================== # 调度主入口 # ========================================== async def check_and_deliver() -> None: """ 定时推送主入口,由 APScheduler 每分钟调用。 流程: 1. 获取当前 UTC 时间 2. 查询所有启用的推送计划 3. 对每个计划,按用户本地时区判断是否在推送窗口 4. 同步准备推送数据 → 异步发送邮件 → 记录结果 """ now = datetime.now(timezone.utc) current_utc = now.time().replace(second=0, microsecond=0) logger.debug(f"推送调度检查 @ UTC {current_utc.strftime('%H:%M')}") db: Session = SessionLocal() try: active_schedules = ( db.query(UserDeliverySchedule) .filter(UserDeliverySchedule.is_active == True) .all() ) for schedule in active_schedules: user = db.query(AppUser).filter(AppUser.id == schedule.user_id).first() if not user: continue # 将 UTC 转为用户本地时间,判断是否落在推送窗口内 user_current = _user_local_time(now, user.timezone) if not _is_within_window(schedule.delivery_time, user_current): continue try: pending = _prepare_user_push(db, user, schedule) if pending is None: continue # 异步按优先级尝试各邮件渠道 sent = False for target_email in pending.email_targets: try: success = await send_html_email( to_email=target_email, subject=pending.subject, html_content=pending.html_body, ) if success: sent = True logger.info(f"用户 {pending.user_id} 邮件发送成功 → {target_email}") break else: logger.warning(f"用户 {pending.user_id} 渠道 {target_email} 发送失败,尝试下一个") except Exception as e: logger.error(f"用户 {pending.user_id} 发送至 {target_email} 异常: {e}") _record_delivery( db, user_id=pending.user_id, event_ids=pending.event_ids, status=TaskStatus.SUCCESS if sent else TaskStatus.ERROR, ) except Exception as e: logger.error(f"推送用户 {schedule.user_id} 时异常: {e}", exc_info=True) except Exception as e: logger.error(f"推送调度主循环异常: {e}", exc_info=True) finally: db.close()