diff --git a/app/service/recommend/service.py b/app/service/recommend/service.py index bbdc6c3..1ff9336 100644 --- a/app/service/recommend/service.py +++ b/app/service/recommend/service.py @@ -9,6 +9,7 @@ from app.core.config import DB_CONFIG, RECOMMEND_PATH_PREFIX logger = logging.getLogger() import pymysql +from concurrent.futures import ThreadPoolExecutor matrix_data = { "interaction_matrix": None, @@ -66,7 +67,7 @@ def load_resources(): def precache_user_category(): - """预缓存用户-分类组合数据""" + """优化后的用户分类预缓存(添加耗时统计)""" if not all([ matrix_data["interaction_matrix"] is not None, matrix_data["feature_matrix"] is not None, @@ -75,61 +76,97 @@ def precache_user_category(): logger.warning("资源未加载完成,跳过预缓存") return - start_time = time.time() + start_time = time.perf_counter() + time_stats = { + "get_all_user_categories": 0, + "process_user_category": 0, + "thread_execution": 0, + "cache_update": 0, + "total": 0, + } + + # 统计用户类别获取时间 + t1 = time.perf_counter() user_categories = get_all_user_categories() + time_stats["get_all_user_categories"] = time.perf_counter() - t1 precached_count = 0 - for user_id, categories in user_categories.items(): + + def process_user_category(user_id, categories): + """单用户类别缓存计算(统计耗时)""" + local_cache = {} + local_valid_idxs = {} + t_start = time.perf_counter() + for category in categories: cache_key = (user_id, category) if cache_key in matrix_data["cached_scores"]: continue try: - # 获取用户索引 user_idx_inter = matrix_data["user_index_interaction"].get(user_id) user_idx_feature = matrix_data["user_index_feature"].get(user_id) - # 获取类别对应的iid列表 + # 统计获取类别 IID 耗时 + t_iid = time.perf_counter() category_iids = matrix_data["category_to_iids"].get(category, []) + valid_sketch_idxs_inter = [matrix_data["sketch_index_interaction"][iid] + for iid in category_iids if iid in matrix_data["sketch_index_interaction"]] + valid_sketch_idxs_feature = [matrix_data["sketch_index_feature"][iid] + for iid in category_iids if iid in matrix_data["sketch_index_feature"]] + time_stats["process_user_category"] += time.perf_counter() - t_iid - # 过滤有效草图索引 - valid_sketch_idxs_inter = [ - idx for iid, idx in matrix_data["sketch_index_interaction"].items() - if iid in category_iids - ] - - # 处理交互分数 + # 统计矩阵计算耗时 + t_matrix = time.perf_counter() + processed_inter = np.zeros(len(valid_sketch_idxs_inter)) if user_idx_inter is not None and valid_sketch_idxs_inter: raw_inter_scores = matrix_data["interaction_matrix"][user_idx_inter, valid_sketch_idxs_inter] processed_inter = raw_inter_scores * 0.7 - else: - processed_inter = np.array([]) - - # 处理特征分数 - valid_sketch_idxs_feature = [ - idx for iid, idx in matrix_data["sketch_index_feature"].items() - if iid in category_iids - ] + processed_feat = np.zeros(len(valid_sketch_idxs_feature)) if user_idx_feature is not None and valid_sketch_idxs_feature: raw_feat_scores = matrix_data["feature_matrix"][user_idx_feature, valid_sketch_idxs_feature] raw_feat_scores = (raw_feat_scores - np.min(raw_feat_scores)) / ( np.max(raw_feat_scores) - np.min(raw_feat_scores) + 1e-8) processed_feat = raw_feat_scores * 0.3 - else: - processed_feat = np.array([]) + time_stats["process_user_category"] += time.perf_counter() - t_matrix - # 缓存结果 if len(processed_inter) == len(processed_feat): - matrix_data["cached_scores"][cache_key] = (processed_inter, processed_feat) - matrix_data["cached_valid_idxs"][cache_key] = valid_sketch_idxs_inter - precached_count += 1 + local_cache[cache_key] = (processed_inter, processed_feat) + local_valid_idxs[cache_key] = valid_sketch_idxs_inter except Exception as e: logger.error(f"预缓存失败 (user={user_id}, category={category}): {str(e)}") - logger.info(f"预缓存完成,共缓存 {precached_count} 个组合,耗时: {time.time() - start_time:.2f}秒") + return local_cache, local_valid_idxs + + # 统计线程执行时间 + t2 = time.perf_counter() + with ThreadPoolExecutor(max_workers=8) as executor: + futures = {executor.submit(process_user_category, user_id, categories): user_id for user_id, categories in user_categories.items()} + for future in futures: + try: + t_cache = time.perf_counter() + cache_part, valid_idxs_part = future.result() + matrix_data["cached_scores"].update(cache_part) + matrix_data["cached_valid_idxs"].update(valid_idxs_part) + time_stats["cache_update"] += time.perf_counter() - t_cache + precached_count += len(cache_part) + except Exception as e: + logger.error(f"线程执行错误: {str(e)}") + time_stats["thread_execution"] = time.perf_counter() - t2 + + time_stats["total"] = time.perf_counter() - start_time + + # 输出统计信息 + logger.info(f""" + 预缓存完成,共缓存 {precached_count} 组数据,耗时统计如下: + - 获取用户类别数据: {time_stats["get_all_user_categories"]:.2f}s + - 计算用户类别缓存: {time_stats["process_user_category"]:.2f}s + - 线程任务执行: {time_stats["thread_execution"]:.2f}s + - 更新缓存数据: {time_stats["cache_update"]:.2f}s + - 总耗时: {time_stats["total"]:.2f}s + """) def get_all_user_categories():