# app/services/summary_service.py import json import os from datetime import timedelta from typing import Any import numpy as np from openai import AsyncOpenAI from app.database import SessionLocal from app.models.models import ( ExtractedTopic, InfoSource, TargetType, TrendingEvent, UnifiedEvent, utcnow, ) from app.prompts.summary_prompts import ( SUMMARY_SYSTEM_PROMPT, SUMMARY_USER_PROMPT_TEMPLATE, ) from app.services.fetcher_service import embedder_model HOT_SCORE_THRESHOLD = int(os.getenv("HOT_SCORE_THRESHOLD", 3)) TOPIC_TAG_MIN_HOT_SCORE = int(os.getenv("TOPIC_TAG_MIN_HOT_SCORE", HOT_SCORE_THRESHOLD)) TOPIC_SIMILARITY_THRESHOLD = float(os.getenv("TOPIC_SIMILARITY_THRESHOLD", 0.82)) TOPIC_TAG_MAX_COUNT = int(os.getenv("TOPIC_TAG_MAX_COUNT", 8)) AI_API_KEY = os.getenv("AI_API_KEY", "") deepseek_client = AsyncOpenAI( api_key=AI_API_KEY, base_url="https://api.deepseek.com", ) async def call_llm_for_summary(platform_data_text: str) -> dict: """Call LLM for unified title, summary and topic candidates.""" prompt = SUMMARY_USER_PROMPT_TEMPLATE.format(platform_data_text=platform_data_text) response = await deepseek_client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": SUMMARY_SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, temperature=1, ) result_text = response.choices[0].message.content return json.loads(result_text) def _normalize_score(raw_score: Any) -> float | None: try: score = float(raw_score) except (TypeError, ValueError): return None if score <= 1: score *= 100 return max(0.0, min(100.0, score)) def parse_topic_keywords(llm_result: dict) -> list[dict[str, Any]]: """Parse topic keywords from LLM response; support list[str] and list[object].""" raw_topics = llm_result.get("topic_keywords") or [] parsed: list[dict[str, Any]] = [] seen: set[str] = set() for item in raw_topics: keyword = "" score = None if isinstance(item, str): keyword = item.strip() elif isinstance(item, dict): raw_keyword = ( item.get("keyword") or item.get("topic_keyword") or item.get("name") or item.get("topic") or "" ) keyword = str(raw_keyword).strip() score = _normalize_score(item.get("relevance_score") or item.get("score")) if not keyword: continue keyword = keyword[:100] normalized_key = keyword.casefold() if normalized_key in seen: continue seen.add(normalized_key) parsed.append({"keyword": keyword, "score": score}) return parsed def normalize_topic_keywords(topic_candidates: list[dict[str, Any]]) -> list[dict[str, Any]]: """Deduplicate semantically similar tags using embedding similarity.""" if not topic_candidates: return [] keywords = [item["keyword"] for item in topic_candidates] vectors = embedder_model.encode(keywords, normalize_embeddings=True, show_progress_bar=False) clusters: list[dict[str, Any]] = [] for item, vector in zip(topic_candidates, vectors): vec = np.asarray(vector, dtype=np.float32) best_idx = -1 best_sim = -1.0 for idx, cluster in enumerate(clusters): sim = float(np.dot(vec, cluster["vector"])) if sim > best_sim: best_sim = sim best_idx = idx if best_idx >= 0 and best_sim >= TOPIC_SIMILARITY_THRESHOLD: cluster = clusters[best_idx] merged = cluster["vector"] * cluster["count"] + vec norm = float(np.linalg.norm(merged)) if norm > 0: cluster["vector"] = merged / norm cluster["count"] += 1 if item["score"] is not None and ( cluster["score"] is None or item["score"] > cluster["score"] ): cluster["score"] = item["score"] # Prefer shorter tag as canonical keyword. if len(item["keyword"]) < len(cluster["keyword"]): cluster["keyword"] = item["keyword"] else: clusters.append( { "keyword": item["keyword"], "score": item["score"], "vector": vec, "count": 1, } ) if any(cluster["score"] is not None for cluster in clusters): clusters.sort(key=lambda x: x["score"] if x["score"] is not None else -1.0, reverse=True) result = [ {"keyword": cluster["keyword"], "score": cluster["score"]} for cluster in clusters[:TOPIC_TAG_MAX_COUNT] ] return result def replace_event_topics(db, event_id: int, normalized_topics: list[dict[str, Any]]) -> None: """Replace EVENT tags for one unified event atomically within current transaction.""" db.query(ExtractedTopic).filter( ExtractedTopic.target_type == TargetType.EVENT, ExtractedTopic.target_id == event_id, ).delete(synchronize_session=False) for item in normalized_topics: db.add( ExtractedTopic( target_type=TargetType.EVENT, target_id=event_id, topic_keyword=item["keyword"], relevance_score=item["score"], ) ) async def generate_unified_summaries(): """Scheduled task: refresh summaries and topic tags for hot unified events.""" print(f"[{utcnow()}] Start unified summary generation task...") # 先提取需要处理的事件 ID,尽早释放 session,不长期占用 db session with SessionLocal() as db: recent_threshold = utcnow() - timedelta(days=3) events = db.query(UnifiedEvent).filter( UnifiedEvent.hot_score >= HOT_SCORE_THRESHOLD, UnifiedEvent.hot_score > UnifiedEvent.last_summarized_trends_count, UnifiedEvent.created_at >= recent_threshold, ).all() if not events: print("No events require summary update in this round.") return # 复制出需要的信息,脱离 session event_ids = [e.id for e in events] event_hot_scores = {e.id: e.hot_score for e in events} # 外层循环:针对每个 event_id 开启一个极短生命周期的 session 获取依赖数据 for event_id in event_ids: platform_dict: dict[str, set[str]] = {} with SessionLocal() as db: trends = ( db.query(TrendingEvent, InfoSource.source_name) .join(InfoSource, TrendingEvent.source_id == InfoSource.id) .filter(TrendingEvent.unified_event_id == event_id) .all() ) if not trends: continue for trend_record, source_name in trends: platform_dict.setdefault(source_name, set()).add(trend_record.current_headline) prompt_lines = [ f"[{platform}] {', '.join(sorted(headlines))}" for platform, headlines in platform_dict.items() ] platform_data_text = "\n".join(prompt_lines) try: # 大模型调用可能耗时几十秒,绝对不能把它包裹在数据库事务里 llm_result = await call_llm_for_summary(platform_data_text) # 调用完成后,再开启一个新的极短事务,进行数据回写 with SessionLocal() as write_db: event = write_db.query(UnifiedEvent).get(event_id) if not event: continue if "unified_title" in llm_result and llm_result["unified_title"]: event.unified_title = llm_result["unified_title"] if "ai_comprehensive_summary" in llm_result and llm_result["ai_comprehensive_summary"]: event.ai_comprehensive_summary = llm_result["ai_comprehensive_summary"] hot_score = event_hot_scores.get(event_id, event.hot_score) if hot_score >= TOPIC_TAG_MIN_HOT_SCORE: topic_candidates = parse_topic_keywords(llm_result) normalized_topics = normalize_topic_keywords(topic_candidates) if normalized_topics: replace_event_topics(write_db, event.id, normalized_topics) event.last_summarized_trends_count = event.hot_score write_db.commit() print( f"Updated event {event.id} summary" f" (hot_score={event.hot_score})." ) except Exception as exc: print(f"Event {event_id} summary generation failed: {exc}") continue