Files
InsightRadar/backend/app/api/endpoints/events.py
T
stardrophere 37791c7976 缓存优化
2026-03-12 14:17:15 +08:00

258 lines
8.6 KiB
Python

# app/api/endpoints/events.py
import time
from datetime import timedelta
from typing import Dict, List, Tuple
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.api.dependencies import get_db
from app.models.models import (
ExtractedTopic,
InfoSource,
RankingLog,
TargetType,
TrendingEvent,
UnifiedEvent,
utcnow,
)
from app.schemas.event_schema import (
PaginatedUnifiedEventResponse,
PlatformTrendResponse,
UnifiedEventResponse,
)
router = APIRouter()
# 排名轨迹最多返回多少个点,避免长时间跨度下数据过大
MAX_RANKING_POINTS = 30
# --- 轻量级接口缓存配置 ---
_UNIFIED_EVENTS_CACHE: Dict[str, Tuple[float, PaginatedUnifiedEventResponse]] = {}
CACHE_TTL_SECONDS = 60
# ---------------------------
@router.get("/unified", response_model=PaginatedUnifiedEventResponse)
def list_unified_events(
min_hot: int = Query(5, ge=0, description="热度阈值,仅返回 hot_score >= 此值的事件"),
hours: int = Query(48, ge=1, le=720, description="查询最近多少小时的数据"),
sort_by: str = Query("hot_score", description="排序字段: hot_score | created_at"),
skip: int = Query(0, ge=0, description="分页偏移量"),
limit: int = Query(10, ge=1, le=50, description="每页返回条数"),
db: Session = Depends(get_db),
):
"""分页返回统一事件,附带各平台热搜、排名轨迹和标签。"""
# --- 1. 尝试从缓存读取 ---
cache_key = f"{min_hot}:{hours}:{sort_by}:{skip}:{limit}"
current_time = time.time()
if cache_key in _UNIFIED_EVENTS_CACHE:
expire_time, cached_data = _UNIFIED_EVENTS_CACHE[cache_key]
if current_time < expire_time:
return cached_data
# -----------------------
time_limit = utcnow() - timedelta(hours=hours)
# 先查总数,用于前端判断是否还有更多
base_query = db.query(UnifiedEvent).filter(
UnifiedEvent.hot_score >= min_hot,
UnifiedEvent.created_at >= time_limit,
)
total = base_query.count()
# 分页查询
if sort_by == "created_at":
base_query = base_query.order_by(UnifiedEvent.created_at.desc())
else:
base_query = base_query.order_by(UnifiedEvent.hot_score.desc(), UnifiedEvent.created_at.desc())
events = base_query.offset(skip).limit(limit).all()
if not events:
return PaginatedUnifiedEventResponse(total=total, has_more=False, data=[])
event_ids = [ev.id for ev in events]
# 批量查询所有相关的热搜条目(避免 N+1)
trend_rows = (
db.query(TrendingEvent, InfoSource.source_name)
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
.filter(TrendingEvent.unified_event_id.in_(event_ids))
.all()
)
# 按 unified_event_id 分组
trend_map: dict[int, list[tuple]] = {}
trend_ids: list[int] = []
for trend, source_name in trend_rows:
trend_map.setdefault(trend.unified_event_id, []).append((trend, source_name))
trend_ids.append(trend.id)
# 批量查询排名日志(避免逐条查询)
ranking_map: dict[int, list[int]] = {}
if trend_ids:
ranking_rows = (
db.query(
RankingLog.event_id,
RankingLog.ranking_position,
)
.filter(
RankingLog.event_id.in_(trend_ids),
RankingLog.observed_at >= time_limit,
)
.order_by(RankingLog.event_id, RankingLog.observed_at.asc())
.all()
)
for event_id, position in ranking_rows:
ranking_map.setdefault(event_id, []).append(position)
# 批量查询标签
tag_map: dict[int, list[str]] = {}
tag_rows = (
db.query(ExtractedTopic.target_id, ExtractedTopic.topic_keyword)
.filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id.in_(event_ids),
)
.order_by(ExtractedTopic.relevance_score.desc(), ExtractedTopic.created_at.desc())
.all()
)
for target_id, keyword in tag_rows:
tag_map.setdefault(target_id, []).append(keyword)
# 组装响应
results: list[UnifiedEventResponse] = []
for ev in events:
platform_list: list[PlatformTrendResponse] = []
trends_for_ev = trend_map.get(ev.id, [])
for trend, source_name in trends_for_ev:
history = ranking_map.get(trend.id, [])
# 截取尾部,只保留最近的点
if len(history) > MAX_RANKING_POINTS:
history = history[-MAX_RANKING_POINTS:]
platform_list.append(
PlatformTrendResponse(
source_id=trend.source_id,
platform_name=source_name,
headline=trend.current_headline,
url=trend.event_url,
current_ranking=trend.current_ranking,
ranking_history=history,
)
)
# 取所有关联热搜条目中最新的 updated_at,代表"最后一次在平台热搜榜看到"的时间
last_active_at = (
max(t.updated_at for t, _ in trends_for_ev)
if trends_for_ev
else ev.updated_at
)
results.append(
UnifiedEventResponse(
event_id=ev.id,
unified_title=ev.unified_title if ev.unified_title else "暂无标题",
summary=ev.ai_comprehensive_summary,
hot_score=ev.hot_score,
created_at=ev.created_at,
last_active_at=last_active_at,
platforms=platform_list,
tags=tag_map.get(ev.id, []),
)
)
has_more = (skip + limit) < total
response = PaginatedUnifiedEventResponse(total=total, has_more=has_more, data=results)
# --- 2. 写入缓存 ---
if len(_UNIFIED_EVENTS_CACHE) > 1000:
# 防止内存无限增长
_UNIFIED_EVENTS_CACHE.clear()
_UNIFIED_EVENTS_CACHE[cache_key] = (current_time + CACHE_TTL_SECONDS, response)
# ------------------
return response
@router.get("/unified/{event_id}", response_model=UnifiedEventResponse)
def get_unified_event(
event_id: int,
db: Session = Depends(get_db),
):
"""按 ID 查询单个统一事件,用于推荐跳转时的聚光灯展示。"""
ev = db.query(UnifiedEvent).filter(UnifiedEvent.id == event_id).first()
if not ev:
raise HTTPException(status_code=404, detail="Event not found")
time_limit = utcnow() - timedelta(hours=720)
trend_rows = (
db.query(TrendingEvent, InfoSource.source_name)
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
.filter(TrendingEvent.unified_event_id == event_id)
.all()
)
trend_ids = [t.id for t, _ in trend_rows]
ranking_map: dict[int, list[int]] = {}
if trend_ids:
ranking_rows = (
db.query(RankingLog.event_id, RankingLog.ranking_position)
.filter(
RankingLog.event_id.in_(trend_ids),
RankingLog.observed_at >= time_limit,
)
.order_by(RankingLog.event_id, RankingLog.observed_at.asc())
.all()
)
for eid, pos in ranking_rows:
ranking_map.setdefault(eid, []).append(pos)
tag_rows = (
db.query(ExtractedTopic.topic_keyword)
.filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id == event_id,
)
.order_by(ExtractedTopic.relevance_score.desc())
.all()
)
tags = [row[0] for row in tag_rows]
platform_list: list[PlatformTrendResponse] = []
for trend, source_name in trend_rows:
history = ranking_map.get(trend.id, [])
if len(history) > MAX_RANKING_POINTS:
history = history[-MAX_RANKING_POINTS:]
platform_list.append(
PlatformTrendResponse(
source_id=trend.source_id,
platform_name=source_name,
headline=trend.current_headline,
url=trend.event_url,
current_ranking=trend.current_ranking,
ranking_history=history,
)
)
last_active_at = (
max(t.updated_at for t, _ in trend_rows)
if trend_rows
else ev.updated_at
)
return UnifiedEventResponse(
event_id=ev.id,
unified_title=ev.unified_title if ev.unified_title else "暂无标题",
summary=ev.ai_comprehensive_summary,
hot_score=ev.hot_score,
created_at=ev.created_at,
last_active_at=last_active_at,
platforms=platform_list,
tags=tags,
)