mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-05 23:32:49 +08:00
442 lines
15 KiB
Python
442 lines
15 KiB
Python
import logging
|
||
import os
|
||
from logging.handlers import TimedRotatingFileHandler
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, time as dt_time, timedelta, timezone, tzinfo
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
||
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.database import SessionLocal
|
||
from app.models.models import (
|
||
AppUser,
|
||
DeliveryHistory,
|
||
ExtractedTopic,
|
||
InfoSource,
|
||
TargetType,
|
||
TaskStatus,
|
||
TrendingEvent,
|
||
UnifiedEvent,
|
||
UserDeliverySchedule,
|
||
UserPushEndpoint,
|
||
UserTopicPreference,
|
||
utcnow,
|
||
)
|
||
from app.prompts.digest_email_template import build_digest_html
|
||
from app.services.matching_service import recommend_events_for_user
|
||
from app.utils.email_utils import send_html_email
|
||
|
||
logger = logging.getLogger("delivery_service")
|
||
|
||
|
||
_delivery_log_dir = Path(__file__).resolve().parents[2] / "logs"
|
||
_delivery_log_dir.mkdir(parents=True, exist_ok=True)
|
||
_delivery_log_file = _delivery_log_dir / "delivery_check.log"
|
||
if not logger.handlers:
|
||
_file_handler = TimedRotatingFileHandler(
|
||
filename=str(_delivery_log_file),
|
||
when="midnight",
|
||
interval=1,
|
||
backupCount=14,
|
||
encoding="utf-8",
|
||
)
|
||
_file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
|
||
logger.addHandler(_file_handler)
|
||
logger.setLevel(logging.INFO)
|
||
logger.propagate = False
|
||
|
||
# AI辅助生成:deepseek-v3-2,2026年3月20日
|
||
|
||
# 推送时间窗口:实际执行时刻与设定时间的最大容差(分钟)
|
||
DELIVERY_WINDOW_MINUTES = int(os.getenv("DELIVERY_WINDOW_MINUTES", 2))
|
||
# 同一用户两次推送之间的最小间隔(分钟)
|
||
MIN_PUSH_INTERVAL_MINUTES = int(os.getenv("MIN_PUSH_INTERVAL_MINUTES", 30))
|
||
# 单次推送最多携带的事件数
|
||
MAX_EVENTS_PER_PUSH = int(os.getenv("MAX_EVENTS_PER_PUSH", 12))
|
||
# 默认模式热度阈值(无关键词或无匹配时使用)
|
||
DEFAULT_MODE_HOT_THRESHOLD = int(os.getenv("DEFAULT_MODE_HOT_THRESHOLD", 3))
|
||
# 默认模式查询时间窗口(小时)
|
||
DEFAULT_MODE_HOURS = int(os.getenv("DEFAULT_MODE_HOURS", 24))
|
||
# 用户时区无效时的兜底时区
|
||
DEFAULT_FALLBACK_TIMEZONE = os.getenv("DEFAULT_FALLBACK_TIMEZONE", "Asia/Shanghai")
|
||
|
||
@dataclass
|
||
class _DefaultEventItem:
|
||
"""
|
||
默认热点事件容器
|
||
无关键词订阅或关键词无匹配时的默认热点包装器,
|
||
接口与 MatchedEventResult 保持一致,方便统一传给模板。
|
||
"""
|
||
event: UnifiedEvent
|
||
match_score: float = 0.0
|
||
exact_hits: list[str] = field(default_factory=list)
|
||
semantic_hits: list[dict[str, Any]] = field(default_factory=list)
|
||
tags: list[str] = field(default_factory=list)
|
||
is_default: bool = True
|
||
|
||
def _time_to_minutes(t: dt_time) -> int:
|
||
return t.hour * 60 + t.minute
|
||
|
||
|
||
def _is_within_window(schedule_time: dt_time, current_time: dt_time, window: int = DELIVERY_WINDOW_MINUTES) -> bool:
|
||
"""判断 schedule_time 是否在 current_time ± window 分钟范围内(跨午夜安全)。"""
|
||
s = _time_to_minutes(schedule_time)
|
||
c = _time_to_minutes(current_time)
|
||
diff = abs(s - c)
|
||
return min(diff, 1440 - diff) <= window
|
||
|
||
|
||
def _resolve_user_timezone(user_timezone: str | None) -> tzinfo:
|
||
"""解析用户时区,异常时回退到默认时区。"""
|
||
tz_name = (user_timezone or "").strip() or DEFAULT_FALLBACK_TIMEZONE
|
||
try:
|
||
return ZoneInfo(tz_name)
|
||
except ZoneInfoNotFoundError:
|
||
logger.warning(
|
||
"用户时区无效,已回退默认时区。timezone=%s fallback=%s",
|
||
tz_name, DEFAULT_FALLBACK_TIMEZONE,
|
||
)
|
||
try:
|
||
return ZoneInfo(DEFAULT_FALLBACK_TIMEZONE)
|
||
except ZoneInfoNotFoundError:
|
||
logger.warning("系统缺少时区数据库,最终回退为 UTC。建议安装 tzdata 包。")
|
||
return timezone.utc
|
||
|
||
|
||
def _user_local_time(now_utc: datetime, user_timezone: str | None) -> dt_time:
|
||
"""把 UTC 当前时刻转换为用户本地时间(仅取 HH:MM)。"""
|
||
local_dt = now_utc.astimezone(_resolve_user_timezone(user_timezone))
|
||
return local_dt.time().replace(second=0, microsecond=0)
|
||
|
||
|
||
def _ensure_aware(dt: datetime) -> datetime:
|
||
if dt.tzinfo is None:
|
||
return dt.replace(tzinfo=timezone.utc)
|
||
return dt
|
||
|
||
# AI辅助生成结束
|
||
|
||
|
||
# 数据库查询辅助
|
||
def _should_skip_by_interval(db: Session, user_id: int) -> bool:
|
||
"""检查用户是否仍在冷却期内,避免短时间内重复推送"""
|
||
row = (
|
||
db.query(DeliveryHistory.created_at)
|
||
.filter(
|
||
DeliveryHistory.user_id == user_id,
|
||
DeliveryHistory.status == TaskStatus.SUCCESS,
|
||
)
|
||
.order_by(DeliveryHistory.created_at.desc())
|
||
.first()
|
||
)
|
||
if row is None:
|
||
return False
|
||
last_time = _ensure_aware(row[0])
|
||
elapsed = (utcnow() - last_time).total_seconds() / 60.0
|
||
return elapsed < MIN_PUSH_INTERVAL_MINUTES
|
||
|
||
|
||
def _get_user_email_endpoints(db: Session, user_id: int) -> list[UserPushEndpoint]:
|
||
"""获取用户已启用的邮件类型推送渠道,按优先级排序。"""
|
||
return (
|
||
db.query(UserPushEndpoint)
|
||
.filter(
|
||
UserPushEndpoint.user_id == user_id,
|
||
UserPushEndpoint.channel_type == "EMAIL",
|
||
UserPushEndpoint.is_active == True,
|
||
)
|
||
.order_by(UserPushEndpoint.priority_level.asc())
|
||
.all()
|
||
)
|
||
|
||
|
||
def _get_already_pushed_event_ids(db: Session, user_id: int) -> set[int]:
|
||
"""获取已经推送过的事件 ID 集合,避免重复轰炸。"""
|
||
rows = (
|
||
db.query(DeliveryHistory.target_id)
|
||
.filter(
|
||
DeliveryHistory.user_id == user_id,
|
||
DeliveryHistory.target_type == TargetType.EVENT,
|
||
DeliveryHistory.status == TaskStatus.SUCCESS,
|
||
)
|
||
.all()
|
||
)
|
||
return {r[0] for r in rows}
|
||
|
||
|
||
def _load_event_platforms(db: Session, event_ids: list[int]) -> dict[int, list[dict]]:
|
||
"""
|
||
批量加载事件的平台来源数据。
|
||
返回:event_id → [{source_name, headline, url, ranking, icon_url}, ...]
|
||
按排名升序排列(rank 1 最靠前)。
|
||
"""
|
||
if not event_ids:
|
||
return {}
|
||
|
||
rows = (
|
||
db.query(
|
||
TrendingEvent.unified_event_id,
|
||
TrendingEvent.current_headline,
|
||
TrendingEvent.event_url,
|
||
TrendingEvent.current_ranking,
|
||
TrendingEvent.icon_url,
|
||
InfoSource.source_name,
|
||
)
|
||
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
|
||
.filter(TrendingEvent.unified_event_id.in_(event_ids))
|
||
.order_by(
|
||
TrendingEvent.unified_event_id,
|
||
TrendingEvent.current_ranking.asc().nulls_last(),
|
||
)
|
||
.all()
|
||
)
|
||
|
||
result: dict[int, list[dict]] = {}
|
||
for event_id, headline, url, ranking, icon_url, source_name in rows:
|
||
result.setdefault(event_id, []).append({
|
||
"source_name": source_name or "未知",
|
||
"headline": headline or "",
|
||
"url": url or "",
|
||
"ranking": ranking,
|
||
"icon_url": icon_url or "",
|
||
})
|
||
return result
|
||
|
||
|
||
def _load_event_tags(db: Session, event_ids: list[int]) -> dict[int, list[str]]:
|
||
"""批量加载事件的标签,返回 event_id → [tag, ...]。"""
|
||
if not event_ids:
|
||
return {}
|
||
rows = (
|
||
db.query(ExtractedTopic.target_id, ExtractedTopic.topic_keyword)
|
||
.filter(
|
||
ExtractedTopic.target_type == TargetType.EVENT,
|
||
ExtractedTopic.target_id.in_(event_ids),
|
||
)
|
||
.all()
|
||
)
|
||
tags_map: dict[int, list[str]] = {}
|
||
for eid, kw in rows:
|
||
if kw:
|
||
tags_map.setdefault(eid, []).append(kw)
|
||
return tags_map
|
||
|
||
|
||
def _user_has_keywords(db: Session, user_id: int) -> bool:
|
||
"""判断用户是否配置了关键词订阅。"""
|
||
return (
|
||
db.query(UserTopicPreference.id)
|
||
.filter(UserTopicPreference.user_id == user_id)
|
||
.first()
|
||
) is not None
|
||
|
||
|
||
def _get_default_hot_events(
|
||
db: Session,
|
||
pushed_ids: set[int],
|
||
) -> list[_DefaultEventItem]:
|
||
"""
|
||
默认模式:获取热度 >= DEFAULT_MODE_HOT_THRESHOLD 的近期热点,
|
||
排除已推送过的,封装成与 MatchedEventResult 接口相同的对象。
|
||
"""
|
||
time_limit = utcnow() - timedelta(hours=DEFAULT_MODE_HOURS)
|
||
events = (
|
||
db.query(UnifiedEvent)
|
||
.filter(
|
||
UnifiedEvent.hot_score >= DEFAULT_MODE_HOT_THRESHOLD,
|
||
UnifiedEvent.created_at >= time_limit,
|
||
)
|
||
.order_by(UnifiedEvent.hot_score.desc())
|
||
.limit(MAX_EVENTS_PER_PUSH * 2)
|
||
.all()
|
||
)
|
||
|
||
event_ids = [e.id for e in events if e.id not in pushed_ids]
|
||
tags_map = _load_event_tags(db, event_ids)
|
||
|
||
result: list[_DefaultEventItem] = []
|
||
for ev in events:
|
||
if ev.id in pushed_ids:
|
||
continue
|
||
result.append(_DefaultEventItem(
|
||
event=ev,
|
||
tags=list(dict.fromkeys(tags_map.get(ev.id, [])))[:6],
|
||
))
|
||
if len(result) >= MAX_EVENTS_PER_PUSH:
|
||
break
|
||
|
||
return result
|
||
|
||
|
||
def _record_delivery(
|
||
db: Session,
|
||
user_id: int,
|
||
event_ids: list[int],
|
||
status: TaskStatus,
|
||
) -> None:
|
||
"""批量写入推送历史记录。"""
|
||
for eid in event_ids:
|
||
record = DeliveryHistory(
|
||
user_id=user_id,
|
||
target_type=TargetType.EVENT,
|
||
target_id=eid,
|
||
status=status,
|
||
)
|
||
db.add(record)
|
||
db.commit()
|
||
|
||
|
||
# AI辅助生成:deepseek-v3-2,2026年3月20日
|
||
|
||
# 推送准备
|
||
@dataclass
|
||
class _PendingPush:
|
||
"""暂存需要发送邮件的信息,便于在 async 上下文中发送。"""
|
||
user_id: int
|
||
email_targets: list[str]
|
||
subject: str
|
||
html_body: str
|
||
event_ids: list[int]
|
||
|
||
# AI生成结束
|
||
|
||
def _prepare_user_push(db: Session, user: AppUser, schedule: UserDeliverySchedule) -> _PendingPush | None:
|
||
"""
|
||
同步准备单个用户的推送数据(DB 操作),不实际发送邮件。
|
||
推送优先级:
|
||
1. 有关键词 且 有匹配 → 发送匹配事件
|
||
2. 有关键词 但 无匹配 → 发送默认热点(热度 >= 3)
|
||
3. 无关键词 → 发送默认热点(热度 >= 3)
|
||
"""
|
||
user_id = user.id
|
||
|
||
if _should_skip_by_interval(db, user_id):
|
||
logger.info(f"用户 {user_id} 仍在 {MIN_PUSH_INTERVAL_MINUTES} 分钟冷却期内,跳过")
|
||
return None
|
||
|
||
email_endpoints = _get_user_email_endpoints(db, user_id)
|
||
if not email_endpoints:
|
||
logger.info(f"用户 {user_id} 无可用邮件渠道,跳过")
|
||
return None
|
||
|
||
pushed_ids = _get_already_pushed_event_ids(db, user_id)
|
||
|
||
items: list = []
|
||
is_default = False
|
||
|
||
has_keywords = _user_has_keywords(db, user_id)
|
||
if has_keywords:
|
||
matched = recommend_events_for_user(
|
||
db,
|
||
user_id=user_id,
|
||
min_hot=1,
|
||
hours=72,
|
||
limit=MAX_EVENTS_PER_PUSH * 2,
|
||
)
|
||
fresh_matched = [m for m in matched if m.event.id not in pushed_ids]
|
||
if fresh_matched:
|
||
items = fresh_matched[:MAX_EVENTS_PER_PUSH]
|
||
logger.info(f"用户 {user_id} 关键词匹配,推送 {len(items)} 条事件")
|
||
else:
|
||
logger.info(f"用户 {user_id} 关键词无匹配结果,切换为默认热点模式")
|
||
is_default = True
|
||
else:
|
||
logger.info(f"用户 {user_id} 未配置关键词,使用默认热点模式")
|
||
is_default = True
|
||
|
||
if is_default:
|
||
items = _get_default_hot_events(db, pushed_ids)
|
||
if not items:
|
||
logger.info(f"用户 {user_id} 默认热点无可推送内容,跳过")
|
||
return None
|
||
|
||
event_ids = [item.event.id for item in items]
|
||
platforms_map = _load_event_platforms(db, event_ids)
|
||
|
||
time_str = schedule.delivery_time.strftime("%H:%M")
|
||
html_body = build_digest_html(
|
||
items=items,
|
||
delivery_time_str=time_str,
|
||
platforms_map=platforms_map,
|
||
is_default_push=is_default,
|
||
)
|
||
|
||
subject_suffix = "全网热点快报" if is_default else "个性化简报"
|
||
return _PendingPush(
|
||
user_id=user_id,
|
||
email_targets=[ep.channel_account for ep in email_endpoints],
|
||
subject=f"聚势智见 {subject_suffix} · {time_str}",
|
||
html_body=html_body,
|
||
event_ids=event_ids,
|
||
)
|
||
|
||
|
||
async def check_and_deliver() -> None:
|
||
"""
|
||
定时推送主入口,由 APScheduler 每分钟调用。
|
||
流程:
|
||
1. 获取当前 UTC 时间
|
||
2. 查询所有启用的推送计划
|
||
3. 对每个计划,按用户本地时区判断是否在推送窗口
|
||
4. 同步准备推送数据 → 异步发送邮件 → 记录结果
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
current_utc = now.time().replace(second=0, microsecond=0)
|
||
logger.debug(f"推送调度检查 @ UTC {current_utc.strftime('%H:%M')}")
|
||
|
||
db: Session = SessionLocal()
|
||
try:
|
||
active_schedules = (
|
||
db.query(UserDeliverySchedule)
|
||
.filter(UserDeliverySchedule.is_active == True)
|
||
.all()
|
||
)
|
||
|
||
for schedule in active_schedules:
|
||
user = db.query(AppUser).filter(AppUser.id == schedule.user_id).first()
|
||
if not user:
|
||
continue
|
||
|
||
user_current = _user_local_time(now, user.timezone)
|
||
if not _is_within_window(schedule.delivery_time, user_current):
|
||
continue
|
||
|
||
try:
|
||
pending = _prepare_user_push(db, user, schedule)
|
||
if pending is None:
|
||
continue
|
||
|
||
sent = False
|
||
for target_email in pending.email_targets:
|
||
try:
|
||
success = await send_html_email(
|
||
to_email=target_email,
|
||
subject=pending.subject,
|
||
html_content=pending.html_body,
|
||
)
|
||
if success:
|
||
sent = True
|
||
logger.info(f"用户 {pending.user_id} 邮件发送成功 → {target_email}")
|
||
break
|
||
else:
|
||
logger.warning(f"用户 {pending.user_id} 渠道 {target_email} 发送失败,尝试下一个")
|
||
except Exception as e:
|
||
logger.error(f"用户 {pending.user_id} 发送至 {target_email} 异常: {e}")
|
||
|
||
_record_delivery(
|
||
db,
|
||
user_id=pending.user_id,
|
||
event_ids=pending.event_ids,
|
||
status=TaskStatus.SUCCESS if sent else TaskStatus.ERROR,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"推送用户 {schedule.user_id} 时异常: {e}", exc_info=True)
|
||
|
||
except Exception as e:
|
||
logger.error(f"推送调度主循环异常: {e}", exc_info=True)
|
||
finally:
|
||
db.close()
|