mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 01:57:51 +08:00
348 lines
13 KiB
Python
348 lines
13 KiB
Python
# app/services/fetcher_service.py
|
||
"""
|
||
抓取服务:从外部 API 拉取热搜/RSS 数据,做查重、向量聚类、入库
|
||
热搜分支:语义聚类到 UnifiedEvent;RSS 分支:写入 NewsArticle
|
||
"""
|
||
import os
|
||
import hashlib
|
||
from datetime import timedelta
|
||
|
||
import httpx
|
||
import json
|
||
import numpy as np
|
||
from dotenv import load_dotenv
|
||
from sklearn.metrics.pairwise import cosine_similarity
|
||
from sentence_transformers import SentenceTransformer
|
||
from app.database import SessionLocal
|
||
from app.models.models import (
|
||
InfoSource, TrendingEvent, NewsArticle, DataSyncTask, TaskStatus,
|
||
HeadlineRevision, RankingLog, SourceType, utcnow, UnifiedEvent
|
||
)
|
||
|
||
# 加载环境变量
|
||
load_dotenv()
|
||
hf_token = os.getenv("HF_TOKEN")
|
||
SIMILARITY_THRESHOLD = float(os.getenv("SIMILARITY_THRESHOLD", 0.72))
|
||
API_BASE_URL = os.getenv("API_BASE_URL", "https://newsnow.busiyi.world/api/s")
|
||
EMBEDDING_MODEL_PATH = os.getenv("EMBEDDING_MODEL_PATH", "")
|
||
|
||
print("正在加载模型...")
|
||
# 全局单例
|
||
embedder_model = SentenceTransformer(EMBEDDING_MODEL_PATH, local_files_only=True)
|
||
print("模型加载完成。")
|
||
|
||
|
||
def generate_md5(text: str) -> str:
|
||
"""生成 32 位 MD5 作为 external_id,用于跨平台去重"""
|
||
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
||
|
||
|
||
def generate_embeddings_batch(texts: list[str]) -> dict:
|
||
"""批量生成向量,返回 {text: (embedding_json, numpy_array)}"""
|
||
if not texts:
|
||
return {}
|
||
|
||
unique_texts = list(set(texts))
|
||
raw_vecs = embedder_model.encode(unique_texts, normalize_embeddings=True, show_progress_bar=False)
|
||
|
||
result = {}
|
||
for text, raw_vec in zip(unique_texts, raw_vecs):
|
||
truncated_vec = [round(float(x), 5) for x in raw_vec]
|
||
emb_json = json.dumps(truncated_vec, separators=(',', ':'))
|
||
result[text] = (emb_json, raw_vec)
|
||
|
||
return result
|
||
|
||
|
||
class UnifiedEventClusterer:
|
||
def __init__(self, db):
|
||
self.db = db
|
||
three_days_ago = utcnow() - timedelta(days=3)
|
||
recent_events = db.query(UnifiedEvent).filter(
|
||
UnifiedEvent.created_at >= three_days_ago
|
||
).order_by(UnifiedEvent.created_at.desc()).limit(300).all()
|
||
|
||
self.event_vectors = []
|
||
self.event_ids = []
|
||
for ev in recent_events:
|
||
if ev.center_embedding:
|
||
self.event_vectors.append(np.array(json.loads(ev.center_embedding)))
|
||
self.event_ids.append(ev.id)
|
||
|
||
def match_or_create(self, title: str, embedding_json: str, new_vec: np.ndarray) -> int:
|
||
"""语义相似则归入已有事件并累加热度,否则创建新 UnifiedEvent"""
|
||
if self.event_vectors:
|
||
# 批量矩阵计算相似度
|
||
sim_scores = cosine_similarity([new_vec], self.event_vectors)[0]
|
||
max_idx = np.argmax(sim_scores)
|
||
|
||
if sim_scores[max_idx] >= SIMILARITY_THRESHOLD:
|
||
matched_event_id = self.event_ids[max_idx]
|
||
# 更新热度
|
||
matched_event = self.db.query(UnifiedEvent).get(matched_event_id)
|
||
if matched_event:
|
||
matched_event.hot_score += 1
|
||
return matched_event_id
|
||
|
||
# 没匹配到,创建一个新的统一大事件
|
||
new_unified = UnifiedEvent(
|
||
unified_title=title,
|
||
center_embedding=embedding_json,
|
||
hot_score=1 # 初始热度
|
||
)
|
||
self.db.add(new_unified)
|
||
self.db.flush() # 获取自增的主键 ID
|
||
|
||
# 更新缓存
|
||
self.event_vectors.append(new_vec)
|
||
self.event_ids.append(new_unified.id)
|
||
|
||
return new_unified.id
|
||
|
||
|
||
def process_hot_trend_item(db, source, item, index: int, external_id: str, existing_event, embeddings_dict: dict, clusterer: UnifiedEventClusterer):
|
||
"""
|
||
处理【热搜/短新闻】的业务逻辑,现已加入 AI 聚类功能
|
||
"""
|
||
title = item.get("title")
|
||
item_url = item.get("url", "")
|
||
|
||
event_to_log = None
|
||
|
||
# 查重:已存在则可能只需更新标题/排名;不存在则需聚类并新建
|
||
if existing_event:
|
||
# 场景 A1:老熟人
|
||
if existing_event.current_headline != title:
|
||
# 标题被暗改,此时需要重新算一次 Embedding
|
||
new_embedding_json, _ = embeddings_dict[title]
|
||
|
||
revision = HeadlineRevision(
|
||
event_id=existing_event.id,
|
||
previous_headline=existing_event.current_headline,
|
||
revised_headline=title
|
||
)
|
||
db.add(revision)
|
||
existing_event.current_headline = title
|
||
existing_event.title_embedding = new_embedding_json # 更新为新标题的语义向量
|
||
# 注:这里不改变它所属的 unified_event_id,因为大体还是同一件事
|
||
|
||
existing_event.current_ranking = index
|
||
existing_event.event_url = item_url
|
||
event_to_log = existing_event
|
||
|
||
else:
|
||
# 场景 A2:这是一条彻底的全新热搜
|
||
# 1. 计算向量
|
||
new_embedding_json, new_vec = embeddings_dict[title]
|
||
|
||
# 2. 扔进聚类中枢找归宿
|
||
matched_event_id = clusterer.match_or_create(title, new_embedding_json, new_vec)
|
||
|
||
# 3. 落库
|
||
new_event = TrendingEvent(
|
||
source_id=source.id,
|
||
external_id=external_id,
|
||
current_headline=title,
|
||
event_url=item_url,
|
||
current_ranking=index,
|
||
title_embedding=new_embedding_json, # 存入向量
|
||
unified_event_id=matched_event_id # 挂载到大事件下
|
||
)
|
||
db.add(new_event)
|
||
db.flush()
|
||
event_to_log = new_event
|
||
|
||
# 强制记录排名轨迹
|
||
rank_log = RankingLog(
|
||
event_id=event_to_log.id,
|
||
ranking_position=index
|
||
)
|
||
db.add(rank_log)
|
||
|
||
|
||
def process_rss_feed_item(db, source, item, external_id: str, existing_article):
|
||
"""
|
||
处理【长文章/传统订阅】分支的核心业务逻辑 (写入 NewsArticle 表)
|
||
"""
|
||
title = item.get("title")
|
||
item_url = item.get("url", "")
|
||
|
||
if existing_article:
|
||
# 文章若存在,仅更新基础字段
|
||
existing_article.article_title = title
|
||
existing_article.article_url = item_url
|
||
else:
|
||
# 全新文章入库
|
||
new_article = NewsArticle(
|
||
source_id=source.id,
|
||
external_id=external_id,
|
||
article_title=title,
|
||
article_url=item_url,
|
||
)
|
||
db.add(new_article)
|
||
|
||
|
||
def process_source_data(db, source, items: list) -> int:
|
||
"""
|
||
数据清洗与路由分发层:
|
||
遍历 API 返回的 items,生成唯一指纹,并路由到不同的处理模块。
|
||
采用批量查重和批量向量计算优化性能,避免数据库锁死。
|
||
返回成功处理的条目数量。
|
||
"""
|
||
saved_count = 0
|
||
platform_id = source.home_url
|
||
|
||
# 1. 批量计算外部 ID 并聚合要计算的文本
|
||
valid_items = []
|
||
external_ids = []
|
||
for item in items:
|
||
title = item.get("title")
|
||
if not title:
|
||
continue
|
||
item_url = item.get("url", "")
|
||
raw_id = item.get("id") or item_url or title
|
||
external_id = generate_md5(f"{platform_id}_{raw_id}")
|
||
|
||
valid_items.append((item, external_id))
|
||
external_ids.append(external_id)
|
||
|
||
if not valid_items:
|
||
return 0
|
||
|
||
# 批量查重:按 external_id 判断是更新还是新增
|
||
existing_events_dict = {}
|
||
existing_articles_dict = {}
|
||
|
||
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
|
||
existing_events = db.query(TrendingEvent).filter(
|
||
TrendingEvent.source_id == source.id,
|
||
TrendingEvent.external_id.in_(external_ids)
|
||
).all()
|
||
existing_events_dict = {ev.external_id: ev for ev in existing_events}
|
||
elif source.source_type == SourceType.RSS_FEED:
|
||
existing_articles = db.query(NewsArticle).filter(
|
||
NewsArticle.source_id == source.id,
|
||
NewsArticle.external_id.in_(external_ids)
|
||
).all()
|
||
existing_articles_dict = {art.external_id: art for art in existing_articles}
|
||
|
||
# 仅对需要算向量的标题做批量 embedding,避免重复计算
|
||
texts_to_embed = []
|
||
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
|
||
for item, external_id in valid_items:
|
||
title = item.get("title")
|
||
existing_event = existing_events_dict.get(external_id)
|
||
if existing_event:
|
||
if existing_event.current_headline != title:
|
||
texts_to_embed.append(title)
|
||
else:
|
||
texts_to_embed.append(title)
|
||
|
||
# 4. 批量执行大模型推理
|
||
embeddings_dict = generate_embeddings_batch(texts_to_embed)
|
||
|
||
# 初始化聚类器(只在热搜模式下需要,且只初始化一次)
|
||
clusterer = None
|
||
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
|
||
clusterer = UnifiedEventClusterer(db)
|
||
|
||
# 按来源类型分流:热搜/API → TrendingEvent + 聚类;RSS → NewsArticle
|
||
for index, (item, external_id) in enumerate(valid_items, 1):
|
||
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
|
||
existing_event = existing_events_dict.get(external_id)
|
||
process_hot_trend_item(
|
||
db, source, item, index, external_id,
|
||
existing_event, embeddings_dict, clusterer
|
||
)
|
||
elif source.source_type == SourceType.RSS_FEED:
|
||
existing_article = existing_articles_dict.get(external_id)
|
||
process_rss_feed_item(db, source, item, external_id, existing_article)
|
||
|
||
saved_count += 1
|
||
|
||
return saved_count
|
||
|
||
|
||
async def fetch_and_save_trending_data():
|
||
"""
|
||
调度层:负责网络请求、数据库事务管理和异常监控隔离。
|
||
"""
|
||
print(f"[{utcnow()}] 开始执行定时抓取任务...")
|
||
|
||
# 获取启用的信息源 - 这个只读操作用一个短连接
|
||
with SessionLocal() as db:
|
||
sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all()
|
||
if not sources:
|
||
print("没有找到启用的信息源,任务结束。")
|
||
return
|
||
|
||
# 我们把 source 的信息提前提取出来,避免在异步中长期持有 session
|
||
source_configs = [
|
||
{
|
||
"id": s.id,
|
||
"home_url": s.home_url,
|
||
"source_name": s.source_name,
|
||
"source_type": s.source_type
|
||
}
|
||
for s in sources
|
||
]
|
||
|
||
# 伪装请求头,规避反爬
|
||
custom_headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Referer": "https://newsnow.busiyi.world/",
|
||
"Origin": "https://newsnow.busiyi.world"
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=15.0, headers=custom_headers) as client:
|
||
for s_config in source_configs:
|
||
platform_id = s_config["home_url"]
|
||
if not platform_id:
|
||
continue
|
||
|
||
url = f"{API_BASE_URL}?id={platform_id}&latest"
|
||
|
||
try:
|
||
# 1. 网络请求(可能耗时较长,不要包在 db session 里)
|
||
response = await client.get(url)
|
||
response.raise_for_status()
|
||
data_json = response.json()
|
||
items = data_json.get("items", [])
|
||
|
||
# 2. 数据库事务操作(尽量短,单独使用 session)
|
||
with SessionLocal() as db:
|
||
# 重新从短 session 中获取 source 实例,以免 detached
|
||
source = db.query(InfoSource).get(s_config["id"])
|
||
if not source:
|
||
continue
|
||
|
||
task_log = DataSyncTask(source_id=source.id, items_fetched=0)
|
||
try:
|
||
# 调用数据处理层
|
||
saved_count = process_source_data(db, source, items)
|
||
|
||
# 业务事务成功提交
|
||
task_log.items_fetched = saved_count
|
||
task_log.task_status = TaskStatus.SUCCESS
|
||
db.add(task_log)
|
||
db.commit()
|
||
print(f"[{source.source_name}] ({source.source_type}) 成功抓取并更新了 {saved_count} 条数据")
|
||
except Exception as e:
|
||
db.rollback()
|
||
raise e # 抛出给外层捕获记录日志
|
||
|
||
except Exception as e:
|
||
# 异常拦截与错误隔离,另起一个超短事务记录日志
|
||
with SessionLocal() as log_db:
|
||
try:
|
||
new_task_log = DataSyncTask(source_id=s_config["id"], items_fetched=0)
|
||
new_task_log.task_status = TaskStatus.ERROR
|
||
new_task_log.error_trace = str(e)
|
||
log_db.add(new_task_log)
|
||
log_db.commit()
|
||
print(f"[{s_config['source_name']}] 抓取失败: {e}")
|
||
except Exception as inner_e:
|
||
log_db.rollback()
|
||
print(f"[{s_config['source_name']}] 抓取失败,且日志写入失败: {e}, {inner_e}")
|