Files
InsightRadar/backend/app/services/matching_service.py
2026-04-02 13:48:33 +08:00

320 lines
11 KiB
Python

"""
匹配服务:根据用户兴趣关键词(精确 + 语义)推荐事件
打分融合:标签/标题匹配分 + 标签相关度 + 热度 + 新鲜度加成
"""
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import numpy as np
from sqlalchemy.orm import Session
from app.models.models import ExtractedTopic, TargetType, UnifiedEvent, UserTopicPreference, utcnow
from app.services.fetcher_service import embedder_model
# 语义匹配阈值:用户关键词和事件标签/标题向量相似度达到该值才计入语义命中
DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD = 0.78
PREFERENCE_SEMANTIC_THRESHOLD = float(
os.getenv("PREFERENCE_SEMANTIC_THRESHOLD", str(DEFAULT_PREFERENCE_SEMANTIC_THRESHOLD))
)
# 推荐列表最大返回条数
DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT = 50
PREFERENCE_RECOMMEND_MAX_LIMIT = int(
os.getenv("PREFERENCE_RECOMMEND_MAX_LIMIT", str(DEFAULT_PREFERENCE_RECOMMEND_MAX_LIMIT))
)
@dataclass
class MatchedEventResult:
"""用户兴趣匹配后的事件结果。"""
event: UnifiedEvent
match_score: float
exact_hits: list[str]
semantic_hits: list[dict[str, Any]]
tags: list[str]
def _normalize_text(text: str) -> str:
"""统一小写与首尾空白,便于做稳定匹配。"""
return text.strip().casefold()
def _find_exact_preference_match(
target_text: str,
normalized_preferences: list[tuple[str, str]],
) -> str | None:
"""
判断目标文本是否与某个用户兴趣词形成“精确命中”。
命中条件:
1. 标准化后完全相等
2. 二者互为包含关系
返回命中的原始兴趣词,未命中则返回 None。
"""
normalized_target = _normalize_text(target_text)
if not normalized_target:
return None
for raw_pref, normalized_pref in normalized_preferences:
if not normalized_pref:
continue
if normalized_target == normalized_pref:
return raw_pref
if normalized_pref in normalized_target or normalized_target in normalized_pref:
return raw_pref
return None
_EMBEDDING_CACHE: dict[str, np.ndarray] = {}
MAX_CACHE_SIZE = 10000
def _build_keyword_embedding_map(keywords: list[str]) -> dict[str, np.ndarray]:
"""
批量生成或从缓存获取关键词向量,并返回原词到向量的映射。
结合了批量推理(Batching)的极速优势和内存缓存的 O(1) 读取优势。
"""
result: dict[str, np.ndarray] = {}
if not keywords:
return result
uncached_keywords = []
# 1. 尝试从缓存获取
for keyword in keywords:
if not keyword:
continue
if keyword in _EMBEDDING_CACHE:
result[keyword] = _EMBEDDING_CACHE[keyword]
else:
uncached_keywords.append(keyword)
# 2. 对未命中的词进行统一的批量推理
if uncached_keywords:
# 去重,避免同一个未缓存的词被计算多次
unique_uncached = list(dict.fromkeys(uncached_keywords))
vectors = embedder_model.encode(unique_uncached, normalize_embeddings=True, show_progress_bar=False)
# 防止缓存无限增长:超过阈值时清空最早存入的一半(简单粗暴的内存控制)
if len(_EMBEDDING_CACHE) > MAX_CACHE_SIZE:
keys_to_delete = list(_EMBEDDING_CACHE.keys())[: MAX_CACHE_SIZE // 2]
for k in keys_to_delete:
del _EMBEDDING_CACHE[k]
# 3. 将新计算的向量存入缓存并回填结果
for keyword, vec in zip(unique_uncached, vectors):
vec_array = np.asarray(vec, dtype=np.float32)
_EMBEDDING_CACHE[keyword] = vec_array
result[keyword] = vec_array
return result
def _find_best_semantic_match(
target_text: str,
target_vec_map: dict[str, np.ndarray],
pref_vec_map: dict[str, np.ndarray],
) -> tuple[str | None, float]:
"""返回与目标文本最接近的兴趣词及其余弦相似度。"""
target_vec = target_vec_map.get(target_text)
if target_vec is None:
return None, -1.0
best_pref = None
best_sim = -1.0
for pref_keyword, pref_vec in pref_vec_map.items():
sim = float(np.dot(target_vec, pref_vec))
if sim > best_sim:
best_sim = sim
best_pref = pref_keyword
return best_pref, best_sim
def _ensure_aware(dt: datetime) -> datetime:
"""SQLite 读出的 datetime 不带时区信息,统一补上 UTC 后才能和 utcnow() 做减法。"""
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
def _calc_freshness_bonus(event: UnifiedEvent) -> float:
"""根据事件新鲜度给一个小额加分,避免旧热点长期占据推荐位。"""
age_hours = max((utcnow() - _ensure_aware(event.created_at)).total_seconds() / 3600.0, 0.0)
if age_hours <= 6:
return 12.0
if age_hours <= 24:
return 8.0
if age_hours <= 72:
return 4.0
return 0.0
def recommend_events_for_user(
db: Session,
*,
user_id: int,
min_hot: int = 3,
hours: int = 72,
limit: int = 20,
semantic_threshold: float | None = None,
) -> list[MatchedEventResult]:
"""
用户兴趣推荐主流程:
1) 精确匹配:用户词 vs EVENT 标签/标题
2) 语义匹配:用户词向量 vs EVENT 标签/标题向量(超过阈值)
3) 打分融合:匹配分 + 标签相关度 + 热度 + 新鲜度
"""
final_limit = max(1, min(limit, PREFERENCE_RECOMMEND_MAX_LIMIT))
similarity_threshold = (
semantic_threshold
if semantic_threshold is not None
else PREFERENCE_SEMANTIC_THRESHOLD
)
# 1. 读取用户兴趣词
preferences = (
db.query(UserTopicPreference)
.filter(UserTopicPreference.user_id == user_id)
.all()
)
if not preferences:
return []
preference_keywords = [pref.interested_keyword.strip() for pref in preferences if pref.interested_keyword.strip()]
if not preference_keywords:
return []
# 2. 读取候选事件(时间 + 热度过滤,避免全表扫描)
time_limit = utcnow() - timedelta(hours=hours)
events = (
db.query(UnifiedEvent)
.filter(
UnifiedEvent.hot_score >= min_hot,
UnifiedEvent.created_at >= time_limit,
)
.order_by(UnifiedEvent.hot_score.desc(), UnifiedEvent.created_at.desc())
.all()
)
if not events:
return []
event_id_list = [event.id for event in events]
topic_rows = (
db.query(
ExtractedTopic.target_id,
ExtractedTopic.topic_keyword,
ExtractedTopic.relevance_score,
)
.filter(
ExtractedTopic.target_type == TargetType.EVENT,
ExtractedTopic.target_id.in_(event_id_list),
)
.all()
)
# 组织事件标签映射:event_id -> [(tag, relevance_score), ...]
event_topics: dict[int, list[tuple[str, float | None]]] = {}
for event_id, topic_keyword, relevance_score in topic_rows:
if not topic_keyword:
continue
event_topics.setdefault(event_id, []).append((topic_keyword, relevance_score))
# 3. 批量编码用户词与标签词,减少模型调用次数
unique_preference_keywords = list(dict.fromkeys(preference_keywords))
unique_topic_keywords = list(dict.fromkeys([row[1] for row in topic_rows if row[1]]))
pref_vec_map = _build_keyword_embedding_map(unique_preference_keywords)
topic_vec_map = _build_keyword_embedding_map(unique_topic_keywords)
# 预先建立“标准化后用户词集合”,用于精确匹配
normalized_preference_pairs = [
(word, _normalize_text(word))
for word in unique_preference_keywords
if _normalize_text(word)
]
unique_event_titles = list(
dict.fromkeys(
[event.unified_title.strip() for event in events if event.unified_title and event.unified_title.strip()]
)
)
title_vec_map = _build_keyword_embedding_map(unique_event_titles)
scored_results: list[MatchedEventResult] = []
for event in events:
topic_list = event_topics.get(event.id, [])
exact_hits: list[str] = []
semantic_hits: list[dict[str, Any]] = []
score = 0.0
# 对每个事件标签做精确匹配或语义匹配
for topic_keyword, topic_relevance in topic_list:
topic_relevance_score = float(topic_relevance) if topic_relevance is not None else 50.0
# 1) 精确命中(包括完全相等与包含关系)
matched_pref = _find_exact_preference_match(topic_keyword, normalized_preference_pairs)
if matched_pref is not None:
exact_hits.append(topic_keyword)
# 精确命中给较高基础分,标签自身相关度作为增益
score += 45.0 + topic_relevance_score * 0.2
continue
# 2) 语义命中(未精确命中时再算)
best_pref, best_sim = _find_best_semantic_match(topic_keyword, topic_vec_map, pref_vec_map)
if best_pref is not None and best_sim >= similarity_threshold:
semantic_hits.append(
{
"preference_keyword": best_pref,
"topic_keyword": topic_keyword,
"similarity": round(best_sim, 4),
}
)
# 语义命中分略低于精确命中,并由相似度放大
score += best_sim * 35.0 + topic_relevance_score * 0.12
# 标题也参与匹配,但权重低于结构化标签,避免长标题过度主导排序。
event_title = (event.unified_title or "").strip()
if event_title:
title_exact_pref = _find_exact_preference_match(event_title, normalized_preference_pairs)
if title_exact_pref is not None:
exact_hits.append(f"标题:{title_exact_pref}")
score += 30.0
else:
best_pref, best_sim = _find_best_semantic_match(event_title, title_vec_map, pref_vec_map)
if best_pref is not None and best_sim >= similarity_threshold:
semantic_hits.append(
{
"preference_keyword": best_pref,
"topic_keyword": f"标题:{best_pref}",
"similarity": round(best_sim, 4),
}
)
score += best_sim * 24.0
# 如果精确和语义都没命中,直接跳过
if not exact_hits and not semantic_hits:
continue
# 融合事件热度和新鲜度,避免只看语义分
score += min(event.hot_score, 100) * 0.3
score += _calc_freshness_bonus(event)
# 返回标签时做去重,保证接口稳定
tags = list(dict.fromkeys([item[0] for item in topic_list]))
scored_results.append(
MatchedEventResult(
event=event,
match_score=round(score, 2),
exact_hits=list(dict.fromkeys(exact_hits)),
semantic_hits=semantic_hits,
tags=tags,
)
)
scored_results.sort(
key=lambda item: (item.match_score, item.event.hot_score, item.event.created_at),
reverse=True,
)
return scored_results[:final_limit]