import io import json import logging import time from io import BytesIO import cv2 import numpy as np import urllib3 from PIL import Image from minio import Minio from app.server.utils.minio_config import MINIO_ACCESS, MINIO_SECRET, MINIO_URL, MINIO_SECURE minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE) # 自定义 Retry 类 class CustomRetry(urllib3.Retry): def increment(self, method=None, url=None, response=None, error=None, **kwargs): # 调用父类的 increment 方法 new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs) # 打印重试信息 logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}") return new_retry logger = logging.getLogger() timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒 http_client = urllib3.PoolManager( num_pools=10, # 设置连接池大小 maxsize=10, timeout=timeout, cert_reqs='CERT_REQUIRED', # 需要证书验证 retries=CustomRetry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504], ), ) # 获取图片 def oss_get_image(oss_client, path, data_type): # cv2 默认全通道读取 bucket = path.split("/", 1)[0] object_name = path.split("/", 1)[1] image_object = None try: image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name) if data_type == "cv2": image_bytes = image_data.read() image_array = np.frombuffer(image_bytes, np.uint8) # 转成8位无符号整型 image_object = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED) if image_object.dtype == np.uint16: image_object = (image_object / 256).astype('uint8') else: data_bytes = BytesIO(image_data.read()) image_object = Image.open(data_bytes) except Exception as e: logger.warning(f"获取图片出现异常 ######: {e}") return image_object def oss_upload_image(oss_client, bucket, object_name, image_bytes): req = None try: req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png') except Exception as e: logger.warning(f"上传图片出现异常 ######: {e}") return req # def upload_json_to_minio_sync( # minio_client: Minio, # bucket_name: str, # object_name: str, # data: list # ) -> str: # """ # 将 Python 字典转换为 JSON 字符串,并上传到 MinIO。 # # :param minio_client: 已初始化的 MinIO 客户端实例。 # :param bucket_name: 目标 Bucket 名称。 # :param object_name: 目标文件路径/名称 (e.g., 'data/report.json')。 # :param data: 要上传的 Python 字典数据。 # :return: 成功返回 True,失败返回 False。 # """ # try: # # 1. 将 Python 字典序列化为 JSON 字符串 # json_string = json.dumps(data, ensure_ascii=False, indent=2) # # 2. 将 JSON 字符串编码为字节流 (bytes) # json_bytes = json_string.encode('utf-8') # # # 3. 创建 BytesIO 对象,用于从内存读取数据 # data_stream = io.BytesIO(json_bytes) # # # 4. 使用 put_object 上传数据流 # minio_client.put_object( # bucket_name, # object_name, # data_stream, # length=len(json_bytes), # content_type='application/json; charset=utf-8' # 设置正确的 MIME 类型 # ) # logger.info(f"✅ JSON file uploaded successfully to {bucket_name}/{object_name}") # return f'{bucket_name}/{object_name}' # # except Exception as e: # logger.error(f"❌ An unexpected error occurred: {e}") # return False if __name__ == '__main__': # url = "lanecarford/lc_stylist_agent_outfit_items/string/7fed1c7b-9efd-41fa-a335-182c310ea611.jpg" # url = "lanecarford/lc_stylist_agent_outfit_items/string/5de155d0-56a6-43e8-a2f1-7538fce86220.jpg" # url = "lanecarford/lc_stylist_agent_outfit_items/string/1cd1803c-5f51-4961-a4f2-2acd3e0d8294.jpg" url = [ 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/4b595d3b-5d3d-4617-ae09-5fca92d935f7.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/6d0d7540-5b61-45f2-a1fa-5cb1c7a3d0fa.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/a4e51ccb-9b95-4718-8153-92ee0a39d0c8.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/cbebbcf6-cca2-4460-9f9f-d0b1000dc2cd.jpg' ] read_type = "1" for id, i in enumerate(url): img = oss_get_image(minio_client, i, read_type) img = oss_get_image(oss_client=minio_client, path=i, data_type=read_type) if read_type == "cv2": cv2.imshow("", img) cv2.waitKey(0) else: img.show() img.save(f"{time.time()}.png")