Files
AiDA_Python/app/service/recommendation_system/recommendation_api.py
2025-12-30 17:18:12 +08:00

215 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
推荐接口实现
实现探索/利用分支、向量检索、Softmax抽样等功能
"""
import logging
import math
import random
import numpy as np
from typing import List, Dict, Optional
from app.service.recommendation_system.config import RECOMMENDATION_CONFIG, REDIS_KEY_USER_PREF_PREFIX
from app.service.recommendation_system.milvus_client import search_similar_vectors, query_random_candidates
from app.service.recommendation_system.precompute import compute_user_preference_vector
from app.service.recommendation_system.vector_utils import normalize_vector
from app.service.utils.redis_utils import Redis
import json
logger = logging.getLogger(__name__)
def get_user_preference_vector(user_id: int, category: str) -> Optional[np.ndarray]:
"""
获取用户偏好向量
Args:
user_id: 用户ID
category: 类别
Returns:
用户偏好向量2048维失败返回 None
"""
# 1. 从 Redis 获取
key = f"{REDIS_KEY_USER_PREF_PREFIX}:{user_id}:{category}"
vector_json = Redis.read(key)
if vector_json:
try:
vector_list = json.loads(vector_json)
return np.array(vector_list, dtype=np.float32)
except Exception as e:
logger.warning(f"解析 Redis 向量失败 [user={user_id}, category={category}]: {e}")
# 2. 如果不存在,实时计算
logger.info(f"Redis 中不存在用户偏好向量,实时计算 [user={user_id}, category={category}]")
preference_vector = compute_user_preference_vector(user_id, category)
if preference_vector is not None:
# 写入 Redis
vector_json = json.dumps(preference_vector.tolist())
Redis.write(
key=key,
value=vector_json,
expire=RECOMMENDATION_CONFIG["redis_expire_seconds"]
)
return preference_vector
def explore_branch(category: str, style: Optional[str] = None) -> List[str]:
"""
探索分支(随机推荐)
Args:
category: 类别
style: 风格(可选)
Returns:
推荐结果列表,每个元素包含 path, style, category 等字段
"""
# 查询候选(随机池)
pool_size = 10 # 固定查询10个然后随机选择
candidates = query_random_candidates(category, style, limit=pool_size)
if not candidates:
logger.warning(f"探索分支:类别 {category} 没有候选数据")
return []
# 随机选择
if len(candidates) > 1:
import random
candidates = random.sample(candidates, 1)
# 格式化返回结果
return [candidate.get("path", "") for candidate in candidates[:1]]
def exploit_branch(
user_id: int,
category: str,
style: Optional[str] = None
) -> List[str]:
"""
利用分支(基于向量相似度推荐)
Args:
user_id: 用户ID
category: 类别
num_recommendations: 返回数量
style: 风格(可选,用于加分)
Returns:
推荐结果列表,每个元素包含 path, style, category, similarity, sample_score 等字段
"""
# 1. 获取用户偏好向量
embedding = get_user_preference_vector(user_id, category)
if embedding is None:
logger.warning(f"利用分支:无法获取用户偏好向量,回退到探索分支 [user={user_id}, category={category}]")
return explore_branch(category, style)
# 2. Milvus 相似度检索(内积 IP
topk = RECOMMENDATION_CONFIG["topk"]
results = search_similar_vectors(embedding, category, topk)
if not results:
logger.warning(f"利用分支:向量检索无结果,回退到探索分支 [user={user_id}, category={category}]")
return explore_branch(category, style)
# 3. Style 加分(可选,需传入 style 参数)
style_bonus = RECOMMENDATION_CONFIG["style_bonus"]
if style:
for result in results:
similarity = result["score"]
if result.get("style") == style:
# 加分:相似度 * (1 + style_bonus)
similarity = similarity * (1 + style_bonus)
result["final_score"] = similarity
else:
for result in results:
result["final_score"] = result["score"]
# 4. Softmax 抽样
scores = [r["final_score"] for r in results]
probabilities = softmax_with_temperature(scores, RECOMMENDATION_CONFIG["softmax_temperature"])
# 根据概率抽样
if not results:
return []
selected_index = np.random.choice(len(results), size=1, p=probabilities, replace=False)
selected_results = [results[int(selected_index[0])]]
# 5. 返回结果
return [result.get("path", "") for result in selected_results]
def softmax_with_temperature(scores: List[float], temperature: float = 1.0) -> List[float]:
"""
Softmax 函数(带温度参数)
Args:
scores: 分数列表
temperature: 温度参数
Returns:
概率列表
"""
if not scores:
return []
# 除以温度
scaled_scores = [s / temperature for s in scores]
# 减去最大值(数值稳定性)
max_score = max(scaled_scores)
exp_scores = [math.exp(s - max_score) for s in scaled_scores]
# 归一化
sum_exp = sum(exp_scores)
if sum_exp == 0:
# 如果所有分数都是负无穷或非常小,返回均匀分布
return [1.0 / len(scores)] * len(scores)
probabilities = [exp_s / sum_exp for exp_s in exp_scores]
return probabilities
def get_recommendations(
user_id: int,
category: str,
style: Optional[str] = None
) -> List[str]:
"""
获取推荐结果(主函数)
Args:
user_id: 用户ID
category: 类别(如 female_skirt
num_recommendations: 返回推荐数量(默认 1
style: 风格(可选):若传入,则在利用分支对同 style 的候选进行加分
Returns:
推荐结果列表,每个元素包含 path 等字段
"""
try:
# 1. 读取配置参数
explore_ratio = RECOMMENDATION_CONFIG["explore_ratio"]
# 2. 探索/利用决策
r = random.random() # 生成随机数 (0-1)
if r < explore_ratio:
logger.debug(f"探索分支 [user={user_id}, category={category}]")
return explore_branch(category, style)
logger.debug(f"利用分支 [user={user_id}, category={category}]")
return exploit_branch(user_id, category, style)
except Exception as e:
logger.error(f"获取推荐结果失败 [user={user_id}, category={category}]: {e}", exc_info=True)
# 容错:回退到探索分支
return explore_branch(category, style)