From 5b541bbea39233935e5fdab1ebe3976a11b6476b Mon Sep 17 00:00:00 2001 From: stardrophere <1925008984@qq.com> Date: Mon, 9 Mar 2026 18:13:35 +0800 Subject: [PATCH] refresh --- .gitignore | 3 +- backend/app/api/dependencies.py | 0 backend/app/api/endpoints/sources.py | 45 +++++++ backend/app/api/endpoints/trends.py | 0 backend/app/api/router.py | 8 ++ backend/app/crud/crud_source.py | 40 ++++++ backend/app/main.py | 94 +++++++------ backend/app/models/models.py | 113 ++++++++++++---- backend/app/schemas/schemas.py | 69 ++++++++++ backend/app/services/fetcher_service.py | 167 ++++++++++++++++++++++++ backend/data/demo.db | Bin 151552 -> 0 bytes 11 files changed, 464 insertions(+), 75 deletions(-) create mode 100644 backend/app/api/dependencies.py create mode 100644 backend/app/api/endpoints/sources.py create mode 100644 backend/app/api/endpoints/trends.py create mode 100644 backend/app/api/router.py create mode 100644 backend/app/crud/crud_source.py create mode 100644 backend/app/schemas/schemas.py create mode 100644 backend/app/services/fetcher_service.py delete mode 100644 backend/data/demo.db diff --git a/.gitignore b/.gitignore index 31e7f15..2f616d9 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,8 @@ __pycache__/ *.py[cod] *$py.class - +*.db +.env .idea # C extensions *.so diff --git a/backend/app/api/dependencies.py b/backend/app/api/dependencies.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/endpoints/sources.py b/backend/app/api/endpoints/sources.py new file mode 100644 index 0000000..eda2606 --- /dev/null +++ b/backend/app/api/endpoints/sources.py @@ -0,0 +1,45 @@ +# app/api/endpoints/sources.py +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from typing import List + +from app.database import get_db +from app.schemas.schemas import ( + InfoSourceCreate, InfoSourceUpdate, InfoSourceResponse, PaginatedResponse +) +from app.crud import crud_source + +router = APIRouter() + + +@router.post("/", response_model=InfoSourceResponse, status_code=status.HTTP_201_CREATED) +async def create_info_source(source_in: InfoSourceCreate, db: Session = Depends(get_db)): + """新建一个信息源""" + return crud_source.create(db=db, obj_in=source_in) + + +@router.get("/", response_model=PaginatedResponse[InfoSourceResponse]) +async def get_all_sources(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): + """获取所有信息源列表(加入了分页参数)""" + sources = crud_source.get_multi(db=db, skip=skip, limit=limit) + return {"total": len(sources), "data": sources} + + +@router.get("/{source_id}", response_model=InfoSourceResponse) +async def get_info_source(source_id: int, db: Session = Depends(get_db)): + """获取单个信息源详情""" + source = crud_source.get(db=db, source_id=source_id) + if not source: + raise HTTPException(status_code=404, detail="该信息源不存在") + return source + + +@router.patch("/{source_id}", response_model=InfoSourceResponse) +async def update_info_source(source_id: int, source_in: InfoSourceUpdate, db: Session = Depends(get_db)): + """更新信息源""" + source = crud_source.get(db=db, source_id=source_id) + if not source: + raise HTTPException(status_code=404, detail="该信息源不存在") + + # 直接把查出来的数据库对象和前端传来的 Pydantic 对象丢给 CRUD 处理 + return crud_source.update(db=db, db_obj=source, obj_in=source_in) diff --git a/backend/app/api/endpoints/trends.py b/backend/app/api/endpoints/trends.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/router.py b/backend/app/api/router.py new file mode 100644 index 0000000..000e9bd --- /dev/null +++ b/backend/app/api/router.py @@ -0,0 +1,8 @@ +# app/api/router.py +from fastapi import APIRouter +from app.api.endpoints import sources + +api_router = APIRouter() + +# 信息源管理 +api_router.include_router(sources.router, prefix="/sources", tags=["信息源管理"]) \ No newline at end of file diff --git a/backend/app/crud/crud_source.py b/backend/app/crud/crud_source.py new file mode 100644 index 0000000..6bcaa54 --- /dev/null +++ b/backend/app/crud/crud_source.py @@ -0,0 +1,40 @@ +# app/crud/crud_source.py +from sqlalchemy.orm import Session +from typing import List, Optional + +from app.models.models import InfoSource +from app.schemas.schemas import InfoSourceCreate, InfoSourceUpdate + + +def get(db: Session, source_id: int) -> Optional[InfoSource]: + """通过 ID 获取单条信息源""" + return db.query(InfoSource).filter(InfoSource.id == source_id).first() + + +def get_multi(db: Session, skip: int = 0, limit: int = 100) -> List[InfoSource]: + """获取所有信息源列表(支持分页)""" + return db.query(InfoSource).offset(skip).limit(limit).all() + + +def create(db: Session, obj_in: InfoSourceCreate) -> InfoSource: + """创建新的信息源""" + db_obj = InfoSource(**obj_in.model_dump()) + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + +def update(db: Session, db_obj: InfoSource, obj_in: InfoSourceUpdate) -> InfoSource: + """更新信息源""" + # 提取前端真正要求更新的字段 + update_data = obj_in.model_dump(exclude_unset=True) + + # 遍历更新模型对象的属性 + for field, value in update_data.items(): + setattr(db_obj, field, value) + + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj diff --git a/backend/app/main.py b/backend/app/main.py index 161d781..221589f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,71 +1,65 @@ # app/main.py +import os from contextlib import asynccontextmanager -from fastapi import FastAPI, Depends -from sqlalchemy.orm import Session +from fastapi import FastAPI +from dotenv import load_dotenv +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from app.services.fetcher_service import fetch_and_save_trending_data +from app.database import engine +from app.models.models import Base -from app.database import engine, get_db -from app.models.models import Base, InfoSource, SourceType +# 路由总线 +from app.api.router import api_router + +load_dotenv() +CRAWL_INTERVAL = int(os.getenv("CRAWL_INTERVAL_MINUTES", 10)) + +scheduler = AsyncIOScheduler() # ========================================== -# 1. 生命周期管理:App 启动时自动建表 +# 1. 生命周期管理:App 启动时自动建表 & 启动调度器 # ========================================== @asynccontextmanager async def lifespan(app: FastAPI): - # 启动时执行:检查模型,如果表不存在,自动在 SQLite 中建表! + # 1. 数据库建表 print("正在初始化数据库表...") - # ---> 核心修改 2:直接使用 Base,而不是 models.Base <--- Base.metadata.create_all(bind=engine) print("数据库表初始化完成!") - yield + + # 2. 配置并启动定时任务 + scheduler.add_job( + fetch_and_save_trending_data, + 'interval', + minutes=CRAWL_INTERVAL, + id='trending_fetch_job', + replace_existing=True + ) + scheduler.start() + print(f"定时抓取任务已启动,每 {CRAWL_INTERVAL} 分钟执行一次") + + # 为了测试方便,启动时立即执行一次 + await fetch_and_save_trending_data() + + yield # 此时 FastAPI 开始接受请求 + + # 3. 优雅关闭 + scheduler.shutdown() + print("定时任务已安全关闭") - -# 初始化 FastAPI,挂载生命周期 +# 初始化 FastAPI app = FastAPI(title="AI 新闻聚合引擎 API", lifespan=lifespan) - # ========================================== -# 2. 路由 API 定义 +# 2. 挂载路由总线 # ========================================== +# 版本控制 +app.include_router(api_router, prefix="/api/v1") -@app.get("/") + +# 健康检查 +@app.get("/", tags=["健康检查"]) async def root(): - return {"message": "Welcome to AI News Aggregator API"} - - -@app.get("/hello/{name}") -async def say_hello(name: str): - return {"message": f"Hello {name}"} - - -# --->与数据库交互的真实接口 - -@app.post("/sources/") -async def create_info_source(name: str, url: str, db: Session = Depends(get_db)): - """ - 测试接口:向数据库中添加一个新闻信息源 - """ - # ---> 核心修改 3:直接使用 InfoSource 和 SourceType <--- - new_source = InfoSource( - source_name=name, - source_type=SourceType.RSS_FEED, # 默认用 RSS 测试 - home_url=url - ) - - db.add(new_source) - db.commit() - db.refresh(new_source) # 刷新以获取自动生成的 ID - - return {"message": "创建成功!", "data": new_source} - - -@app.get("/sources/") -async def get_all_sources(db: Session = Depends(get_db)): - """ - 测试接口:查询数据库中所有的信息源 - """ - # ---> 核心修改 4:直接使用 InfoSource <--- - sources = db.query(InfoSource).all() - return {"total": len(sources), "data": sources} \ No newline at end of file + return {"message": "Welcome to AI News Aggregator API", "status": "ok"} diff --git a/backend/app/models/models.py b/backend/app/models/models.py index 7108e82..69736ba 100644 --- a/backend/app/models/models.py +++ b/backend/app/models/models.py @@ -1,3 +1,4 @@ +# models.py from datetime import datetime, timezone, time from typing import Optional, Any import enum @@ -10,32 +11,47 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship # ========================================== -# 0. 全局基类与枚举定义 +# 0. 全局基类、枚举定义与动态类型 # ========================================== class Base(DeclarativeBase): - """SQLAlchemy 2.0 声明式基类""" + """ + SQLAlchemy 2.0 声明式基类 + 所有的表模型都必须继承这个基类。 + """ pass +# 让代码在 SQLite 环境下自动降级为 Integer 以保证自增正常工作, +# 而在生产环境部署到 PostgreSQL 或 MySQL 时,依然会使用容量更大的 BigInteger。 +BigIntType = BigInteger().with_variant(Integer, "sqlite") + + class SourceType(str, enum.Enum): - HOT_TREND = "HOT_TREND" - RSS_FEED = "RSS_FEED" - API = "API" + """信息源的抓取方式""" + HOT_TREND = "HOT_TREND" # 热搜榜单类 + RSS_FEED = "RSS_FEED" # 传统RSS订阅 + API = "API" # 接口抓取类 class TargetType(str, enum.Enum): - EVENT = "EVENT" - TREND = "TREND" - ARTICLE = "ARTICLE" + """ + 多态目标类型 (Polymorphic Target) + 用于标记一条评论或一个标签到底是挂载在哪个实体下的。 + """ + EVENT = "EVENT" # 挂载在单个热搜事件下 + TREND = "TREND" # 挂载在宏观趋势下 + ARTICLE = "ARTICLE" # 挂载在具体新闻文章下 class TaskStatus(str, enum.Enum): + """后台任务状态""" SUCCESS = "SUCCESS" ERROR = "ERROR" class GenderType(str, enum.Enum): + """用户性别枚举""" MALE = "MALE" FEMALE = "FEMALE" OTHER = "OTHER" @@ -43,7 +59,10 @@ class GenderType(str, enum.Enum): def utcnow(): - """获取带UTC时区的当前时间 (推荐实践)""" + """ + 获取带UTC时区的当前时间 (最佳实践) + 服务器内部和数据库统一存储 UTC 时间,只在前端展示时转为用户本地时区,避免时区错乱。 + """ return datetime.now(timezone.utc) @@ -51,6 +70,10 @@ def utcnow(): # 模块一:信息源管理 # ========================================== class InfoSource(Base): + """ + 抓取源配置表 + 充当爬虫的“任务清单”,后台可以随时开关特定的信息源,而不需要重启代码。 + """ __tablename__ = "info_sources" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) @@ -67,14 +90,19 @@ class InfoSource(Base): # 模块二:AI 语义聚类中枢 (大事件池) # ========================================== class UnifiedEvent(Base): + """ + AI 统一事件表 + 核心业务逻辑:比如微博热搜叫“苹果发布会”,知乎热搜叫“iPhone 16 测评”, + 它们在子表(TrendingEvent)是两条记录,但通过 AI 语义向量对比后, + 会将它们统一挂载到这个表的一个 UnifiedEvent ID 下,实现跨平台事件聚合。 + """ __tablename__ = "unified_events" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) unified_title: Mapped[str] = mapped_column(String(255), comment="AI统一标题") ai_comprehensive_summary: Mapped[Optional[str]] = mapped_column(Text, comment="AI全局深度总结") - # SQLite 没有原生 Vector 类型,存为用逗号分隔的字符串或JSON,Postgres可换成 PGVector - center_embedding: Mapped[Optional[str]] = mapped_column(Text, comment="中心向量") + center_embedding: Mapped[Optional[str]] = mapped_column(Text, comment="中心向量") # 用于高维空间相似度计算 hot_score: Mapped[int] = mapped_column(Integer, default=0, comment="聚合热度得分") created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) @@ -85,12 +113,16 @@ class UnifiedEvent(Base): # 模块三:内容存储库 (热搜 & 新闻子节点) # ========================================== class TrendingEvent(Base): + """ + 各平台热搜数据明细表 + """ __tablename__ = "trending_events" __table_args__ = ( + # 联合唯一索引:同一个来源(比如微博)的同一条外部ID(MD5)只能存在一条记录,防重插核心保障 UniqueConstraint("source_id", "external_id", name="idx_unique_external_trend"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id")) unified_event_id: Mapped[Optional[int]] = mapped_column(ForeignKey("unified_events.id")) @@ -109,12 +141,13 @@ class TrendingEvent(Base): class NewsArticle(Base): + """新闻文章明细表 (与 TrendingEvent 类似,但侧重长文本阅读)""" __tablename__ = "news_articles" __table_args__ = ( UniqueConstraint("source_id", "external_id", name="idx_unique_external_article"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id")) unified_event_id: Mapped[Optional[int]] = mapped_column(ForeignKey("unified_events.id")) @@ -136,9 +169,13 @@ class NewsArticle(Base): # 模块四:热度与轨迹追踪 # ========================================== class HeadlineRevision(Base): + """ + 标题修订历史表 + 用于记录平台方暗戳戳修改热搜词条的行为(例如公关介入改标题)。 + """ __tablename__ = "headline_revisions" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) event_id: Mapped[int] = mapped_column(ForeignKey("trending_events.id")) previous_headline: Mapped[str] = mapped_column(String(255)) revised_headline: Mapped[str] = mapped_column(String(255)) @@ -147,12 +184,17 @@ class HeadlineRevision(Base): class RankingLog(Base): + """ + 热搜排名时间序列化日志 + 每一次抓取都会生成一条记录,可以用于前端绘制热搜“排名起伏折线图”。 + """ __tablename__ = "ranking_logs" __table_args__ = ( + # 针对时间序列查询优化的复合索引,加速类似 "查询某事件在过去24小时内的排名变化" 的操作 Index("idx_event_time", "event_id", "observed_at"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) event_id: Mapped[int] = mapped_column(ForeignKey("trending_events.id")) ranking_position: Mapped[int] = mapped_column(Integer) observed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) @@ -163,16 +205,22 @@ class RankingLog(Base): # ========================================== # 模块五:多态话题与多态评论 # ========================================== +# 【设计模式】:多态设计 +# 通过 target_type (存表名/类型) + target_id (存主键ID) 的组合, +# 让这两个表既能挂载在"单一热搜"下,也能挂载在"新闻文章"下,甚至挂在"统一大事件"下,避免了建立无数个外键的冗余。 + class ExtractedTopic(Base): + """AI 提取的核心话题标签表""" __tablename__ = "extracted_topics" __table_args__ = ( Index("idx_topic_keyword", "topic_keyword"), + # 多态查询索引,加速 target_type + target_id 的组合查询 Index("idx_polymorphic_topics", "target_type", "target_id"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) target_type: Mapped[TargetType] = mapped_column(Enum(TargetType)) - target_id: Mapped[int] = mapped_column(BigInteger) + target_id: Mapped[int] = mapped_column(BigIntType) topic_keyword: Mapped[str] = mapped_column(String(100)) relevance_score: Mapped[Optional[float]] = mapped_column(Float) @@ -180,14 +228,15 @@ class ExtractedTopic(Base): class DiscussionComment(Base): + """全平台统一评论表""" __tablename__ = "discussion_comments" __table_args__ = ( Index("idx_polymorphic_comments", "target_type", "target_id"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) target_type: Mapped[TargetType] = mapped_column(Enum(TargetType)) - target_id: Mapped[int] = mapped_column(BigInteger) + target_id: Mapped[int] = mapped_column(BigIntType) commenter_name: Mapped[Optional[str]] = mapped_column(String(100)) comment_content: Mapped[str] = mapped_column(Text) @@ -202,6 +251,7 @@ class DiscussionComment(Base): # 模块六:用户画像与多渠道高可用推送系统 # ========================================== class AppUser(Base): + """系统核心用户表""" __tablename__ = "app_users" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) @@ -212,7 +262,7 @@ class AppUser(Base): avatar_url: Mapped[Optional[str]] = mapped_column(String(500)) gender: Mapped[GenderType] = mapped_column(Enum(GenderType), default=GenderType.UNKNOWN) - # 核心:万能扩展收纳箱 (SQLite 完美支持通过 SQLAlchemy 存储 JSON) + # 预留的 JSON 字段,可以存放未来灵活变化的用户配置,避免频繁修改表结构 metadata_: Mapped[Optional[Any]] = mapped_column("metadata", JSON, comment="自定义扩展偏好") timezone: Mapped[str] = mapped_column(String(50), default="Asia/Shanghai") @@ -221,6 +271,10 @@ class AppUser(Base): class UserPushEndpoint(Base): + """ + 多渠道推送端点配置表 + 一个用户可能绑定了邮箱(EMAIL)和微信(WECHAT),支持配置降级重试(priority_level)。 + """ __tablename__ = "user_push_endpoints" __table_args__ = ( UniqueConstraint("user_id", "channel_type", name="idx_unique_user_channel"), @@ -238,6 +292,7 @@ class UserPushEndpoint(Base): class UserTopicPreference(Base): + """用户订阅的兴趣标签库""" __tablename__ = "user_topic_preferences" __table_args__ = ( UniqueConstraint("user_id", "interested_keyword", name="idx_unique_preference"), @@ -251,6 +306,7 @@ class UserTopicPreference(Base): class UserDeliverySchedule(Base): + """用户勿扰/定时推送时间表""" __tablename__ = "user_delivery_schedules" __table_args__ = ( UniqueConstraint("user_id", "delivery_time", name="idx_unique_schedule"), @@ -265,15 +321,20 @@ class UserDeliverySchedule(Base): class DeliveryHistory(Base): + """ + 推送历史防刷表 + 核心用途:一旦给某个用户推送过某条新闻/事件,就记录在这里。 + 下次再触发推荐时,检查这个表,防止给同一个用户反复发送相同的内容。 + """ __tablename__ = "delivery_history" __table_args__ = ( UniqueConstraint("user_id", "target_type", "target_id", name="idx_prevent_duplicate_push"), ) - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) user_id: Mapped[int] = mapped_column(ForeignKey("app_users.id")) target_type: Mapped[TargetType] = mapped_column(Enum(TargetType)) - target_id: Mapped[int] = mapped_column(BigInteger) + target_id: Mapped[int] = mapped_column(BigIntType) status: Mapped[TaskStatus] = mapped_column(Enum(TaskStatus)) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) @@ -283,9 +344,13 @@ class DeliveryHistory(Base): # 模块七:系统任务监控 # ========================================== class DataSyncTask(Base): + """ + 数据同步健康度监控表 + 这就是爬虫脚本每次运行都要写入记录的地方,用于后台 Dashboard 监控爬虫健康状态和错误堆栈。 + """ __tablename__ = "data_sync_tasks" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True) source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id")) items_fetched: Mapped[int] = mapped_column(Integer, default=0) task_status: Mapped[TaskStatus] = mapped_column(Enum(TaskStatus)) diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py new file mode 100644 index 0000000..98e9115 --- /dev/null +++ b/backend/app/schemas/schemas.py @@ -0,0 +1,69 @@ +# app/schemas/schemas.py +from pydantic import BaseModel, ConfigDict, Field +from typing import Optional +from datetime import datetime + +# 枚举 +from app.models.models import SourceType + + +# ========================================== +# InfoSource (信息源) 相关的 Schemas +# ========================================== + +class InfoSourceBase(BaseModel): + """ + 信息源的基础 Schema,包含各个操作通用的字段。 + 使用 Field() 可以为 Swagger UI 接口文档提供明确的注释和校验规则。 + """ + source_name: str = Field(..., max_length=100, description="信息源名称,例如:微博热搜、知乎热榜", examples=["微博热搜"]) + source_type: SourceType = Field(default=SourceType.API, description="抓取方式枚举") + home_url: Optional[str] = Field(default=None, max_length=255, description="平台主页或接口的基础URL", examples=["weibo"]) + is_enabled: bool = Field(default=True, description="是否启用该信息源的定时抓取") + + +class InfoSourceCreate(InfoSourceBase): + """ + 创建信息源 (POST) 时接收的请求体。 + 目前直接继承 Base 即可,如果以后有“只有创建时才能传的字段”,可以写在这里。 + """ + pass + + +class InfoSourceUpdate(BaseModel): + """ + 更新信息源 (PATCH/PUT) 时接收的请求体。 + 更新时,所有的字段都应该是可选的 (Optional),因为前端可能只想改其中一个字段。 + """ + source_name: Optional[str] = Field(default=None, max_length=100) + source_type: Optional[SourceType] = None + home_url: Optional[str] = Field(default=None, max_length=255) + is_enabled: Optional[bool] = None + + +class InfoSourceResponse(InfoSourceBase): + """ + 返回给前端的响应体格式 (GET 或 POST/PATCH 成功后的返回)。 + 包含了数据库自动生成的 ID 和时间戳字段。 + """ + id: int = Field(..., description="数据库主键ID") + created_at: datetime = Field(..., description="创建时间") + updated_at: datetime = Field(..., description="最后一次更新时间") + + # 核心配置:告诉 Pydantic,允许传入 SQLAlchemy 的 ORM 类实例并自动提取属性 + model_config = ConfigDict(from_attributes=True) + + +# ========================================== +# 分页响应包装器 (泛型设计,方便以后复用) +# ========================================== +from typing import Generic, TypeVar, List + +T = TypeVar("T") + +class PaginatedResponse(BaseModel, Generic[T]): + """ + 通用的列表响应格式,带总数统计,方便前端分页。 + """ + total: int = Field(..., description="符合条件的数据总条数") + data: List[T] = Field(..., description="当前页的数据列表") \ No newline at end of file diff --git a/backend/app/services/fetcher_service.py b/backend/app/services/fetcher_service.py new file mode 100644 index 0000000..a6f016a --- /dev/null +++ b/backend/app/services/fetcher_service.py @@ -0,0 +1,167 @@ +# app/services/fetcher_service.py +import os +import hashlib +import httpx +from dotenv import load_dotenv + +from app.database import SessionLocal +from app.models.models import ( + InfoSource, TrendingEvent, DataSyncTask, TaskStatus, + HeadlineRevision, RankingLog, utcnow +) + +# 加载 .env 文件中的环境变量 +load_dotenv() + +# 从环境变量获取 API 基础地址,提供默认回退地址 +API_BASE_URL = os.getenv("API_BASE_URL", "https://newsnow.busiyi.world/api/s") + + +def generate_md5(text: str) -> str: + """ + 生成32位MD5哈希 + 用途:为不同平台的数据生成统一的、长度固定的外部唯一标识(external_id), + 方便建立数据库的唯一索引,防止同一条热搜重复插入。 + """ + return hashlib.md5(text.encode('utf-8')).hexdigest() + + +async def fetch_and_save_trending_data(): + """ + 核心定时任务:从数据库读取信息源 -> 抓取API -> 解析并存入数据库 + + 业务流程: + 1. 查询所有已启用的信息源 (is_enabled == True) + 2. 伪装成浏览器发起异步 HTTP 请求 + 3. 遍历解析数据,进行去重判断 (MD5) + 4. 记录标题变更轨迹 (HeadlineRevision) 和 热搜排名轨迹 (RankingLog) + 5. 统一提交或在发生异常时回滚脏数据 + """ + print(f"[{utcnow()}] 开始执行定时抓取任务...") + + # 使用上下文管理器确保数据库连接池正确归还连接 + with SessionLocal() as db: + # 1. 动态获取抓取源,这样在后台开关信息源不需要重启服务 + sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all() + if not sources: + print("没有找到启用的信息源,任务结束。") + return + + # 2. 伪装 HTTP 请求头,绕过目标服务器的反爬/防盗链机制 + custom_headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Referer": "https://newsnow.busiyi.world/", + "Origin": "https://newsnow.busiyi.world" + } + + # 复用异步客户端,提高并发抓取效率 + async with httpx.AsyncClient(timeout=15.0, headers=custom_headers) as client: + for source in sources: + # 平台标识,如 "weibo", "zhihu" 等,这里复用了 home_url 字段存储 + platform_id = source.home_url + if not platform_id: + continue + + # 拼装最终的抓取 URL + url = f"{API_BASE_URL}?id={platform_id}&latest" + # 初始化本次抓取任务的日志记录 + task_log = DataSyncTask(source_id=source.id, items_fetched=0) + + try: + # 发起请求并校验 HTTP 状态码 + response = await client.get(url) + response.raise_for_status() + data_json = response.json() + + items = data_json.get("items", []) + saved_count = 0 + + for index, item in enumerate(items, 1): + title = item.get("title") + if not title: + continue + + item_url = item.get("url", "") + + # 3. ID 去重策略:优先用接口自带 ID,没有则用 URL,最差情况用标题兜底 + raw_id = item.get("id") or item_url or title + # 组合平台标识和原始 ID 算出全局唯一的 MD5 + external_id = generate_md5(f"{platform_id}_{raw_id}") + + # 4. 在数据库中查询是否已经存在这条热搜 + existing_event = db.query(TrendingEvent).filter( + TrendingEvent.source_id == source.id, + TrendingEvent.external_id == external_id + ).first() + + event_to_log = None # 留个指针,用来后续记录名次历史 + + if existing_event: + # -------- 分支 A:热搜已存在,执行更新逻辑 -------- + + # 监控标题变化(例如:微博热搜经常会改词条名字) + if existing_event.current_headline != title: + # 标题发生改变!立刻记录到修订历史表 + revision = HeadlineRevision( + event_id=existing_event.id, + previous_headline=existing_event.current_headline, + revised_headline=title + ) + db.add(revision) + existing_event.current_headline = title # 更新为主表最新标题 + + # 更新当前最新的排名和链接 + existing_event.current_ranking = index + existing_event.event_url = item_url + + event_to_log = existing_event + else: + # -------- 分支 B:全新热搜,执行插入逻辑 -------- + new_event = TrendingEvent( + source_id=source.id, + external_id=external_id, + current_headline=title, + event_url=item_url, + current_ranking=index, + ) + db.add(new_event) + # 核心操作!flush 会将数据推入数据库生成自增的 ID,但不提交事务 (commit)。 + # 这样接下来的 RankingLog 就能立刻拿到 `new_event.id` 作为外键。 + db.flush() + event_to_log = new_event + + # -------- 无论新旧,统一记录排名轨迹 -------- + # 只要抓取到,不管新旧,必须记一笔当前的排名! <--- + # 借助这个表,后续可以画出某条热搜随时间变化的“排名上升/下降曲线” + rank_log = RankingLog( + event_id=event_to_log.id, + ranking_position=index + ) + db.add(rank_log) + + saved_count += 1 + + # 如果这一个平台的数据全部处理顺利,标记成功并整体提交 + task_log.items_fetched = saved_count + task_log.task_status = TaskStatus.SUCCESS + db.add(task_log) + db.commit() # ✅ 只在 try 顺利走完整个平台的数据时,才统一提交业务数据到硬盘 + print(f"[{source.source_name}] 成功抓取并更新了 {saved_count} 条数据") + + except Exception as e: + # -------- 异常处理机制 -------- + # 独立日志记录 + + # 第一步:遇到报错(如网络中断、解析错误),立刻回滚。 + # 丢弃这批脏数据,防止数据库出现一半更新一半没更新的“不一致状态”。 + db.rollback() + + # 第二步:记录错误日志并独立提交。 + # 因为上面执行了 rollback,之前 add 的 task_log 也被清空了状态, + # 此时重新设置状态,并作为一次新的独立事务提交到数据库,方便后台监控报错。 + task_log.task_status = TaskStatus.ERROR + task_log.error_trace = str(e) + db.add(task_log) + db.commit() # 提交错误日志本身 + print(f"[{source.source_name}] 抓取失败: {e}") diff --git a/backend/data/demo.db b/backend/data/demo.db deleted file mode 100644 index 8711511bd381e3a48c967a39ffe93bde23bc79f3..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 151552 zcmeI(%WvGq9S88G*WR^O*27Q6PFyi*1#2Oz)=Cfz!7$vlOgoC@-FPLZHgX^+?npb) zF1aSTvXmSO$yNZP2y!Y4^pc)?O-?=L7_^rj^B?37D9|F8&TvU;hvdqS%4te`T{smv zoFV5kzj=}J^8F?15@pS@Te_>vW=1oTl=+RKWHOo4?C&`H8~owNM}q_Q|5DG-{eC>1 z`Sln7KE^K|`H^2EkNkK-8oDs}<>0`;t-`m*zsXl}KV-kp{Ux`N`Fr-OY<>8M>G`^=uQrKEo;aFryVPhBM>sgSP*ck*skpmpWu;+@6H zic+bvzYms{E-Qrr$LiPy(O4o&QSYcVA+=-DHA?Cl*(9dRFY1=SGm} z7Tzz{CT3@fLUxzBO`?(3Dyi40xvs3J4_A0z!`dXaMqBzi(K>cB%s)3hUF7M0^)xSk zm^w2zCl()EmRLl`ZCJKu>Mar$Q1XknY`RYQqdT2eOSiYf=W2IWo78Er`?6fOnB~gi zJ=J?)o9HfcV9^wu7Ao3l*AGnpu(o)wTzjP4RUau6wC+`Mp;}p9sg-$y(E66fs2_J2 zR>104PI*vSy#GK|Cc=igtc2J2CEc#p)Wtg$Ui$uJMWv?RR%>cyL0wj;xn^nI5}fb> zdb33vTF5OVcJmjC^QUt1y-QMtnsu_}JZ`cMr|BKn@{Tnj`m`D0@bvu2m(N-fvYy%- z9?Hv;lhX5LuQRwdF?nQl1JsEe4htP3lK3<064bEF#J*s3Y*tBEYY@HOq~?A!$GSte zlO}eCc3We4H^QC}bhMywTQ@g&2ldAQ9`dU;C2N{v(srA;VGl8W`i|hW+t0QeTp!8F z_b2;xgGkJ2Gm#Uc*N0yvdnuBA#5j?cOJmYb*6Ve2OVe3PcN}6nY}D0hGdvvfyMjNO zELLu*50&`!O0}Ydd6WsSjN+~_n3u=Kq`k9Vp9|B&A5IAUEIjS^u%vm#D>#hC(%iH- zu(x%`d1Bdht)V-O@G>6oJTgscY$T5B`ljycc49ADXX6C1!=kQ=_iGVXX9Kvde6U=t z@LTW+;8Uxovqhiqu=_!wxHDA9%hzY6ovYopWK%-hYHqhIyWOCMW>~Ej8wQ*@b&QVV zP&Qfwrv`m~zft>S%`UE?KZir_y=ra+5PNzUY?zm_AYv@o{%-+)PN8P z2^aJqb$cG8Uo(Xdm$Z=aHQ2f$F*f{UhGx3V(Hqc&DZEKHh{KXQEHN=;gNZpPNnH1M ztAuVFz54PYx;!Wp)q$M+Xtv+5k^rlf5>6I&viZC`KPBx6JHk$h8VpgnIu!`o(R3S8N(j=R@$(HGkVcA5vy;LoC*U>MSjpK23 z<5*5!p6VA@(VALZronYDTKiKz{^tCB! z1OW&@00Izz00bZa0SG_<0uYD_;P?NdA~=Qs1Rwwb2tWV=5P$##AOHafq)!0f|I^o| zs1XDp009U<00Izz00bZa0SG`KDuD0*Q4t(N00Izz00bZa0SG_<0uX=z1kxvf@Bis* zQ`86o5P$##AOHafKmY;|fB*y_5Ea1wKPrM_2tWV=5P$##AOHafKmY;|fI#{L@caMi zYg5z+0uX=z1Rwwb2tWV=5P$##AP^P6_y4E}jv)X62tWV=5P$##AOHafKmY>i6TtpI zeQk;wK>z{}fB*y_009U<00Izz00g1}`2GK=2#z5D0SG_<0uX=z1Rwwb2tWV==@Y>B z|MayfY6JlYKmY;|fB*y_009U<00Iz*3JeW@ml>57GQZ?s+k|&O)+b%Vl#4*(~ zIr-77l%ZywY&nmc)FqnUaV_sy>o~;L+8w8%5wqTACtXJ?C7e8a;5~Y8cyghpmRD3| zrF?TqRfK0zCJFS_(5*PGPbkLp;r=yuk`PSI{_Ebm6xBZ3YW6mIM02Jfi;2*g8w)uv=kb4=Q96F2NB z#!ufNymtH9b_e4`UM`JEJ6Uf|p<9~HtobD3u<=i)&G2x@4@SRVFIH};50&`!O0}Yd zd6WsSjN+~_n3u=Kq`k9V)Q9Qe4=0334^R6sF3v06F!l-#PRq=i;;_7TD)s+vHs}+6=J`{gy^@Mx$ z2@g9G3&owGLSDW;EA3qEwj~>!+g5YCW!dcpH8jI&wOB`V>eMkhjzd|04NeXE{C=bM z$;g{FA+s{U$kvIgx!Y}WS@93p5WoA`@w_}cEA3tMT0J3a!l?lv5)v-xKkoHBM!#kX zA1-Mj<7==PJ~1}@WQJzC%+VWCgvqo?H;BWMJ1j9VWP>R(C`nv_c=L6)jb42sk51Hu zqB?NU+A#rED}2zKd45XTnMw>Q*J@M4VH3Np8+@VBJwAS5Jaw<((G<yh5}Mvdw#Ww48kG{=(&bZq2I_lvvz7cQh|gO-WBRFLmo0UY_c=eu~yyD@D)b zcCQ@C%f+Jf?7Sx{F4DxKte`5U^*0T!d(kSK^6{^2lLj#zy2)rFt7?PwayA&mw*d{y z^&OMC`QZ=j*RnaeTI@$2RM%TR@f~3A-p4u0Hm1k<=5%*b%FE;90ekPty7%WJ0(o!B z0h6)6#*Yn~LJayB@mV4qp6A6dZCEXFV--x;l&wZgzCUI=xoUMuEmwr151)K!;GdaO znd!{HUk84d|5JYX*dOz`?B9;PFMXZ;efERQwB!zdGV=A1G+aFK2T!;Bj<>FVHs77= z*it}qwoQYL9?pitR|Y+YV}fxcv432LJE_Fkg}obSIocX=jRsRn3QYben&awj#|axC zvWy|N%{HHWFefj)WR4CId>-rFqJ@tw?9If;VygwVGrM@u)+)lpW-Ad@d3xr&H)8L7 z+-(BVWS!dH)*94tEql8jIetfMgh*=tR`6c1#9nrE7p4F03gqC%c~O)nHAG~y&PF@$ zjYz%IZc>A7{k```;pW7zTeygeuIQpo>t8Q4Y|rA|Ytq8 z!2>#P?>5h})#$wRjP(>Y8+cF9T>`PlUL)*9gW_I8z@Z|sMF z_!8WU3h~sLc;kx~yk^TA4c#9UkwuU{ezJ{e_e0SC%@>PL-eVi*acSpt_!gwQn&Zm{{|&g- zwAP(*f3(|wowt7-fA`UQEqg`leA!*wZkY5AE-0aGIn-sZC=-Xdp65*dto5RCFllYnyRr$MP*~W;zQ1_O+pF*9Z~ub5fiFqVu6oeHXA+IA)G2>S>%`BzE1<{IUyq*e4XD*| zUgArQ_iQXqf?J1EUu;VI30q=eEGPfA)UPFiN8NYTO7U~=1#e66@BjH*4g7%s1Rwwb z2tWV=5P$##AOHafq+0<0{y*LAi`qc|0uX=z1Rwwb2tWV=5P$##xB$NYBMu+{0SG_< z0uX=z1Rwwb2tWV==@-EF|Ma&pY6t-cKmY;|fB*y_009U<00I!e_kY9y1Rwwb2tWV= z5P$##AOHafKp_1B`2L^%HbxC0009U<00Izz00bZa0SG_<0@(i}1|R?d2tWV=5P$## zAOHafKmY>i7r^iTr@xI+LkK_s0uX=z1Rwwb2tWV=5P$%_|04z<009U<00Izz00bZa z0SG_<0_hjP_y6>_F=_|_2tWV=5P$##AOHafKmY;|!1sT|00bZa0SG_<0uX=z1Rwwb z2tXkH0{H%){x(JpApijgKmY;|fB*y_009U<00Q{_j~IXe1Rwwb2tWV=5P$##AOHaf gq+bC0|Ma&pY6t-cKmY;|fB*y_009U<00I#B4{#a-M*si-