Files
lc_stylist_agent/app/server/utils/minio_client.py

133 lines
5.0 KiB
Python
Raw Normal View History

2025-10-24 10:37:19 +08:00
import io
import json
import logging
from io import BytesIO
import cv2
import numpy as np
import urllib3
from PIL import Image
from minio import Minio
from app.server.utils.minio_config import MINIO_ACCESS, MINIO_SECRET, MINIO_URL, MINIO_SECURE
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
# 自定义 Retry 类
class CustomRetry(urllib3.Retry):
def increment(self, method=None, url=None, response=None, error=None, **kwargs):
# 调用父类的 increment 方法
new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs)
# 打印重试信息
logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}")
return new_retry
logger = logging.getLogger()
timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒
http_client = urllib3.PoolManager(
num_pools=10, # 设置连接池大小
maxsize=10,
timeout=timeout,
cert_reqs='CERT_REQUIRED', # 需要证书验证
retries=CustomRetry(
total=5,
backoff_factor=0.2,
status_forcelist=[500, 502, 503, 504],
),
)
# 获取图片
def oss_get_image(oss_client, path, data_type):
# cv2 默认全通道读取
bucket = path.split("/", 1)[0]
object_name = path.split("/", 1)[1]
image_object = None
try:
image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name)
if data_type == "cv2":
image_bytes = image_data.read()
image_array = np.frombuffer(image_bytes, np.uint8) # 转成8位无符号整型
image_object = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED)
if image_object.dtype == np.uint16:
image_object = (image_object / 256).astype('uint8')
else:
data_bytes = BytesIO(image_data.read())
image_object = Image.open(data_bytes)
except Exception as e:
logger.warning(f"获取图片出现异常 ######: {e}")
return image_object
def oss_upload_image(oss_client, bucket, object_name, image_bytes):
req = None
try:
req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png')
except Exception as e:
logger.warning(f"上传图片出现异常 ######: {e}")
return req
# def upload_json_to_minio_sync(
# minio_client: Minio,
# bucket_name: str,
# object_name: str,
# data: list
# ) -> str:
# """
# 将 Python 字典转换为 JSON 字符串,并上传到 MinIO。
#
# :param minio_client: 已初始化的 MinIO 客户端实例。
# :param bucket_name: 目标 Bucket 名称。
# :param object_name: 目标文件路径/名称 (e.g., 'data/report.json')。
# :param data: 要上传的 Python 字典数据。
# :return: 成功返回 True失败返回 False。
# """
# try:
# # 1. 将 Python 字典序列化为 JSON 字符串
# json_string = json.dumps(data, ensure_ascii=False, indent=2)
# # 2. 将 JSON 字符串编码为字节流 (bytes)
# json_bytes = json_string.encode('utf-8')
#
# # 3. 创建 BytesIO 对象,用于从内存读取数据
# data_stream = io.BytesIO(json_bytes)
#
# # 4. 使用 put_object 上传数据流
# minio_client.put_object(
# bucket_name,
# object_name,
# data_stream,
# length=len(json_bytes),
# content_type='application/json; charset=utf-8' # 设置正确的 MIME 类型
# )
# logger.info(f"✅ JSON file uploaded successfully to {bucket_name}/{object_name}")
# return f'{bucket_name}/{object_name}'
#
# except Exception as e:
# logger.error(f"❌ An unexpected error occurred: {e}")
# return False
if __name__ == '__main__':
# url = "lanecarford/lc_stylist_agent_outfit_items/string/7fed1c7b-9efd-41fa-a335-182c310ea611.jpg"
# url = "lanecarford/lc_stylist_agent_outfit_items/string/5de155d0-56a6-43e8-a2f1-7538fce86220.jpg"
# url = "lanecarford/lc_stylist_agent_outfit_items/string/1cd1803c-5f51-4961-a4f2-2acd3e0d8294.jpg"
url = [
'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251120/d9df7c48-c7e5-47f9-be67-07f0d175d202.jpg',
'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251120/ddf39b9c-69f0-4b28-95ed-9d823fa82e35.jpg',
'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251120/112194a0-dc1d-4151-8c58-82642142a553.jpg',
'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251120/788007f1-e44b-4390-ad9e-a2d4ba406379.jpg'
]
2025-10-24 10:37:19 +08:00
read_type = "1"
for id, i in enumerate(url):
img = oss_get_image(minio_client, i, read_type)
img = oss_get_image(oss_client=minio_client, path=i, data_type=read_type)
if read_type == "cv2":
cv2.imshow("", img)
cv2.waitKey(0)
else:
img.show()
img.save(f"{time.time()}.png")