Files
2026-04-20 15:53:02 +08:00

190 lines
5.9 KiB
Python

import time
from typing import Any, Dict, List, Tuple
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.api.dependencies import get_current_user, get_db
from app.models.models import AppUser, UserTopicPreference
from app.schemas.preference_schema import (
MatchedEventResponse,
UserPreferenceRecommendationResponse,
UserTopicPreferenceCreate,
UserTopicPreferenceResponse,
)
from app.services.matching_service import recommend_events_for_user
router = APIRouter()
_RECOMMEND_CACHE: Dict[str, Tuple[float, Any]] = {}
CACHE_TTL_SECONDS = 60
def _invalidate_user_cache(user_id: int):
"""清除某个用户的推荐结果缓存(当用户新增或删除关键词时调用)"""
keys_to_delete = [k for k in _RECOMMEND_CACHE.keys() if k.startswith(f"{user_id}:")]
for k in keys_to_delete:
_RECOMMEND_CACHE.pop(k, None)
def _ensure_self_access(path_user_id: int, current_user: AppUser) -> None:
"""校验路径 user_id 是否为当前登录用户本人。"""
if path_user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only operate your own resources",
)
@router.get(
"/users/{user_id}/preferences",
response_model=List[UserTopicPreferenceResponse],
)
def list_user_preferences(
user_id: int,
db: Session = Depends(get_db),
current_user: AppUser = Depends(get_current_user),
):
"""获取用户已设置的兴趣关键词。"""
_ensure_self_access(user_id, current_user)
preferences = (
db.query(UserTopicPreference)
.filter(UserTopicPreference.user_id == user_id)
.order_by(UserTopicPreference.created_at.desc())
.all()
)
return preferences
@router.post(
"/users/{user_id}/preferences",
response_model=UserTopicPreferenceResponse,
status_code=status.HTTP_201_CREATED,
)
def create_user_preference(
user_id: int,
payload: UserTopicPreferenceCreate,
db: Session = Depends(get_db),
current_user: AppUser = Depends(get_current_user),
):
"""新增一个用户兴趣关键词。"""
_ensure_self_access(user_id, current_user)
keyword = payload.interested_keyword.strip()
if not keyword:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Keyword cannot be empty")
db_obj = UserTopicPreference(
user_id=user_id,
interested_keyword=keyword,
)
db.add(db_obj)
try:
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Preference keyword already exists for this user",
)
db.refresh(db_obj)
_invalidate_user_cache(user_id)
return db_obj
@router.delete(
"/users/{user_id}/preferences/{preference_id}",
status_code=status.HTTP_204_NO_CONTENT,
)
def delete_user_preference(
user_id: int,
preference_id: int,
db: Session = Depends(get_db),
current_user: AppUser = Depends(get_current_user),
):
"""删除一个用户兴趣关键词。"""
_ensure_self_access(user_id, current_user)
preference = (
db.query(UserTopicPreference)
.filter(
UserTopicPreference.id == preference_id,
UserTopicPreference.user_id == user_id,
)
.first()
)
if not preference:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Preference not found")
db.delete(preference)
db.commit()
_invalidate_user_cache(user_id)
return None
@router.get(
"/users/{user_id}/recommended-events",
response_model=UserPreferenceRecommendationResponse,
)
def recommend_events(
user_id: int,
min_hot: int = Query(3, ge=1, description="最小热度阈值"),
hours: int = Query(72, ge=1, le=24 * 30, description="仅匹配最近多少小时的事件"),
limit: int = Query(20, ge=1, le=50, description="最多返回多少条推荐"),
semantic_threshold: float = Query(0.78, ge=0.0, le=1.0, description="语义匹配相似度阈值"),
sort_by: str = Query("match_score", description="排序方式: match_score | created_at"),
db: Session = Depends(get_db),
current_user: AppUser = Depends(get_current_user),
):
"""基于用户兴趣词推荐事件(精确匹配 + 语义匹配)。"""
_ensure_self_access(user_id, current_user)
cache_key = f"{user_id}:{min_hot}:{hours}:{limit}:{semantic_threshold}:{sort_by}"
current_time = time.time()
if cache_key in _RECOMMEND_CACHE:
expire_time, cached_data = _RECOMMEND_CACHE[cache_key]
if current_time < expire_time:
return cached_data
matched = recommend_events_for_user(
db,
user_id=user_id,
min_hot=min_hot,
hours=hours,
limit=limit,
semantic_threshold=semantic_threshold,
)
if sort_by == "created_at":
matched.sort(key=lambda x: x.event.created_at, reverse=True)
result_data: list[MatchedEventResponse] = []
for item in matched:
result_data.append(
MatchedEventResponse(
event_id=item.event.id,
unified_title=item.event.unified_title,
summary=item.event.ai_comprehensive_summary,
hot_score=item.event.hot_score,
created_at=item.event.created_at,
tags=item.tags,
match_score=item.match_score,
exact_hits=item.exact_hits,
semantic_hits=item.semantic_hits,
)
)
response = UserPreferenceRecommendationResponse(
user_id=user_id,
total=len(result_data),
data=result_data,
)
# 写入缓存,超过 2000 条时清空防止内存膨胀
if len(_RECOMMEND_CACHE) > 2000:
_RECOMMEND_CACHE.clear()
_RECOMMEND_CACHE[cache_key] = (current_time + CACHE_TTL_SECONDS, response)
return response