Files
InsightRadar/backend/app/api/endpoints/stats.py
T
2026-03-13 23:48:49 +08:00

66 lines
2.1 KiB
Python

# 系统状态监控 API:返回爬虫集群运行概况(信息源数、今日抓取量、最近同步时间等)
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.api.dependencies import get_db
from app.models.models import DataSyncTask, InfoSource, TaskStatus, utcnow
router = APIRouter()
class SystemStatsResponse(BaseModel):
"""系统运行状态汇总"""
active_sources: int
total_sources: int
items_today: int
success_tasks_today: int
error_tasks_today: int
last_sync_at: Optional[datetime] = None
@router.get("/system/stats", response_model=SystemStatsResponse)
def get_system_stats(db: Session = Depends(get_db)):
"""获取爬虫集群的当日运行状态。"""
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
# 信息源统计:总数与启用数
total_sources = db.query(func.count(InfoSource.id)).scalar() or 0
active_sources = (
db.query(func.count(InfoSource.id))
.filter(InfoSource.is_enabled.is_(True))
.scalar() or 0
)
# 今日任务统计:抓取条数、成功/失败任务数
today_tasks = (
db.query(DataSyncTask)
.filter(DataSyncTask.created_at >= today_start)
.all()
)
items_today = sum(t.items_fetched for t in today_tasks)
success_count = sum(1 for t in today_tasks if t.task_status == TaskStatus.SUCCESS)
error_count = sum(1 for t in today_tasks if t.task_status == TaskStatus.ERROR)
# 最后一次同步时间
last_task = (
db.query(DataSyncTask)
.filter(DataSyncTask.task_status == TaskStatus.SUCCESS)
.order_by(DataSyncTask.created_at.desc())
.first()
)
return SystemStatsResponse(
active_sources=active_sources,
total_sources=total_sources,
items_today=items_today,
success_tasks_today=success_count,
error_tasks_today=error_count,
last_sync_at=last_task.created_at if last_task else None,
)