import io import logging from io import BytesIO import urllib3 from PIL import Image from minio import Minio, S3Error from src.core.config import settings minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE) # 自定义 Retry 类 class CustomRetry(urllib3.Retry): def increment(self, method=None, url=None, response=None, error=None, **kwargs): # 调用父类的 increment 方法 new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs) # 打印重试信息 logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}") return new_retry logger = logging.getLogger() timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒 http_client = urllib3.PoolManager( num_pools=10, # 设置连接池大小 maxsize=10, timeout=timeout, cert_reqs='CERT_REQUIRED', # 需要证书验证 retries=CustomRetry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504], ), ) # 获取图片 def oss_get_image(oss_client, bucket, object_name): # cv2 默认全通道读取 image_object = None try: image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name) data_bytes = BytesIO(image_data.read()) image_object = Image.open(data_bytes) except Exception as e: logger.warning(f" | 获取图片出现异常 ######: {e}") return image_object def oss_upload_image(oss_client, bucket, object_name, image_bytes): req = None try: req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png') except Exception as e: logger.warning(f" | 上传图片出现异常 ######: {e}") return req def oss_upload_image_file(oss_client, bucket, object_name, file_path): req = None try: req = oss_client.fput_object( bucket_name=bucket, object_name=object_name, file_path=file_path ) except Exception as e: logger.warning(f" | 上传图片出现异常 ######: {e}") return req def get_presigned_url(oss_client, bucket, object_name): try: presigned_url = oss_client.get_presigned_url( "GET", bucket_name=bucket, object_name=object_name, ) return presigned_url except Exception as e: print(f"get_presigned_url exception :{e}") return None def is_minio_file_exist(oss_client: Minio, bucket_name: str, object_name: str) -> bool: try: # 核心判断:检查MinIO中指定bucket+object是否存在 oss_client.stat_object(bucket_name, object_name) return True except Exception as e: return False def load_minio_file_to_state(oss_client, bucket: str, object_name: str, display_filename: str = None): try: # 下載 object 成 bytes response = oss_client.get_object( bucket_name=bucket, object_name=object_name, ) data_bytes = response.read() response.close() response.release_conn() # 決定在 agent 裡顯示的檔名(可覆寫,避免洩漏真實 object name) filename = display_filename or object_name.split("/")[-1] # 回傳適合塞進 state["files"] 的格式 return {filename: data_bytes} except S3Error as err: raise ValueError(f"MinIO 下載失敗: {err}") from urllib.parse import urlparse def check_and_extract_minio_image(url: str) -> dict[str, str]: """ 校验URL + 提取MinIO图片路径(支持预签名地址) 返回格式: {"state": bool, "message": str, "data": str} """ # 1. 空值判断 if not url or not isinstance(url, str): return { "state": False, "message": "URL cannot be empty or invalid format", "data": "" } # 2. 解析URL try: parsed = urlparse(url) if not (parsed.scheme and parsed.netloc): return { "state": False, "message": "Invalid URL format", "data": "" } except Exception: return { "state": False, "message": "Failed to parse URL", "data": "" } # 3. 域名判断 allowed_domains = {"www.minio-api.aida.com.hk", "minio-api.aida.com.hk"} if parsed.netloc not in allowed_domains: return { "state": False, "message": f"Invalid domain: {parsed.netloc}", "data": "" } # 4. Get file path (ignore query parameters for presigned URL) file_path = parsed.path.strip() if not file_path: return { "state": False, "message": "No file path found in URL", "data": "" } # 5. Check if it's an image image_exts = (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".tiff") if not file_path.lower().endswith(image_exts): return { "state": False, "message": "Not a valid image file", "data": "" } # 6. Extract final path result_path = file_path.lstrip("/") return { "state": True, "message": "Success, path extracted", "data": result_path } if __name__ == '__main__': urls = ["fida-public-bucket/furniture/sketches/0193c9b2-d8dd-40fc-b715-3ce0daab7abf.png-0.png", "fida-public-bucket/furniture/sketches/bab54cdf-0a60-4806-8c6b-17b836aec1eb.png-1.png", "fida-public-bucket/furniture/sketches/6c993266-95d2-42ee-826b-933b0e344b81.png-2.png"] # read_type = "2" for url in urls: img = oss_get_image(oss_client=minio_client, bucket=url.split('/')[0], object_name=url[url.find('/') + 1:]) img.show() # img.save("result.png") # get_presigned_url(oss_client=minio_client, bucket="fida-test", object_name="furniture/sketches/07bf4cfe-4502-4821-b78f-7727bf409498.png")