import io import logging from io import BytesIO import urllib3 from PIL import Image from minio import Minio MINIO_URL = "www.minio-api.aida.com.hk" MINIO_ACCESS = "vXKFLSJkYeEq2DrSZvkB" MINIO_SECRET = "uKTZT3x7C43WvPN9QTc99DiRkwddWZrG9Uh3JVlR" MINIO_SECURE = True minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE) # 自定义 Retry 类 class CustomRetry(urllib3.Retry): def increment(self, method=None, url=None, response=None, error=None, **kwargs): # 调用父类的 increment 方法 new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs) # 打印重试信息 logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}") return new_retry logger = logging.getLogger() timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒 http_client = urllib3.PoolManager( num_pools=10, # 设置连接池大小 maxsize=10, timeout=timeout, cert_reqs='CERT_REQUIRED', # 需要证书验证 retries=CustomRetry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504], ), ) # 获取图片 def oss_get_image(oss_client, bucket, object_name): # cv2 默认全通道读取 image_object = None try: image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name) data_bytes = BytesIO(image_data.read()) image_object = Image.open(data_bytes) except Exception as e: logger.warning(f" | 获取图片出现异常 ######: {e}") return image_object def oss_upload_image(oss_client, bucket, object_name, image_bytes): req = None try: req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png') except Exception as e: logger.warning(f" | 上传图片出现异常 ######: {e}") return req if __name__ == '__main__': url = "test/fida_generate_image/9fea0a597d884c81a5a4c8381b7bae2c.png" img = oss_get_image(oss_client=minio_client, bucket=url.split('/')[0], object_name=url[url.find('/') + 1:]) img.show() img.save("result.png")