2024-09-19 14:20:56 +08:00
|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
|
|
"""
|
|
|
|
|
|
@Project :trinity_client
|
|
|
|
|
|
@File :synthesis_item.py
|
|
|
|
|
|
@Author :周成融
|
|
|
|
|
|
@Date :2023/8/26 14:13:04
|
|
|
|
|
|
@detail :
|
|
|
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
|
|
import cv2
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
from PIL import Image
|
2025-12-30 16:49:08 +08:00
|
|
|
|
from minio import Minio
|
|
|
|
|
|
from app.core.config import settings
|
2024-09-19 14:20:56 +08:00
|
|
|
|
from app.service.utils.generate_uuid import generate_uuid
|
2025-12-30 16:49:08 +08:00
|
|
|
|
from app.service.utils.new_oss_client import oss_upload_image
|
|
|
|
|
|
|
|
|
|
|
|
minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def positioning(all_mask_shape, mask_shape, offset):
|
|
|
|
|
|
all_start = 0
|
|
|
|
|
|
all_end = 0
|
|
|
|
|
|
mask_start = 0
|
|
|
|
|
|
mask_end = 0
|
|
|
|
|
|
if offset == 0:
|
|
|
|
|
|
all_start = 0
|
|
|
|
|
|
all_end = min(all_mask_shape, mask_shape)
|
|
|
|
|
|
|
|
|
|
|
|
mask_start = 0
|
|
|
|
|
|
mask_end = min(all_mask_shape, mask_shape)
|
|
|
|
|
|
elif offset > 0:
|
|
|
|
|
|
all_start = min(offset, all_mask_shape)
|
|
|
|
|
|
all_end = min(offset + mask_shape, all_mask_shape)
|
|
|
|
|
|
|
|
|
|
|
|
mask_start = 0
|
|
|
|
|
|
mask_end = 0 if offset > all_mask_shape else min(all_mask_shape - offset, mask_shape)
|
|
|
|
|
|
elif offset < 0:
|
|
|
|
|
|
if abs(offset) > mask_shape:
|
|
|
|
|
|
all_start = 0
|
|
|
|
|
|
all_end = 0
|
|
|
|
|
|
else:
|
|
|
|
|
|
all_start = 0
|
|
|
|
|
|
if mask_shape - abs(offset) > all_mask_shape:
|
|
|
|
|
|
all_end = min(mask_shape - abs(offset), all_mask_shape)
|
|
|
|
|
|
else:
|
|
|
|
|
|
all_end = mask_shape - abs(offset)
|
|
|
|
|
|
|
|
|
|
|
|
if abs(offset) > mask_shape:
|
|
|
|
|
|
mask_start = mask_shape
|
|
|
|
|
|
mask_end = mask_shape
|
|
|
|
|
|
else:
|
|
|
|
|
|
mask_start = abs(offset)
|
|
|
|
|
|
if mask_shape - abs(offset) >= all_mask_shape:
|
|
|
|
|
|
mask_end = all_mask_shape + abs(offset)
|
|
|
|
|
|
else:
|
|
|
|
|
|
mask_end = mask_shape
|
|
|
|
|
|
return all_start, all_end, mask_start, mask_end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# @RunTime
|
|
|
|
|
|
def synthesis(data, size, basic_info):
|
2025-11-17 16:14:22 +08:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
data:
|
|
|
|
|
|
size:
|
|
|
|
|
|
basic_info:
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
# out_of_bounds_control: 是否允许服装越界 True 允许 False 不允许 默认情况允许
|
|
|
|
|
|
out_of_bounds_control = basic_info.get('out_of_bounds_control', True)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
# 创建底图
|
|
|
|
|
|
base_image = Image.new('RGBA', size, (0, 0, 0, 0))
|
|
|
|
|
|
try:
|
|
|
|
|
|
all_mask_shape = (size[1], size[0])
|
|
|
|
|
|
body_mask = None
|
|
|
|
|
|
for d in data:
|
|
|
|
|
|
if d['name'] == 'body' or d['name'] == 'mannequin':
|
|
|
|
|
|
# 创建一个新的宽高透明图像, 把模特贴上去获取mask
|
|
|
|
|
|
transparent_image = Image.new("RGBA", size, (0, 0, 0, 0))
|
|
|
|
|
|
transparent_image.paste(d['image'], (d['adaptive_position'][1], d['adaptive_position'][0]), d['image']) # 此处可变数组会被paste篡改值,所以使用下标获取position
|
|
|
|
|
|
body_mask = np.array(transparent_image.split()[3])
|
|
|
|
|
|
|
|
|
|
|
|
# 根据新的坐标获取新的肩点
|
|
|
|
|
|
left_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_left'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
|
|
|
|
|
|
right_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_right'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
|
|
|
|
|
|
body_mask[:min(left_shoulder[1], right_shoulder[1]), left_shoulder[0]:right_shoulder[0]] = 255
|
|
|
|
|
|
_, binary_body_mask = cv2.threshold(body_mask, 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
|
top_outer_mask = np.array(binary_body_mask)
|
|
|
|
|
|
bottom_outer_mask = np.array(binary_body_mask)
|
2025-11-07 10:56:34 +08:00
|
|
|
|
others_outer_mask = np.array(binary_body_mask)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
|
|
|
|
|
|
top = True
|
|
|
|
|
|
bottom = True
|
2025-11-07 10:56:34 +08:00
|
|
|
|
others = True
|
2024-09-19 14:20:56 +08:00
|
|
|
|
i = len(data)
|
|
|
|
|
|
while i:
|
|
|
|
|
|
i -= 1
|
|
|
|
|
|
if top and data[i]['name'] in ["blouse_front", "outwear_front", "dress_front", "tops_front"]:
|
2025-11-17 16:14:22 +08:00
|
|
|
|
if out_of_bounds_control:
|
|
|
|
|
|
top = True
|
|
|
|
|
|
else:
|
|
|
|
|
|
top = False
|
2024-09-19 14:20:56 +08:00
|
|
|
|
mask_shape = data[i]['mask'].shape
|
|
|
|
|
|
y_offset, x_offset = data[i]['adaptive_position']
|
|
|
|
|
|
# 初始化叠加区域的起始和结束位置
|
|
|
|
|
|
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
|
|
|
|
|
|
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
|
|
|
|
|
|
# 将叠加区域赋值为相应的像素值
|
|
|
|
|
|
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
|
background = np.zeros_like(top_outer_mask)
|
|
|
|
|
|
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
|
|
|
|
|
|
top_outer_mask = background + top_outer_mask
|
|
|
|
|
|
elif bottom and data[i]['name'] in ["trousers_front", "skirt_front", "bottoms_front", "dress_front"]:
|
2024-10-23 14:32:23 +08:00
|
|
|
|
# bottom = False
|
2024-09-19 14:20:56 +08:00
|
|
|
|
mask_shape = data[i]['mask'].shape
|
|
|
|
|
|
y_offset, x_offset = data[i]['adaptive_position']
|
|
|
|
|
|
# 初始化叠加区域的起始和结束位置
|
|
|
|
|
|
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
|
|
|
|
|
|
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
|
|
|
|
|
|
# 将叠加区域赋值为相应的像素值
|
|
|
|
|
|
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
|
background = np.zeros_like(top_outer_mask)
|
|
|
|
|
|
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
|
|
|
|
|
|
bottom_outer_mask = background + bottom_outer_mask
|
2025-11-07 10:56:34 +08:00
|
|
|
|
elif others and data[i]['name'] in ['others_front']:
|
2024-11-26 16:08:10 +08:00
|
|
|
|
mask_shape = data[i]['mask'].shape
|
|
|
|
|
|
y_offset, x_offset = data[i]['adaptive_position']
|
|
|
|
|
|
# 初始化叠加区域的起始和结束位置
|
|
|
|
|
|
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
|
|
|
|
|
|
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
|
|
|
|
|
|
# 将叠加区域赋值为相应的像素值
|
|
|
|
|
|
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
|
background = np.zeros_like(top_outer_mask)
|
|
|
|
|
|
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
|
2025-11-07 10:56:34 +08:00
|
|
|
|
others_outer_mask = background + others_outer_mask
|
2024-11-26 16:08:10 +08:00
|
|
|
|
pass
|
2024-09-19 14:20:56 +08:00
|
|
|
|
elif bottom is False and top is False:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
all_mask = cv2.bitwise_or(top_outer_mask, bottom_outer_mask)
|
2025-11-07 10:56:34 +08:00
|
|
|
|
all_mask = cv2.bitwise_or(all_mask, others_outer_mask)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
|
|
|
|
|
|
for layer in data:
|
|
|
|
|
|
if layer['image'] is not None:
|
|
|
|
|
|
if layer['name'] != "body":
|
|
|
|
|
|
test_image = Image.new('RGBA', size, (0, 0, 0, 0))
|
|
|
|
|
|
test_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
|
|
|
|
|
|
mask_data = np.where(all_mask > 0, 255, 0).astype(np.uint8)
|
|
|
|
|
|
mask_alpha = Image.fromarray(mask_data)
|
|
|
|
|
|
cropped_image = Image.composite(test_image, Image.new("RGBA", test_image.size, (255, 255, 255, 0)), mask_alpha)
|
|
|
|
|
|
base_image.paste(test_image, (0, 0), cropped_image) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00
|
|
|
|
|
|
else:
|
|
|
|
|
|
base_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
|
|
|
|
|
|
|
|
|
|
|
|
result_image = base_image
|
|
|
|
|
|
|
|
|
|
|
|
image_data = io.BytesIO()
|
|
|
|
|
|
result_image.save(image_data, format='PNG')
|
|
|
|
|
|
image_data.seek(0)
|
|
|
|
|
|
|
|
|
|
|
|
# oss upload
|
|
|
|
|
|
image_bytes = image_data.read()
|
|
|
|
|
|
bucket_name = "aida-results"
|
|
|
|
|
|
object_name = f'result_{generate_uuid()}.png'
|
2025-12-30 16:49:08 +08:00
|
|
|
|
oss_upload_image(oss_client=minio_client, bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
return f"{bucket_name}/{object_name}"
|
|
|
|
|
|
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
|
|
|
|
|
|
|
|
|
|
|
|
# object_name = f'result_{generate_uuid()}.png'
|
|
|
|
|
|
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
|
|
|
|
|
|
# object_url = f"aida-results/{object_name}"
|
|
|
|
|
|
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
|
|
|
|
|
|
# return object_url
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# return ""
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logging.warning(f"synthesis runtime exception : {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def synthesis_single(front_image, back_image):
|
|
|
|
|
|
result_image = None
|
|
|
|
|
|
if front_image:
|
|
|
|
|
|
result_image = front_image
|
|
|
|
|
|
if back_image:
|
|
|
|
|
|
result_image.paste(back_image, (0, 0), back_image)
|
|
|
|
|
|
|
|
|
|
|
|
# with io.BytesIO() as output:
|
|
|
|
|
|
# result_image.save(output, format='PNG')
|
|
|
|
|
|
# data = output.getvalue()
|
|
|
|
|
|
# object_name = f'result_{generate_uuid()}.png'
|
|
|
|
|
|
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
|
|
|
|
|
|
# object_url = f"aida-results/{object_name}"
|
|
|
|
|
|
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
|
|
|
|
|
|
# return object_url
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# return ""
|
|
|
|
|
|
image_data = io.BytesIO()
|
|
|
|
|
|
result_image.save(image_data, format='PNG')
|
|
|
|
|
|
image_data.seek(0)
|
|
|
|
|
|
image_bytes = image_data.read()
|
|
|
|
|
|
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
|
|
|
|
|
|
# oss upload
|
|
|
|
|
|
bucket_name = 'aida-results'
|
|
|
|
|
|
object_name = f'result_{generate_uuid()}.png'
|
2025-12-30 16:49:08 +08:00
|
|
|
|
oss_upload_image(oss_client=minio_client, bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
|
2024-09-19 14:20:56 +08:00
|
|
|
|
return f"{bucket_name}/{object_name}"
|
2024-09-25 11:40:11 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-12-30 16:49:08 +08:00
|
|
|
|
def update_base_size_priority(layers):
|
2024-09-25 11:40:11 +08:00
|
|
|
|
# 计算透明背景图片的宽度
|
|
|
|
|
|
min_x = min(info['position'][1] for info in layers)
|
|
|
|
|
|
x_list = []
|
2024-10-18 15:53:40 +08:00
|
|
|
|
new_height = 700
|
2024-09-25 11:40:11 +08:00
|
|
|
|
for info in layers:
|
|
|
|
|
|
if info['image'] is not None:
|
|
|
|
|
|
x_list.append(info['position'][1] + info['image'].width)
|
2024-10-18 15:53:40 +08:00
|
|
|
|
if info['name'] == 'mannequin':
|
|
|
|
|
|
new_height = info['image'].height
|
2024-09-25 11:40:11 +08:00
|
|
|
|
max_x = max(x_list)
|
2025-09-26 13:20:26 +08:00
|
|
|
|
|
|
|
|
|
|
# x坐标中最小偏移量的绝对值 + 最大偏移量
|
|
|
|
|
|
new_width = max_x + abs(min_x)
|
2024-09-25 11:40:11 +08:00
|
|
|
|
# 更新坐标
|
|
|
|
|
|
for info in layers:
|
|
|
|
|
|
info['adaptive_position'] = (info['position'][0], info['position'][1] - min_x)
|
|
|
|
|
|
return layers, (new_width, new_height)
|