mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 01:57:51 +08:00
256 lines
8.6 KiB
Python
256 lines
8.6 KiB
Python
import json
|
||
import os
|
||
from datetime import timedelta
|
||
from typing import Any
|
||
|
||
import numpy as np
|
||
from openai import AsyncOpenAI
|
||
|
||
from app.database import SessionLocal
|
||
from app.models.models import (
|
||
ExtractedTopic,
|
||
InfoSource,
|
||
TargetType,
|
||
TrendingEvent,
|
||
UnifiedEvent,
|
||
utcnow,
|
||
)
|
||
from app.prompts.summary_prompts import (
|
||
SUMMARY_SYSTEM_PROMPT,
|
||
SUMMARY_USER_PROMPT_TEMPLATE,
|
||
)
|
||
from app.services.fetcher_service import embedder_model
|
||
|
||
# AI辅助生成:deepseek-v3-2,2026年3月20日
|
||
|
||
HOT_SCORE_THRESHOLD = int(os.getenv("HOT_SCORE_THRESHOLD", 3))
|
||
TOPIC_TAG_MIN_HOT_SCORE = int(os.getenv("TOPIC_TAG_MIN_HOT_SCORE", HOT_SCORE_THRESHOLD))
|
||
TOPIC_SIMILARITY_THRESHOLD = float(os.getenv("TOPIC_SIMILARITY_THRESHOLD", 0.82))
|
||
TOPIC_TAG_MAX_COUNT = int(os.getenv("TOPIC_TAG_MAX_COUNT", 8))
|
||
AI_API_KEY = os.getenv("AI_API_KEY", "")
|
||
|
||
# AI生成结束
|
||
|
||
|
||
deepseek_client = AsyncOpenAI(
|
||
api_key=AI_API_KEY,
|
||
base_url="https://api.deepseek.com",
|
||
)
|
||
|
||
|
||
async def call_llm_for_summary(platform_data_text: str) -> dict:
|
||
"""调用 LLM 生成统一标题、综合摘要、话题候选词"""
|
||
prompt = SUMMARY_USER_PROMPT_TEMPLATE.format(platform_data_text=platform_data_text)
|
||
|
||
response = await deepseek_client.chat.completions.create(
|
||
model="deepseek-chat",
|
||
messages=[
|
||
{"role": "system", "content": SUMMARY_SYSTEM_PROMPT},
|
||
{"role": "user", "content": prompt},
|
||
],
|
||
response_format={"type": "json_object"},
|
||
temperature=1,
|
||
)
|
||
|
||
result_text = response.choices[0].message.content
|
||
return json.loads(result_text)
|
||
|
||
|
||
def _normalize_score(raw_score: Any) -> float | None:
|
||
try:
|
||
score = float(raw_score)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
if score <= 1:
|
||
score *= 100
|
||
|
||
return max(0.0, min(100.0, score))
|
||
|
||
|
||
def parse_topic_keywords(llm_result: dict) -> list[dict[str, Any]]:
|
||
"""解析 LLM 返回的话题关键词,支持字符串或对象格式"""
|
||
raw_topics = llm_result.get("topic_keywords") or []
|
||
parsed: list[dict[str, Any]] = []
|
||
seen: set[str] = set()
|
||
|
||
for item in raw_topics:
|
||
keyword = ""
|
||
score = None
|
||
|
||
if isinstance(item, str):
|
||
keyword = item.strip()
|
||
elif isinstance(item, dict):
|
||
raw_keyword = (
|
||
item.get("keyword")
|
||
or item.get("topic_keyword")
|
||
or item.get("name")
|
||
or item.get("topic")
|
||
or ""
|
||
)
|
||
keyword = str(raw_keyword).strip()
|
||
score = _normalize_score(item.get("relevance_score") or item.get("score"))
|
||
|
||
if not keyword:
|
||
continue
|
||
|
||
keyword = keyword[:100]
|
||
normalized_key = keyword.casefold()
|
||
if normalized_key in seen:
|
||
continue
|
||
|
||
seen.add(normalized_key)
|
||
parsed.append({"keyword": keyword, "score": score})
|
||
|
||
return parsed
|
||
|
||
|
||
def normalize_topic_keywords(topic_candidates: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||
"""用向量相似度去重同义标签,保留最具代表性的关键词"""
|
||
if not topic_candidates:
|
||
return []
|
||
|
||
keywords = [item["keyword"] for item in topic_candidates]
|
||
vectors = embedder_model.encode(keywords, normalize_embeddings=True, show_progress_bar=False)
|
||
|
||
clusters: list[dict[str, Any]] = []
|
||
for item, vector in zip(topic_candidates, vectors):
|
||
vec = np.asarray(vector, dtype=np.float32)
|
||
|
||
best_idx = -1
|
||
best_sim = -1.0
|
||
for idx, cluster in enumerate(clusters):
|
||
sim = float(np.dot(vec, cluster["vector"]))
|
||
if sim > best_sim:
|
||
best_sim = sim
|
||
best_idx = idx
|
||
|
||
if best_idx >= 0 and best_sim >= TOPIC_SIMILARITY_THRESHOLD:
|
||
cluster = clusters[best_idx]
|
||
merged = cluster["vector"] * cluster["count"] + vec
|
||
norm = float(np.linalg.norm(merged))
|
||
if norm > 0:
|
||
cluster["vector"] = merged / norm
|
||
|
||
cluster["count"] += 1
|
||
if item["score"] is not None and (
|
||
cluster["score"] is None or item["score"] > cluster["score"]
|
||
):
|
||
cluster["score"] = item["score"]
|
||
|
||
# 优先选择更短的标签作为规范关键词,减少同义长短词分裂。
|
||
if len(item["keyword"]) < len(cluster["keyword"]):
|
||
cluster["keyword"] = item["keyword"]
|
||
else:
|
||
clusters.append(
|
||
{
|
||
"keyword": item["keyword"],
|
||
"score": item["score"],
|
||
"vector": vec,
|
||
"count": 1,
|
||
}
|
||
)
|
||
|
||
if any(cluster["score"] is not None for cluster in clusters):
|
||
clusters.sort(key=lambda x: x["score"] if x["score"] is not None else -1.0, reverse=True)
|
||
|
||
result = [
|
||
{"keyword": cluster["keyword"], "score": cluster["score"]}
|
||
for cluster in clusters[:TOPIC_TAG_MAX_COUNT]
|
||
]
|
||
return result
|
||
|
||
|
||
def replace_event_topics(db, event_id: int, normalized_topics: list[dict[str, Any]]) -> None:
|
||
"""原子替换某事件的标签:先删旧再插新"""
|
||
db.query(ExtractedTopic).filter(
|
||
ExtractedTopic.target_type == TargetType.EVENT,
|
||
ExtractedTopic.target_id == event_id,
|
||
).delete(synchronize_session=False)
|
||
|
||
for item in normalized_topics:
|
||
db.add(
|
||
ExtractedTopic(
|
||
target_type=TargetType.EVENT,
|
||
target_id=event_id,
|
||
topic_keyword=item["keyword"],
|
||
relevance_score=item["score"],
|
||
)
|
||
)
|
||
|
||
|
||
async def generate_unified_summaries():
|
||
"""定时任务:对热度达标且未摘要的事件刷新标题、摘要、标签"""
|
||
print(f"[{utcnow()}] Start unified summary generation task...")
|
||
|
||
with SessionLocal() as db:
|
||
recent_threshold = utcnow() - timedelta(days=3)
|
||
events = db.query(UnifiedEvent).filter(
|
||
UnifiedEvent.hot_score >= HOT_SCORE_THRESHOLD,
|
||
UnifiedEvent.hot_score > UnifiedEvent.last_summarized_trends_count,
|
||
UnifiedEvent.created_at >= recent_threshold,
|
||
).all()
|
||
|
||
if not events:
|
||
print("No events require summary update in this round.")
|
||
return
|
||
|
||
event_ids = [e.id for e in events]
|
||
event_hot_scores = {e.id: e.hot_score for e in events}
|
||
|
||
for event_id in event_ids:
|
||
platform_dict: dict[str, set[str]] = {}
|
||
with SessionLocal() as db:
|
||
trends = (
|
||
db.query(TrendingEvent, InfoSource.source_name)
|
||
.join(InfoSource, TrendingEvent.source_id == InfoSource.id)
|
||
.filter(TrendingEvent.unified_event_id == event_id)
|
||
.all()
|
||
)
|
||
|
||
if not trends:
|
||
continue
|
||
|
||
for trend_record, source_name in trends:
|
||
platform_dict.setdefault(source_name, set()).add(trend_record.current_headline)
|
||
|
||
prompt_lines = [
|
||
f"[{platform}] {', '.join(sorted(headlines))}"
|
||
for platform, headlines in platform_dict.items()
|
||
]
|
||
platform_data_text = "\n".join(prompt_lines)
|
||
|
||
try:
|
||
# 大模型调用可能耗时几十秒,绝对不能把它包裹在数据库事务里
|
||
llm_result = await call_llm_for_summary(platform_data_text)
|
||
|
||
# 调用完成后,再开启一个新的极短事务,进行数据回写
|
||
with SessionLocal() as write_db:
|
||
event = write_db.query(UnifiedEvent).get(event_id)
|
||
if not event:
|
||
continue
|
||
|
||
if "unified_title" in llm_result and llm_result["unified_title"]:
|
||
event.unified_title = llm_result["unified_title"]
|
||
if "ai_comprehensive_summary" in llm_result and llm_result["ai_comprehensive_summary"]:
|
||
event.ai_comprehensive_summary = llm_result["ai_comprehensive_summary"]
|
||
|
||
hot_score = event_hot_scores.get(event_id, event.hot_score)
|
||
if hot_score >= TOPIC_TAG_MIN_HOT_SCORE:
|
||
topic_candidates = parse_topic_keywords(llm_result)
|
||
normalized_topics = normalize_topic_keywords(topic_candidates)
|
||
if normalized_topics:
|
||
replace_event_topics(write_db, event.id, normalized_topics)
|
||
|
||
event.last_summarized_trends_count = event.hot_score
|
||
write_db.commit()
|
||
|
||
print(
|
||
f"Updated event {event.id} summary"
|
||
f" (hot_score={event.hot_score})."
|
||
)
|
||
|
||
except Exception as exc:
|
||
print(f"Event {event_id} summary generation failed: {exc}")
|
||
continue
|