Files
AiDA_Python/app/service/design_fast/pipeline/split.py

135 lines
9.3 KiB
Python
Raw Normal View History

2024-09-19 14:20:56 +08:00
import io
import logging
import cv2
import numpy as np
from PIL import Image
from celery.bin.result import result
2024-09-19 14:20:56 +08:00
2024-09-26 06:09:05 +00:00
from app.service.design_fast.utils.conversion_image import rgb_to_rgba
2024-11-19 10:14:52 +08:00
from app.service.design_fast.utils.transparent import sketch_to_transparent
2024-09-25 11:40:11 +08:00
from app.service.design_fast.utils.upload_image import upload_png_mask
2024-09-19 14:20:56 +08:00
from app.service.utils.generate_uuid import generate_uuid
2024-11-19 10:14:52 +08:00
from app.service.utils.new_oss_client import oss_upload_image, oss_get_image
2024-09-19 14:20:56 +08:00
class Split(object):
def __init__(self, minio_client):
self.minio_client = minio_client
def __call__(self, result):
try:
if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms', 'others'):
if result.get('design_type', None) == 'merge':
# merge 不需要返回mask (红绿图)
if result['resize_scale'][0] == 1.0 and result['resize_scale'][1] == 1.0:
front_mask = result['front_mask']
back_mask = result['back_mask']
else:
height, width = result['front_mask'].shape[:2]
new_width = int(width * result['resize_scale'][0])
new_height = int(height * result['resize_scale'][1])
front_mask = cv2.resize(result['front_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
back_mask = cv2.resize(result['back_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
result['merge_image'] = cv2.resize(result['merge_image'], (new_width, new_height), interpolation=cv2.INTER_AREA)
rgba_image = rgb_to_rgba(result['merge_image'], front_mask + back_mask)
new_size = (int(rgba_image.shape[1] * result["scale"]), int(rgba_image.shape[0] * result["scale"]))
rgba_image = cv2.resize(rgba_image, new_size, interpolation=cv2.INTER_AREA)
result_front_image = np.zeros_like(rgba_image)
front_mask = cv2.resize(front_mask, new_size, interpolation=cv2.INTER_AREA)
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
result_front_image_pil = Image.fromarray(cv2.cvtColor(result_front_image, cv2.COLOR_BGR2RGBA))
result['front_image'], result["front_image_url"], _ = upload_png_mask(self.minio_client, result_front_image_pil, f'{generate_uuid()}', mask=None)
result_back_image = np.zeros_like(rgba_image)
back_mask = cv2.resize(back_mask, new_size, interpolation=cv2.INTER_AREA)
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
result_back_image_pil = Image.fromarray(cv2.cvtColor(result_back_image, cv2.COLOR_BGR2RGBA))
result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None)
return result
else:
ori_front_mask = result['front_mask'].copy()
ori_back_mask = result['back_mask'].copy()
if result['resize_scale'][0] == 1.0 and result['resize_scale'][1] == 1.0:
front_mask = result['front_mask']
back_mask = result['back_mask']
2024-11-19 10:14:52 +08:00
else:
height, width = result['front_mask'].shape[:2]
new_width = int(width * result['resize_scale'][0])
new_height = int(height * result['resize_scale'][1])
front_mask = cv2.resize(result['front_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
back_mask = cv2.resize(result['back_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
rgba_image = rgb_to_rgba(result['final_image'], front_mask + back_mask)
new_size = (int(rgba_image.shape[1] * result["scale"]), int(rgba_image.shape[0] * result["scale"]))
rgba_image = cv2.resize(rgba_image, new_size, interpolation=cv2.INTER_AREA)
result_front_image = np.zeros_like(rgba_image)
front_mask = cv2.resize(front_mask, new_size, interpolation=cv2.INTER_AREA)
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
result_front_image_pil = Image.fromarray(cv2.cvtColor(result_front_image, cv2.COLOR_BGR2RGBA))
if 'transparent' in result.keys():
# 用户自选区域transparent
transparent = result['transparent']
if transparent['mask_url'] is not None and transparent['mask_url'] != "":
# 预处理用户自选区mask
seg_mask = oss_get_image(oss_client=self.minio_client, bucket=transparent['mask_url'].split('/')[0], object_name=transparent['mask_url'][transparent['mask_url'].find('/') + 1:], data_type="cv2")
seg_mask = cv2.resize(seg_mask, new_size, interpolation=cv2.INTER_AREA)
# 转换颜色空间为 RGBOpenCV 默认是 BGR
image_rgb = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2RGB)
r, g, b = cv2.split(image_rgb)
blue_mask = b > r
# 创建红色和绿色掩码
transparent_mask = np.array(blue_mask, dtype=np.uint8) * 255
result_front_image_pil = sketch_to_transparent(result_front_image_pil, transparent_mask, transparent["scale"])
else:
result_front_image_pil = sketch_to_transparent(result_front_image_pil, front_mask, transparent["scale"])
result['front_image'], result["front_image_url"], _ = upload_png_mask(self.minio_client, result_front_image_pil, f'{generate_uuid()}', mask=None)
height, width = ori_front_mask.shape
mask_image = np.zeros((height, width, 3))
mask_image[ori_front_mask != 0] = [0, 0, 255]
result_back_image = np.zeros_like(rgba_image)
back_mask = cv2.resize(back_mask, new_size, interpolation=cv2.INTER_AREA)
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
result_back_image_pil = Image.fromarray(cv2.cvtColor(result_back_image, cv2.COLOR_BGR2RGBA))
result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None)
# mask_image[back_mask != 0] = [0, 255, 0]
mask_image[ori_back_mask != 0] = [0, 255, 0]
rbga_mask = rgb_to_rgba(mask_image, ori_front_mask + ori_back_mask)
mask_pil = Image.fromarray(cv2.cvtColor(rbga_mask.astype(np.uint8), cv2.COLOR_BGR2RGBA))
image_data = io.BytesIO()
mask_pil.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
req = oss_upload_image(oss_client=self.minio_client, bucket="aida-clothing", object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
result['mask_url'] = req.bucket_name + "/" + req.object_name
# 创建中间图层(未分割图层) 1.color + overall_print 2.color + overall_print + print
result_pattern_overall_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_overall'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
result['pattern_overall_image'], result['pattern_overall_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_overall_image_pil, f'{generate_uuid()}')
result_pattern_print_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_print'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
result['pattern_print_image'], result['pattern_print_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_print_image_pil, f'{generate_uuid()}')
return result
else:
ori_front_mask, ori_back_mask = None, None
# 创建中间图层(未分割图层) 1.color + overall_print 2.color + overall_print + print
result_pattern_overall_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_overall'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
result['pattern_overall_image'], result['pattern_overall_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_overall_image_pil, f'{generate_uuid()}')
result_pattern_print_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_print'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
result['pattern_print_image'], result['pattern_print_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_print_image_pil, f'{generate_uuid()}')
2024-09-19 14:20:56 +08:00
return result
2024-09-19 14:20:56 +08:00
except Exception as e:
logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")