Files

190 lines
6.1 KiB
Python
Raw Permalink Normal View History

2025-12-29 10:52:33 +08:00
"""
向量计算工具类
包含 ResNet50 特征提取向量归一化等功能
"""
import io
import logging
import numpy as np
import torch
from torchvision import models, transforms
from PIL import Image
from minio import Minio
2025-12-30 17:35:32 +08:00
from app.core.config import settings
2025-12-29 10:52:33 +08:00
from app.service.recommendation_system.config import RECOMMENDATION_CONFIG
logger = logging.getLogger(__name__)
# 图像预处理与ResNet训练时的预处理一致
transform = transforms.Compose([
transforms.Resize((224, 224)), # ResNet 要求 224x224 的输入
transforms.ToTensor(), # 转换为 Tensor
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # 标准化
])
# 加载预训练的 ResNet50 模型(去掉最后全连接层)
_resnet_model = None
def get_resnet_model():
"""获取 ResNet50 模型(单例模式)"""
global _resnet_model
if _resnet_model is None:
logger.info("加载 ResNet50 模型...")
_resnet_model = models.resnet50(pretrained=True)
modules = list(_resnet_model.children())[:-1] # 移除最后的全连接层
_resnet_model = torch.nn.Sequential(*modules)
_resnet_model.eval() # 设置为评估模式
logger.info("ResNet50 模型加载完成")
return _resnet_model
# MinIO 客户端(单例)
_minio_client = None
def get_minio_client():
"""获取 MinIO 客户端(单例模式)"""
global _minio_client
if _minio_client is None:
_minio_client = Minio(
2025-12-30 17:35:32 +08:00
settings.MINIO_URL,
access_key=settings.MINIO_ACCESS,
secret_key=settings.MINIO_SECRET,
secure=settings.MINIO_SECURE
2025-12-29 10:52:33 +08:00
)
return _minio_client
def get_image_from_minio(path: str) -> Image.Image:
"""
MinIO 获取图片
Args:
path: MinIO 逻辑 URL格式如 "bucket_name/object_name"
Returns:
PIL Image 对象失败返回 None
"""
try:
# 分割路径,获取桶名和文件路径
path_parts = path.split('/', 1)
if len(path_parts) != 2:
logger.error(f"路径格式错误: {path}")
return None
bucket_name, file_name = path_parts
minio_client = get_minio_client()
# 获取文件
obj = minio_client.get_object(bucket_name, file_name)
img_data = obj.read() # 读取图像数据
img = Image.open(io.BytesIO(img_data)) # 将数据转为图像对象
return img
except Exception as e:
logger.error(f"从 MinIO 获取图片失败 [{path}]: {e}")
return None
def extract_feature_vector(path: str) -> np.ndarray:
"""
使用 ResNet50 提取图片特征向量2048
Args:
path: MinIO 逻辑 URL
Returns:
2048维特征向量numpy array失败返回零向量
"""
try:
# 从 MinIO 获取图像
img = get_image_from_minio(path)
if img is None:
logger.warning(f"无法获取图片,返回零向量: {path}")
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
# 预处理
# 部分 MinIO 图片可能是 RGBA/CMYK转换成 RGB 以匹配 3 通道标准化参数
if img.mode != "RGB":
try:
img = img.convert("RGB")
except Exception:
logger.warning(f"无法转换图片为RGB返回零向量: {path}")
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
img_tensor = transform(img).unsqueeze(0) # 扩展维度以适应批量处理
# 提取特征
resnet_model = get_resnet_model()
with torch.no_grad(): # 在不需要计算梯度的情况下进行推断
feature_vector = resnet_model(img_tensor) # 获取 ResNet 的输出
feature_vector = feature_vector.squeeze().cpu().numpy() # 转换为 NumPy 数组并去掉 batch 维度
# 确保是 2048 维
if feature_vector.ndim > 1:
feature_vector = feature_vector.flatten()
# 确保维度正确
if len(feature_vector) != RECOMMENDATION_CONFIG["vector_dim"]:
logger.warning(f"向量维度不正确: {len(feature_vector)}, 期望: {RECOMMENDATION_CONFIG['vector_dim']}")
# 如果维度不对,尝试调整
if len(feature_vector) > RECOMMENDATION_CONFIG["vector_dim"]:
feature_vector = feature_vector[:RECOMMENDATION_CONFIG["vector_dim"]]
else:
padded = np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
padded[:len(feature_vector)] = feature_vector
feature_vector = padded
return feature_vector.astype(np.float32)
except Exception as e:
logger.error(f"提取特征向量失败 [{path}]: {e}", exc_info=True)
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
def normalize_vector(vector: np.ndarray) -> np.ndarray:
"""
L2 归一化向量
Args:
vector: 输入向量
Returns:
归一化后的向量
"""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
def compute_weighted_average(vectors: list, weights: list) -> np.ndarray:
"""
计算加权平均向量
Args:
vectors: 向量列表
weights: 权重列表
Returns:
加权平均向量不做归一化模长为加权平均后的尺度
"""
if not vectors or not weights:
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
# 确保所有向量都是 numpy array
vectors = [np.array(v) for v in vectors]
weights = np.array(weights)
# 计算加权和
weighted_sum = np.zeros_like(vectors[0])
for v, w in zip(vectors, weights):
weighted_sum += v * w
# 返回加权平均(除以权重和,不做 L2 归一化,模长不会随条数线性暴涨)
weight_total = weights.sum()
if weight_total == 0:
return weighted_sum
return weighted_sum / weight_total