Files
2026-04-20 15:53:02 +08:00

62 lines
1.8 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.api.dependencies import get_db
from app.models.models import DataSyncTask, InfoSource, TaskStatus, utcnow
router = APIRouter()
class SystemStatsResponse(BaseModel):
"""系统运行状态汇总"""
active_sources: int
total_sources: int
items_today: int
success_tasks_today: int
error_tasks_today: int
last_sync_at: Optional[datetime] = None
@router.get("/system/stats", response_model=SystemStatsResponse)
def get_system_stats(db: Session = Depends(get_db)):
"""获取爬虫集群的当日运行状态。"""
today_start = utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
total_sources = db.query(func.count(InfoSource.id)).scalar() or 0
active_sources = (
db.query(func.count(InfoSource.id))
.filter(InfoSource.is_enabled.is_(True))
.scalar() or 0
)
today_tasks = (
db.query(DataSyncTask)
.filter(DataSyncTask.created_at >= today_start)
.all()
)
items_today = sum(t.items_fetched for t in today_tasks)
success_count = sum(1 for t in today_tasks if t.task_status == TaskStatus.SUCCESS)
error_count = sum(1 for t in today_tasks if t.task_status == TaskStatus.ERROR)
last_task = (
db.query(DataSyncTask)
.filter(DataSyncTask.task_status == TaskStatus.SUCCESS)
.order_by(DataSyncTask.created_at.desc())
.first()
)
return SystemStatsResponse(
active_sources=active_sources,
total_sources=total_sources,
items_today=items_today,
success_tasks_today=success_count,
error_tasks_today=error_count,
last_sync_at=last_task.created_at if last_task else None,
)