TASK:系统sketch推荐接口预缓存速度优化;

This commit is contained in:
shahaibo
2025-03-19 15:23:21 +08:00
parent 4defe25e6b
commit 6f48626005

View File

@@ -9,6 +9,7 @@ from app.core.config import DB_CONFIG, RECOMMEND_PATH_PREFIX
logger = logging.getLogger()
import pymysql
from concurrent.futures import ThreadPoolExecutor
matrix_data = {
"interaction_matrix": None,
@@ -66,7 +67,7 @@ def load_resources():
def precache_user_category():
"""预缓存用户-分类组合数据"""
"""优化后的用户分类预缓存(添加耗时统计)"""
if not all([
matrix_data["interaction_matrix"] is not None,
matrix_data["feature_matrix"] is not None,
@@ -75,61 +76,97 @@ def precache_user_category():
logger.warning("资源未加载完成,跳过预缓存")
return
start_time = time.time()
start_time = time.perf_counter()
time_stats = {
"get_all_user_categories": 0,
"process_user_category": 0,
"thread_execution": 0,
"cache_update": 0,
"total": 0,
}
# 统计用户类别获取时间
t1 = time.perf_counter()
user_categories = get_all_user_categories()
time_stats["get_all_user_categories"] = time.perf_counter() - t1
precached_count = 0
for user_id, categories in user_categories.items():
def process_user_category(user_id, categories):
"""单用户类别缓存计算(统计耗时)"""
local_cache = {}
local_valid_idxs = {}
t_start = time.perf_counter()
for category in categories:
cache_key = (user_id, category)
if cache_key in matrix_data["cached_scores"]:
continue
try:
# 获取用户索引
user_idx_inter = matrix_data["user_index_interaction"].get(user_id)
user_idx_feature = matrix_data["user_index_feature"].get(user_id)
# 获取类别对应的iid列表
# 统计获取类别 IID 耗时
t_iid = time.perf_counter()
category_iids = matrix_data["category_to_iids"].get(category, [])
valid_sketch_idxs_inter = [matrix_data["sketch_index_interaction"][iid]
for iid in category_iids if iid in matrix_data["sketch_index_interaction"]]
valid_sketch_idxs_feature = [matrix_data["sketch_index_feature"][iid]
for iid in category_iids if iid in matrix_data["sketch_index_feature"]]
time_stats["process_user_category"] += time.perf_counter() - t_iid
# 过滤有效草图索引
valid_sketch_idxs_inter = [
idx for iid, idx in matrix_data["sketch_index_interaction"].items()
if iid in category_iids
]
# 处理交互分数
# 统计矩阵计算耗时
t_matrix = time.perf_counter()
processed_inter = np.zeros(len(valid_sketch_idxs_inter))
if user_idx_inter is not None and valid_sketch_idxs_inter:
raw_inter_scores = matrix_data["interaction_matrix"][user_idx_inter, valid_sketch_idxs_inter]
processed_inter = raw_inter_scores * 0.7
else:
processed_inter = np.array([])
# 处理特征分数
valid_sketch_idxs_feature = [
idx for iid, idx in matrix_data["sketch_index_feature"].items()
if iid in category_iids
]
processed_feat = np.zeros(len(valid_sketch_idxs_feature))
if user_idx_feature is not None and valid_sketch_idxs_feature:
raw_feat_scores = matrix_data["feature_matrix"][user_idx_feature, valid_sketch_idxs_feature]
raw_feat_scores = (raw_feat_scores - np.min(raw_feat_scores)) / (
np.max(raw_feat_scores) - np.min(raw_feat_scores) + 1e-8)
processed_feat = raw_feat_scores * 0.3
else:
processed_feat = np.array([])
time_stats["process_user_category"] += time.perf_counter() - t_matrix
# 缓存结果
if len(processed_inter) == len(processed_feat):
matrix_data["cached_scores"][cache_key] = (processed_inter, processed_feat)
matrix_data["cached_valid_idxs"][cache_key] = valid_sketch_idxs_inter
precached_count += 1
local_cache[cache_key] = (processed_inter, processed_feat)
local_valid_idxs[cache_key] = valid_sketch_idxs_inter
except Exception as e:
logger.error(f"预缓存失败 (user={user_id}, category={category}): {str(e)}")
logger.info(f"预缓存完成,共缓存 {precached_count} 个组合,耗时: {time.time() - start_time:.2f}")
return local_cache, local_valid_idxs
# 统计线程执行时间
t2 = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_user_category, user_id, categories): user_id for user_id, categories in user_categories.items()}
for future in futures:
try:
t_cache = time.perf_counter()
cache_part, valid_idxs_part = future.result()
matrix_data["cached_scores"].update(cache_part)
matrix_data["cached_valid_idxs"].update(valid_idxs_part)
time_stats["cache_update"] += time.perf_counter() - t_cache
precached_count += len(cache_part)
except Exception as e:
logger.error(f"线程执行错误: {str(e)}")
time_stats["thread_execution"] = time.perf_counter() - t2
time_stats["total"] = time.perf_counter() - start_time
# 输出统计信息
logger.info(f"""
预缓存完成,共缓存 {precached_count} 组数据,耗时统计如下:
- 获取用户类别数据: {time_stats["get_all_user_categories"]:.2f}s
- 计算用户类别缓存: {time_stats["process_user_category"]:.2f}s
- 线程任务执行: {time_stats["thread_execution"]:.2f}s
- 更新缓存数据: {time_stats["cache_update"]:.2f}s
- 总耗时: {time_stats["total"]:.2f}s
""")
def get_all_user_categories():