import io import json import logging import time from io import BytesIO import cv2 import numpy as np import urllib3 from PIL import Image from minio import Minio from app.config import settings minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE) # 自定义 Retry 类 class CustomRetry(urllib3.Retry): def increment(self, method=None, url=None, response=None, error=None, **kwargs): # 调用父类的 increment 方法 new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs) # 打印重试信息 logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}") return new_retry logger = logging.getLogger() timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒 http_client = urllib3.PoolManager( num_pools=10, # 设置连接池大小 maxsize=10, timeout=timeout, cert_reqs='CERT_REQUIRED', # 需要证书验证 retries=CustomRetry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504], ), ) # 获取图片 def oss_get_image(oss_client, path, data_type): # cv2 默认全通道读取 bucket = path.split("/", 1)[0] object_name = path.split("/", 1)[1] image_object = None try: image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name) if data_type == "cv2": image_bytes = image_data.read() image_array = np.frombuffer(image_bytes, np.uint8) # 转成8位无符号整型 image_object = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED) if image_object.dtype == np.uint16: image_object = (image_object / 256).astype('uint8') else: data_bytes = BytesIO(image_data.read()) image_object = Image.open(data_bytes) except Exception as e: logger.warning(f"获取图片出现异常 ######: {e}") return image_object def oss_upload_image(oss_client, bucket, object_name, image_bytes): req = None try: req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png') except Exception as e: logger.warning(f"上传图片出现异常 ######: {e}") return req # def upload_json_to_minio_sync( # minio_client: Minio, # bucket_name: str, # object_name: str, # data: list # ) -> str: # """ # 将 Python 字典转换为 JSON 字符串,并上传到 MinIO。 # # :param minio_client: 已初始化的 MinIO 客户端实例。 # :param bucket_name: 目标 Bucket 名称。 # :param object_name: 目标文件路径/名称 (e.g., 'data/report.json')。 # :param data: 要上传的 Python 字典数据。 # :return: 成功返回 True,失败返回 False。 # """ # try: # # 1. 将 Python 字典序列化为 JSON 字符串 # json_string = json.dumps(data, ensure_ascii=False, indent=2) # # 2. 将 JSON 字符串编码为字节流 (bytes) # json_bytes = json_string.encode('utf-8') # # # 3. 创建 BytesIO 对象,用于从内存读取数据 # data_stream = io.BytesIO(json_bytes) # # # 4. 使用 put_object 上传数据流 # minio_client.put_object( # bucket_name, # object_name, # data_stream, # length=len(json_bytes), # content_type='application/json; charset=utf-8' # 设置正确的 MIME 类型 # ) # logger.info(f"✅ JSON file uploaded successfully to {bucket_name}/{object_name}") # return f'{bucket_name}/{object_name}' # # except Exception as e: # logger.error(f"❌ An unexpected error occurred: {e}") # return False if __name__ == '__main__': # url = "lanecarford/lc_stylist_agent_outfit_items/string/7fed1c7b-9efd-41fa-a335-182c310ea611.jpg" # url = "lanecarford/lc_stylist_agent_outfit_items/string/5de155d0-56a6-43e8-a2f1-7538fce86220.jpg" # url = "lanecarford/lc_stylist_agent_outfit_items/string/1cd1803c-5f51-4961-a4f2-2acd3e0d8294.jpg" url = [ 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/4b595d3b-5d3d-4617-ae09-5fca92d935f7.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/6d0d7540-5b61-45f2-a1fa-5cb1c7a3d0fa.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/a4e51ccb-9b95-4718-8153-92ee0a39d0c8.jpg', 'lanecarford/lc_stylist_agent_outfit_items/zhhtest20251121/cbebbcf6-cca2-4460-9f9f-d0b1000dc2cd.jpg' ] read_type = "1" for id, i in enumerate(url): img = oss_get_image(minio_client, i, read_type) img = oss_get_image(oss_client=minio_client, path=i, data_type=read_type) if read_type == "cv2": cv2.imshow("", img) cv2.waitKey(0) else: img.show() img.save(f"{time.time()}.png")