backend 去ai化

This commit is contained in:
2026-04-20 15:53:02 +08:00
parent 7a34fc0079
commit bba6de25ac
28 changed files with 161 additions and 228 deletions
+11 -34
View File
@@ -1,8 +1,3 @@
# app/services/fetcher_service.py
"""
抓取服务:从外部 API 拉取热搜/RSS 数据,做查重、向量聚类、入库
热搜分支:语义聚类到 UnifiedEventRSS 分支:写入 NewsArticle
"""
import os
import hashlib
from datetime import timedelta
@@ -19,6 +14,8 @@ from app.models.models import (
HeadlineRevision, RankingLog, SourceType, utcnow, UnifiedEvent
)
# AI辅助生成:deepseek-v3-22026年3月20日
# 加载环境变量
load_dotenv()
hf_token = os.getenv("HF_TOKEN")
@@ -31,6 +28,8 @@ print("正在加载模型...")
embedder_model = SentenceTransformer(EMBEDDING_MODEL_PATH, local_files_only=True)
print("模型加载完成。")
# AI生成结束
def generate_md5(text: str) -> str:
"""生成 32 位 MD5 作为 external_id,用于跨平台去重"""
@@ -88,10 +87,10 @@ class UnifiedEventClusterer:
new_unified = UnifiedEvent(
unified_title=title,
center_embedding=embedding_json,
hot_score=1 # 初始热度
hot_score=1
)
self.db.add(new_unified)
self.db.flush() # 获取自增的主键 ID
self.db.flush()
# 更新缓存
self.event_vectors.append(new_vec)
@@ -109,11 +108,8 @@ def process_hot_trend_item(db, source, item, index: int, external_id: str, exist
event_to_log = None
# 查重:已存在则可能只需更新标题/排名;不存在则需聚类并新建
if existing_event:
# 场景 A1:老熟人
if existing_event.current_headline != title:
# 标题被暗改,此时需要重新算一次 Embedding
new_embedding_json, _ = embeddings_dict[title]
revision = HeadlineRevision(
@@ -123,30 +119,25 @@ def process_hot_trend_item(db, source, item, index: int, external_id: str, exist
)
db.add(revision)
existing_event.current_headline = title
existing_event.title_embedding = new_embedding_json # 更新为新标题的语义向量
# 注:这里不改变它所属的 unified_event_id,因为大体还是同一件事
existing_event.title_embedding = new_embedding_json
existing_event.current_ranking = index
existing_event.event_url = item_url
event_to_log = existing_event
else:
# 场景 A2:这是一条彻底的全新热搜
# 1. 计算向量
new_embedding_json, new_vec = embeddings_dict[title]
# 2. 扔进聚类中枢找归宿
new_embedding_json, new_vec = embeddings_dict[title]
matched_event_id = clusterer.match_or_create(title, new_embedding_json, new_vec)
# 3. 落库
new_event = TrendingEvent(
source_id=source.id,
external_id=external_id,
current_headline=title,
event_url=item_url,
current_ranking=index,
title_embedding=new_embedding_json, # 存入向量
unified_event_id=matched_event_id # 挂载到大事件下
title_embedding=new_embedding_json,
unified_event_id=matched_event_id
)
db.add(new_event)
db.flush()
@@ -192,7 +183,6 @@ def process_source_data(db, source, items: list) -> int:
saved_count = 0
platform_id = source.home_url
# 1. 批量计算外部 ID 并聚合要计算的文本
valid_items = []
external_ids = []
for item in items:
@@ -209,7 +199,6 @@ def process_source_data(db, source, items: list) -> int:
if not valid_items:
return 0
# 批量查重:按 external_id 判断是更新还是新增
existing_events_dict = {}
existing_articles_dict = {}
@@ -226,7 +215,6 @@ def process_source_data(db, source, items: list) -> int:
).all()
existing_articles_dict = {art.external_id: art for art in existing_articles}
# 仅对需要算向量的标题做批量 embedding,避免重复计算
texts_to_embed = []
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
for item, external_id in valid_items:
@@ -238,15 +226,12 @@ def process_source_data(db, source, items: list) -> int:
else:
texts_to_embed.append(title)
# 4. 批量执行大模型推理
embeddings_dict = generate_embeddings_batch(texts_to_embed)
# 初始化聚类器(只在热搜模式下需要,且只初始化一次)
clusterer = None
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
clusterer = UnifiedEventClusterer(db)
# 按来源类型分流:热搜/API → TrendingEvent + 聚类;RSS → NewsArticle
for index, (item, external_id) in enumerate(valid_items, 1):
if source.source_type in (SourceType.HOT_TREND, SourceType.API):
existing_event = existing_events_dict.get(external_id)
@@ -269,14 +254,12 @@ async def fetch_and_save_trending_data():
"""
print(f"[{utcnow()}] 开始执行定时抓取任务...")
# 获取启用的信息源 - 这个只读操作用一个短连接
with SessionLocal() as db:
sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all()
if not sources:
print("没有找到启用的信息源,任务结束。")
return
# 我们把 source 的信息提前提取出来,避免在异步中长期持有 session
source_configs = [
{
"id": s.id,
@@ -287,7 +270,6 @@ async def fetch_and_save_trending_data():
for s in sources
]
# 伪装请求头,规避反爬
custom_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
@@ -304,13 +286,11 @@ async def fetch_and_save_trending_data():
url = f"{API_BASE_URL}?id={platform_id}&latest"
try:
# 1. 网络请求(可能耗时较长,不要包在 db session 里)
response = await client.get(url)
response.raise_for_status()
data_json = response.json()
items = data_json.get("items", [])
# 2. 数据库事务操作(尽量短,单独使用 session)
with SessionLocal() as db:
# 重新从短 session 中获取 source 实例,以免 detached
source = db.query(InfoSource).get(s_config["id"])
@@ -319,10 +299,8 @@ async def fetch_and_save_trending_data():
task_log = DataSyncTask(source_id=source.id, items_fetched=0)
try:
# 调用数据处理层
saved_count = process_source_data(db, source, items)
# 业务事务成功提交
task_log.items_fetched = saved_count
task_log.task_status = TaskStatus.SUCCESS
db.add(task_log)
@@ -330,10 +308,9 @@ async def fetch_and_save_trending_data():
print(f"[{source.source_name}] ({source.source_type}) 成功抓取并更新了 {saved_count} 条数据")
except Exception as e:
db.rollback()
raise e # 抛出给外层捕获记录日志
raise e
except Exception as e:
# 异常拦截与错误隔离,另起一个超短事务记录日志
with SessionLocal() as log_db:
try:
new_task_log = DataSyncTask(source_id=s_config["id"], items_fetched=0)