2025-12-29 10:52:33 +08:00
|
|
|
|
"""
|
|
|
|
|
|
向量计算工具类
|
|
|
|
|
|
包含 ResNet50 特征提取、向量归一化等功能
|
|
|
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
import torch
|
|
|
|
|
|
from torchvision import models, transforms
|
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
from minio import Minio
|
|
|
|
|
|
|
2025-12-30 17:35:32 +08:00
|
|
|
|
from app.core.config import settings
|
2025-12-29 10:52:33 +08:00
|
|
|
|
from app.service.recommendation_system.config import RECOMMENDATION_CONFIG
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
# 图像预处理(与ResNet训练时的预处理一致)
|
|
|
|
|
|
transform = transforms.Compose([
|
|
|
|
|
|
transforms.Resize((224, 224)), # ResNet 要求 224x224 的输入
|
|
|
|
|
|
transforms.ToTensor(), # 转换为 Tensor
|
|
|
|
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # 标准化
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
|
|
# 加载预训练的 ResNet50 模型(去掉最后全连接层)
|
|
|
|
|
|
_resnet_model = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_resnet_model():
|
|
|
|
|
|
"""获取 ResNet50 模型(单例模式)"""
|
|
|
|
|
|
global _resnet_model
|
|
|
|
|
|
if _resnet_model is None:
|
|
|
|
|
|
logger.info("加载 ResNet50 模型...")
|
|
|
|
|
|
_resnet_model = models.resnet50(pretrained=True)
|
|
|
|
|
|
modules = list(_resnet_model.children())[:-1] # 移除最后的全连接层
|
|
|
|
|
|
_resnet_model = torch.nn.Sequential(*modules)
|
|
|
|
|
|
_resnet_model.eval() # 设置为评估模式
|
|
|
|
|
|
logger.info("ResNet50 模型加载完成")
|
|
|
|
|
|
return _resnet_model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# MinIO 客户端(单例)
|
|
|
|
|
|
_minio_client = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_minio_client():
|
|
|
|
|
|
"""获取 MinIO 客户端(单例模式)"""
|
|
|
|
|
|
global _minio_client
|
|
|
|
|
|
if _minio_client is None:
|
|
|
|
|
|
_minio_client = Minio(
|
2025-12-30 17:35:32 +08:00
|
|
|
|
settings.MINIO_URL,
|
|
|
|
|
|
access_key=settings.MINIO_ACCESS,
|
|
|
|
|
|
secret_key=settings.MINIO_SECRET,
|
|
|
|
|
|
secure=settings.MINIO_SECURE
|
2025-12-29 10:52:33 +08:00
|
|
|
|
)
|
|
|
|
|
|
return _minio_client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_image_from_minio(path: str) -> Image.Image:
|
|
|
|
|
|
"""
|
|
|
|
|
|
从 MinIO 获取图片
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
path: MinIO 逻辑 URL,格式如 "bucket_name/object_name"
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
PIL Image 对象,失败返回 None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 分割路径,获取桶名和文件路径
|
|
|
|
|
|
path_parts = path.split('/', 1)
|
|
|
|
|
|
if len(path_parts) != 2:
|
|
|
|
|
|
logger.error(f"路径格式错误: {path}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
bucket_name, file_name = path_parts
|
|
|
|
|
|
minio_client = get_minio_client()
|
|
|
|
|
|
|
|
|
|
|
|
# 获取文件
|
|
|
|
|
|
obj = minio_client.get_object(bucket_name, file_name)
|
|
|
|
|
|
img_data = obj.read() # 读取图像数据
|
|
|
|
|
|
img = Image.open(io.BytesIO(img_data)) # 将数据转为图像对象
|
|
|
|
|
|
|
|
|
|
|
|
return img
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"从 MinIO 获取图片失败 [{path}]: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_feature_vector(path: str) -> np.ndarray:
|
|
|
|
|
|
"""
|
|
|
|
|
|
使用 ResNet50 提取图片特征向量(2048维)
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
path: MinIO 逻辑 URL
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
2048维特征向量(numpy array),失败返回零向量
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 从 MinIO 获取图像
|
|
|
|
|
|
img = get_image_from_minio(path)
|
|
|
|
|
|
if img is None:
|
|
|
|
|
|
logger.warning(f"无法获取图片,返回零向量: {path}")
|
|
|
|
|
|
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
# 预处理
|
|
|
|
|
|
# 部分 MinIO 图片可能是 RGBA/CMYK,转换成 RGB 以匹配 3 通道标准化参数
|
|
|
|
|
|
if img.mode != "RGB":
|
|
|
|
|
|
try:
|
|
|
|
|
|
img = img.convert("RGB")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
logger.warning(f"无法转换图片为RGB,返回零向量: {path}")
|
|
|
|
|
|
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
img_tensor = transform(img).unsqueeze(0) # 扩展维度以适应批量处理
|
|
|
|
|
|
|
|
|
|
|
|
# 提取特征
|
|
|
|
|
|
resnet_model = get_resnet_model()
|
|
|
|
|
|
with torch.no_grad(): # 在不需要计算梯度的情况下进行推断
|
|
|
|
|
|
feature_vector = resnet_model(img_tensor) # 获取 ResNet 的输出
|
|
|
|
|
|
feature_vector = feature_vector.squeeze().cpu().numpy() # 转换为 NumPy 数组并去掉 batch 维度
|
|
|
|
|
|
|
|
|
|
|
|
# 确保是 2048 维
|
|
|
|
|
|
if feature_vector.ndim > 1:
|
|
|
|
|
|
feature_vector = feature_vector.flatten()
|
|
|
|
|
|
|
|
|
|
|
|
# 确保维度正确
|
|
|
|
|
|
if len(feature_vector) != RECOMMENDATION_CONFIG["vector_dim"]:
|
|
|
|
|
|
logger.warning(f"向量维度不正确: {len(feature_vector)}, 期望: {RECOMMENDATION_CONFIG['vector_dim']}")
|
|
|
|
|
|
# 如果维度不对,尝试调整
|
|
|
|
|
|
if len(feature_vector) > RECOMMENDATION_CONFIG["vector_dim"]:
|
|
|
|
|
|
feature_vector = feature_vector[:RECOMMENDATION_CONFIG["vector_dim"]]
|
|
|
|
|
|
else:
|
|
|
|
|
|
padded = np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
|
|
|
|
|
padded[:len(feature_vector)] = feature_vector
|
|
|
|
|
|
feature_vector = padded
|
|
|
|
|
|
|
|
|
|
|
|
return feature_vector.astype(np.float32)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"提取特征向量失败 [{path}]: {e}", exc_info=True)
|
|
|
|
|
|
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_vector(vector: np.ndarray) -> np.ndarray:
|
|
|
|
|
|
"""
|
|
|
|
|
|
L2 归一化向量
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
vector: 输入向量
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
归一化后的向量
|
|
|
|
|
|
"""
|
|
|
|
|
|
norm = np.linalg.norm(vector)
|
|
|
|
|
|
if norm == 0:
|
|
|
|
|
|
return vector
|
|
|
|
|
|
return vector / norm
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_weighted_average(vectors: list, weights: list) -> np.ndarray:
|
|
|
|
|
|
"""
|
|
|
|
|
|
计算加权平均向量
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
vectors: 向量列表
|
|
|
|
|
|
weights: 权重列表
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
加权平均向量(不做归一化,模长为加权平均后的尺度)
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not vectors or not weights:
|
|
|
|
|
|
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
# 确保所有向量都是 numpy array
|
|
|
|
|
|
vectors = [np.array(v) for v in vectors]
|
|
|
|
|
|
weights = np.array(weights)
|
|
|
|
|
|
|
|
|
|
|
|
# 计算加权和
|
|
|
|
|
|
weighted_sum = np.zeros_like(vectors[0])
|
|
|
|
|
|
for v, w in zip(vectors, weights):
|
|
|
|
|
|
weighted_sum += v * w
|
|
|
|
|
|
|
|
|
|
|
|
# 返回加权平均(除以权重和,不做 L2 归一化,模长不会随条数线性暴涨)
|
|
|
|
|
|
weight_total = weights.sum()
|
|
|
|
|
|
if weight_total == 0:
|
|
|
|
|
|
return weighted_sum
|
|
|
|
|
|
return weighted_sum / weight_total
|
|
|
|
|
|
|