From d39dee851fcadebc118b87d2a73b224fb28050b8 Mon Sep 17 00:00:00 2001 From: shahaibo <1023316923@qq.com> Date: Tue, 10 Jun 2025 10:54:20 +0800 Subject: [PATCH] =?UTF-8?q?TASK:=E5=86=B7=E5=90=AF=E5=8A=A8=E7=83=AD?= =?UTF-8?q?=E5=BA=A6=E6=8E=A8=E8=8D=90=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/api_brand_dna_initialize.py | 212 ++++++++++++++++++++++++ app/api/api_recommendation.py | 72 ++++++-- app/service/recommend/scheduled_task.py | 110 +++++++++++- app/service/recommend/service.py | 22 ++- 4 files changed, 400 insertions(+), 16 deletions(-) create mode 100644 app/api/api_brand_dna_initialize.py diff --git a/app/api/api_brand_dna_initialize.py b/app/api/api_brand_dna_initialize.py new file mode 100644 index 0000000..72c0a25 --- /dev/null +++ b/app/api/api_brand_dna_initialize.py @@ -0,0 +1,212 @@ +import io +import logging +import sys +import time +from typing import List +from collections import defaultdict +import numpy as np +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from fastapi import HTTPException, APIRouter + +from app.service.recommend.service import load_resources, matrix_data +import pymysql +from app.core.config import DB_CONFIG, TABLE_CATEGORIES, RECOMMEND_PATH_PREFIX +from minio import Minio +import torch +from torchvision import models, transforms +from PIL import Image +import os +from fastapi.responses import JSONResponse + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') +logger = logging.getLogger() +router = APIRouter() + +# MinIO 配置 +minio_client = Minio( + "www.minio.aida.com.hk:12024", + access_key="admin", + secret_key="Aidlab123123!", + secure=True +) + +transform = transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]), +]) + +# ResNet50(去掉最后全连接层) +resnet_model = models.resnet50(pretrained=True) +resnet_model = torch.nn.Sequential(*list(resnet_model.children())[:-1]) +resnet_model.eval() + + +def get_sketch_image_from_minio(sketch_path: str): + path_parts = sketch_path.split('/', 1) + if len(path_parts) != 2: + return None + bucket_name, file_name = path_parts + try: + obj = minio_client.get_object(bucket_name, file_name) + img = Image.open(io.BytesIO(obj.read())) + return transform(img).unsqueeze(0) + except Exception as e: + logger.warning(f"Fetch image failed [{sketch_path}]: {e}") + return None + + +def extract_feature_vector_from_resnet(sketch_path: str) -> np.ndarray: + img_tensor = get_sketch_image_from_minio(sketch_path) + if img_tensor is None: + return np.zeros(2048, dtype=np.float32) + with torch.no_grad(): + vec = resnet_model(img_tensor) # [1, 2048, 1, 1] + return vec.squeeze().cpu().numpy() + + +# 预加载 +BRAND_FEATURES = np.load(f'{RECOMMEND_PATH_PREFIX}brand_feature.npy', allow_pickle=True).item() +SYSTEM_FEATURES = np.load(f'{RECOMMEND_PATH_PREFIX}sketch_feature_dict.npy', allow_pickle=True).item() + + +def save_sketch_to_iid(): + sketch_to_iid = { + sketch_path: iid + for iid, sketch_path in enumerate(SYSTEM_FEATURES.keys(), start=1) + } + np.save(f"{RECOMMEND_PATH_PREFIX}sketch_to_iid.npy", sketch_to_iid) + + +def load_sketch_to_iid(): + path = f"{RECOMMEND_PATH_PREFIX}sketch_to_iid.npy" + if os.path.exists(path): + return np.load(path, allow_pickle=True).item() + save_sketch_to_iid() + return np.load(path, allow_pickle=True).item() + + +sketch_to_iid = load_sketch_to_iid() + + +def getNewCategory(gender: str, sketch_category: str) -> str: + return f"{gender.lower()}_{sketch_category.lower()}" + + +def get_category_from_path(path: str) -> str: + parts = path.split('/') + if len(parts) >= 4: + return f"{parts[2].lower()}_{parts[3].lower()}" + return "unknown_unknown" + + +def load_brand_matrix(): + """单独加载 brand_matrix 和 brand_index_map""" + mat_path = f"{RECOMMEND_PATH_PREFIX}brand_matrix.npy" + idx_path = f"{RECOMMEND_PATH_PREFIX}brand_index_map.npy" + try: + matrix = np.load(mat_path) + index_map = np.load(idx_path, allow_pickle=True).item() + except FileNotFoundError: + matrix = np.zeros((0, len(sketch_to_iid)), dtype=np.float32) + index_map = {} + return matrix, index_map + +def cosine_similarity(vec1, vec2): + """计算余弦相似度(增加零值处理)""" + norm = np.linalg.norm(vec1) * np.linalg.norm(vec2) + return np.dot(vec1, vec2) / (norm + 1e-10) if norm != 0 else 0.0 + +def calculate_brand_matrix(sketch_data, brand_id: int) -> np.ndarray: + # 1. 收集品牌-分类-特征 + brand_feature = defaultdict(lambda: defaultdict(list)) + for _id, sketch_path, gender, sketch_category in sketch_data: + cat = getNewCategory(gender, sketch_category) + feat = BRAND_FEATURES.get(_id) or extract_feature_vector_from_resnet(sketch_path) + brand_feature[(brand_id, cat)][_id].append(feat) + + # 2. 构建 sketch 索引 + sketch_list = sorted(sketch_to_iid.values()) + sketch_index = {iid: idx for idx, iid in enumerate(sketch_list)} + n_sketch = len(sketch_list) + + # 3. 加载或初始化矩阵 + brand_matrix, brand_index_map = load_brand_matrix() + + # 4. 增加/更新 行 + if brand_id in brand_index_map: + row_idx = brand_index_map[brand_id] + else: + row_idx = brand_matrix.shape[0] + brand_index_map[brand_id] = row_idx + brand_matrix = np.vstack([ + brand_matrix, + np.zeros((1, n_sketch), dtype=np.float32) + ]) + + # 5. 计算品牌-分类平均向量 + brand_avg = {} + for key, id_dict in brand_feature.items(): + all_feats = [v for feats in id_dict.values() for v in feats] + if all_feats: + brand_avg[key] = np.mean(all_feats, axis=0) + + # 6. 填充相似度 + for sketch_path, sys_vec in SYSTEM_FEATURES.items(): + iid = sketch_to_iid.get(sketch_path) + if not iid or iid not in sketch_index: + continue + cat_key = (brand_id, get_category_from_path(sketch_path)) + avg_vec = brand_avg.get(cat_key) + if avg_vec is not None: + cos_sim = cosine_similarity(avg_vec, sys_vec) + brand_matrix[row_idx, sketch_index[iid]] = cos_sim + + # 7. 持久化 + np.save(f"{RECOMMEND_PATH_PREFIX}brand_feature_matrix.npy", brand_matrix) + np.save(f"{RECOMMEND_PATH_PREFIX}brand_index_map.npy", brand_index_map) + + # 返回该品牌对应行 + return brand_matrix[row_idx:row_idx+1] + + +@router.get("/brand_dna_initialize/{brand_id}") +async def brand_dna_initialize(brand_id: int): + conn = None + try: + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + cursor.execute(""" + SELECT id, img_url, gender, category + FROM product_image_attribute + WHERE library_id IN ( + SELECT library_id + FROM brand_rel_library + WHERE brand_id = %s + ) + """, (brand_id,)) + sketch_data = cursor.fetchall() + + # 触发计算并持久化,若内部出错会抛异常 + _ = calculate_brand_matrix(sketch_data, brand_id) + + # 返回成功 + return {"success": True} + + except HTTPException: + # 已经是明确的 HTTPException,直接抛出 + raise + + except Exception as e: + logger.error(f"品牌初始化失败 [{brand_id}]: {e}", exc_info=True) + # 返回失败的 JSON,同时设置 500 状态码 + return JSONResponse( + status_code=500, + content={"success": False, "message": "品牌初始化失败"} + ) + + finally: + if conn: + conn.close() diff --git a/app/api/api_recommendation.py b/app/api/api_recommendation.py index 93fb251..5f71d38 100644 --- a/app/api/api_recommendation.py +++ b/app/api/api_recommendation.py @@ -3,7 +3,10 @@ import logging import sys import time from typing import List - +import os +import json +import math +import random import numpy as np from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger @@ -31,18 +34,44 @@ async def startup_event(): scheduler.start() logger.info("定时任务已启动") -def get_random_recommendations(category: str, num: int) -> List[str]: - """全品类随机推荐""" - all_iids = list(matrix_data["iid_to_sketch"].keys()) - # 优先从当前品类选择 - category_iids = matrix_data["category_to_iids"].get(category, all_iids) - # 确保不超出实际数量 - sample_size = min(num, len(category_iids)) - sampled = np.random.choice(category_iids, size=sample_size, replace=False) - return [matrix_data["iid_to_sketch"][iid] for iid in sampled] +def softmax(scores): + max_score = max(scores) + exp_scores = [math.exp(s - max_score) for s in scores] + sum_exp = sum(exp_scores) + return [s / sum_exp for s in exp_scores] -@router.get("/recommend/{user_id}/{category}/{num_recommendations}", response_model=List[str]) -async def get_recommendations(user_id: int, category: str, num_recommendations: int = 10): +def get_random_recommendations(category: str, num: int) -> List[str]: + """根据预加载热度向量推荐(冷启动)""" + try: + heat_data = matrix_data.get("heat_data", {}) + + if category not in heat_data: + raise ValueError(f"热度数据缺少类别 {category},使用随机推荐") + + heat_dict = heat_data[category] # {url: score} + urls = list(heat_dict.keys()) + scores = list(heat_dict.values()) + + if not urls: + raise ValueError("该类别下无热度记录,使用随机推荐") + + probs = softmax(scores) + sample_size = min(num, len(urls)) + sampled_urls = random.choices(urls, weights=probs, k=sample_size) + + return sampled_urls + + except Exception as e: + # 回退:完全随机推荐 + all_iids = list(matrix_data["iid_to_sketch"].keys()) + category_iids = matrix_data["category_to_iids"].get(category, all_iids) + sample_size = min(num, len(category_iids)) + sampled = np.random.choice(category_iids, size=sample_size, replace=False) + return [matrix_data["iid_to_sketch"][iid] for iid in sampled] + + +@router.get("/recommend/{user_id}/{category}/{num_recommendations}/{brand_id}/{brand_scale}", response_model=List[str]) +async def get_recommendations(user_id: int, category: str, brand_id: int, brand_scale: float, num_recommendations: int = 10): """ :param user_id: 4 :param category: female_skirt @@ -95,7 +124,7 @@ async def get_recommendations(user_id: int, category: str, num_recommendations: raw_feat_scores = matrix_data["feature_matrix"][user_idx_feature, valid_sketch_idxs_feature] raw_feat_scores = (raw_feat_scores - np.min(raw_feat_scores)) / ( np.max(raw_feat_scores) - np.min(raw_feat_scores) + 1e-8) - processed_feat = raw_feat_scores * 0.3 + processed_feat = raw_feat_scores else: processed_feat = np.array([]) @@ -104,7 +133,22 @@ async def get_recommendations(user_id: int, category: str, num_recommendations: matrix_data["cached_valid_idxs"][cache_key] = valid_sketch_idxs_inter # 合并分数 - final_scores = processed_inter + processed_feat + if brand_id is not None: + if brand_id is not None: + brand_idx_feature = matrix_data["brand_index_map"].get(brand_id) + if brand_idx_feature is not None and valid_sketch_idxs_feature: + raw_brand_feat_scores = matrix_data["brand_feature_matrix"][ + brand_idx_feature, valid_sketch_idxs_feature] + raw_brand_feat_scores = (raw_brand_feat_scores - np.min(raw_brand_feat_scores)) / ( + np.max(raw_brand_feat_scores) - np.min(raw_brand_feat_scores) + 1e-8) + processed_brand_feat = raw_brand_feat_scores + final_scores = processed_inter + 0.3 * ((1 - brand_scale) * processed_feat + brand_scale * processed_brand_feat) + else: + final_scores = processed_inter + 0.3 * processed_feat + else: + final_scores = processed_inter + 0.3 * processed_feat + else: + final_scores = processed_inter + 0.3 * processed_feat valid_sketch_idxs = matrix_data["cached_valid_idxs"][cache_key] # 概率采样 diff --git a/app/service/recommend/scheduled_task.py b/app/service/recommend/scheduled_task.py index ec1e4aa..f6b52ef 100644 --- a/app/service/recommend/scheduled_task.py +++ b/app/service/recommend/scheduled_task.py @@ -14,6 +14,9 @@ import matplotlib.pyplot as plt from scipy.sparse import csr_matrix import matplotlib.font_manager as fm from scipy import sparse +import pandas as pd +from datetime import datetime, timedelta +import json from app.core.config import DB_CONFIG, TABLE_CATEGORIES, RECOMMEND_PATH_PREFIX @@ -50,6 +53,13 @@ minio_client = Minio( # 预加载系统sketch特征向量 SYSTEM_FEATURES = np.load(f'{RECOMMEND_PATH_PREFIX}sketch_feature_dict.npy', allow_pickle=True).item() +# 行为权重和衰减系数 +BEHAVIOR_CONFIG = { + 'portfolioClick': {'weight': 1, 'decay': 0.3}, + 'portfolioLike': {'weight': 2, 'decay': 0.2}, + 'secondCreation': {'weight': 3, 'decay': 0.1}, + 'sketchLike': {'weight': 4, 'decay': 0} # 不衰减 +} # 保存sketch_to_iid到文件 def save_sketch_to_iid(): @@ -418,9 +428,107 @@ def cosine_similarity(vec1, vec2): return np.dot(vec1, vec2) / (norm + 1e-10) if norm != 0 else 0.0 +def fetch_user_behavior_data(days=30): + """从MySQL获取用户行为数据(整合旧查询和新需求)""" + conn = None + try: + conn = pymysql.connect(**DB_CONFIG) + + # 计算日期范围 + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + # 整合查询(获取完整行为数据) + query = f""" + SELECT + account_id, + behavior_type, + gender, + category, + url, + create_time + FROM user_behavior + WHERE create_time BETWEEN '{start_date}' AND '{end_date}' + """ + + df = pd.read_sql(query, conn) + logging.info(f"成功读取{len(df)}条用户行为记录") + return df + + except Exception as e: + logging.error(f"数据库查询失败: {str(e)}") + return pd.DataFrame() + + finally: + if conn: + conn.close() + + +def calculate_heat(row, current_date): + """计算单个行为的热度值(每次行为独立计算,不考虑聚合次数)""" + # 计算时间差(天) + days_passed = (current_date - row['create_time']).days + + # 获取行为配置(默认权重为0) + config = BEHAVIOR_CONFIG.get(row['behavior_type'], {'weight': 0, 'decay': 0}) + + # 计算热度值 = 权重 * e^(-衰减系数 * 天数) + return config['weight'] * np.exp(-config['decay'] * days_passed) + +def load_heat_matrix_as_array(file_path): + """ + 直接加载为二维numpy数组 + 返回: (data_array, row_labels, col_labels) + """ + with open(file_path) as f: + saved = json.load(f) + return ( + np.array(saved['data']), # 二维矩阵 + saved['row_labels'], # 行标签列表 + saved['col_labels'] # 列标签列表 + ) + +def update_heat_matrices(): + """每日计算并存储热度矩阵(gender_category × path)""" + current_date = datetime.now() + + # 获取数据 + df = fetch_user_behavior_data(30) + if df.empty: + logging.warning("无有效数据,跳过今日计算") + return None + + # 计算热度值 + df['heat'] = df.apply(calculate_heat, axis=1, current_date=current_date) + df['gender_category'] = df['gender'] + '_' + df['category'] + + # 构建热度向量 + heat_vectors = {} + grouped = df.groupby(['gender_category', 'url'])['heat'].sum() + for (gender_category, url), heat in grouped.items(): + heat_vectors.setdefault(gender_category, {})[url] = heat + + # 存储结果 + save_path = 'heat_vectors_data' + os.makedirs(save_path, exist_ok=True) + date_str = current_date.strftime('%Y%m%d') + + # vectors_file = f"{save_path}/heat_vectors_{date_str}.json" + vectors_file = f"{save_path}/heat_vectors.json" + with open(vectors_file, 'w', encoding='utf-8') as f: + json.dump({ + 'update_time': current_date.strftime('%Y-%m-%d %H:%M:%S'), + 'data': heat_vectors + }, f, ensure_ascii=False, indent=2) + + logging.info(f"成功存储热度向量,共{len(heat_vectors)}个分组,日期: {date_str}") + return heat_vectors + + if __name__ == "__main__": try: - update_user_matrices() + # update_user_matrices() + update_heat_matrices() # scheduler = BlockingScheduler() # scheduler.add_job(update_user_matrices, 'cron', hour=12, timezone='Asia/Shanghai') # logging.info("定时任务已启动,每天12:00执行") diff --git a/app/service/recommend/service.py b/app/service/recommend/service.py index 1ff9336..b3545f2 100644 --- a/app/service/recommend/service.py +++ b/app/service/recommend/service.py @@ -2,7 +2,8 @@ import logging import time from collections import defaultdict - +import os +import json import numpy as np from app.core.config import DB_CONFIG, RECOMMEND_PATH_PREFIX @@ -11,6 +12,8 @@ logger = logging.getLogger() import pymysql from concurrent.futures import ThreadPoolExecutor +HEAT_VECTOR_FILE = 'heat_vectors_data/heat_vectors.json' # 可动态加载或配置 + matrix_data = { "interaction_matrix": None, "feature_matrix": None, @@ -26,6 +29,9 @@ matrix_data = { "category_sketch_idxs_feature": None, "user_inter_full": dict(), "user_feat_full": dict(), + "brand_feature_matrix": None, + "brand_index_map": None, + "heat_data": {}, } @@ -48,7 +54,13 @@ def load_resources(): allow_pickle=True).item() matrix_data["feature_matrix"] = np.load(f"{RECOMMEND_PATH_PREFIX}feature_matrix.npy", allow_pickle=True) + + matrix_data["brand_feature_matrix"] = np.load(f"{RECOMMEND_PATH_PREFIX}brand_feature_matrix.npy", allow_pickle=True) + + matrix_data["brand_index_map"] = np.load(f"{RECOMMEND_PATH_PREFIX}brand_index_map.npy",allow_pickle=True).item() + matrix_data["user_index_feature"] = np.load(f"{RECOMMEND_PATH_PREFIX}user_index_feature_matrix.npy", allow_pickle=True).item() + matrix_data["sketch_index_feature"] = np.load(f"{RECOMMEND_PATH_PREFIX}sketch_index_feature_matrix.npy", allow_pickle=True).item() category_to_iid_map = np.load(f"{RECOMMEND_PATH_PREFIX}iid_to_category_interaction_matrix.npy", allow_pickle=True).item() @@ -61,6 +73,14 @@ def load_resources(): # 触发预缓存 precache_user_category() + if os.path.exists(HEAT_VECTOR_FILE): + with open(HEAT_VECTOR_FILE, 'r', encoding='utf-8') as f: + heat_json = json.load(f) + matrix_data["heat_data"] = heat_json.get("data", {}) + logger.info(f"热度向量数据加载完成,共加载 {len(matrix_data['heat_data'])} 个类别") + else: + matrix_data["heat_data"] = {} + except Exception as e: logger.error(f"资源加载失败: {str(e)}") raise RuntimeError("初始化失败")