Files
AiDA_Python/app/service/recommend/scheduled_task.py

540 lines
20 KiB
Python
Raw Normal View History

import pymysql
import numpy as np
from apscheduler.schedulers.blocking import BlockingScheduler
import os
import logging
from collections import defaultdict
import torch
from torchvision import models, transforms
from minio import Minio
from PIL import Image
import io
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix
import matplotlib.font_manager as fm
from scipy import sparse
2025-06-10 10:54:20 +08:00
import pandas as pd
from datetime import datetime, timedelta
import json
from app.core.config import DB_CONFIG, TABLE_CATEGORIES, RECOMMEND_PATH_PREFIX
# 自动选择可用字体
try:
# 尝试加载常见中文字体
font_path = fm.findfont(fm.FontProperties(family=['Microsoft YaHei', 'SimHei', 'WenQuanYi Zen Hei']))
plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=font_path).get_name()
except:
# 回退到英文字体
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 检查系统中可用的字体并选择支持中文的字体
font_path = fm.findfont(fm.FontProperties(family='Microsoft YaHei')) # 或其他支持中文的字体
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 设置为 Microsoft YaHei
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='scheduler.log'
)
# MinIO 配置信息
minio_client = Minio(
"www.minio.aida.com.hk:12024", # MinIO Endpoint
access_key="admin", # Access Key
secret_key="Aidlab123123!", # Secret Key
secure=True # 使用https
)
# 预加载系统sketch特征向量
SYSTEM_FEATURES = np.load(f'{RECOMMEND_PATH_PREFIX}sketch_feature_dict.npy', allow_pickle=True).item()
2025-06-10 10:54:20 +08:00
# 行为权重和衰减系数
BEHAVIOR_CONFIG = {
'portfolioClick': {'weight': 1, 'decay': 0.3},
'portfolioLike': {'weight': 2, 'decay': 0.2},
'secondCreation': {'weight': 3, 'decay': 0.1},
'sketchLike': {'weight': 4, 'decay': 0} # 不衰减
}
# 保存sketch_to_iid到文件
def save_sketch_to_iid():
"""保存sketch到iid的映射"""
sketch_to_iid = {sketch_path: iid for iid, sketch_path in enumerate(SYSTEM_FEATURES.keys(), start=1)}
np.save('sketch_to_iid.npy', sketch_to_iid)
print("sketch_to_iid 已保存")
# 从文件加载sketch_to_iid
def load_sketch_to_iid():
"""加载保存的sketch到iid的映射"""
if os.path.exists('sketch_to_iid.npy'):
sketch_to_iid = np.load('sketch_to_iid.npy', allow_pickle=True).item()
print("sketch_to_iid 已加载")
return sketch_to_iid
else:
# 如果文件不存在,则生成并保存
print("sketch_to_iid 文件不存在,正在生成并保存...")
save_sketch_to_iid()
return np.load('sketch_to_iid.npy', allow_pickle=True).item()
# 使用load_sketch_to_iid来获取映射
sketch_to_iid = load_sketch_to_iid()
# 在代码中其他地方使用sketch_to_iid
# print(f"Total sketches: {len(sketch_to_iid)}")
# 定义图像预处理与ResNet训练时的预处理一致
transform = transforms.Compose([
transforms.Resize((224, 224)), # ResNet 要求 224x224 的输入
transforms.ToTensor(), # 转换为 Tensor
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # 标准化
])
# 加载预训练的 ResNet 模型 (ResNet50)
resnet_model = models.resnet50(pretrained=True)
modules = list(resnet_model.children())[:-1] # 移除最后的全连接层
resnet_model = torch.nn.Sequential(*modules)
resnet_model.eval() # 设置为评估模式
# 从 MinIO 获取图片并进行预处理
def get_sketch_image_from_minio(sketch_path):
"""
MinIO 获取 sketch 图像并预处理
"""
# 分割路径,获取桶名和文件路径
path_parts = sketch_path.split('/', 1) # 根据第一个斜杠分割,得到桶名和路径
bucket_name = path_parts[0] # 桶名
file_name = path_parts[1] # 文件路径(从第二部分开始)
try:
# 获取文件
obj = minio_client.get_object(bucket_name, file_name)
img_data = obj.read() # 读取图像数据
img = Image.open(io.BytesIO(img_data)) # 将数据转为图像对象
img = transform(img) # 对图像进行预处理
return img.unsqueeze(0) # 扩展维度以适应批量处理
except Exception as e:
print(f"Error fetching image for {sketch_path}: {e}")
return None
def extract_feature_vector_from_resnet(sketch_path):
"""
提取 sketch 图像的特征向量
"""
# 从 MinIO 获取图像并预处理
img_tensor = get_sketch_image_from_minio(sketch_path)
if img_tensor is None:
return np.zeros(2048) # 如果获取失败,返回零向量
with torch.no_grad(): # 在不需要计算梯度的情况下进行推断
feature_vector = resnet_model(img_tensor) # 获取 ResNet 的输出
return feature_vector.squeeze().cpu().numpy() # 转换为 NumPy 数组并去掉 batch 维度
def update_user_matrices():
"""每天更新用户交互次数矩阵和特征向量矩阵"""
conn = None
try:
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
# 修改后的查询语句移除category过滤
cursor.execute("""
SELECT account_id, path, COUNT(*) as like_count
FROM user_preference_log_test
GROUP BY account_id, path
""")
user_data = cursor.fetchall()
logging.info(f"成功读取{len(user_data)}条用户偏好记录")
# 计算矩阵
interaction_matrix, raw_counts_sparse, user_index_interaction_matrix, sketch_index_interaction_matrix, iid_to_category_interaction_matrix = calculate_interaction_matrix(user_data)
# visualize_sparse_matrix(raw_counts_sparse,'交互次数矩阵', 'interaction_frequency_matrix.png')
# visualize_sparse_matrix(interaction_matrix, '交互次数得分矩阵', 'interaction_score_matrix.png')
# plot_interaction_count_matrix(raw_counts_sparse)
# feature_matrix, iid_to_category_feature_matrix, user_index_feature_matrix, sketch_index_feature_matrix = calculate_feature_matrix(user_data)
feature_matrix, user_index_feature_matrix, sketch_index_feature_matrix, iid_to_category_feature_matrix = calculate_feature_matrix(user_data)
# visualize_sparse_matrix(feature_matrix, '系统sketch与用户category平均特征向量关联度矩阵', 'correlation_matrix.png')
# 存储矩阵
np.save(f"{RECOMMEND_PATH_PREFIX}interaction_matrix.npy", interaction_matrix)
np.save(f"{RECOMMEND_PATH_PREFIX}feature_matrix.npy", feature_matrix)
#
np.save(f"{RECOMMEND_PATH_PREFIX}iid_to_category_interaction_matrix.npy", iid_to_category_interaction_matrix)
np.save(f"{RECOMMEND_PATH_PREFIX}user_index_interaction_matrix.npy", user_index_interaction_matrix)
#
np.save(f"{RECOMMEND_PATH_PREFIX}iid_to_category_feature_matrix.npy", iid_to_category_feature_matrix)
np.save(f"{RECOMMEND_PATH_PREFIX}user_index_feature_matrix.npy", user_index_feature_matrix)
#
np.save(f"{RECOMMEND_PATH_PREFIX}sketch_index_interaction_matrix.npy", sketch_index_interaction_matrix)
np.save(f"{RECOMMEND_PATH_PREFIX}sketch_index_feature_matrix.npy", sketch_index_feature_matrix)
# logging.info("矩阵更新完成")
except Exception as e:
logging.error(f"定时任务执行失败: {str(e)}", exc_info=True)
finally:
if conn:
conn.close()
def plot_interaction_count_matrix(interaction_count_matrix):
"""绘制交互次数矩阵的分布图(热图),不隐藏零值"""
try:
if not isinstance(interaction_count_matrix, csr_matrix):
sparse_matrix = csr_matrix(interaction_count_matrix)
else:
sparse_matrix = interaction_count_matrix
# 转换为密集矩阵
try:
dense_matrix = sparse_matrix.toarray()
except MemoryError:
logging.error("内存不足,无法转换为密集矩阵")
return
# 自动检测可用中文字体
try:
font_path = fm.findfont(fm.FontProperties(family='sans-serif', style='normal'))
plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=font_path).get_name()
except:
plt.rcParams['font.sans-serif'] = ['DejaVu Sans'] # 回退到英文字体
plt.rcParams['axes.unicode_minus'] = False
# 处理大矩阵的显示,限制显示范围
if dense_matrix.shape[0] > 1000 or dense_matrix.shape[1] > 1000:
dense_matrix = dense_matrix[:1000, :1000] # 只绘制前1000行列
plt.figure(figsize=(15, 10))
# 使用 `cmap` 来设置颜色,零值可以使用特定颜色,调整 `vmin` 和 `vmax` 让热图更具对比
sns.heatmap(
dense_matrix,
cmap="Blues", # 可以选择不同的颜色映射,"Blues" 或 "YlGnBu"
annot=False, # 关闭标注
cbar_kws={"label": "Interaction Count"}, # 添加颜色条标签
linewidths=0.5,
vmin=0, # 设置最小值,确保零值明显
vmax=np.max(dense_matrix) # 设置最大值,保持颜色映射的合理性
)
plt.title('User-Sketch Interaction Matrix (With Zero Entries)')
plt.xlabel('Sketch Index')
plt.ylabel('User Index')
plt.savefig('interaction_heatmap_with_zeros.png', dpi=150, bbox_inches='tight')
plt.close()
logging.info("热图已保存为 interaction_heatmap_with_zeros.png")
except Exception as e:
logging.error(f"绘图失败: {str(e)}", exc_info=True)
def visualize_sparse_matrix(matrix, title='Non-zero Interactions (Scatter Plot)', filename="scatter_figure_interaction.png"):
if not sparse.issparse(matrix):
# 转换为稀疏矩阵
matrix = sparse.csr_matrix(matrix)
# 获取非零元素的坐标和值
rows, cols = matrix.nonzero()
values = matrix.data
# 绘制散点图
plt.figure(figsize=(24, 20))
plt.scatter(cols, rows, c=values, cmap='coolwarm', alpha=0.7, s=1)
plt.colorbar(label='Interaction Count')
plt.title(title)
plt.xlabel('Item Index')
plt.ylabel('Item Index')
plt.savefig(filename)
def calculate_interaction_matrix(user_data):
"""基于新表结构的交互次数矩阵计算仅系统sketch"""
# 获取所有用户ID
all_users = set()
for account_id, path, like_count in user_data:
category = get_category_from_path(path)
if category not in TABLE_CATEGORIES.keys():
continue
all_users.add(account_id)
# 获取所有系统sketch的iid
system_sketch_iids = {sketch_to_iid[path] for path in SYSTEM_FEATURES.keys() if path in sketch_to_iid}
# 创建映射关系
user_index = {uid: idx for idx, uid in enumerate(sorted(all_users))}
sketch_index = {iid: idx for idx, iid in enumerate(sorted(system_sketch_iids))}
# 初始化双矩阵:归一化矩阵(密集)和原始计数矩阵(稀疏)
interaction_matrix = np.zeros((len(all_users), len(sketch_index))) # 归一化矩阵
data, rows, cols = [], [], [] # 用于构建稀疏矩阵的COO格式数据
# 预计算用户最大交互次数
user_max_likes = defaultdict(int)
for account_id, path, like_count in user_data:
if sketch_to_iid.get(path) in system_sketch_iids:
user_max_likes[account_id] = max(user_max_likes[account_id], like_count)
# 填充矩阵
for account_id, path, like_count in user_data:
sketch_iid = sketch_to_iid.get(path)
if sketch_iid not in system_sketch_iids:
continue
user_idx = user_index[account_id]
sketch_idx = sketch_index[sketch_iid]
# 填充稀疏矩阵数据
data.append(like_count)
rows.append(user_idx)
cols.append(sketch_idx)
# 归一化计算
max_like = user_max_likes.get(account_id, 1)
interaction_matrix[user_idx, sketch_idx] = np.log1p(1 + like_count) / np.log1p(1 + max_like)
# 构建稀疏矩阵CSR格式适合快速行操作
interaction_count_matrix = csr_matrix((data, (rows, cols)), shape=(len(all_users), len(sketch_index)))
return interaction_matrix, interaction_count_matrix, user_index, sketch_index, {iid: get_category_from_path(path) for path, iid in sketch_to_iid.items()}
def calculate_feature_matrix(user_data):
"""基于新表结构的特征矩阵计算,返回用户与系统草图的相似度矩阵(加权平均)"""
# 用户特征数据存储结构:{(account_id, category): {sketch_iid: [(feature_vector, weight)]}}
user_feature_weights = defaultdict(lambda: defaultdict(list))
# 初始化所有用户和系统草图集合
all_users = set()
all_system_sketches = set(SYSTEM_FEATURES.keys())
# ==== 第一遍遍历:收集特征向量和权重 ====
for account_id, path, like_count in user_data:
category = get_category_from_path(path)
if category not in TABLE_CATEGORIES.keys():
continue
sketch_iid = sketch_to_iid.get(path)
if not sketch_iid:
continue
# 记录用户
all_users.add(account_id)
# 提取特征并记录权重like_count
if path in SYSTEM_FEATURES: # 系统草图
feature = SYSTEM_FEATURES[path]
weight = like_count # 使用like_count作为权重
user_feature_weights[(account_id, category)][sketch_iid].append((feature, weight))
else: # 用户草图
feature = extract_feature_vector_from_resnet(path)
weight = like_count
user_feature_weights[(account_id, category)][sketch_iid].append((feature, weight))
# ==== 第二遍遍历收集所有系统草图iid ====
system_sketch_iids = set()
for sketch_path in SYSTEM_FEATURES:
if iid := sketch_to_iid.get(sketch_path):
system_sketch_iids.add(iid)
# ==== 创建索引映射 ====
user_list = sorted(all_users)
sketch_list = sorted(system_sketch_iids)
user_index = {uid: idx for idx, uid in enumerate(user_list)}
sketch_index = {iid: idx for idx, iid in enumerate(sketch_list)}
# ==== 初始化特征矩阵 ====
feature_matrix = np.zeros((len(user_list), len(sketch_list)))
# ==== 预计算加权平均特征向量 ====
user_avg_features = {}
for (account_id, category), sketches in user_feature_weights.items():
try:
# 展平所有特征向量和权重
all_features_weights = [(vec, weight) for vec_list in sketches.values() for vec, weight in vec_list]
if len(all_features_weights) == 0:
continue
# 计算总权重
total_weight = sum(weight for _, weight in all_features_weights)
if total_weight <= 0: # 防止除零错误
total_weight = 1.0
# 加权平均计算
weighted_sum = np.zeros_like(all_features_weights[0][0]) # 获取特征向量维度
for vec, weight in all_features_weights:
weighted_sum += vec * weight
avg_vec = weighted_sum / total_weight
user_avg_features[(account_id, category)] = avg_vec
except Exception as e:
logging.warning(f"用户({account_id},{category})加权特征计算失败: {str(e)}")
continue
# ==== 计算相似度并填充矩阵 ====
for sketch_path, sys_vector in SYSTEM_FEATURES.items():
sketch_iid = sketch_to_iid.get(sketch_path)
system_sketch_category = get_category_from_path(sketch_path)
if not sketch_iid or sketch_iid not in sketch_index:
continue
sketch_col = sketch_index[sketch_iid]
# 遍历所有用户
for account_id in all_users:
user_row = user_index.get(account_id)
if user_row is None:
continue
# 获取用户加权平均特征向量
try:
# 直接通过复合键获取用户特征向量
user_vec = user_avg_features[(account_id, system_sketch_category)]
except KeyError:
# 该用户在此类别下无特征数据
continue
# 计算余弦相似度
cos_sim = cosine_similarity(user_vec, sys_vector)
feature_matrix[user_row, sketch_col] = cos_sim
return feature_matrix, user_index, sketch_index, {iid: get_category_from_path(path) for path, iid in sketch_to_iid.items()}
def get_category_from_path(path):
"""从path字段解析类别"""
try:
parts = path.split('/')
if len(parts) >= 2:
return f"{parts[2]}_{parts[3]}"
return "unknown"
except:
return "unknown"
def cosine_similarity(vec1, vec2):
"""计算余弦相似度(增加零值处理)"""
norm = np.linalg.norm(vec1) * np.linalg.norm(vec2)
return np.dot(vec1, vec2) / (norm + 1e-10) if norm != 0 else 0.0
2025-06-10 10:54:20 +08:00
def fetch_user_behavior_data(days=30):
"""从MySQL获取用户行为数据整合旧查询和新需求"""
conn = None
try:
conn = pymysql.connect(**DB_CONFIG)
# 计算日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 整合查询(获取完整行为数据)
query = f"""
SELECT
account_id,
behavior_type,
gender,
category,
url,
create_time
FROM user_behavior
WHERE create_time BETWEEN '{start_date}' AND '{end_date}'
"""
df = pd.read_sql(query, conn)
logging.info(f"成功读取{len(df)}条用户行为记录")
return df
except Exception as e:
logging.error(f"数据库查询失败: {str(e)}")
return pd.DataFrame()
finally:
if conn:
conn.close()
def calculate_heat(row, current_date):
"""计算单个行为的热度值(每次行为独立计算,不考虑聚合次数)"""
# 计算时间差(天)
days_passed = (current_date - row['create_time']).days
# 获取行为配置默认权重为0
config = BEHAVIOR_CONFIG.get(row['behavior_type'], {'weight': 0, 'decay': 0})
# 计算热度值 = 权重 * e^(-衰减系数 * 天数)
return config['weight'] * np.exp(-config['decay'] * days_passed)
def load_heat_matrix_as_array(file_path):
"""
直接加载为二维numpy数组
返回: (data_array, row_labels, col_labels)
"""
with open(file_path) as f:
saved = json.load(f)
return (
np.array(saved['data']), # 二维矩阵
saved['row_labels'], # 行标签列表
saved['col_labels'] # 列标签列表
)
def update_heat_matrices():
"""每日计算并存储热度矩阵gender_category × path"""
current_date = datetime.now()
# 获取数据
df = fetch_user_behavior_data(30)
if df.empty:
logging.warning("无有效数据,跳过今日计算")
return None
# 计算热度值
df['heat'] = df.apply(calculate_heat, axis=1, current_date=current_date)
df['gender_category'] = df['gender'] + '_' + df['category']
# 构建热度向量
heat_vectors = {}
grouped = df.groupby(['gender_category', 'url'])['heat'].sum()
for (gender_category, url), heat in grouped.items():
heat_vectors.setdefault(gender_category, {})[url] = heat
# 存储结果
save_path = 'heat_vectors_data'
os.makedirs(save_path, exist_ok=True)
date_str = current_date.strftime('%Y%m%d')
# vectors_file = f"{save_path}/heat_vectors_{date_str}.json"
vectors_file = f"{save_path}/heat_vectors.json"
with open(vectors_file, 'w', encoding='utf-8') as f:
json.dump({
'update_time': current_date.strftime('%Y-%m-%d %H:%M:%S'),
'data': heat_vectors
}, f, ensure_ascii=False, indent=2)
logging.info(f"成功存储热度向量,共{len(heat_vectors)}个分组,日期: {date_str}")
return heat_vectors
if __name__ == "__main__":
try:
2025-06-10 10:54:20 +08:00
# update_user_matrices()
update_heat_matrices()
# scheduler = BlockingScheduler()
# scheduler.add_job(update_user_matrices, 'cron', hour=12, timezone='Asia/Shanghai')
# logging.info("定时任务已启动每天12:00执行")
# scheduler.start()
except KeyboardInterrupt:
logging.info("定时任务已停止")
except Exception as e:
logging.error(f"调度器启动失败: {str(e)}", exc_info=True)