mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 00:57:51 +08:00
55 lines
1.6 KiB
Python
55 lines
1.6 KiB
Python
# app/crud/crud_source.py
|
||
"""
|
||
信息源 CRUD:对 InfoSource 的增删改查,供 API 与爬虫使用
|
||
"""
|
||
from sqlite3 import IntegrityError
|
||
|
||
from sqlalchemy.orm import Session
|
||
from typing import List, Optional
|
||
|
||
from app.models.models import InfoSource
|
||
from app.schemas.source_schema import InfoSourceCreate, InfoSourceUpdate
|
||
|
||
|
||
def get(db: Session, source_id: int) -> Optional[InfoSource]:
|
||
"""通过 ID 获取单条信息源"""
|
||
return db.query(InfoSource).filter(InfoSource.id == source_id).first()
|
||
|
||
|
||
def get_multi(db: Session, skip: int = 0, limit: int = 100) -> List[InfoSource]:
|
||
"""获取所有信息源列表(支持分页)"""
|
||
return db.query(InfoSource).offset(skip).limit(limit).all()
|
||
|
||
|
||
def create(db: Session, obj_in: InfoSourceCreate) -> InfoSource:
|
||
"""创建新的信息源"""
|
||
db_obj = InfoSource(**obj_in.model_dump())
|
||
exits =db.query(InfoSource).filter(InfoSource.source_name == db_obj.source_name).first()
|
||
if exits:
|
||
db.close()
|
||
return db_obj
|
||
try:
|
||
db.add(db_obj)
|
||
db.commit()
|
||
db.refresh(db_obj)
|
||
return db_obj
|
||
except IntegrityError:
|
||
db.rollback()
|
||
finally:
|
||
db.close()
|
||
return db_obj
|
||
|
||
|
||
def update(db: Session, db_obj: InfoSource, obj_in: InfoSourceUpdate) -> InfoSource:
|
||
"""更新信息源,仅更新前端传入的字段(exclude_unset=True)"""
|
||
update_data = obj_in.model_dump(exclude_unset=True)
|
||
|
||
# 遍历更新模型对象的属性
|
||
for field, value in update_data.items():
|
||
setattr(db_obj, field, value)
|
||
|
||
db.add(db_obj)
|
||
db.commit()
|
||
db.refresh(db_obj)
|
||
return db_obj
|