import io import logging import mimetypes import os from io import BytesIO import urllib3 from PIL import Image from minio import Minio MINIO_URL = "www.minio-api.aida.com.hk" MINIO_ACCESS = "vXKFLSJkYeEq2DrSZvkB" MINIO_SECRET = "uKTZT3x7C43WvPN9QTc99DiRkwddWZrG9Uh3JVlR" MINIO_SECURE = True MINIO_BUCKET = "test" from minio import Minio import urllib3 class CustomRetry(urllib3.Retry): def increment(self, method=None, url=None, response=None, error=None, **kwargs): # 调用父类的 increment 方法 new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs) # 打印重试信息 logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}") return new_retry http_client = urllib3.PoolManager( num_pools=20, maxsize=50, timeout=urllib3.Timeout(connect=2, read=30), cert_reqs='CERT_REQUIRED', # 需要证书验证 retries=CustomRetry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504], ), ) minio_client = Minio( MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE, http_client=http_client ) def minio_get_image(client, bucket, object_name): response = None try: response = client.get_object(bucket, object_name) # 直接读取 bytes image_bytes = response.read() image = Image.open(BytesIO(image_bytes)).convert("RGB") return image except Exception as e: logger.error(f"读取 MinIO 图片失败: {bucket}/{object_name} | {e}") return None finally: if response: response.close() response.release_conn() def upload_bytes(data_bytes, object_name, content_type): minio_client.put_object( bucket_name=MINIO_BUCKET, object_name=object_name, data=BytesIO(data_bytes), length=len(data_bytes), content_type=content_type ) return f"{MINIO_BUCKET}/{object_name}" def upload_local_file(file_path, object_name, content_type="application/octet-stream"): """ 将本地磁盘上的文件上传到 MinIO :param file_path: 本地文件路径 (如: 'output/sample.obj') :param object_name: MinIO 中的存储路径/文件名 :param content_type: 文件 MIME 类型 """ # 健壮性检查:确保本地文件确实存在 if not os.path.exists(file_path): raise FileNotFoundError(f"本地文件未找到: {file_path}") # 使用 fput_object 直接从磁盘流式上传,不占用过多内存 minio_client.fput_object( bucket_name=MINIO_BUCKET, object_name=object_name, file_path=file_path, content_type=content_type ) return f"{MINIO_BUCKET}/{object_name}" def download_from_minio(object_path, local_path): """ 从 MinIO 下载文件到本地磁盘 """ import os from minio.error import S3Error local_dir = os.path.dirname(local_path) if local_dir: os.makedirs(local_dir, exist_ok=True) path_parts = object_path.split("/", 1) bucket_name = path_parts[0] object_name = path_parts[1] try: minio_client.fget_object( bucket_name=bucket_name, object_name=object_name, file_path=local_path ) if os.path.exists(local_path): return local_path else: raise RuntimeError("下载文件后本地路径不存在") except S3Error as err: if err.code == 'NoSuchKey': raise FileNotFoundError(f"MinIO 对象不存在: {bucket_name}/{object_name}") raise except Exception as e: raise RuntimeError(f"MinIO 下载失败: {str(e)}") logger = logging.getLogger() if __name__ == '__main__': url = "fida-test/furniture/sketches/4449a66d-6267-43f7-86a2-1e42bd19ec61.png" read_type = "2" img = minio_get_image(client=minio_client, bucket=url.split('/')[0], object_name=url[url.find('/') + 1:]) img.show() img.save("result.png")