#!/usr/bin/env python # -*- coding: UTF-8 -*- """ @Project :trinity_client @File :synthesis_item.py @Author :周成融 @Date :2023/8/26 14:13:04 @detail : """ import io import logging import cv2 import numpy as np from PIL import Image from minio import Minio from app.core.config import settings from app.service.utils.generate_uuid import generate_uuid from app.service.utils.new_oss_client import oss_upload_image minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE) def positioning(all_mask_shape, mask_shape, offset): all_start = 0 all_end = 0 mask_start = 0 mask_end = 0 if offset == 0: all_start = 0 all_end = min(all_mask_shape, mask_shape) mask_start = 0 mask_end = min(all_mask_shape, mask_shape) elif offset > 0: all_start = min(offset, all_mask_shape) all_end = min(offset + mask_shape, all_mask_shape) mask_start = 0 mask_end = 0 if offset > all_mask_shape else min(all_mask_shape - offset, mask_shape) elif offset < 0: if abs(offset) > mask_shape: all_start = 0 all_end = 0 else: all_start = 0 if mask_shape - abs(offset) > all_mask_shape: all_end = min(mask_shape - abs(offset), all_mask_shape) else: all_end = mask_shape - abs(offset) if abs(offset) > mask_shape: mask_start = mask_shape mask_end = mask_shape else: mask_start = abs(offset) if mask_shape - abs(offset) >= all_mask_shape: mask_end = all_mask_shape + abs(offset) else: mask_end = mask_shape return all_start, all_end, mask_start, mask_end # @RunTime def synthesis(data, size, basic_info): """ Args: data: size: basic_info: Returns: """ # out_of_bounds_control: 是否允许服装越界 True 允许 False 不允许 默认情况允许 out_of_bounds_control = basic_info.get('out_of_bounds_control', True) # 创建底图 base_image = Image.new('RGBA', size, (0, 0, 0, 0)) try: all_mask_shape = (size[1], size[0]) body_mask = None for d in data: if d['name'] == 'body' or d['name'] == 'mannequin': # 创建一个新的宽高透明图像, 把模特贴上去获取mask transparent_image = Image.new("RGBA", size, (0, 0, 0, 0)) transparent_image.paste(d['image'], (d['adaptive_position'][1], d['adaptive_position'][0]), d['image']) # 此处可变数组会被paste篡改值,所以使用下标获取position body_mask = np.array(transparent_image.split()[3]) # 根据新的坐标获取新的肩点 left_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_left'], [d['adaptive_position'][1], d['adaptive_position'][0]])] right_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_right'], [d['adaptive_position'][1], d['adaptive_position'][0]])] body_mask[:min(left_shoulder[1], right_shoulder[1]), left_shoulder[0]:right_shoulder[0]] = 255 _, binary_body_mask = cv2.threshold(body_mask, 127, 255, cv2.THRESH_BINARY) top_outer_mask = np.array(binary_body_mask) bottom_outer_mask = np.array(binary_body_mask) others_outer_mask = np.array(binary_body_mask) top = True bottom = True others = True i = len(data) while i: i -= 1 if top and data[i]['name'] in ["blouse_front", "outwear_front", "dress_front", "tops_front"]: if out_of_bounds_control: top = True else: top = False mask_shape = data[i]['mask'].shape y_offset, x_offset = data[i]['adaptive_position'] # 初始化叠加区域的起始和结束位置 all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset) all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset) # 将叠加区域赋值为相应的像素值 _, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY) background = np.zeros_like(top_outer_mask) background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] top_outer_mask = background + top_outer_mask elif bottom and data[i]['name'] in ["trousers_front", "skirt_front", "bottoms_front", "dress_front"]: # bottom = False mask_shape = data[i]['mask'].shape y_offset, x_offset = data[i]['adaptive_position'] # 初始化叠加区域的起始和结束位置 all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset) all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset) # 将叠加区域赋值为相应的像素值 _, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY) background = np.zeros_like(top_outer_mask) background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] bottom_outer_mask = background + bottom_outer_mask elif others and data[i]['name'] in ['others_front']: mask_shape = data[i]['mask'].shape y_offset, x_offset = data[i]['adaptive_position'] # 初始化叠加区域的起始和结束位置 all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset) all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset) # 将叠加区域赋值为相应的像素值 _, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY) background = np.zeros_like(top_outer_mask) background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] others_outer_mask = background + others_outer_mask pass elif bottom is False and top is False: break all_mask = cv2.bitwise_or(top_outer_mask, bottom_outer_mask) all_mask = cv2.bitwise_or(all_mask, others_outer_mask) for layer in data: if layer['image'] is not None: if layer['name'] != "body": test_image = Image.new('RGBA', size, (0, 0, 0, 0)) paste_img, position = transpose_rotate(layer, layer['image']) test_image.paste(paste_img, position, paste_img) mask_data = np.where(all_mask > 0, 255, 0).astype(np.uint8) mask_alpha = Image.fromarray(mask_data) mask_alpha.paste(paste_img.getchannel('A'), position, paste_img.getchannel('A')) cropped_image = Image.composite(test_image, Image.new("RGBA", test_image.size, (255, 255, 255, 0)), mask_alpha) base_image.paste(test_image, (0, 0), cropped_image) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00 else: base_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image']) result_image = base_image image_data = io.BytesIO() result_image.save(image_data, format='PNG') image_data.seek(0) # oss upload image_bytes = image_data.read() bucket_name = "aida-results" object_name = f'result_{generate_uuid()}.png' oss_upload_image(oss_client=minio_client, bucket=bucket_name, object_name=object_name, image_bytes=image_bytes) return f"{bucket_name}/{object_name}" # return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}" # object_name = f'result_{generate_uuid()}.png' # response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png') # object_url = f"aida-results/{object_name}" # if response['ResponseMetadata']['HTTPStatusCode'] == 200: # return object_url # else: # return "" except Exception as e: logging.warning(f"synthesis runtime exception : {e}") def synthesis_single(front_image, back_image): result_image = None if front_image: result_image = front_image if back_image: result_image.paste(back_image, (0, 0), back_image) # with io.BytesIO() as output: # result_image.save(output, format='PNG') # data = output.getvalue() # object_name = f'result_{generate_uuid()}.png' # response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png') # object_url = f"aida-results/{object_name}" # if response['ResponseMetadata']['HTTPStatusCode'] == 200: # return object_url # else: # return "" image_data = io.BytesIO() result_image.save(image_data, format='PNG') image_data.seek(0) image_bytes = image_data.read() # return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}" # oss upload bucket_name = 'aida-results' object_name = f'result_{generate_uuid()}.png' oss_upload_image(oss_client=minio_client, bucket=bucket_name, object_name=object_name, image_bytes=image_bytes) return f"{bucket_name}/{object_name}" def update_base_size_priority(layers): # 计算透明背景图片的宽度 min_x = min(info['position'][1] for info in layers) x_list = [] new_height = 700 for info in layers: if info['image'] is not None: x_list.append(info['position'][1] + info['image'].width) if info['name'] == 'mannequin': new_height = info['image'].height max_x = max(x_list) # x坐标中最小偏移量的绝对值 + 最大偏移量 new_width = max_x + abs(min_x) # 更新坐标 for info in layers: info['adaptive_position'] = (info['position'][0], info['position'][1] - min_x) return layers, (new_width, new_height) def transpose_rotate(layer, image): # transpose[0]是左右 transpose[1]是上下 transpose = layer.get('transpose', [0, 0]) rotate = layer.get('rotate', 0) paste_x, paste_y = layer['adaptive_position'][1], layer['adaptive_position'][0] # transpose左右是1 上下是-1 if transpose[0] != 1: # 左右 image = image.transpose(0) if transpose[1] != 1: # 上下 image = image.transpose(1) if rotate: image = image.rotate(rotate, expand=True) # 4. 计算粘贴位置以保持视觉中心一致 # 原本 (15, 36) 是 288*288 的左上角,我们计算其中心点 target_center_x = 15 + 288 // 2 target_center_y = 36 + 288 // 2 # 获取旋转后图像的新尺寸 new_w, new_h = image.size # 计算新的左上角坐标,使得旋转后的图像中心依然在原定的中心位置 paste_x = target_center_x - new_w // 2 paste_y = target_center_y - new_h // 2 return image, (paste_x, paste_y)