mirror of
https://github.com/stardrophere/InsightRadar.git
synced 2026-06-06 00:57:51 +08:00
320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""
|
|
匹配服务:根据用户兴趣关键词(精确 + 语义)推荐事件
|
|
打分融合:标签/标题匹配分 + 标签相关度 + 热度 + 新鲜度加成
|
|
"""
|
|
import os
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.models import ExtractedTopic, TargetType, UnifiedEvent, UserTopicPreference, utcnow
|
|
from app.services.fetcher_service import embedder_model
|
|
|
|
|
|
# 语义匹配阈值:用户关键词和事件标签/标题向量相似度达到该值才计入语义命中
|
|
DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD = 0.78
|
|
PREFERENCE_SEMANTIC_THRESHOLD = float(
|
|
os.getenv("PREFERENCE_SEMANTIC_THRESHOLD", str(DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD))
|
|
)
|
|
# 推荐列表最大返回条数
|
|
DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT = 50
|
|
PREFERENCE_RECOMMEND_MAX_LIMIT = int(
|
|
os.getenv("PREFERENCE_RECOMMEND_MAX_LIMIT", str(DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT))
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class MatchedEventResult:
|
|
"""用户兴趣匹配后的事件结果。"""
|
|
event: UnifiedEvent
|
|
match_score: float
|
|
exact_hits: list[str]
|
|
semantic_hits: list[dict[str, Any]]
|
|
tags: list[str]
|
|
|
|
|
|
def _normalize_text(text: str) -> str:
|
|
"""统一小写与首尾空白,便于做稳定匹配。"""
|
|
return text.strip().casefold()
|
|
|
|
|
|
def _find_exact_preference_match(
|
|
target_text: str,
|
|
normalized_preferences: list[tuple[str, str]],
|
|
) -> str | None:
|
|
"""
|
|
判断目标文本是否与某个用户兴趣词形成“精确命中”。
|
|
命中条件:
|
|
1. 标准化后完全相等
|
|
2. 二者互为包含关系
|
|
返回命中的原始兴趣词,未命中则返回 None。
|
|
"""
|
|
normalized_target = _normalize_text(target_text)
|
|
if not normalized_target:
|
|
return None
|
|
|
|
for raw_pref, normalized_pref in normalized_preferences:
|
|
if not normalized_pref:
|
|
continue
|
|
if normalized_target == normalized_pref:
|
|
return raw_pref
|
|
if normalized_pref in normalized_target or normalized_target in normalized_pref:
|
|
return raw_pref
|
|
return None
|
|
|
|
|
|
_EMBEDDING_CACHE: dict[str, np.ndarray] = {}
|
|
MAX_CACHE_SIZE = 10000
|
|
|
|
def _build_keyword_embedding_map(keywords: list[str]) -> dict[str, np.ndarray]:
|
|
"""
|
|
批量生成或从缓存获取关键词向量,并返回原词到向量的映射。
|
|
结合了批量推理(Batching)的极速优势和内存缓存的 O(1) 读取优势。
|
|
"""
|
|
result: dict[str, np.ndarray] = {}
|
|
if not keywords:
|
|
return result
|
|
|
|
uncached_keywords = []
|
|
|
|
# 1. 尝试从缓存获取
|
|
for keyword in keywords:
|
|
if not keyword:
|
|
continue
|
|
if keyword in _EMBEDDING_CACHE:
|
|
result[keyword] = _EMBEDDING_CACHE[keyword]
|
|
else:
|
|
uncached_keywords.append(keyword)
|
|
|
|
# 2. 对未命中的词进行统一的批量推理
|
|
if uncached_keywords:
|
|
# 去重,避免同一个未缓存的词被计算多次
|
|
unique_uncached = list(dict.fromkeys(uncached_keywords))
|
|
|
|
vectors = embedder_model.encode(unique_uncached, normalize_embeddings=True, show_progress_bar=False)
|
|
|
|
# 防止缓存无限增长:超过阈值时清空最早存入的一半(简单粗暴的内存控制)
|
|
if len(_EMBEDDING_CACHE) > MAX_CACHE_SIZE:
|
|
keys_to_delete = list(_EMBEDDING_CACHE.keys())[: MAX_CACHE_SIZE // 2]
|
|
for k in keys_to_delete:
|
|
del _EMBEDDING_CACHE[k]
|
|
|
|
# 3. 将新计算的向量存入缓存并回填结果
|
|
for keyword, vec in zip(unique_uncached, vectors):
|
|
vec_array = np.asarray(vec, dtype=np.float32)
|
|
_EMBEDDING_CACHE[keyword] = vec_array
|
|
result[keyword] = vec_array
|
|
|
|
return result
|
|
|
|
|
|
def _find_best_semantic_match(
|
|
target_text: str,
|
|
target_vec_map: dict[str, np.ndarray],
|
|
pref_vec_map: dict[str, np.ndarray],
|
|
) -> tuple[str | None, float]:
|
|
"""返回与目标文本最接近的兴趣词及其余弦相似度。"""
|
|
target_vec = target_vec_map.get(target_text)
|
|
if target_vec is None:
|
|
return None, -1.0
|
|
|
|
best_pref = None
|
|
best_sim = -1.0
|
|
for pref_keyword, pref_vec in pref_vec_map.items():
|
|
sim = float(np.dot(target_vec, pref_vec))
|
|
if sim > best_sim:
|
|
best_sim = sim
|
|
best_pref = pref_keyword
|
|
return best_pref, best_sim
|
|
|
|
|
|
def _ensure_aware(dt: datetime) -> datetime:
|
|
"""SQLite 读出的 datetime 不带时区信息,统一补上 UTC 后才能和 utcnow() 做减法。"""
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
|
|
|
|
def _calc_freshness_bonus(event: UnifiedEvent) -> float:
|
|
"""根据事件新鲜度给一个小额加分,避免旧热点长期占据推荐位。"""
|
|
age_hours = max((utcnow() - _ensure_aware(event.created_at)).total_seconds() / 3600.0, 0.0)
|
|
if age_hours <= 6:
|
|
return 12.0
|
|
if age_hours <= 24:
|
|
return 8.0
|
|
if age_hours <= 72:
|
|
return 4.0
|
|
return 0.0
|
|
|
|
|
|
def recommend_events_for_user(
|
|
db: Session,
|
|
*,
|
|
user_id: int,
|
|
min_hot: int = 3,
|
|
hours: int = 72,
|
|
limit: int = 20,
|
|
semantic_threshold: float | None = None,
|
|
) -> list[MatchedEventResult]:
|
|
"""
|
|
用户兴趣推荐主流程:
|
|
1) 精确匹配:用户词 vs EVENT 标签/标题
|
|
2) 语义匹配:用户词向量 vs EVENT 标签/标题向量(超过阈值)
|
|
3) 打分融合:匹配分 + 标签相关度 + 热度 + 新鲜度
|
|
"""
|
|
final_limit = max(1, min(limit, PREFERENCE_RECOMMEND_MAX_LIMIT))
|
|
similarity_threshold = (
|
|
semantic_threshold
|
|
if semantic_threshold is not None
|
|
else PREFERENCE_SEMANTIC_THRESHOLD
|
|
)
|
|
|
|
# 1. 读取用户兴趣词
|
|
preferences = (
|
|
db.query(UserTopicPreference)
|
|
.filter(UserTopicPreference.user_id == user_id)
|
|
.all()
|
|
)
|
|
if not preferences:
|
|
return []
|
|
|
|
preference_keywords = [pref.interested_keyword.strip() for pref in preferences if pref.interested_keyword.strip()]
|
|
if not preference_keywords:
|
|
return []
|
|
|
|
# 2. 读取候选事件(时间 + 热度过滤,避免全表扫描)
|
|
time_limit = utcnow() - timedelta(hours=hours)
|
|
events = (
|
|
db.query(UnifiedEvent)
|
|
.filter(
|
|
UnifiedEvent.hot_score >= min_hot,
|
|
UnifiedEvent.created_at >= time_limit,
|
|
)
|
|
.order_by(UnifiedEvent.hot_score.desc(), UnifiedEvent.created_at.desc())
|
|
.all()
|
|
)
|
|
if not events:
|
|
return []
|
|
|
|
event_id_list = [event.id for event in events]
|
|
topic_rows = (
|
|
db.query(
|
|
ExtractedTopic.target_id,
|
|
ExtractedTopic.topic_keyword,
|
|
ExtractedTopic.relevance_score,
|
|
)
|
|
.filter(
|
|
ExtractedTopic.target_type == TargetType.EVENT,
|
|
ExtractedTopic.target_id.in_(event_id_list),
|
|
)
|
|
.all()
|
|
)
|
|
|
|
# 组织事件标签映射:event_id -> [(tag, relevance_score), ...]
|
|
event_topics: dict[int, list[tuple[str, float | None]]] = {}
|
|
for event_id, topic_keyword, relevance_score in topic_rows:
|
|
if not topic_keyword:
|
|
continue
|
|
event_topics.setdefault(event_id, []).append((topic_keyword, relevance_score))
|
|
|
|
# 3. 批量编码用户词与标签词,减少模型调用次数
|
|
unique_preference_keywords = list(dict.fromkeys(preference_keywords))
|
|
unique_topic_keywords = list(dict.fromkeys([row[1] for row in topic_rows if row[1]]))
|
|
pref_vec_map = _build_keyword_embedding_map(unique_preference_keywords)
|
|
topic_vec_map = _build_keyword_embedding_map(unique_topic_keywords)
|
|
|
|
# 预先建立“标准化后用户词集合”,用于精确匹配
|
|
normalized_preference_pairs = [
|
|
(word, _normalize_text(word))
|
|
for word in unique_preference_keywords
|
|
if _normalize_text(word)
|
|
]
|
|
unique_event_titles = list(
|
|
dict.fromkeys(
|
|
[event.unified_title.strip() for event in events if event.unified_title and event.unified_title.strip()]
|
|
)
|
|
)
|
|
title_vec_map = _build_keyword_embedding_map(unique_event_titles)
|
|
|
|
scored_results: list[MatchedEventResult] = []
|
|
for event in events:
|
|
topic_list = event_topics.get(event.id, [])
|
|
|
|
exact_hits: list[str] = []
|
|
semantic_hits: list[dict[str, Any]] = []
|
|
score = 0.0
|
|
|
|
# 对每个事件标签做精确匹配或语义匹配
|
|
for topic_keyword, topic_relevance in topic_list:
|
|
topic_relevance_score = float(topic_relevance) if topic_relevance is not None else 50.0
|
|
|
|
# 1) 精确命中(包括完全相等与包含关系)
|
|
matched_pref = _find_exact_preference_match(topic_keyword, normalized_preference_pairs)
|
|
if matched_pref is not None:
|
|
exact_hits.append(topic_keyword)
|
|
# 精确命中给较高基础分,标签自身相关度作为增益
|
|
score += 45.0 + topic_relevance_score * 0.2
|
|
continue
|
|
|
|
# 2) 语义命中(未精确命中时再算)
|
|
best_pref, best_sim = _find_best_semantic_match(topic_keyword, topic_vec_map, pref_vec_map)
|
|
|
|
if best_pref is not None and best_sim >= similarity_threshold:
|
|
semantic_hits.append(
|
|
{
|
|
"preference_keyword": best_pref,
|
|
"topic_keyword": topic_keyword,
|
|
"similarity": round(best_sim, 4),
|
|
}
|
|
)
|
|
# 语义命中分略低于精确命中,并由相似度放大
|
|
score += best_sim * 35.0 + topic_relevance_score * 0.12
|
|
|
|
# 标题也参与匹配,但权重低于结构化标签,避免长标题过度主导排序。
|
|
event_title = (event.unified_title or "").strip()
|
|
if event_title:
|
|
title_exact_pref = _find_exact_preference_match(event_title, normalized_preference_pairs)
|
|
if title_exact_pref is not None:
|
|
exact_hits.append(f"标题:{title_exact_pref}")
|
|
score += 30.0
|
|
else:
|
|
best_pref, best_sim = _find_best_semantic_match(event_title, title_vec_map, pref_vec_map)
|
|
if best_pref is not None and best_sim >= similarity_threshold:
|
|
semantic_hits.append(
|
|
{
|
|
"preference_keyword": best_pref,
|
|
"topic_keyword": f"标题:{best_pref}",
|
|
"similarity": round(best_sim, 4),
|
|
}
|
|
)
|
|
score += best_sim * 24.0
|
|
|
|
# 如果精确和语义都没命中,直接跳过
|
|
if not exact_hits and not semantic_hits:
|
|
continue
|
|
|
|
# 融合事件热度和新鲜度,避免只看语义分
|
|
score += min(event.hot_score, 100) * 0.3
|
|
score += _calc_freshness_bonus(event)
|
|
|
|
# 返回标签时做去重,保证接口稳定
|
|
tags = list(dict.fromkeys([item[0] for item in topic_list]))
|
|
scored_results.append(
|
|
MatchedEventResult(
|
|
event=event,
|
|
match_score=round(score, 2),
|
|
exact_hits=list(dict.fromkeys(exact_hits)),
|
|
semantic_hits=semantic_hits,
|
|
tags=tags,
|
|
)
|
|
)
|
|
|
|
scored_results.sort(
|
|
key=lambda item: (item.match_score, item.event.hot_score, item.event.created_at),
|
|
reverse=True,
|
|
)
|
|
return scored_results[:final_limit]
|