Files
2026-04-20 15:53:02 +08:00

299 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import numpy as np
from sqlalchemy.orm import Session
from app.models.models import ExtractedTopic, TargetType, UnifiedEvent, UserTopicPreference, utcnow
from app.services.fetcher_service import embedder_model
# AI辅助生成:deepseek-v3-22026年3月20日
# 语义匹配阈值:用户关键词和事件标签/标题向量相似度达到该值才计入语义命中
DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD = 0.78
PREFERENCE_SEMANTIC_THRESHOLD = float(
os.getenv("PREFERENCE_SEMANTIC_THRESHOLD", str(DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD))
)
# 推荐列表最大返回条数
DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT = 50
PREFERENCE_RECOMMEND_MAX_LIMIT = int(
os.getenv("PREFERENCE_RECOMMEND_MAX_LIMIT", str(DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT))
)
@dataclass
class MatchedEventResult:
"""用户兴趣匹配后的事件结果。"""
event: UnifiedEvent
match_score: float
exact_hits: list[str]
semantic_hits: list[dict[str, Any]]
tags: list[str]
# AI生成结束
def _normalize_text(text: str) -> str:
"""统一小写与首尾空白,便于做稳定匹配。"""
return text.strip().casefold()
def _find_exact_preference_match(
target_text: str,
normalized_preferences: list[tuple[str, str]],
) -> str | None:
"""
判断目标文本是否与某个用户兴趣词形成“精确命中”。
命中条件:
1. 标准化后完全相等
2. 二者互为包含关系
返回命中的原始兴趣词,未命中则返回 None。
"""
normalized_target = _normalize_text(target_text)
if not normalized_target:
return None
for raw_pref, normalized_pref in normalized_preferences:
if not normalized_pref:
continue
if normalized_target == normalized_pref:
return raw_pref
if normalized_pref in normalized_target or normalized_target in normalized_pref:
return raw_pref
return None
_EMBEDDING_CACHE: dict[str, np.ndarray] = {}
MAX_CACHE_SIZE = 10000
def _build_keyword_embedding_map(keywords: list[str]) -> dict[str, np.ndarray]:
"""
批量生成或从缓存获取关键词向量,并返回原词到向量的映射。
结合了批量推理(Batching)的极速优势和内存缓存的 O(1) 读取优势。
"""
result: dict[str, np.ndarray] = {}
if not keywords:
return result
uncached_keywords = []
for keyword in keywords:
if not keyword:
continue
if keyword in _EMBEDDING_CACHE:
result[keyword] = _EMBEDDING_CACHE[keyword]
else:
uncached_keywords.append(keyword)
if uncached_keywords:
unique_uncached = list(dict.fromkeys(uncached_keywords))
vectors = embedder_model.encode(unique_uncached, normalize_embeddings=True, show_progress_bar=False)
# 防止缓存无限增长:超过阈值时清空最早存入的一半(简单粗暴的内存控制)
if len(_EMBEDDING_CACHE) > MAX_CACHE_SIZE:
keys_to_delete = list(_EMBEDDING_CACHE.keys())[: MAX_CACHE_SIZE // 2]
for k in keys_to_delete:
del _EMBEDDING_CACHE[k]
for keyword, vec in zip(unique_uncached, vectors):
vec_array = np.asarray(vec, dtype=np.float32)
_EMBEDDING_CACHE[keyword] = vec_array
result[keyword] = vec_array
return result
def _find_best_semantic_match(
target_text: str,
target_vec_map: dict[str, np.ndarray],
pref_vec_map: dict[str, np.ndarray],
) -> tuple[str | None, float]:
"""返回与目标文本最接近的兴趣词及其余弦相似度。"""
target_vec = target_vec_map.get(target_text)
if target_vec is None:
return None, -1.0
best_pref = None
best_sim = -1.0
for pref_keyword, pref_vec in pref_vec_map.items():
sim = float(np.dot(target_vec, pref_vec))
if sim > best_sim:
best_sim = sim
best_pref = pref_keyword
return best_pref, best_sim
def _ensure_aware(dt: datetime) -> datetime:
"""SQLite 读出的 datetime 不带时区信息,统一补上 UTC 后才能和 utcnow() 做减法。"""
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
def _calc_freshness_bonus(event: UnifiedEvent) -> float:
"""根据事件新鲜度给一个小额加分,避免旧热点长期占据推荐位。"""
age_hours = max((utcnow() - _ensure_aware(event.created_at)).total_seconds() / 3600.0, 0.0)
if age_hours <= 6:
return 12.0
if age_hours <= 24:
return 8.0
if age_hours <= 72:
return 4.0
return 0.0
def recommend_events_for_user(
db: Session,
*,
user_id: int,
min_hot: int = 3,
hours: int = 72,
limit: int = 20,
semantic_threshold: float | None = None,
) -> list[MatchedEventResult]:
"""
用户兴趣推荐主流程:
1) 精确匹配:用户词 vs EVENT 标签/标题
2) 语义匹配:用户词向量 vs EVENT 标签/标题向量(超过阈值)
3) 打分融合:匹配分 + 标签相关度 + 热度 + 新鲜度
"""
final_limit = max(1, min(limit, PREFERENCE_RECOMMEND_MAX_LIMIT))
similarity_threshold = (
semantic_threshold
if semantic_threshold is not None
else PREFERENCE_SEMANTIC_THRESHOLD
)
preferences = (
db.query(UserTopicPreference)
.filter(UserTopicPreference.user_id == user_id)
.all()
)
if not preferences:
return []
preference_keywords = [pref.interested_keyword.strip() for pref in preferences if pref.interested_keyword.strip()]
if not preference_keywords:
return []
time_limit = utcnow() - timedelta(hours=hours)
events = (
db.query(UnifiedEvent)
.filter(
UnifiedEvent.hot_score >= min_hot,
UnifiedEvent.created_at >= time_limit,
)
.order_by(UnifiedEvent.hot_score.desc(), UnifiedEvent.created_at.desc())
.all()
)
if not events:
return []
event_id_list = [event.id for event in events]
topic_rows = (
db.query(
ExtractedTopic.target_id,
ExtractedTopic.topic_keyword,
ExtractedTopic.relevance_score,
)
.filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id.in_(event_id_list),
)
.all()
)
event_topics: dict[int, list[tuple[str, float | None]]] = {}
for event_id, topic_keyword, relevance_score in topic_rows:
if not topic_keyword:
continue
event_topics.setdefault(event_id, []).append((topic_keyword, relevance_score))
unique_preference_keywords = list(dict.fromkeys(preference_keywords))
unique_topic_keywords = list(dict.fromkeys([row[1] for row in topic_rows if row[1]]))
pref_vec_map = _build_keyword_embedding_map(unique_preference_keywords)
topic_vec_map = _build_keyword_embedding_map(unique_topic_keywords)
normalized_preference_pairs = [
(word, _normalize_text(word))
for word in unique_preference_keywords
if _normalize_text(word)
]
unique_event_titles = list(
dict.fromkeys(
[event.unified_title.strip() for event in events if event.unified_title and event.unified_title.strip()]
)
)
title_vec_map = _build_keyword_embedding_map(unique_event_titles)
scored_results: list[MatchedEventResult] = []
for event in events:
topic_list = event_topics.get(event.id, [])
exact_hits: list[str] = []
semantic_hits: list[dict[str, Any]] = []
score = 0.0
for topic_keyword, topic_relevance in topic_list:
topic_relevance_score = float(topic_relevance) if topic_relevance is not None else 50.0
matched_pref = _find_exact_preference_match(topic_keyword, normalized_preference_pairs)
if matched_pref is not None:
exact_hits.append(topic_keyword)
score += 45.0 + topic_relevance_score * 0.2
continue
best_pref, best_sim = _find_best_semantic_match(topic_keyword, topic_vec_map, pref_vec_map)
if best_pref is not None and best_sim >= similarity_threshold:
semantic_hits.append(
{
"preference_keyword": best_pref,
"topic_keyword": topic_keyword,
"similarity": round(best_sim, 4),
}
)
score += best_sim * 35.0 + topic_relevance_score * 0.12
event_title = (event.unified_title or "").strip()
if event_title:
title_exact_pref = _find_exact_preference_match(event_title, normalized_preference_pairs)
if title_exact_pref is not None:
exact_hits.append(f"标题:{title_exact_pref}")
score += 30.0
else:
best_pref, best_sim = _find_best_semantic_match(event_title, title_vec_map, pref_vec_map)
if best_pref is not None and best_sim >= similarity_threshold:
semantic_hits.append(
{
"preference_keyword": best_pref,
"topic_keyword": f"标题:{best_pref}",
"similarity": round(best_sim, 4),
}
)
score += best_sim * 24.0
if not exact_hits and not semantic_hits:
continue
score += min(event.hot_score, 100) * 0.3
score += _calc_freshness_bonus(event)
tags = list(dict.fromkeys([item[0] for item in topic_list]))
scored_results.append(
MatchedEventResult(
event=event,
match_score=round(score, 2),
exact_hits=list(dict.fromkeys(exact_hits)),
semantic_hits=semantic_hits,
tags=tags,
)
)
scored_results.sort(
key=lambda item: (item.match_score, item.event.hot_score, item.event.created_at),
reverse=True,
)
return scored_results[:final_limit]