Files
InsightRadar/backend/app/services/summary_service.py
2026-03-13 23:48:49 +08:00

260 lines
9.0 KiB
Python

# app/services/summary_service.py
"""
摘要服务:调用 LLM 生成统一标题、综合摘要、话题标签
定时任务:对热度达标且未摘要的事件批量处理
"""
import json
import os
from datetime import timedelta
from typing import Any
import numpy as np
from openai import AsyncOpenAI
from app.database import SessionLocal
from app.models.models import (
ExtractedTopic,
InfoSource,
TargetType,
TrendingEvent,
UnifiedEvent,
utcnow,
)
from app.prompts.summary_prompts import (
SUMMARY_SYSTEM_PROMPT,
SUMMARY_USER_PROMPT_TEMPLATE,
)
from app.services.fetcher_service import embedder_model
HOT_SCORE_THRESHOLD = int(os.getenv("HOT_SCORE_THRESHOLD", 3))
TOPIC_TAG_MIN_HOT_SCORE = int(os.getenv("TOPIC_TAG_MIN_HOT_SCORE", HOT_SCORE_THRESHOLD))
TOPIC_SIMILARITY_THRESHOLD = float(os.getenv("TOPIC_SIMILARITY_THRESHOLD", 0.82))
TOPIC_TAG_MAX_COUNT = int(os.getenv("TOPIC_TAG_MAX_COUNT", 8))
AI_API_KEY = os.getenv("AI_API_KEY", "")
deepseek_client = AsyncOpenAI(
api_key=AI_API_KEY,
base_url="https://api.deepseek.com",
)
async def call_llm_for_summary(platform_data_text: str) -> dict:
"""调用 LLM 生成统一标题、综合摘要、话题候选词"""
prompt = SUMMARY_USER_PROMPT_TEMPLATE.format(platform_data_text=platform_data_text)
response = await deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": SUMMARY_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
temperature=1,
)
result_text = response.choices[0].message.content
return json.loads(result_text)
def _normalize_score(raw_score: Any) -> float | None:
try:
score = float(raw_score)
except (TypeError, ValueError):
return None
if score <= 1:
score *= 100
return max(0.0, min(100.0, score))
def parse_topic_keywords(llm_result: dict) -> list[dict[str, Any]]:
"""解析 LLM 返回的话题关键词,支持字符串或对象格式"""
raw_topics = llm_result.get("topic_keywords") or []
parsed: list[dict[str, Any]] = []
seen: set[str] = set()
for item in raw_topics:
keyword = ""
score = None
if isinstance(item, str):
keyword = item.strip()
elif isinstance(item, dict):
raw_keyword = (
item.get("keyword")
or item.get("topic_keyword")
or item.get("name")
or item.get("topic")
or ""
)
keyword = str(raw_keyword).strip()
score = _normalize_score(item.get("relevance_score") or item.get("score"))
if not keyword:
continue
keyword = keyword[:100]
normalized_key = keyword.casefold()
if normalized_key in seen:
continue
seen.add(normalized_key)
parsed.append({"keyword": keyword, "score": score})
return parsed
def normalize_topic_keywords(topic_candidates: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""用向量相似度去重同义标签,保留最具代表性的关键词"""
if not topic_candidates:
return []
keywords = [item["keyword"] for item in topic_candidates]
vectors = embedder_model.encode(keywords, normalize_embeddings=True, show_progress_bar=False)
clusters: list[dict[str, Any]] = []
for item, vector in zip(topic_candidates, vectors):
vec = np.asarray(vector, dtype=np.float32)
best_idx = -1
best_sim = -1.0
for idx, cluster in enumerate(clusters):
sim = float(np.dot(vec, cluster["vector"]))
if sim > best_sim:
best_sim = sim
best_idx = idx
if best_idx >= 0 and best_sim >= TOPIC_SIMILARITY_THRESHOLD:
cluster = clusters[best_idx]
merged = cluster["vector"] * cluster["count"] + vec
norm = float(np.linalg.norm(merged))
if norm > 0:
cluster["vector"] = merged / norm
cluster["count"] += 1
if item["score"] is not None and (
cluster["score"] is None or item["score"] > cluster["score"]
):
cluster["score"] = item["score"]
# 优先选择更短的标签作为规范关键词,减少同义长短词分裂。
if len(item["keyword"]) < len(cluster["keyword"]):
cluster["keyword"] = item["keyword"]
else:
clusters.append(
{
"keyword": item["keyword"],
"score": item["score"],
"vector": vec,
"count": 1,
}
)
if any(cluster["score"] is not None for cluster in clusters):
clusters.sort(key=lambda x: x["score"] if x["score"] is not None else -1.0, reverse=True)
result = [
{"keyword": cluster["keyword"], "score": cluster["score"]}
for cluster in clusters[:TOPIC_TAG_MAX_COUNT]
]
return result
def replace_event_topics(db, event_id: int, normalized_topics: list[dict[str, Any]]) -> None:
"""原子替换某事件的标签:先删旧再插新"""
db.query(ExtractedTopic).filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id == event_id,
).delete(synchronize_session=False)
for item in normalized_topics:
db.add(
ExtractedTopic(
target_type=TargetType.EVENT,
target_id=event_id,
topic_keyword=item["keyword"],
relevance_score=item["score"],
)
)
async def generate_unified_summaries():
"""定时任务:对热度达标且未摘要的事件刷新标题、摘要、标签"""
print(f"[{utcnow()}] Start unified summary generation task...")
# 先提取需要处理的事件 ID,尽早释放 session,不长期占用 db session
with SessionLocal() as db:
recent_threshold = utcnow() - timedelta(days=3)
events = db.query(UnifiedEvent).filter(
UnifiedEvent.hot_score >= HOT_SCORE_THRESHOLD,
UnifiedEvent.hot_score > UnifiedEvent.last_summarized_trends_count,
UnifiedEvent.created_at >= recent_threshold,
).all()
if not events:
print("No events require summary update in this round.")
return
# 复制出需要的信息,脱离 session
event_ids = [e.id for e in events]
event_hot_scores = {e.id: e.hot_score for e in events}
# 外层循环:针对每个 event_id 开启一个极短生命周期的 session 获取依赖数据
for event_id in event_ids:
platform_dict: dict[str, set[str]] = {}
with SessionLocal() as db:
trends = (
db.query(TrendingEvent, InfoSource.source_name)
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
.filter(TrendingEvent.unified_event_id == event_id)
.all()
)
if not trends:
continue
for trend_record, source_name in trends:
platform_dict.setdefault(source_name, set()).add(trend_record.current_headline)
prompt_lines = [
f"[{platform}] {', '.join(sorted(headlines))}"
for platform, headlines in platform_dict.items()
]
platform_data_text = "\n".join(prompt_lines)
try:
# 大模型调用可能耗时几十秒,绝对不能把它包裹在数据库事务里
llm_result = await call_llm_for_summary(platform_data_text)
# 调用完成后,再开启一个新的极短事务,进行数据回写
with SessionLocal() as write_db:
event = write_db.query(UnifiedEvent).get(event_id)
if not event:
continue
if "unified_title" in llm_result and llm_result["unified_title"]:
event.unified_title = llm_result["unified_title"]
if "ai_comprehensive_summary" in llm_result and llm_result["ai_comprehensive_summary"]:
event.ai_comprehensive_summary = llm_result["ai_comprehensive_summary"]
hot_score = event_hot_scores.get(event_id, event.hot_score)
if hot_score >= TOPIC_TAG_MIN_HOT_SCORE:
topic_candidates = parse_topic_keywords(llm_result)
normalized_topics = normalize_topic_keywords(topic_candidates)
if normalized_topics:
replace_event_topics(write_db, event.id, normalized_topics)
event.last_summarized_trends_count = event.hot_score
write_db.commit()
print(
f"Updated event {event.id} summary"
f" (hot_score={event.hot_score})."
)
except Exception as exc:
print(f"Event {event_id} summary generation failed: {exc}")
continue