import io import logging import sys import time from typing import List import numpy as np from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from fastapi import HTTPException, APIRouter from app.service.recommend.service import load_resources, matrix_data sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') logger = logging.getLogger() router = APIRouter() @router.on_event("startup") async def startup_event(): # 初始加载 load_resources() # 配置定时任务 scheduler = BackgroundScheduler() scheduler.add_job( load_resources, trigger=CronTrigger(hour=0, minute=30), name="每日资源刷新" ) scheduler.start() logger.info("定时任务已启动") @router.get("/recommend/{user_id}/{category}/{num_recommendations}", response_model=List[str]) async def get_recommendations(user_id: int, category: str, num_recommendations: int = 10): """ :param user_id: 4 :param category: female_skirt :param num_recommendations: 1 :return: [ "aida-sys-image/images/female/skirt/903000017.jpg" ] """ try: start_time = time.time() cache_key = (user_id, category) # 检查缓存 if cache_key in matrix_data["cached_scores"]: processed_inter, processed_feat = matrix_data["cached_scores"][cache_key] valid_sketch_idxs_inter = matrix_data["cached_valid_idxs"][cache_key] else: # 实时计算逻辑(同原代码) user_idx_inter = matrix_data["user_index_interaction"].get(user_id) user_idx_feature = matrix_data["user_index_feature"].get(user_id) category_iids = matrix_data["category_to_iids"].get(category, []) valid_sketch_idxs_inter = [ idx for iid, idx in matrix_data["sketch_index_interaction"].items() if iid in category_iids ] # 处理交互分数 raw_inter_scores = [] if user_idx_inter is not None and valid_sketch_idxs_inter: raw_inter_scores = matrix_data["interaction_matrix"][user_idx_inter, valid_sketch_idxs_inter] processed_inter = raw_inter_scores * 0.7 # 处理特征分数 valid_sketch_idxs_feature = [ idx for iid, idx in matrix_data["sketch_index_feature"].items() if iid in category_iids ] raw_feat_scores = [] if user_idx_feature is not None and valid_sketch_idxs_feature: raw_feat_scores = matrix_data["feature_matrix"][user_idx_feature, valid_sketch_idxs_feature] raw_feat_scores = (raw_feat_scores - np.min(raw_feat_scores)) / ( np.max(raw_feat_scores) - np.min(raw_feat_scores) + 1e-8) processed_feat = raw_feat_scores * 0.3 else: processed_feat = np.array([]) # 更新缓存 matrix_data["cached_scores"][cache_key] = (processed_inter, processed_feat) matrix_data["cached_valid_idxs"][cache_key] = valid_sketch_idxs_inter # 合并分数 final_scores = processed_inter + processed_feat valid_sketch_idxs = matrix_data["cached_valid_idxs"][cache_key] # 概率采样 scores = np.array(final_scores) # 调整后的概率转换(带温度控制的softmax) def calibrated_softmax(scores, temperature=1.0): scores = scores / temperature scale = scores - max(scores) exps = np.exp(scale) return exps / np.sum(exps) probs = calibrated_softmax(scores, 0.07) chosen_indices = np.random.choice( len(valid_sketch_idxs), size=min(num_recommendations, len(valid_sketch_idxs)), p=probs, replace=False ) recommendations = [matrix_data["iid_to_sketch"][valid_sketch_idxs[idx]] for idx in chosen_indices] logger.info(f"推荐生成完成,耗时: {time.time() - start_time:.2f}秒") return recommendations except Exception as e: logger.error(f"推荐失败: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e))