import io import logging import cv2 import numpy as np from PIL import Image from cv2 import cvtColor, COLOR_BGR2RGBA from app.core.config import AIDA_CLOTHING from app.service.design_fast.utils.conversion_image import rgb_to_rgba from app.service.design_fast.utils.transparent import sketch_to_transparent from app.service.design_fast.utils.upload_image import upload_png_mask from app.service.utils.generate_uuid import generate_uuid from app.service.utils.new_oss_client import oss_upload_image, oss_get_image class Split(object): def __init__(self, minio_client): self.minio_client = minio_client def __call__(self, result): try: if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms', 'accessories'): ori_front_mask = result['front_mask'].copy() ori_back_mask = result['back_mask'].copy() if result['resize_scale'][0] == 1.0 and result['resize_scale'][1] == 1.0: front_mask = result['front_mask'] back_mask = result['back_mask'] else: height, width = result['front_mask'].shape[:2] new_width = int(width * result['resize_scale'][0]) new_height = int(height * result['resize_scale'][1]) front_mask = cv2.resize(result['front_mask'], (new_width, new_height)) back_mask = cv2.resize(result['back_mask'], (new_width, new_height)) rgba_image = rgb_to_rgba(result['final_image'], front_mask + back_mask) new_size = (int(rgba_image.shape[1] * result["scale"]), int(rgba_image.shape[0] * result["scale"])) rgba_image = cv2.resize(rgba_image, new_size) result_front_image = np.zeros_like(rgba_image) front_mask = cv2.resize(front_mask, new_size) result_front_image[front_mask != 0] = rgba_image[front_mask != 0] result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA)) if 'transparent' in result.keys(): # 用户自选区域transparent transparent = result['transparent'] if transparent['mask_url'] is not None and transparent['mask_url'] != "": # 预处理用户自选区mask seg_mask = oss_get_image(oss_client=self.minio_client, bucket=transparent['mask_url'].split('/')[0], object_name=transparent['mask_url'][transparent['mask_url'].find('/') + 1:], data_type="cv2") seg_mask = cv2.resize(seg_mask, new_size, interpolation=cv2.INTER_NEAREST) # 转换颜色空间为 RGB(OpenCV 默认是 BGR) image_rgb = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2RGB) r, g, b = cv2.split(image_rgb) blue_mask = b > r # 创建红色和绿色掩码 transparent_mask = np.array(blue_mask, dtype=np.uint8) * 255 result_front_image_pil = sketch_to_transparent(result_front_image_pil, transparent_mask, transparent["scale"]) else: result_front_image_pil = sketch_to_transparent(result_front_image_pil, front_mask, transparent["scale"]) result['front_image'], result["front_image_url"], _ = upload_png_mask(self.minio_client, result_front_image_pil, f'{generate_uuid()}', mask=None) # 前片部分 (红图部分) # height, width = front_mask.shape # mask_image = np.zeros((height, width, 3)) # mask_image[front_mask != 0] = [0, 0, 255] # 切换为原始图片尺寸------------------------------- height, width = ori_front_mask.shape mask_image = np.zeros((height, width, 3)) mask_image[ori_front_mask != 0] = [0, 0, 255] # ----------------------------------------------- # if result["name"] in ('blouse', 'dress', 'outwear', 'tops'): # result_back_image = np.zeros_like(rgba_image) # back_mask = cv2.resize(back_mask, new_size) # result_back_image[back_mask != 0] = rgba_image[back_mask != 0] # result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA)) # result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None) # mask_image[back_mask != 0] = [0, 255, 0] # # rbga_mask = rgb_to_rgba(mask_image, front_mask + back_mask) # mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA)) # image_data = io.BytesIO() # mask_pil.save(image_data, format='PNG') # image_data.seek(0) # image_bytes = image_data.read() # req = oss_upload_image(oss_client=self.minio_client, bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes) # result['mask_url'] = req.bucket_name + "/" + req.object_name # else: # rbga_mask = rgb_to_rgba(mask_image, front_mask) # mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA)) # image_data = io.BytesIO() # mask_pil.save(image_data, format='PNG') # image_data.seek(0) # image_bytes = image_data.read() # req = oss_upload_image(oss_client=self.minio_client, bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes) # result['mask_url'] = req.bucket_name + "/" + req.object_name # result['back_image'] = None # result["back_image_url"] = None # # result["back_mask_url"] = None # # result['back_mask_image'] = None result_back_image = np.zeros_like(rgba_image) back_mask = cv2.resize(back_mask, new_size) result_back_image[back_mask != 0] = rgba_image[back_mask != 0] result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA)) result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None) # mask_image[back_mask != 0] = [0, 255, 0] mask_image[ori_back_mask != 0] = [0, 255, 0] rbga_mask = rgb_to_rgba(mask_image, ori_front_mask + ori_back_mask) mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA)) image_data = io.BytesIO() mask_pil.save(image_data, format='PNG') image_data.seek(0) image_bytes = image_data.read() req = oss_upload_image(oss_client=self.minio_client, bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes) result['mask_url'] = req.bucket_name + "/" + req.object_name # 创建中间图层 result_pattern_image_rgba = rgb_to_rgba(result['no_seg_sketch'], result['mask']) result_pattern_image_pil = Image.fromarray(cvtColor(result_pattern_image_rgba, COLOR_BGR2RGBA)) result['pattern_image'], result['pattern_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_image_pil, f'{generate_uuid()}') return result except Exception as e: logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")