All checks were successful
git commit AiDA python develop 分支构建部署 / scheduled_deploy (push) Has been skipped
190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
"""
|
||
向量计算工具类
|
||
包含 ResNet50 特征提取、向量归一化等功能
|
||
"""
|
||
import io
|
||
import logging
|
||
import numpy as np
|
||
import torch
|
||
from torchvision import models, transforms
|
||
from PIL import Image
|
||
from minio import Minio
|
||
|
||
from app.core.config import settings
|
||
from app.service.recommendation_system.config import RECOMMENDATION_CONFIG
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 图像预处理(与ResNet训练时的预处理一致)
|
||
transform = transforms.Compose([
|
||
transforms.Resize((224, 224)), # ResNet 要求 224x224 的输入
|
||
transforms.ToTensor(), # 转换为 Tensor
|
||
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # 标准化
|
||
])
|
||
|
||
# 加载预训练的 ResNet50 模型(去掉最后全连接层)
|
||
_resnet_model = None
|
||
|
||
|
||
def get_resnet_model():
|
||
"""获取 ResNet50 模型(单例模式)"""
|
||
global _resnet_model
|
||
if _resnet_model is None:
|
||
logger.info("加载 ResNet50 模型...")
|
||
_resnet_model = models.resnet50(pretrained=True)
|
||
modules = list(_resnet_model.children())[:-1] # 移除最后的全连接层
|
||
_resnet_model = torch.nn.Sequential(*modules)
|
||
_resnet_model.eval() # 设置为评估模式
|
||
logger.info("ResNet50 模型加载完成")
|
||
return _resnet_model
|
||
|
||
|
||
# MinIO 客户端(单例)
|
||
_minio_client = None
|
||
|
||
|
||
def get_minio_client():
|
||
"""获取 MinIO 客户端(单例模式)"""
|
||
global _minio_client
|
||
if _minio_client is None:
|
||
_minio_client = Minio(
|
||
settings.MINIO_URL,
|
||
access_key=settings.MINIO_ACCESS,
|
||
secret_key=settings.MINIO_SECRET,
|
||
secure=settings.MINIO_SECURE
|
||
)
|
||
return _minio_client
|
||
|
||
|
||
def get_image_from_minio(path: str) -> Image.Image:
|
||
"""
|
||
从 MinIO 获取图片
|
||
|
||
Args:
|
||
path: MinIO 逻辑 URL,格式如 "bucket_name/object_name"
|
||
|
||
Returns:
|
||
PIL Image 对象,失败返回 None
|
||
"""
|
||
try:
|
||
# 分割路径,获取桶名和文件路径
|
||
path_parts = path.split('/', 1)
|
||
if len(path_parts) != 2:
|
||
logger.error(f"路径格式错误: {path}")
|
||
return None
|
||
|
||
bucket_name, file_name = path_parts
|
||
minio_client = get_minio_client()
|
||
|
||
# 获取文件
|
||
obj = minio_client.get_object(bucket_name, file_name)
|
||
img_data = obj.read() # 读取图像数据
|
||
img = Image.open(io.BytesIO(img_data)) # 将数据转为图像对象
|
||
|
||
return img
|
||
except Exception as e:
|
||
logger.error(f"从 MinIO 获取图片失败 [{path}]: {e}")
|
||
return None
|
||
|
||
|
||
def extract_feature_vector(path: str) -> np.ndarray:
|
||
"""
|
||
使用 ResNet50 提取图片特征向量(2048维)
|
||
|
||
Args:
|
||
path: MinIO 逻辑 URL
|
||
|
||
Returns:
|
||
2048维特征向量(numpy array),失败返回零向量
|
||
"""
|
||
try:
|
||
# 从 MinIO 获取图像
|
||
img = get_image_from_minio(path)
|
||
if img is None:
|
||
logger.warning(f"无法获取图片,返回零向量: {path}")
|
||
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
||
|
||
# 预处理
|
||
# 部分 MinIO 图片可能是 RGBA/CMYK,转换成 RGB 以匹配 3 通道标准化参数
|
||
if img.mode != "RGB":
|
||
try:
|
||
img = img.convert("RGB")
|
||
except Exception:
|
||
logger.warning(f"无法转换图片为RGB,返回零向量: {path}")
|
||
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
||
|
||
img_tensor = transform(img).unsqueeze(0) # 扩展维度以适应批量处理
|
||
|
||
# 提取特征
|
||
resnet_model = get_resnet_model()
|
||
with torch.no_grad(): # 在不需要计算梯度的情况下进行推断
|
||
feature_vector = resnet_model(img_tensor) # 获取 ResNet 的输出
|
||
feature_vector = feature_vector.squeeze().cpu().numpy() # 转换为 NumPy 数组并去掉 batch 维度
|
||
|
||
# 确保是 2048 维
|
||
if feature_vector.ndim > 1:
|
||
feature_vector = feature_vector.flatten()
|
||
|
||
# 确保维度正确
|
||
if len(feature_vector) != RECOMMENDATION_CONFIG["vector_dim"]:
|
||
logger.warning(f"向量维度不正确: {len(feature_vector)}, 期望: {RECOMMENDATION_CONFIG['vector_dim']}")
|
||
# 如果维度不对,尝试调整
|
||
if len(feature_vector) > RECOMMENDATION_CONFIG["vector_dim"]:
|
||
feature_vector = feature_vector[:RECOMMENDATION_CONFIG["vector_dim"]]
|
||
else:
|
||
padded = np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
||
padded[:len(feature_vector)] = feature_vector
|
||
feature_vector = padded
|
||
|
||
return feature_vector.astype(np.float32)
|
||
except Exception as e:
|
||
logger.error(f"提取特征向量失败 [{path}]: {e}", exc_info=True)
|
||
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
||
|
||
|
||
def normalize_vector(vector: np.ndarray) -> np.ndarray:
|
||
"""
|
||
L2 归一化向量
|
||
|
||
Args:
|
||
vector: 输入向量
|
||
|
||
Returns:
|
||
归一化后的向量
|
||
"""
|
||
norm = np.linalg.norm(vector)
|
||
if norm == 0:
|
||
return vector
|
||
return vector / norm
|
||
|
||
|
||
def compute_weighted_average(vectors: list, weights: list) -> np.ndarray:
|
||
"""
|
||
计算加权平均向量
|
||
|
||
Args:
|
||
vectors: 向量列表
|
||
weights: 权重列表
|
||
|
||
Returns:
|
||
加权平均向量(不做归一化,模长为加权平均后的尺度)
|
||
"""
|
||
if not vectors or not weights:
|
||
return np.zeros(RECOMMENDATION_CONFIG["vector_dim"], dtype=np.float32)
|
||
|
||
# 确保所有向量都是 numpy array
|
||
vectors = [np.array(v) for v in vectors]
|
||
weights = np.array(weights)
|
||
|
||
# 计算加权和
|
||
weighted_sum = np.zeros_like(vectors[0])
|
||
for v, w in zip(vectors, weights):
|
||
weighted_sum += v * w
|
||
|
||
# 返回加权平均(除以权重和,不做 L2 归一化,模长不会随条数线性暴涨)
|
||
weight_total = weights.sum()
|
||
if weight_total == 0:
|
||
return weighted_sum
|
||
return weighted_sum / weight_total
|
||
|