# # 预加载资源 # import logging # import time # from collections import defaultdict # import os # import json # import numpy as np # # from app.core.config import DB_CONFIG, RECOMMEND_PATH_PREFIX # # logger = logging.getLogger() # import pymysql # from concurrent.futures import ThreadPoolExecutor # # HEAT_VECTOR_FILE = 'heat_vectors_data/heat_vectors.json' # 可动态加载或配置 # # matrix_data = { # "interaction_matrix": None, # "feature_matrix": None, # "user_index_interaction": None, # "sketch_index_interaction": None, # "user_index_feature": None, # "sketch_index_feature": None, # "iid_to_sketch": None, # "category_to_iids": None, # "cached_scores": {}, # "cached_valid_idxs": {}, # "category_sketch_idxs_inter": None, # "category_sketch_idxs_feature": None, # "user_inter_full": dict(), # "user_feat_full": dict(), # "brand_feature_matrix": None, # "brand_index_map": None, # "heat_data": {}, # } # # # def load_resources(): # """加载所有矩阵和映射关系,并触发预缓存""" # try: # start_time = time.time() # # # 清空缓存 # matrix_data["cached_scores"].clear() # matrix_data["cached_valid_idxs"].clear() # # # 加载数据 # sketch_to_iid = np.load(f'{RECOMMEND_PATH_PREFIX}sketch_to_iid.npy', allow_pickle=True).item() # matrix_data["iid_to_sketch"] = {v: k for k, v in sketch_to_iid.items()} # # matrix_data["interaction_matrix"] = np.load(f"{RECOMMEND_PATH_PREFIX}interaction_matrix.npy", allow_pickle=True) # matrix_data["user_index_interaction"] = np.load(f"{RECOMMEND_PATH_PREFIX}user_index_interaction_matrix.npy", allow_pickle=True).item() # matrix_data["sketch_index_interaction"] = np.load(f"{RECOMMEND_PATH_PREFIX}sketch_index_interaction_matrix.npy", # allow_pickle=True).item() # # matrix_data["feature_matrix"] = np.load(f"{RECOMMEND_PATH_PREFIX}feature_matrix.npy", allow_pickle=True) # # brand_feature_path = f"{RECOMMEND_PATH_PREFIX}brand_feature_matrix.npy" # if os.path.exists(brand_feature_path): # matrix_data["brand_feature_matrix"] = np.load(brand_feature_path, allow_pickle=True) # else: # logger.warning("brand_feature_matrix 文件不存在,使用空数组") # matrix_data["brand_feature_matrix"] = np.array([]) # # # brand_index_map # brand_index_path = f"{RECOMMEND_PATH_PREFIX}brand_index_map.npy" # if os.path.exists(brand_index_path): # matrix_data["brand_index_map"] = np.load(brand_index_path, allow_pickle=True).item() # else: # logger.warning("brand_index_map 文件不存在,使用空字典") # matrix_data["brand_index_map"] = {} # # matrix_data["user_index_feature"] = np.load(f"{RECOMMEND_PATH_PREFIX}user_index_feature_matrix.npy", allow_pickle=True).item() # # matrix_data["sketch_index_feature"] = np.load(f"{RECOMMEND_PATH_PREFIX}sketch_index_feature_matrix.npy", allow_pickle=True).item() # # category_to_iid_map = np.load(f"{RECOMMEND_PATH_PREFIX}iid_to_category_interaction_matrix.npy", allow_pickle=True).item() # matrix_data["category_to_iids"] = defaultdict(list) # for iid, cat in category_to_iid_map.items(): # matrix_data["category_to_iids"][cat].append(iid) # # logger.info(f"资源加载完成,耗时: {time.time() - start_time:.2f}秒") # # # 触发预缓存 # precache_user_category() # # if os.path.exists(HEAT_VECTOR_FILE): # with open(HEAT_VECTOR_FILE, 'r', encoding='utf-8') as f: # heat_json = json.load(f) # matrix_data["heat_data"] = heat_json.get("data", {}) # logger.info(f"热度向量数据加载完成,共加载 {len(matrix_data['heat_data'])} 个类别") # else: # matrix_data["heat_data"] = {} # # except Exception as e: # logger.error(f"资源加载失败: {str(e)}") # raise RuntimeError("初始化失败") # # # def precache_user_category(): # """优化后的用户分类预缓存(添加耗时统计)""" # if not all([ # matrix_data["interaction_matrix"] is not None, # matrix_data["feature_matrix"] is not None, # matrix_data["user_index_interaction"] is not None # ]): # logger.warning("资源未加载完成,跳过预缓存") # return # # start_time = time.perf_counter() # time_stats = { # "get_all_user_categories": 0, # "process_user_category": 0, # "thread_execution": 0, # "cache_update": 0, # "total": 0, # } # # # 统计用户类别获取时间 # t1 = time.perf_counter() # user_categories = get_all_user_categories() # time_stats["get_all_user_categories"] = time.perf_counter() - t1 # # precached_count = 0 # # def process_user_category(user_id, categories): # """单用户类别缓存计算(统计耗时)""" # local_cache = {} # local_valid_idxs = {} # t_start = time.perf_counter() # # for category in categories: # cache_key = (user_id, category) # if cache_key in matrix_data["cached_scores"]: # continue # # try: # user_idx_inter = matrix_data["user_index_interaction"].get(user_id) # user_idx_feature = matrix_data["user_index_feature"].get(user_id) # # # 统计获取类别 IID 耗时 # t_iid = time.perf_counter() # category_iids = matrix_data["category_to_iids"].get(category, []) # valid_sketch_idxs_inter = [matrix_data["sketch_index_interaction"][iid] # for iid in category_iids if iid in matrix_data["sketch_index_interaction"]] # valid_sketch_idxs_feature = [matrix_data["sketch_index_feature"][iid] # for iid in category_iids if iid in matrix_data["sketch_index_feature"]] # time_stats["process_user_category"] += time.perf_counter() - t_iid # # # 统计矩阵计算耗时 # t_matrix = time.perf_counter() # processed_inter = np.zeros(len(valid_sketch_idxs_inter)) # if user_idx_inter is not None and valid_sketch_idxs_inter: # raw_inter_scores = matrix_data["interaction_matrix"][user_idx_inter, valid_sketch_idxs_inter] # processed_inter = raw_inter_scores * 0.7 # # processed_feat = np.zeros(len(valid_sketch_idxs_feature)) # if user_idx_feature is not None and valid_sketch_idxs_feature: # raw_feat_scores = matrix_data["feature_matrix"][user_idx_feature, valid_sketch_idxs_feature] # raw_feat_scores = (raw_feat_scores - np.min(raw_feat_scores)) / ( # np.max(raw_feat_scores) - np.min(raw_feat_scores) + 1e-8) # processed_feat = raw_feat_scores * 0.3 # time_stats["process_user_category"] += time.perf_counter() - t_matrix # # if len(processed_inter) == len(processed_feat): # local_cache[cache_key] = (processed_inter, processed_feat) # local_valid_idxs[cache_key] = valid_sketch_idxs_inter # # except Exception as e: # logger.error(f"预缓存失败 (user={user_id}, category={category}): {str(e)}") # # return local_cache, local_valid_idxs # # # 统计线程执行时间 # t2 = time.perf_counter() # with ThreadPoolExecutor(max_workers=8) as executor: # futures = {executor.submit(process_user_category, user_id, categories): user_id for user_id, categories in user_categories.items()} # for future in futures: # try: # t_cache = time.perf_counter() # cache_part, valid_idxs_part = future.result() # matrix_data["cached_scores"].update(cache_part) # matrix_data["cached_valid_idxs"].update(valid_idxs_part) # time_stats["cache_update"] += time.perf_counter() - t_cache # precached_count += len(cache_part) # except Exception as e: # logger.error(f"线程执行错误: {str(e)}") # time_stats["thread_execution"] = time.perf_counter() - t2 # # time_stats["total"] = time.perf_counter() - start_time # # # 输出统计信息 # logger.info(f""" # 预缓存完成,共缓存 {precached_count} 组数据,耗时统计如下: # - 获取用户类别数据: {time_stats["get_all_user_categories"]:.2f}s # - 计算用户类别缓存: {time_stats["process_user_category"]:.2f}s # - 线程任务执行: {time_stats["thread_execution"]:.2f}s # - 更新缓存数据: {time_stats["cache_update"]:.2f}s # - 总耗时: {time_stats["total"]:.2f}s # """) # # # def get_all_user_categories(): # """获取所有用户及其对应的分类""" # conn = None # try: # conn = pymysql.connect(**DB_CONFIG) # cursor = conn.cursor() # # query = """ # SELECT DISTINCT account_id, path # FROM user_preference_log_prediction # """ # cursor.execute(query) # results = cursor.fetchall() # # user_categories = defaultdict(set) # for account_id, path in results: # category = get_category_from_path(path) # user_categories[account_id].add(category) # # return dict(user_categories) # # except Exception as e: # logger.error(f"数据库查询失败: {str(e)}") # return {} # finally: # if conn: # conn.close() # # # def get_category_from_path(path: str) -> str: # """从路径解析类别""" # try: # parts = path.split('/') # if len(parts) >= 4: # return f"{parts[2]}_{parts[3]}" # return "unknown" # except: # return "unknown"