# app/services/fetcher_service.py import os import hashlib import httpx from dotenv import load_dotenv from app.database import SessionLocal from app.models.models import ( InfoSource, TrendingEvent, NewsArticle, DataSyncTask, TaskStatus, HeadlineRevision, RankingLog, SourceType, utcnow ) # ========================================== # 环境变量与全局配置 # ========================================== # 加载环境变量 load_dotenv() # 从环境变量获取 API 基础地址,如果没有配置则提供默认回退地址 API_BASE_URL = os.getenv("API_BASE_URL", "https://newsnow.busiyi.world/api/s") def generate_md5(text: str) -> str: """ 生成32位MD5哈希值 维护说明: 各个平台(微博、知乎、微信等)返回的原始 ID 格式千奇百怪(有长数字、有UUID、有URL)。 为了方便数据库建立统一的高性能唯一索引(UniqueConstraint), 我们统一将其转为长度固定的 32 位 MD5 字符串作为 external_id。 """ return hashlib.md5(text.encode('utf-8')).hexdigest() async def fetch_and_save_trending_data(): """ 核心定时任务:从数据库读取信息源 -> 抓取API -> 解析 -> 根据业务类型分流存入对应的数据库表 执行流程: 1. 查询所有配置为“已启用”的信息源 (is_enabled == True)。 2. 伪装 HTTP 请求头,规避目标服务器的反爬机制。 3. 遍历解析数据,生成 MD5 唯一指纹进行全局去重。 4. 核心路由分流: - 若源为 HOT_TREND/API,按热搜逻辑处理(记录名次轨迹、标题变更)。 - 若源为 RSS_FEED,按长文章逻辑处理(忽略名次,直接落库)。 5. 严格的事务管理:成功则统一提交,报错则回滚业务数据并独立提交错误日志。 """ print(f"[{utcnow()}] 开始执行定时抓取任务...") # 使用上下文管理器确保数据库连接池正确获取和归还连接 with SessionLocal() as db: # 1. 动态获取抓取源。 # 优势:在后台修改数据库的信息源开关,下一次定时任务立刻生效,无需重启服务。 sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all() if not sources: print("没有找到启用的信息源,任务结束。") return # 2. 伪装成真实的浏览器 HTTP 请求头 # 维护注意:如果抓取接口返回 403 Forbidden,通常是这里的反爬策略失效了,需要更新 User-Agent 或 Cookie custom_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Referer": "https://newsnow.busiyi.world/", "Origin": "https://newsnow.busiyi.world" } # 复用异步 HTTP 客户端,比每次循环新建 Client 性能更高 async with httpx.AsyncClient(timeout=15.0, headers=custom_headers) as client: for source in sources: # platform_id 对应第三方接口的入参标识,如 "weibo", "zhihu" 等 platform_id = source.home_url if not platform_id: continue # ========================================== # 【技术债预警 / TODO】 # 目前无论 source_type 是什么,都统一请求了这个 JSON API。 # 未来如果加入了真正的外部 RSS 订阅源(返回的是 XML 格式), # 这里需要增加判断逻辑:如果是 RSS_FEED,应当使用 feedparser 库去解析 XML,而不是用 httpx 获取 JSON。 # ========================================== url = f"{API_BASE_URL}?id={platform_id}&latest" # 初始化本次特定信息源抓取任务的系统监控日志 task_log = DataSyncTask(source_id=source.id, items_fetched=0) try: # 发起请求并校验 HTTP 状态码 (非 2xx 会抛出异常进入 except 块) response = await client.get(url) response.raise_for_status() data_json = response.json() items = data_json.get("items", []) saved_count = 0 for index, item in enumerate(items, 1): title = item.get("title") if not title: continue item_url = item.get("url", "") # 3. ID 兜底与去重策略 # 优先用接口自带的 ID -> 没有则用 URL 代替 -> 最差情况用 title 兜底 raw_id = item.get("id") or item_url or title # 组合“平台标识+原始ID”算出全局唯一的 MD5 外部标识 external_id = generate_md5(f"{platform_id}_{raw_id}") # ========================================== # 4. 核心数据分流路由 # 根据信息源的业务类型,将数据推入不同的物理表 # ========================================== if source.source_type in (SourceType.HOT_TREND, SourceType.API): # -------------------------------------------------- # 分支 A:热搜/短新闻逻辑 -> 写入 TrendingEvent 表 # -------------------------------------------------- existing_event = db.query(TrendingEvent).filter( TrendingEvent.source_id == source.id, TrendingEvent.external_id == external_id ).first() event_to_log = None # 临时指针,用于后续绑定排名轨迹 if existing_event: # 场景 A1:该热搜已经在数据库中 # 监控并记录“标题暗改”(常见于热搜公关介入) if existing_event.current_headline != title: revision = HeadlineRevision( event_id=existing_event.id, previous_headline=existing_event.current_headline, revised_headline=title ) db.add(revision) existing_event.current_headline = title # 覆盖为主表最新标题 # 更新当前的实时排名和 URL existing_event.current_ranking = index existing_event.event_url = item_url event_to_log = existing_event else: # 场景 A2:发现全新热搜 new_event = TrendingEvent( source_id=source.id, external_id=external_id, current_headline=title, event_url=item_url, current_ranking=index, ) db.add(new_event) # db.flush() 是关键:它将数据推给数据库生成了自增的主键 ID,但尚未最终 commit。 # 拿到合法的 event_to_log.id db.flush() event_to_log = new_event # 排名轨迹强制记录 # 只要抓到了热搜(无论新旧),必须打点记录当前名次,用于前端绘制排名趋势图 rank_log = RankingLog( event_id=event_to_log.id, ranking_position=index ) db.add(rank_log) elif source.source_type == SourceType.RSS_FEED: # -------------------------------------------------- # 分支 B:长文章/传统订阅逻辑 -> 写入 NewsArticle 表 # -------------------------------------------------- existing_article = db.query(NewsArticle).filter( NewsArticle.source_id == source.id, NewsArticle.external_id == external_id ).first() if existing_article: # 文章如果已存在,通常只需要更新基础字段(文章一般不涉及排名起伏) existing_article.article_title = title existing_article.article_url = item_url # 预留位置:如果以后接口返回了摘要,可以在这里 update existing_article.original_summary else: # 全新文章入库 new_article = NewsArticle( source_id=source.id, external_id=external_id, article_title=title, article_url=item_url, # original_summary=item.get("desc", ""), # author_name=item.get("author", "") ) db.add(new_article) saved_count += 1 # -------------------------------------------------- # 5. 业务事务成功提交 # -------------------------------------------------- # 只有当前平台(source)的所有 item 都顺畅走完,才标记成功 task_log.items_fetched = saved_count task_log.task_status = TaskStatus.SUCCESS db.add(task_log) # 统一将当前信息源爬取的所有业务数据持久化到硬盘 db.commit() print(f"[{source.source_name}] ({source.source_type}) 成功抓取并更新了 {saved_count} 条数据") except Exception as e: # -------------------------------------------------- # 6. 异常拦截与错误隔离机制 # -------------------------------------------------- # 回滚本次抓取的全部脏数据, db.rollback() # 错误日志记下来 task_log.task_status = TaskStatus.ERROR task_log.error_trace = str(e) db.add(task_log) db.commit() print(f"[{source.source_name}] 抓取失败: {e}")