All checks were successful
git commit AiDA python develop 分支构建部署 / scheduled_deploy (push) Has been skipped
132 lines
8.4 KiB
Python
132 lines
8.4 KiB
Python
import io
|
||
import logging
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from PIL import Image
|
||
|
||
from app.service.design_fast.utils.conversion_image import rgb_to_rgba
|
||
from app.service.design_fast.utils.transparent import sketch_to_transparent
|
||
from app.service.design_fast.utils.upload_image import upload_png_mask
|
||
from app.service.utils.generate_uuid import generate_uuid
|
||
from app.service.utils.new_oss_client import oss_upload_image, oss_get_image
|
||
|
||
|
||
class Split(object):
|
||
def __init__(self, minio_client):
|
||
self.minio_client = minio_client
|
||
|
||
def __call__(self, result):
|
||
try:
|
||
if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms', 'others'):
|
||
ori_front_mask = result['front_mask'].copy()
|
||
ori_back_mask = result['back_mask'].copy()
|
||
|
||
if result['resize_scale'][0] == 1.0 and result['resize_scale'][1] == 1.0:
|
||
front_mask = result['front_mask']
|
||
back_mask = result['back_mask']
|
||
else:
|
||
height, width = result['front_mask'].shape[:2]
|
||
new_width = int(width * result['resize_scale'][0])
|
||
new_height = int(height * result['resize_scale'][1])
|
||
|
||
front_mask = cv2.resize(result['front_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
|
||
back_mask = cv2.resize(result['back_mask'], (new_width, new_height), interpolation=cv2.INTER_AREA)
|
||
|
||
rgba_image = rgb_to_rgba(result['final_image'], front_mask + back_mask)
|
||
new_size = (int(rgba_image.shape[1] * result["scale"]), int(rgba_image.shape[0] * result["scale"]))
|
||
rgba_image = cv2.resize(rgba_image, new_size, interpolation=cv2.INTER_AREA)
|
||
result_front_image = np.zeros_like(rgba_image)
|
||
front_mask = cv2.resize(front_mask, new_size, interpolation=cv2.INTER_AREA)
|
||
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
|
||
result_front_image_pil = Image.fromarray(cv2.cvtColor(result_front_image, cv2.COLOR_BGR2RGBA))
|
||
if 'transparent' in result.keys():
|
||
# 用户自选区域transparent
|
||
transparent = result['transparent']
|
||
if transparent['mask_url'] is not None and transparent['mask_url'] != "":
|
||
# 预处理用户自选区mask
|
||
seg_mask = oss_get_image(oss_client=self.minio_client, bucket=transparent['mask_url'].split('/')[0], object_name=transparent['mask_url'][transparent['mask_url'].find('/') + 1:], data_type="cv2")
|
||
seg_mask = cv2.resize(seg_mask, new_size, interpolation=cv2.INTER_AREA)
|
||
# 转换颜色空间为 RGB(OpenCV 默认是 BGR)
|
||
image_rgb = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2RGB)
|
||
|
||
r, g, b = cv2.split(image_rgb)
|
||
blue_mask = b > r
|
||
|
||
# 创建红色和绿色掩码
|
||
transparent_mask = np.array(blue_mask, dtype=np.uint8) * 255
|
||
result_front_image_pil = sketch_to_transparent(result_front_image_pil, transparent_mask, transparent["scale"])
|
||
else:
|
||
result_front_image_pil = sketch_to_transparent(result_front_image_pil, front_mask, transparent["scale"])
|
||
result['front_image'], result["front_image_url"], _ = upload_png_mask(self.minio_client, result_front_image_pil, f'{generate_uuid()}', mask=None)
|
||
|
||
# 前片部分 (红图部分)
|
||
# height, width = front_mask.shape
|
||
# mask_image = np.zeros((height, width, 3))
|
||
# mask_image[front_mask != 0] = [0, 0, 255]
|
||
|
||
# 切换为原始图片尺寸-------------------------------
|
||
height, width = ori_front_mask.shape
|
||
mask_image = np.zeros((height, width, 3))
|
||
mask_image[ori_front_mask != 0] = [0, 0, 255]
|
||
# -----------------------------------------------
|
||
|
||
# if result["name"] in ('blouse', 'dress', 'outwear', 'tops'):
|
||
# result_back_image = np.zeros_like(rgba_image)
|
||
# back_mask = cv2.resize(back_mask, new_size, interpolation=cv2.INTER_AREA)
|
||
# result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
|
||
# result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
|
||
# result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None)
|
||
# mask_image[back_mask != 0] = [0, 255, 0]
|
||
#
|
||
# rbga_mask = rgb_to_rgba(mask_image, front_mask + back_mask)
|
||
# mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA))
|
||
# image_data = io.BytesIO()
|
||
# mask_pil.save(image_data, format='PNG')
|
||
# image_data.seek(0)
|
||
# image_bytes = image_data.read()
|
||
# req = oss_upload_image(oss_client=self.minio_client, bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
|
||
# result['mask_url'] = req.bucket_name + "/" + req.object_name
|
||
# else:
|
||
# rbga_mask = rgb_to_rgba(mask_image, front_mask)
|
||
# mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA))
|
||
# image_data = io.BytesIO()
|
||
# mask_pil.save(image_data, format='PNG')
|
||
# image_data.seek(0)
|
||
# image_bytes = image_data.read()
|
||
# req = oss_upload_image(oss_client=self.minio_client, bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
|
||
# result['mask_url'] = req.bucket_name + "/" + req.object_name
|
||
# result['back_image'] = None
|
||
# result["back_image_url"] = None
|
||
# # result["back_mask_url"] = None
|
||
# # result['back_mask_image'] = None
|
||
|
||
result_back_image = np.zeros_like(rgba_image)
|
||
back_mask = cv2.resize(back_mask, new_size, interpolation=cv2.INTER_AREA)
|
||
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
|
||
result_back_image_pil = Image.fromarray(cv2.cvtColor(result_back_image, cv2.COLOR_BGR2RGBA))
|
||
result['back_image'], result["back_image_url"], _ = upload_png_mask(self.minio_client, result_back_image_pil, f'{generate_uuid()}', mask=None)
|
||
|
||
# mask_image[back_mask != 0] = [0, 255, 0]
|
||
mask_image[ori_back_mask != 0] = [0, 255, 0]
|
||
|
||
rbga_mask = rgb_to_rgba(mask_image, ori_front_mask + ori_back_mask)
|
||
mask_pil = Image.fromarray(cv2.cvtColor(rbga_mask.astype(np.uint8), cv2.COLOR_BGR2RGBA))
|
||
image_data = io.BytesIO()
|
||
mask_pil.save(image_data, format='PNG')
|
||
image_data.seek(0)
|
||
image_bytes = image_data.read()
|
||
req = oss_upload_image(oss_client=self.minio_client, bucket="aida-clothing", object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
|
||
result['mask_url'] = req.bucket_name + "/" + req.object_name
|
||
else:
|
||
ori_front_mask, ori_back_mask = None, None
|
||
# 创建中间图层(未分割图层) 1.color + overall_print 2.color + overall_print + print
|
||
result_pattern_overall_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_overall'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
|
||
result['pattern_overall_image'], result['pattern_overall_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_overall_image_pil, f'{generate_uuid()}')
|
||
|
||
result_pattern_print_image_pil = Image.fromarray(cv2.cvtColor(rgb_to_rgba(result['no_seg_sketch_print'], ori_front_mask + ori_back_mask), cv2.COLOR_BGR2RGBA))
|
||
result['pattern_print_image'], result['pattern_print_image_url'], _ = upload_png_mask(self.minio_client, result_pattern_print_image_pil, f'{generate_uuid()}')
|
||
return result
|
||
except Exception as e:
|
||
logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")
|