This commit is contained in:
stardrophere
2026-03-09 18:13:35 +08:00
parent 3c57dd0cce
commit 5b541bbea3
11 changed files with 464 additions and 75 deletions
View File
+45
View File
@@ -0,0 +1,45 @@
# app/api/endpoints/sources.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.database import get_db
from app.schemas.schemas import (
InfoSourceCreate, InfoSourceUpdate, InfoSourceResponse, PaginatedResponse
)
from app.crud import crud_source
router = APIRouter()
@router.post("/", response_model=InfoSourceResponse, status_code=status.HTTP_201_CREATED)
async def create_info_source(source_in: InfoSourceCreate, db: Session = Depends(get_db)):
"""新建一个信息源"""
return crud_source.create(db=db, obj_in=source_in)
@router.get("/", response_model=PaginatedResponse[InfoSourceResponse])
async def get_all_sources(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取所有信息源列表(加入了分页参数)"""
sources = crud_source.get_multi(db=db, skip=skip, limit=limit)
return {"total": len(sources), "data": sources}
@router.get("/{source_id}", response_model=InfoSourceResponse)
async def get_info_source(source_id: int, db: Session = Depends(get_db)):
"""获取单个信息源详情"""
source = crud_source.get(db=db, source_id=source_id)
if not source:
raise HTTPException(status_code=404, detail="该信息源不存在")
return source
@router.patch("/{source_id}", response_model=InfoSourceResponse)
async def update_info_source(source_id: int, source_in: InfoSourceUpdate, db: Session = Depends(get_db)):
"""更新信息源"""
source = crud_source.get(db=db, source_id=source_id)
if not source:
raise HTTPException(status_code=404, detail="该信息源不存在")
# 直接把查出来的数据库对象和前端传来的 Pydantic 对象丢给 CRUD 处理
return crud_source.update(db=db, db_obj=source, obj_in=source_in)
View File
+8
View File
@@ -0,0 +1,8 @@
# app/api/router.py
from fastapi import APIRouter
from app.api.endpoints import sources
api_router = APIRouter()
# 信息源管理
api_router.include_router(sources.router, prefix="/sources", tags=["信息源管理"])
+40
View File
@@ -0,0 +1,40 @@
# app/crud/crud_source.py
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.models import InfoSource
from app.schemas.schemas import InfoSourceCreate, InfoSourceUpdate
def get(db: Session, source_id: int) -> Optional[InfoSource]:
"""通过 ID 获取单条信息源"""
return db.query(InfoSource).filter(InfoSource.id == source_id).first()
def get_multi(db: Session, skip: int = 0, limit: int = 100) -> List[InfoSource]:
"""获取所有信息源列表(支持分页)"""
return db.query(InfoSource).offset(skip).limit(limit).all()
def create(db: Session, obj_in: InfoSourceCreate) -> InfoSource:
"""创建新的信息源"""
db_obj = InfoSource(**obj_in.model_dump())
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(db: Session, db_obj: InfoSource, obj_in: InfoSourceUpdate) -> InfoSource:
"""更新信息源"""
# 提取前端真正要求更新的字段
update_data = obj_in.model_dump(exclude_unset=True)
# 遍历更新模型对象的属性
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
+44 -50
View File
@@ -1,71 +1,65 @@
# app/main.py
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from fastapi import FastAPI
from dotenv import load_dotenv
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.services.fetcher_service import fetch_and_save_trending_data
from app.database import engine
from app.models.models import Base
from app.database import engine, get_db
from app.models.models import Base, InfoSource, SourceType
# 路由总线
from app.api.router import api_router
load_dotenv()
CRAWL_INTERVAL = int(os.getenv("CRAWL_INTERVAL_MINUTES", 10))
scheduler = AsyncIOScheduler()
# ==========================================
# 1. 生命周期管理:App 启动时自动建表
# 1. 生命周期管理:App 启动时自动建表 & 启动调度器
# ==========================================
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行:检查模型,如果表不存在,自动在 SQLite 中建表
# 1. 数据库建表
print("正在初始化数据库表...")
# ---> 核心修改 2:直接使用 Base,而不是 models.Base <---
Base.metadata.create_all(bind=engine)
print("数据库表初始化完成!")
yield
# 2. 配置并启动定时任务
scheduler.add_job(
fetch_and_save_trending_data,
'interval',
minutes=CRAWL_INTERVAL,
id='trending_fetch_job',
replace_existing=True
)
scheduler.start()
print(f"定时抓取任务已启动,每 {CRAWL_INTERVAL} 分钟执行一次")
# 为了测试方便,启动时立即执行一次
await fetch_and_save_trending_data()
yield # 此时 FastAPI 开始接受请求
# 3. 优雅关闭
scheduler.shutdown()
print("定时任务已安全关闭")
# 初始化 FastAPI,挂载生命周期
# 初始化 FastAPI
app = FastAPI(title="AI 新闻聚合引擎 API", lifespan=lifespan)
# ==========================================
# 2. 路由 API 定义
# 2. 挂载路由总线
# ==========================================
# 版本控制
app.include_router(api_router, prefix="/api/v1")
@app.get("/")
# 健康检查
@app.get("/", tags=["健康检查"])
async def root():
return {"message": "Welcome to AI News Aggregator API"}
@app.get("/hello/{name}")
async def say_hello(name: str):
return {"message": f"Hello {name}"}
# --->与数据库交互的真实接口
@app.post("/sources/")
async def create_info_source(name: str, url: str, db: Session = Depends(get_db)):
"""
测试接口:向数据库中添加一个新闻信息源
"""
# ---> 核心修改 3:直接使用 InfoSource 和 SourceType <---
new_source = InfoSource(
source_name=name,
source_type=SourceType.RSS_FEED, # 默认用 RSS 测试
home_url=url
)
db.add(new_source)
db.commit()
db.refresh(new_source) # 刷新以获取自动生成的 ID
return {"message": "创建成功!", "data": new_source}
@app.get("/sources/")
async def get_all_sources(db: Session = Depends(get_db)):
"""
测试接口:查询数据库中所有的信息源
"""
# ---> 核心修改 4:直接使用 InfoSource <---
sources = db.query(InfoSource).all()
return {"total": len(sources), "data": sources}
return {"message": "Welcome to AI News Aggregator API", "status": "ok"}
+89 -24
View File
@@ -1,3 +1,4 @@
# models.py
from datetime import datetime, timezone, time
from typing import Optional, Any
import enum
@@ -10,32 +11,47 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# ==========================================
# 0. 全局基类枚举定义
# 0. 全局基类枚举定义与动态类型
# ==========================================
class Base(DeclarativeBase):
"""SQLAlchemy 2.0 声明式基类"""
"""
SQLAlchemy 2.0 声明式基类
所有的表模型都必须继承这个基类。
"""
pass
# 让代码在 SQLite 环境下自动降级为 Integer 以保证自增正常工作,
# 而在生产环境部署到 PostgreSQL 或 MySQL 时,依然会使用容量更大的 BigInteger。
BigIntType = BigInteger().with_variant(Integer, "sqlite")
class SourceType(str, enum.Enum):
HOT_TREND = "HOT_TREND"
RSS_FEED = "RSS_FEED"
API = "API"
"""信息源的抓取方式"""
HOT_TREND = "HOT_TREND" # 热搜榜单类
RSS_FEED = "RSS_FEED" # 传统RSS订阅
API = "API" # 接口抓取类
class TargetType(str, enum.Enum):
EVENT = "EVENT"
TREND = "TREND"
ARTICLE = "ARTICLE"
"""
多态目标类型 (Polymorphic Target)
用于标记一条评论或一个标签到底是挂载在哪个实体下的。
"""
EVENT = "EVENT" # 挂载在单个热搜事件下
TREND = "TREND" # 挂载在宏观趋势下
ARTICLE = "ARTICLE" # 挂载在具体新闻文章下
class TaskStatus(str, enum.Enum):
"""后台任务状态"""
SUCCESS = "SUCCESS"
ERROR = "ERROR"
class GenderType(str, enum.Enum):
"""用户性别枚举"""
MALE = "MALE"
FEMALE = "FEMALE"
OTHER = "OTHER"
@@ -43,7 +59,10 @@ class GenderType(str, enum.Enum):
def utcnow():
"""获取带UTC时区的当前时间 (推荐实践)"""
"""
获取带UTC时区的当前时间 (最佳实践)
服务器内部和数据库统一存储 UTC 时间,只在前端展示时转为用户本地时区,避免时区错乱。
"""
return datetime.now(timezone.utc)
@@ -51,6 +70,10 @@ def utcnow():
# 模块一:信息源管理
# ==========================================
class InfoSource(Base):
"""
抓取源配置表
充当爬虫的“任务清单”,后台可以随时开关特定的信息源,而不需要重启代码。
"""
__tablename__ = "info_sources"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
@@ -67,14 +90,19 @@ class InfoSource(Base):
# 模块二:AI 语义聚类中枢 (大事件池)
# ==========================================
class UnifiedEvent(Base):
"""
AI 统一事件表
核心业务逻辑:比如微博热搜叫“苹果发布会”,知乎热搜叫“iPhone 16 测评”,
它们在子表(TrendingEvent)是两条记录,但通过 AI 语义向量对比后,
会将它们统一挂载到这个表的一个 UnifiedEvent ID 下,实现跨平台事件聚合。
"""
__tablename__ = "unified_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
unified_title: Mapped[str] = mapped_column(String(255), comment="AI统一标题")
ai_comprehensive_summary: Mapped[Optional[str]] = mapped_column(Text, comment="AI全局深度总结")
# SQLite 没有原生 Vector 类型,存为用逗号分隔的字符串或JSON,Postgres可换成 PGVector
center_embedding: Mapped[Optional[str]] = mapped_column(Text, comment="中心向量")
center_embedding: Mapped[Optional[str]] = mapped_column(Text, comment="中心向量") # 用于高维空间相似度计算
hot_score: Mapped[int] = mapped_column(Integer, default=0, comment="聚合热度得分")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
@@ -85,12 +113,16 @@ class UnifiedEvent(Base):
# 模块三:内容存储库 (热搜 & 新闻子节点)
# ==========================================
class TrendingEvent(Base):
"""
各平台热搜数据明细表
"""
__tablename__ = "trending_events"
__table_args__ = (
# 联合唯一索引:同一个来源(比如微博)的同一条外部ID(MD5)只能存在一条记录,防重插核心保障
UniqueConstraint("source_id", "external_id", name="idx_unique_external_trend"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id"))
unified_event_id: Mapped[Optional[int]] = mapped_column(ForeignKey("unified_events.id"))
@@ -109,12 +141,13 @@ class TrendingEvent(Base):
class NewsArticle(Base):
"""新闻文章明细表 (与 TrendingEvent 类似,但侧重长文本阅读)"""
__tablename__ = "news_articles"
__table_args__ = (
UniqueConstraint("source_id", "external_id", name="idx_unique_external_article"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id"))
unified_event_id: Mapped[Optional[int]] = mapped_column(ForeignKey("unified_events.id"))
@@ -136,9 +169,13 @@ class NewsArticle(Base):
# 模块四:热度与轨迹追踪
# ==========================================
class HeadlineRevision(Base):
"""
标题修订历史表
用于记录平台方暗戳戳修改热搜词条的行为(例如公关介入改标题)。
"""
__tablename__ = "headline_revisions"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
event_id: Mapped[int] = mapped_column(ForeignKey("trending_events.id"))
previous_headline: Mapped[str] = mapped_column(String(255))
revised_headline: Mapped[str] = mapped_column(String(255))
@@ -147,12 +184,17 @@ class HeadlineRevision(Base):
class RankingLog(Base):
"""
热搜排名时间序列化日志
每一次抓取都会生成一条记录,可以用于前端绘制热搜“排名起伏折线图”。
"""
__tablename__ = "ranking_logs"
__table_args__ = (
# 针对时间序列查询优化的复合索引,加速类似 "查询某事件在过去24小时内的排名变化" 的操作
Index("idx_event_time", "event_id", "observed_at"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
event_id: Mapped[int] = mapped_column(ForeignKey("trending_events.id"))
ranking_position: Mapped[int] = mapped_column(Integer)
observed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
@@ -163,16 +205,22 @@ class RankingLog(Base):
# ==========================================
# 模块五:多态话题与多态评论
# ==========================================
# 【设计模式】:多态设计
# 通过 target_type (存表名/类型) + target_id (存主键ID) 的组合,
# 让这两个表既能挂载在"单一热搜"下,也能挂载在"新闻文章"下,甚至挂在"统一大事件"下,避免了建立无数个外键的冗余。
class ExtractedTopic(Base):
"""AI 提取的核心话题标签表"""
__tablename__ = "extracted_topics"
__table_args__ = (
Index("idx_topic_keyword", "topic_keyword"),
# 多态查询索引,加速 target_type + target_id 的组合查询
Index("idx_polymorphic_topics", "target_type", "target_id"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
target_type: Mapped[TargetType] = mapped_column(Enum(TargetType))
target_id: Mapped[int] = mapped_column(BigInteger)
target_id: Mapped[int] = mapped_column(BigIntType)
topic_keyword: Mapped[str] = mapped_column(String(100))
relevance_score: Mapped[Optional[float]] = mapped_column(Float)
@@ -180,14 +228,15 @@ class ExtractedTopic(Base):
class DiscussionComment(Base):
"""全平台统一评论表"""
__tablename__ = "discussion_comments"
__table_args__ = (
Index("idx_polymorphic_comments", "target_type", "target_id"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
target_type: Mapped[TargetType] = mapped_column(Enum(TargetType))
target_id: Mapped[int] = mapped_column(BigInteger)
target_id: Mapped[int] = mapped_column(BigIntType)
commenter_name: Mapped[Optional[str]] = mapped_column(String(100))
comment_content: Mapped[str] = mapped_column(Text)
@@ -202,6 +251,7 @@ class DiscussionComment(Base):
# 模块六:用户画像与多渠道高可用推送系统
# ==========================================
class AppUser(Base):
"""系统核心用户表"""
__tablename__ = "app_users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
@@ -212,7 +262,7 @@ class AppUser(Base):
avatar_url: Mapped[Optional[str]] = mapped_column(String(500))
gender: Mapped[GenderType] = mapped_column(Enum(GenderType), default=GenderType.UNKNOWN)
# 核心:万能扩展收纳箱 (SQLite 完美支持通过 SQLAlchemy 存储 JSON)
# 预留的 JSON 字段,可以存放未来灵活变化的用户配置,避免频繁修改表结构
metadata_: Mapped[Optional[Any]] = mapped_column("metadata", JSON, comment="自定义扩展偏好")
timezone: Mapped[str] = mapped_column(String(50), default="Asia/Shanghai")
@@ -221,6 +271,10 @@ class AppUser(Base):
class UserPushEndpoint(Base):
"""
多渠道推送端点配置表
一个用户可能绑定了邮箱(EMAIL)和微信(WECHAT),支持配置降级重试(priority_level)。
"""
__tablename__ = "user_push_endpoints"
__table_args__ = (
UniqueConstraint("user_id", "channel_type", name="idx_unique_user_channel"),
@@ -238,6 +292,7 @@ class UserPushEndpoint(Base):
class UserTopicPreference(Base):
"""用户订阅的兴趣标签库"""
__tablename__ = "user_topic_preferences"
__table_args__ = (
UniqueConstraint("user_id", "interested_keyword", name="idx_unique_preference"),
@@ -251,6 +306,7 @@ class UserTopicPreference(Base):
class UserDeliverySchedule(Base):
"""用户勿扰/定时推送时间表"""
__tablename__ = "user_delivery_schedules"
__table_args__ = (
UniqueConstraint("user_id", "delivery_time", name="idx_unique_schedule"),
@@ -265,15 +321,20 @@ class UserDeliverySchedule(Base):
class DeliveryHistory(Base):
"""
推送历史防刷表
核心用途:一旦给某个用户推送过某条新闻/事件,就记录在这里。
下次再触发推荐时,检查这个表,防止给同一个用户反复发送相同的内容。
"""
__tablename__ = "delivery_history"
__table_args__ = (
UniqueConstraint("user_id", "target_type", "target_id", name="idx_prevent_duplicate_push"),
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(ForeignKey("app_users.id"))
target_type: Mapped[TargetType] = mapped_column(Enum(TargetType))
target_id: Mapped[int] = mapped_column(BigInteger)
target_id: Mapped[int] = mapped_column(BigIntType)
status: Mapped[TaskStatus] = mapped_column(Enum(TaskStatus))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
@@ -283,9 +344,13 @@ class DeliveryHistory(Base):
# 模块七:系统任务监控
# ==========================================
class DataSyncTask(Base):
"""
数据同步健康度监控表
这就是爬虫脚本每次运行都要写入记录的地方,用于后台 Dashboard 监控爬虫健康状态和错误堆栈。
"""
__tablename__ = "data_sync_tasks"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigIntType, primary_key=True, autoincrement=True)
source_id: Mapped[int] = mapped_column(ForeignKey("info_sources.id"))
items_fetched: Mapped[int] = mapped_column(Integer, default=0)
task_status: Mapped[TaskStatus] = mapped_column(Enum(TaskStatus))
+69
View File
@@ -0,0 +1,69 @@
# app/schemas/schemas.py
from pydantic import BaseModel, ConfigDict, Field
from typing import Optional
from datetime import datetime
# 枚举
from app.models.models import SourceType
# ==========================================
# InfoSource (信息源) 相关的 Schemas
# ==========================================
class InfoSourceBase(BaseModel):
"""
信息源的基础 Schema,包含各个操作通用的字段。
使用 Field() 可以为 Swagger UI 接口文档提供明确的注释和校验规则。
"""
source_name: str = Field(..., max_length=100, description="信息源名称,例如:微博热搜、知乎热榜", examples=["微博热搜"])
source_type: SourceType = Field(default=SourceType.API, description="抓取方式枚举")
home_url: Optional[str] = Field(default=None, max_length=255, description="平台主页或接口的基础URL", examples=["weibo"])
is_enabled: bool = Field(default=True, description="是否启用该信息源的定时抓取")
class InfoSourceCreate(InfoSourceBase):
"""
创建信息源 (POST) 时接收的请求体。
目前直接继承 Base 即可,如果以后有“只有创建时才能传的字段”,可以写在这里。
"""
pass
class InfoSourceUpdate(BaseModel):
"""
更新信息源 (PATCH/PUT) 时接收的请求体。
更新时,所有的字段都应该是可选的 (Optional),因为前端可能只想改其中一个字段。
"""
source_name: Optional[str] = Field(default=None, max_length=100)
source_type: Optional[SourceType] = None
home_url: Optional[str] = Field(default=None, max_length=255)
is_enabled: Optional[bool] = None
class InfoSourceResponse(InfoSourceBase):
"""
返回给前端的响应体格式 (GET 或 POST/PATCH 成功后的返回)。
包含了数据库自动生成的 ID 和时间戳字段。
"""
id: int = Field(..., description="数据库主键ID")
created_at: datetime = Field(..., description="创建时间")
updated_at: datetime = Field(..., description="最后一次更新时间")
# 核心配置:告诉 Pydantic,允许传入 SQLAlchemy 的 ORM 类实例并自动提取属性
model_config = ConfigDict(from_attributes=True)
# ==========================================
# 分页响应包装器 (泛型设计,方便以后复用)
# ==========================================
from typing import Generic, TypeVar, List
T = TypeVar("T")
class PaginatedResponse(BaseModel, Generic[T]):
"""
通用的列表响应格式,带总数统计,方便前端分页。
"""
total: int = Field(..., description="符合条件的数据总条数")
data: List[T] = Field(..., description="当前页的数据列表")
+167
View File
@@ -0,0 +1,167 @@
# app/services/fetcher_service.py
import os
import hashlib
import httpx
from dotenv import load_dotenv
from app.database import SessionLocal
from app.models.models import (
InfoSource, TrendingEvent, DataSyncTask, TaskStatus,
HeadlineRevision, RankingLog, utcnow
)
# 加载 .env 文件中的环境变量
load_dotenv()
# 从环境变量获取 API 基础地址,提供默认回退地址
API_BASE_URL = os.getenv("API_BASE_URL", "https://newsnow.busiyi.world/api/s")
def generate_md5(text: str) -> str:
"""
生成32位MD5哈希
用途:为不同平台的数据生成统一的、长度固定的外部唯一标识(external_id),
方便建立数据库的唯一索引,防止同一条热搜重复插入。
"""
return hashlib.md5(text.encode('utf-8')).hexdigest()
async def fetch_and_save_trending_data():
"""
核心定时任务:从数据库读取信息源 -> 抓取API -> 解析并存入数据库
业务流程:
1. 查询所有已启用的信息源 (is_enabled == True)
2. 伪装成浏览器发起异步 HTTP 请求
3. 遍历解析数据,进行去重判断 (MD5)
4. 记录标题变更轨迹 (HeadlineRevision) 和 热搜排名轨迹 (RankingLog)
5. 统一提交或在发生异常时回滚脏数据
"""
print(f"[{utcnow()}] 开始执行定时抓取任务...")
# 使用上下文管理器确保数据库连接池正确归还连接
with SessionLocal() as db:
# 1. 动态获取抓取源,这样在后台开关信息源不需要重启服务
sources = db.query(InfoSource).filter(InfoSource.is_enabled == True).all()
if not sources:
print("没有找到启用的信息源,任务结束。")
return
# 2. 伪装 HTTP 请求头,绕过目标服务器的反爬/防盗链机制
custom_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Referer": "https://newsnow.busiyi.world/",
"Origin": "https://newsnow.busiyi.world"
}
# 复用异步客户端,提高并发抓取效率
async with httpx.AsyncClient(timeout=15.0, headers=custom_headers) as client:
for source in sources:
# 平台标识,如 "weibo", "zhihu" 等,这里复用了 home_url 字段存储
platform_id = source.home_url
if not platform_id:
continue
# 拼装最终的抓取 URL
url = f"{API_BASE_URL}?id={platform_id}&latest"
# 初始化本次抓取任务的日志记录
task_log = DataSyncTask(source_id=source.id, items_fetched=0)
try:
# 发起请求并校验 HTTP 状态码
response = await client.get(url)
response.raise_for_status()
data_json = response.json()
items = data_json.get("items", [])
saved_count = 0
for index, item in enumerate(items, 1):
title = item.get("title")
if not title:
continue
item_url = item.get("url", "")
# 3. ID 去重策略:优先用接口自带 ID,没有则用 URL,最差情况用标题兜底
raw_id = item.get("id") or item_url or title
# 组合平台标识和原始 ID 算出全局唯一的 MD5
external_id = generate_md5(f"{platform_id}_{raw_id}")
# 4. 在数据库中查询是否已经存在这条热搜
existing_event = db.query(TrendingEvent).filter(
TrendingEvent.source_id == source.id,
TrendingEvent.external_id == external_id
).first()
event_to_log = None # 留个指针,用来后续记录名次历史
if existing_event:
# -------- 分支 A:热搜已存在,执行更新逻辑 --------
# 监控标题变化(例如:微博热搜经常会改词条名字)
if existing_event.current_headline != title:
# 标题发生改变!立刻记录到修订历史表
revision = HeadlineRevision(
event_id=existing_event.id,
previous_headline=existing_event.current_headline,
revised_headline=title
)
db.add(revision)
existing_event.current_headline = title # 更新为主表最新标题
# 更新当前最新的排名和链接
existing_event.current_ranking = index
existing_event.event_url = item_url
event_to_log = existing_event
else:
# -------- 分支 B:全新热搜,执行插入逻辑 --------
new_event = TrendingEvent(
source_id=source.id,
external_id=external_id,
current_headline=title,
event_url=item_url,
current_ranking=index,
)
db.add(new_event)
# 核心操作!flush 会将数据推入数据库生成自增的 ID,但不提交事务 (commit)。
# 这样接下来的 RankingLog 就能立刻拿到 `new_event.id` 作为外键。
db.flush()
event_to_log = new_event
# -------- 无论新旧,统一记录排名轨迹 --------
# 只要抓取到,不管新旧,必须记一笔当前的排名! <---
# 借助这个表,后续可以画出某条热搜随时间变化的“排名上升/下降曲线”
rank_log = RankingLog(
event_id=event_to_log.id,
ranking_position=index
)
db.add(rank_log)
saved_count += 1
# 如果这一个平台的数据全部处理顺利,标记成功并整体提交
task_log.items_fetched = saved_count
task_log.task_status = TaskStatus.SUCCESS
db.add(task_log)
db.commit() # ✅ 只在 try 顺利走完整个平台的数据时,才统一提交业务数据到硬盘
print(f"[{source.source_name}] 成功抓取并更新了 {saved_count} 条数据")
except Exception as e:
# -------- 异常处理机制 --------
# 独立日志记录
# 第一步:遇到报错(如网络中断、解析错误),立刻回滚。
# 丢弃这批脏数据,防止数据库出现一半更新一半没更新的“不一致状态”。
db.rollback()
# 第二步:记录错误日志并独立提交。
# 因为上面执行了 rollback,之前 add 的 task_log 也被清空了状态,
# 此时重新设置状态,并作为一次新的独立事务提交到数据库,方便后台监控报错。
task_log.task_status = TaskStatus.ERROR
task_log.error_trace = str(e)
db.add(task_log)
db.commit() # 提交错误日志本身
print(f"[{source.source_name}] 抓取失败: {e}")
Binary file not shown.