From 9fa07cfb0789b534a0eb9b483827db1b6f1cf85f Mon Sep 17 00:00:00 2001 From: stardrophere <1925008984@qq.com> Date: Tue, 10 Mar 2026 09:20:23 +0800 Subject: [PATCH] small update --- .gitignore | 2 + backend/app/database.py | 25 ++- backend/app/services/fetcher_service.py | 213 +++++++++++++++--------- 3 files changed, 158 insertions(+), 82 deletions(-) diff --git a/.gitignore b/.gitignore index 2f616d9..3f485da 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class +*.db-shm +*.db-wal *.db .env .idea diff --git a/backend/app/database.py b/backend/app/database.py index 984662a..30c2c4d 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -1,5 +1,5 @@ # database.py -from sqlalchemy import create_engine +from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker # SQLite 数据库文件位置 @@ -10,13 +10,34 @@ engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) + +# ========================================== +# 监听 SQLite 连接,注入 PRAGMA 指令 +# ========================================== +@event.listens_for(engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + + # 1. 开启 WAL 模式:读写分离 + cursor.execute("PRAGMA journal_mode=WAL;") + + # 2. 优化同步模式:在 WAL 模式下,NORMAL 既能保证不丢数据,又能大幅提升写入速度 + cursor.execute("PRAGMA synchronous=NORMAL;") + + # 3. 强制开启外键约束:极其重要!SQLite 默认不检查外键,这行能保护你的多态关联不乱套 + cursor.execute("PRAGMA foreign_keys=ON;") + + cursor.close() + + # 创建数据库会话工厂 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + # 依赖注入函数:每个请求过来时,给它发一个数据库连接,请求结束时自动关闭 def get_db(): db = SessionLocal() try: yield db finally: - db.close() \ No newline at end of file + db.close() diff --git a/backend/app/services/fetcher_service.py b/backend/app/services/fetcher_service.py index a6f016a..a6b7423 100644 --- a/backend/app/services/fetcher_service.py +++ b/backend/app/services/fetcher_service.py @@ -6,48 +6,57 @@ from dotenv import load_dotenv from app.database import SessionLocal from app.models.models import ( - InfoSource, TrendingEvent, DataSyncTask, TaskStatus, - HeadlineRevision, RankingLog, utcnow + InfoSource, TrendingEvent, NewsArticle, DataSyncTask, TaskStatus, + HeadlineRevision, RankingLog, SourceType, utcnow ) -# 加载 .env 文件中的环境变量 +# ========================================== +# 环境变量与全局配置 +# ========================================== +# 加载环境变量 load_dotenv() -# 从环境变量获取 API 基础地址,提供默认回退地址 +# 从环境变量获取 API 基础地址,如果没有配置则提供默认回退地址 API_BASE_URL = os.getenv("API_BASE_URL", "https://newsnow.busiyi.world/api/s") def generate_md5(text: str) -> str: """ - 生成32位MD5哈希 - 用途:为不同平台的数据生成统一的、长度固定的外部唯一标识(external_id), - 方便建立数据库的唯一索引,防止同一条热搜重复插入。 + 生成32位MD5哈希值 + 维护说明: + 各个平台(微博、知乎、微信等)返回的原始 ID 格式千奇百怪(有长数字、有UUID、有URL)。 + 为了方便数据库建立统一的高性能唯一索引(UniqueConstraint), + 我们统一将其转为长度固定的 32 位 MD5 字符串作为 external_id。 """ return hashlib.md5(text.encode('utf-8')).hexdigest() async def fetch_and_save_trending_data(): """ - 核心定时任务:从数据库读取信息源 -> 抓取API -> 解析并存入数据库 + 核心定时任务:从数据库读取信息源 -> 抓取API -> 解析 -> 根据业务类型分流存入对应的数据库表 - 业务流程: - 1. 查询所有已启用的信息源 (is_enabled == True) - 2. 伪装成浏览器发起异步 HTTP 请求 - 3. 遍历解析数据,进行去重判断 (MD5) - 4. 记录标题变更轨迹 (HeadlineRevision) 和 热搜排名轨迹 (RankingLog) - 5. 统一提交或在发生异常时回滚脏数据 + 执行流程: + 1. 查询所有配置为“已启用”的信息源 (is_enabled == True)。 + 2. 伪装 HTTP 请求头,规避目标服务器的反爬机制。 + 3. 遍历解析数据,生成 MD5 唯一指纹进行全局去重。 + 4. 核心路由分流: + - 若源为 HOT_TREND/API,按热搜逻辑处理(记录名次轨迹、标题变更)。 + - 若源为 RSS_FEED,按长文章逻辑处理(忽略名次,直接落库)。 + 5. 严格的事务管理:成功则统一提交,报错则回滚业务数据并独立提交错误日志。 """ print(f"[{utcnow()}] 开始执行定时抓取任务...") - # 使用上下文管理器确保数据库连接池正确归还连接 + # 使用上下文管理器确保数据库连接池正确获取和归还连接 with SessionLocal() as db: - # 1. 动态获取抓取源,这样在后台开关信息源不需要重启服务 + # 1. 动态获取抓取源。 + # 优势:在后台修改数据库的信息源开关,下一次定时任务立刻生效,无需重启服务。 sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all() if not sources: print("没有找到启用的信息源,任务结束。") return - # 2. 伪装 HTTP 请求头,绕过目标服务器的反爬/防盗链机制 + # 2. 伪装成真实的浏览器 HTTP 请求头 + # 维护注意:如果抓取接口返回 403 Forbidden,通常是这里的反爬策略失效了,需要更新 User-Agent 或 Cookie custom_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", @@ -55,21 +64,27 @@ async def fetch_and_save_trending_data(): "Origin": "https://newsnow.busiyi.world" } - # 复用异步客户端,提高并发抓取效率 + # 复用异步 HTTP 客户端,比每次循环新建 Client 性能更高 async with httpx.AsyncClient(timeout=15.0, headers=custom_headers) as client: for source in sources: - # 平台标识,如 "weibo", "zhihu" 等,这里复用了 home_url 字段存储 + # platform_id 对应第三方接口的入参标识,如 "weibo", "zhihu" 等 platform_id = source.home_url if not platform_id: continue - # 拼装最终的抓取 URL + # ========================================== + # 【技术债预警 / TODO】 + # 目前无论 source_type 是什么,都统一请求了这个 JSON API。 + # 未来如果加入了真正的外部 RSS 订阅源(返回的是 XML 格式), + # 这里需要增加判断逻辑:如果是 RSS_FEED,应当使用 feedparser 库去解析 XML,而不是用 httpx 获取 JSON。 + # ========================================== url = f"{API_BASE_URL}?id={platform_id}&latest" - # 初始化本次抓取任务的日志记录 + + # 初始化本次特定信息源抓取任务的系统监控日志 task_log = DataSyncTask(source_id=source.id, items_fetched=0) try: - # 发起请求并校验 HTTP 状态码 + # 发起请求并校验 HTTP 状态码 (非 2xx 会抛出异常进入 except 块) response = await client.get(url) response.raise_for_status() data_json = response.json() @@ -84,84 +99,122 @@ async def fetch_and_save_trending_data(): item_url = item.get("url", "") - # 3. ID 去重策略:优先用接口自带 ID,没有则用 URL,最差情况用标题兜底 + # 3. ID 兜底与去重策略 + # 优先用接口自带的 ID -> 没有则用 URL 代替 -> 最差情况用 title 兜底 raw_id = item.get("id") or item_url or title - # 组合平台标识和原始 ID 算出全局唯一的 MD5 + # 组合“平台标识+原始ID”算出全局唯一的 MD5 外部标识 external_id = generate_md5(f"{platform_id}_{raw_id}") - # 4. 在数据库中查询是否已经存在这条热搜 - existing_event = db.query(TrendingEvent).filter( - TrendingEvent.source_id == source.id, - TrendingEvent.external_id == external_id - ).first() + # ========================================== + # 4. 核心数据分流路由 + # 根据信息源的业务类型,将数据推入不同的物理表 + # ========================================== - event_to_log = None # 留个指针,用来后续记录名次历史 + if source.source_type in (SourceType.HOT_TREND, SourceType.API): + # -------------------------------------------------- + # 分支 A:热搜/短新闻逻辑 -> 写入 TrendingEvent 表 + # -------------------------------------------------- + existing_event = db.query(TrendingEvent).filter( + TrendingEvent.source_id == source.id, + TrendingEvent.external_id == external_id + ).first() - if existing_event: - # -------- 分支 A:热搜已存在,执行更新逻辑 -------- + event_to_log = None # 临时指针,用于后续绑定排名轨迹 - # 监控标题变化(例如:微博热搜经常会改词条名字) - if existing_event.current_headline != title: - # 标题发生改变!立刻记录到修订历史表 - revision = HeadlineRevision( - event_id=existing_event.id, - previous_headline=existing_event.current_headline, - revised_headline=title + if existing_event: + # 场景 A1:该热搜已经在数据库中 + + # 监控并记录“标题暗改”(常见于热搜公关介入) + if existing_event.current_headline != title: + revision = HeadlineRevision( + event_id=existing_event.id, + previous_headline=existing_event.current_headline, + revised_headline=title + ) + db.add(revision) + existing_event.current_headline = title # 覆盖为主表最新标题 + + # 更新当前的实时排名和 URL + existing_event.current_ranking = index + existing_event.event_url = item_url + event_to_log = existing_event + else: + # 场景 A2:发现全新热搜 + new_event = TrendingEvent( + source_id=source.id, + external_id=external_id, + current_headline=title, + event_url=item_url, + current_ranking=index, ) - db.add(revision) - existing_event.current_headline = title # 更新为主表最新标题 + db.add(new_event) + # db.flush() 是关键:它将数据推给数据库生成了自增的主键 ID,但尚未最终 commit。 + # 拿到合法的 event_to_log.id + db.flush() + event_to_log = new_event - # 更新当前最新的排名和链接 - existing_event.current_ranking = index - existing_event.event_url = item_url - - event_to_log = existing_event - else: - # -------- 分支 B:全新热搜,执行插入逻辑 -------- - new_event = TrendingEvent( - source_id=source.id, - external_id=external_id, - current_headline=title, - event_url=item_url, - current_ranking=index, + # 排名轨迹强制记录 + # 只要抓到了热搜(无论新旧),必须打点记录当前名次,用于前端绘制排名趋势图 + rank_log = RankingLog( + event_id=event_to_log.id, + ranking_position=index ) - db.add(new_event) - # 核心操作!flush 会将数据推入数据库生成自增的 ID,但不提交事务 (commit)。 - # 这样接下来的 RankingLog 就能立刻拿到 `new_event.id` 作为外键。 - db.flush() - event_to_log = new_event + db.add(rank_log) + + elif source.source_type == SourceType.RSS_FEED: + # -------------------------------------------------- + # 分支 B:长文章/传统订阅逻辑 -> 写入 NewsArticle 表 + # -------------------------------------------------- + existing_article = db.query(NewsArticle).filter( + NewsArticle.source_id == source.id, + NewsArticle.external_id == external_id + ).first() + + if existing_article: + # 文章如果已存在,通常只需要更新基础字段(文章一般不涉及排名起伏) + existing_article.article_title = title + existing_article.article_url = item_url + # 预留位置:如果以后接口返回了摘要,可以在这里 update existing_article.original_summary + else: + # 全新文章入库 + new_article = NewsArticle( + source_id=source.id, + external_id=external_id, + article_title=title, + article_url=item_url, + # original_summary=item.get("desc", ""), + # author_name=item.get("author", "") + ) + db.add(new_article) + - # -------- 无论新旧,统一记录排名轨迹 -------- - # 只要抓取到,不管新旧,必须记一笔当前的排名! <--- - # 借助这个表,后续可以画出某条热搜随时间变化的“排名上升/下降曲线” - rank_log = RankingLog( - event_id=event_to_log.id, - ranking_position=index - ) - db.add(rank_log) saved_count += 1 - # 如果这一个平台的数据全部处理顺利,标记成功并整体提交 + # -------------------------------------------------- + # 5. 业务事务成功提交 + # -------------------------------------------------- + # 只有当前平台(source)的所有 item 都顺畅走完,才标记成功 task_log.items_fetched = saved_count task_log.task_status = TaskStatus.SUCCESS db.add(task_log) - db.commit() # ✅ 只在 try 顺利走完整个平台的数据时,才统一提交业务数据到硬盘 - print(f"[{source.source_name}] 成功抓取并更新了 {saved_count} 条数据") + + # 统一将当前信息源爬取的所有业务数据持久化到硬盘 + db.commit() + print(f"[{source.source_name}] ({source.source_type}) 成功抓取并更新了 {saved_count} 条数据") except Exception as e: - # -------- 异常处理机制 -------- - # 独立日志记录 - - # 第一步:遇到报错(如网络中断、解析错误),立刻回滚。 - # 丢弃这批脏数据,防止数据库出现一半更新一半没更新的“不一致状态”。 + # -------------------------------------------------- + # 6. 异常拦截与错误隔离机制 + # -------------------------------------------------- + # 回滚本次抓取的全部脏数据, db.rollback() - # 第二步:记录错误日志并独立提交。 - # 因为上面执行了 rollback,之前 add 的 task_log 也被清空了状态, - # 此时重新设置状态,并作为一次新的独立事务提交到数据库,方便后台监控报错。 + # 错误日志记下来 task_log.task_status = TaskStatus.ERROR task_log.error_trace = str(e) db.add(task_log) - db.commit() # 提交错误日志本身 - print(f"[{source.source_name}] 抓取失败: {e}") + + + db.commit() + print(f"[{source.source_name}] 抓取失败: {e}") \ No newline at end of file