215 lines
6.7 KiB
Python
215 lines
6.7 KiB
Python
|
|
"""
|
|||
|
|
推荐接口实现
|
|||
|
|
实现探索/利用分支、向量检索、Softmax抽样等功能
|
|||
|
|
"""
|
|||
|
|
import logging
|
|||
|
|
import math
|
|||
|
|
import random
|
|||
|
|
import numpy as np
|
|||
|
|
from typing import List, Dict, Optional
|
|||
|
|
|
|||
|
|
from app.service.recommendation_system.config import RECOMMENDATION_CONFIG, REDIS_KEY_USER_PREF_PREFIX
|
|||
|
|
from app.service.recommendation_system.milvus_client import search_similar_vectors, query_random_candidates
|
|||
|
|
from app.service.recommendation_system.precompute import compute_user_preference_vector
|
|||
|
|
from app.service.recommendation_system.vector_utils import normalize_vector
|
|||
|
|
from app.service.utils.redis_utils import Redis
|
|||
|
|
import json
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_user_preference_vector(user_id: int, category: str) -> Optional[np.ndarray]:
|
|||
|
|
"""
|
|||
|
|
获取用户偏好向量
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
user_id: 用户ID
|
|||
|
|
category: 类别
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
用户偏好向量(2048维),失败返回 None
|
|||
|
|
"""
|
|||
|
|
# 1. 从 Redis 获取
|
|||
|
|
key = f"{REDIS_KEY_USER_PREF_PREFIX}:{user_id}:{category}"
|
|||
|
|
vector_json = Redis.read(key)
|
|||
|
|
|
|||
|
|
if vector_json:
|
|||
|
|
try:
|
|||
|
|
vector_list = json.loads(vector_json)
|
|||
|
|
return np.array(vector_list, dtype=np.float32)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"解析 Redis 向量失败 [user={user_id}, category={category}]: {e}")
|
|||
|
|
|
|||
|
|
# 2. 如果不存在,实时计算
|
|||
|
|
logger.info(f"Redis 中不存在用户偏好向量,实时计算 [user={user_id}, category={category}]")
|
|||
|
|
preference_vector = compute_user_preference_vector(user_id, category)
|
|||
|
|
|
|||
|
|
if preference_vector is not None:
|
|||
|
|
# 写入 Redis
|
|||
|
|
vector_json = json.dumps(preference_vector.tolist())
|
|||
|
|
Redis.write(
|
|||
|
|
key=key,
|
|||
|
|
value=vector_json,
|
|||
|
|
expire=RECOMMENDATION_CONFIG["redis_expire_seconds"]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return preference_vector
|
|||
|
|
|
|||
|
|
|
|||
|
|
def explore_branch(category: str, style: Optional[str] = None) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
探索分支(随机推荐)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
category: 类别
|
|||
|
|
style: 风格(可选)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
推荐结果列表,每个元素包含 path, style, category 等字段
|
|||
|
|
"""
|
|||
|
|
# 查询候选(随机池)
|
|||
|
|
pool_size = 10 # 固定查询10个,然后随机选择
|
|||
|
|
|
|||
|
|
candidates = query_random_candidates(category, style, limit=pool_size)
|
|||
|
|
|
|||
|
|
if not candidates:
|
|||
|
|
logger.warning(f"探索分支:类别 {category} 没有候选数据")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
# 随机选择
|
|||
|
|
if len(candidates) > 1:
|
|||
|
|
import random
|
|||
|
|
candidates = random.sample(candidates, 1)
|
|||
|
|
|
|||
|
|
# 格式化返回结果
|
|||
|
|
return [candidate.get("path", "") for candidate in candidates[:1]]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def exploit_branch(
|
|||
|
|
user_id: int,
|
|||
|
|
category: str,
|
|||
|
|
style: Optional[str] = None
|
|||
|
|
) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
利用分支(基于向量相似度推荐)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
user_id: 用户ID
|
|||
|
|
category: 类别
|
|||
|
|
num_recommendations: 返回数量
|
|||
|
|
style: 风格(可选,用于加分)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
推荐结果列表,每个元素包含 path, style, category, similarity, sample_score 等字段
|
|||
|
|
"""
|
|||
|
|
# 1. 获取用户偏好向量
|
|||
|
|
embedding = get_user_preference_vector(user_id, category)
|
|||
|
|
|
|||
|
|
if embedding is None:
|
|||
|
|
logger.warning(f"利用分支:无法获取用户偏好向量,回退到探索分支 [user={user_id}, category={category}]")
|
|||
|
|
return explore_branch(category, style)
|
|||
|
|
|
|||
|
|
# 2. Milvus 相似度检索(内积 IP)
|
|||
|
|
topk = RECOMMENDATION_CONFIG["topk"]
|
|||
|
|
results = search_similar_vectors(embedding, category, topk)
|
|||
|
|
|
|||
|
|
if not results:
|
|||
|
|
logger.warning(f"利用分支:向量检索无结果,回退到探索分支 [user={user_id}, category={category}]")
|
|||
|
|
return explore_branch(category, style)
|
|||
|
|
|
|||
|
|
# 3. Style 加分(可选,需传入 style 参数)
|
|||
|
|
style_bonus = RECOMMENDATION_CONFIG["style_bonus"]
|
|||
|
|
if style:
|
|||
|
|
for result in results:
|
|||
|
|
similarity = result["score"]
|
|||
|
|
if result.get("style") == style:
|
|||
|
|
# 加分:相似度 * (1 + style_bonus)
|
|||
|
|
similarity = similarity * (1 + style_bonus)
|
|||
|
|
result["final_score"] = similarity
|
|||
|
|
else:
|
|||
|
|
for result in results:
|
|||
|
|
result["final_score"] = result["score"]
|
|||
|
|
|
|||
|
|
# 4. Softmax 抽样
|
|||
|
|
scores = [r["final_score"] for r in results]
|
|||
|
|
probabilities = softmax_with_temperature(scores, RECOMMENDATION_CONFIG["softmax_temperature"])
|
|||
|
|
|
|||
|
|
# 根据概率抽样
|
|||
|
|
if not results:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
selected_index = np.random.choice(len(results), size=1, p=probabilities, replace=False)
|
|||
|
|
selected_results = [results[int(selected_index[0])]]
|
|||
|
|
|
|||
|
|
# 5. 返回结果
|
|||
|
|
return [result.get("path", "") for result in selected_results]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def softmax_with_temperature(scores: List[float], temperature: float = 1.0) -> List[float]:
|
|||
|
|
"""
|
|||
|
|
Softmax 函数(带温度参数)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
scores: 分数列表
|
|||
|
|
temperature: 温度参数
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
概率列表
|
|||
|
|
"""
|
|||
|
|
if not scores:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
# 除以温度
|
|||
|
|
scaled_scores = [s / temperature for s in scores]
|
|||
|
|
|
|||
|
|
# 减去最大值(数值稳定性)
|
|||
|
|
max_score = max(scaled_scores)
|
|||
|
|
exp_scores = [math.exp(s - max_score) for s in scaled_scores]
|
|||
|
|
|
|||
|
|
# 归一化
|
|||
|
|
sum_exp = sum(exp_scores)
|
|||
|
|
if sum_exp == 0:
|
|||
|
|
# 如果所有分数都是负无穷或非常小,返回均匀分布
|
|||
|
|
return [1.0 / len(scores)] * len(scores)
|
|||
|
|
|
|||
|
|
probabilities = [exp_s / sum_exp for exp_s in exp_scores]
|
|||
|
|
return probabilities
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_recommendations(
|
|||
|
|
user_id: int,
|
|||
|
|
category: str,
|
|||
|
|
style: Optional[str] = None
|
|||
|
|
) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
获取推荐结果(主函数)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
user_id: 用户ID
|
|||
|
|
category: 类别(如 female_skirt)
|
|||
|
|
num_recommendations: 返回推荐数量(默认 1)
|
|||
|
|
style: 风格(可选):若传入,则在利用分支对同 style 的候选进行加分
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
推荐结果列表,每个元素包含 path 等字段
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 1. 读取配置参数
|
|||
|
|
explore_ratio = RECOMMENDATION_CONFIG["explore_ratio"]
|
|||
|
|
|
|||
|
|
# 2. 探索/利用决策
|
|||
|
|
r = random.random() # 生成随机数 (0-1)
|
|||
|
|
|
|||
|
|
if r < explore_ratio:
|
|||
|
|
logger.debug(f"探索分支 [user={user_id}, category={category}]")
|
|||
|
|
return explore_branch(category, style)
|
|||
|
|
|
|||
|
|
logger.debug(f"利用分支 [user={user_id}, category={category}]")
|
|||
|
|
return exploit_branch(user_id, category, style)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取推荐结果失败 [user={user_id}, category={category}]: {e}", exc_info=True)
|
|||
|
|
# 容错:回退到探索分支
|
|||
|
|
return explore_branch(category, style)
|
|||
|
|
|