feat 代码整理

fix
This commit is contained in:
zhouchengrong
2024-09-26 14:12:17 +08:00
parent 04b15aa200
commit dbc90ea350
7 changed files with 622 additions and 108 deletions

View File

@@ -0,0 +1,613 @@
import io
import json
import logging.config
import threading
import uuid
import cv2
import numpy as np
import urllib3
from PIL import Image
from celery import Celery
from minio import Minio
from app.core.config import *
from app.service.design_batch.item import BodyItem, TopItem, BottomItem
id_lock = threading.Lock()
celery_app = Celery('tasks', broker='amqp://guest:guest@10.1.2.213:5672//', backend='rpc://')
celery_app.conf.worker_log_format = '%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s'
celery_app.conf.worker_hijack_root_logger = False
logging.getLogger('pika').setLevel(logging.WARNING)
logger = logging.getLogger()
timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒
# 自定义 Retry 类
class CustomRetry(urllib3.Retry):
def increment(self, method=None, url=None, response=None, error=None, **kwargs):
# 调用父类的 increment 方法
new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs)
# 打印重试信息
logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}")
return new_retry
http_client = urllib3.PoolManager(
num_pools=50, # 设置连接池大小
maxsize=50,
timeout=timeout,
cert_reqs='CERT_REQUIRED', # 需要证书验证
retries=CustomRetry(
total=5,
backoff_factor=0.2,
status_forcelist=[500, 502, 503, 504],
),
)
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE, http_client=http_client)
def oss_upload_image(bucket, object_name, image_bytes):
req = None
try:
oss_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png')
except Exception as e:
logger.warning(f" 上传图片出现异常 ######: {e}")
return req
# 优先级
priority_dict = {
'earring_front': 99,
'bag_front': 98,
'hairstyle_front': 97,
'outwear_front': 20,
'tops_front': 19,
'dress_front': 18,
'blouse_front': 17,
'skirt_front': 16,
'trousers_front': 15,
'bottoms_front': 14,
'shoes_right': 1,
'shoes_left': 1,
'body': 0,
'bottoms_back': -14,
'trousers_back': -15,
'skirt_back': -16,
'blouse_back': -17,
'dress_back': -18,
'tops_back': -19,
'outwear_back': -20,
'hairstyle_back': -97,
'bag_back': -98,
'earring_back': -99,
}
def process_item(item, basic):
if item['type'] == "Body":
body_server = BodyItem(data=item, basic=basic, minio_client=minio_client)
item_data = body_server.process()
elif item['type'].lower() in ['blouse', 'outwear', 'dress', 'tops']:
top_server = TopItem(data=item, basic=basic, minio_client=minio_client)
item_data = top_server.process()
else:
bottom_server = BottomItem(data=item, basic=basic, minio_client=minio_client)
item_data = bottom_server.process()
return item_data
def process_layer(item, layers):
if item['name'] == "mannequin":
body_layer = organize_body(item)
layers.append(body_layer)
return item['body_image'].size
else:
front_layer, back_layer = organize_clothing(item)
layers.append(front_layer)
layers.append(back_layer)
def organize_body(layer):
body_layer = dict(priority=0,
name=layer["name"].lower(),
image=layer['body_image'],
image_url=layer['body_path'],
mask_image=None,
mask_url=None,
sacle=1,
# mask=layer['body_mask'],
position=(0, 0))
return body_layer
def organize_clothing(layer):
# 起始坐标
start_point = calculate_start_point(layer['keypoint'], layer['scale'], layer['clothes_keypoint'], layer['body_point_test'], layer["offset"], layer["resize_scale"])
# 前片数据
front_layer = dict(priority=layer['priority'] if layer.get("layer_order", False) else priority_dict.get(f'{layer["name"].lower()}_front', None),
name=f'{layer["name"].lower()}_front',
image=layer["front_image"],
# mask_image=layer['front_mask_image'],
image_url=layer['front_image_url'],
mask_url=layer['mask_url'],
sacle=layer['scale'],
clothes_keypoint=layer['clothes_keypoint'],
position=start_point,
resize_scale=layer["resize_scale"],
mask=cv2.resize(layer['mask'], layer["front_image"].size),
gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "",
pattern_image_url=layer['pattern_image_url'],
pattern_image=layer['pattern_image']
)
# 后片数据
back_layer = dict(priority=-layer.get("priority", 0) if layer.get("layer_order", False) else priority_dict.get(f'{layer["name"].lower()}_back', None),
name=f'{layer["name"].lower()}_back',
image=layer["back_image"],
# mask_image=layer['back_mask_image'],
image_url=layer['back_image_url'],
mask_url=layer['mask_url'],
sacle=layer['scale'],
clothes_keypoint=layer['clothes_keypoint'],
position=start_point,
resize_scale=layer["resize_scale"],
mask=cv2.resize(layer['mask'], layer["front_image"].size),
gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "",
pattern_image_url=layer['pattern_image_url'],
)
return front_layer, back_layer
def calculate_start_point(keypoint_type, scale, clothes_point, body_point, offset, resize_scale):
"""
Align left
Args:
keypoint_type: string, "waistband" | "shoulder" | "ear_point"
scale: float
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
Returns:
start_point: tuple (x', y')
x' = y_body - y1 * scale + offset
y' = x_body - x1 * scale + offset
"""
side_indicator = f'{keypoint_type}_left'
start_point = (
int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator][0]) * scale), # y
int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator][1]) * scale) # x
)
return start_point
def update_base_size_priority(layers, size):
# 计算透明背景图片的宽度
min_x = min(info['position'][1] for info in layers)
x_list = []
for info in layers:
if info['image'] is not None:
x_list.append(info['position'][1] + info['image'].width)
max_x = max(x_list)
new_width = max_x - min_x
new_height = 700
# 更新坐标
for info in layers:
info['adaptive_position'] = (info['position'][0], info['position'][1] - min_x)
return layers, (new_width, new_height)
def synthesis_single(front_image, back_image):
result_image = None
if front_image:
result_image = front_image
if back_image:
result_image.paste(back_image, (0, 0), back_image)
image_data = io.BytesIO()
result_image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
bucket_name = 'aida-results'
object_name = f'result_{generate_uuid()}.png'
oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
return f"{bucket_name}/{object_name}"
def oss_upload_json(json_data, object_name):
try:
with open(f"app/service/design/design_batch/response_json/{object_name}", 'w') as file:
json.dump(json_data, file, indent=4)
oss_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
oss_client.fput_object("test", object_name, f"app/service/design/design_batch/response_json/{object_name}")
except Exception as e:
logger.warning(str(e))
def generate_uuid():
with id_lock:
unique_id = str(uuid.uuid1())
return unique_id
def positioning(all_mask_shape, mask_shape, offset):
all_start = 0
all_end = 0
mask_start = 0
mask_end = 0
if offset == 0:
all_start = 0
all_end = min(all_mask_shape, mask_shape)
mask_start = 0
mask_end = min(all_mask_shape, mask_shape)
elif offset > 0:
all_start = min(offset, all_mask_shape)
all_end = min(offset + mask_shape, all_mask_shape)
mask_start = 0
mask_end = 0 if offset > all_mask_shape else min(all_mask_shape - offset, mask_shape)
elif offset < 0:
if abs(offset) > mask_shape:
all_start = 0
all_end = 0
else:
all_start = 0
if mask_shape - abs(offset) > all_mask_shape:
all_end = min(mask_shape - abs(offset), all_mask_shape)
else:
all_end = mask_shape - abs(offset)
if abs(offset) > mask_shape:
mask_start = mask_shape
mask_end = mask_shape
else:
mask_start = abs(offset)
if mask_shape - abs(offset) >= all_mask_shape:
mask_end = all_mask_shape + abs(offset)
else:
mask_end = mask_shape
return all_start, all_end, mask_start, mask_end
def synthesis(data, size, basic_info):
# 创建底图
base_image = Image.new('RGBA', size, (0, 0, 0, 0))
try:
all_mask_shape = (size[1], size[0])
body_mask = None
for d in data:
if d['name'] == 'body' or d['name'] == 'mannequin':
# 创建一个新的宽高透明图像, 把模特贴上去获取mask
transparent_image = Image.new("RGBA", size, (0, 0, 0, 0))
transparent_image.paste(d['image'], (d['adaptive_position'][1], d['adaptive_position'][0]), d['image']) # 此处可变数组会被paste篡改值所以使用下标获取position
body_mask = np.array(transparent_image.split()[3])
# 根据新的坐标获取新的肩点
left_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_left'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
right_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_right'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
body_mask[:min(left_shoulder[1], right_shoulder[1]), left_shoulder[0]:right_shoulder[0]] = 255
_, binary_body_mask = cv2.threshold(body_mask, 127, 255, cv2.THRESH_BINARY)
top_outer_mask = np.array(binary_body_mask)
bottom_outer_mask = np.array(binary_body_mask)
top = True
bottom = True
i = len(data)
while i:
i -= 1
if top and data[i]['name'] in ["blouse_front", "outwear_front", "dress_front", "tops_front"]:
top = False
mask_shape = data[i]['mask'].shape
y_offset, x_offset = data[i]['adaptive_position']
# 初始化叠加区域的起始和结束位置
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
# 将叠加区域赋值为相应的像素值
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
background = np.zeros_like(top_outer_mask)
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
top_outer_mask = background + top_outer_mask
elif bottom and data[i]['name'] in ["trousers_front", "skirt_front", "bottoms_front", "dress_front"]:
bottom = False
mask_shape = data[i]['mask'].shape
y_offset, x_offset = data[i]['adaptive_position']
# 初始化叠加区域的起始和结束位置
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
# 将叠加区域赋值为相应的像素值
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
background = np.zeros_like(top_outer_mask)
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
bottom_outer_mask = background + bottom_outer_mask
elif bottom is False and top is False:
break
all_mask = cv2.bitwise_or(top_outer_mask, bottom_outer_mask)
for layer in data:
if layer['image'] is not None:
if layer['name'] != "body":
test_image = Image.new('RGBA', size, (0, 0, 0, 0))
test_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
mask_data = np.where(all_mask > 0, 255, 0).astype(np.uint8)
mask_alpha = Image.fromarray(mask_data)
cropped_image = Image.composite(test_image, Image.new("RGBA", test_image.size, (255, 255, 255, 0)), mask_alpha)
base_image.paste(test_image, (0, 0), cropped_image) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00
else:
base_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
result_image = base_image
image_data = io.BytesIO()
result_image.save(image_data, format='PNG')
image_data.seek(0)
# oss upload
image_bytes = image_data.read()
bucket_name = "aida-results"
object_name = f'result_{generate_uuid()}.png'
oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
return f"{bucket_name}/{object_name}"
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
# object_name = f'result_{generate_uuid()}.png'
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
# object_url = f"aida-results/{object_name}"
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
# return object_url
# else:
# return ""
except Exception as e:
logging.warning(f"synthesis runtime exception : {e}")
def publish_status(task_id, progress, result):
connection = pika.BlockingConnection(pika.ConnectionParameters('10.1.2.213'))
channel = connection.channel()
channel.queue_declare(queue='DesignBatch', durable=True)
message = {'task_id': task_id, 'progress': progress, "result": result}
channel.basic_publish(exchange='',
routing_key='DesignBatch',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,
))
connection.close()
@celery_app.task
def batch_design(objects_data, tasks_id, json_name):
object_response = []
threads = []
active_threads = 0
lock = threading.Lock()
def process_object(step, object):
nonlocal active_threads
basic = object['basic']
items_response = {'layers': []}
if basic['single_overall'] == "overall":
item_results = []
for item in object['items']:
item_results.append(process_item(item, basic))
layers = []
body_size = None
for item in item_results:
body_size = process_layer(item, layers)
layers = sorted(layers, key=lambda s: s.get("priority", float('inf')))
layers, new_size = update_base_size_priority(layers, body_size)
for lay in layers:
items_response['layers'].append({
'image_category': lay['name'],
'position': lay['position'],
'priority': lay.get("priority", None),
'resize_scale': lay['resize_scale'] if "resize_scale" in lay.keys() else None,
'image_size': lay['image'] if lay['image'] is None else lay['image'].size,
'gradient_string': lay['gradient_string'] if 'gradient_string' in lay.keys() else "",
'mask_url': lay['mask_url'],
'image_url': lay['image_url'] if 'image_url' in lay.keys() else None,
'pattern_image_url': lay['pattern_image_url'] if 'pattern_image_url' in lay.keys() else None,
})
items_response['synthesis_url'] = synthesis(layers, new_size, basic)
else:
item_result = process_item(object['items'][0], basic)
items_response['layers'].append({
'image_category': f"{item_result['name']}_front",
'image_size': item_result['back_image'].size if item_result['back_image'] else None,
'position': None,
'priority': 0,
'image_url': item_result['front_image_url'],
'mask_url': item_result['mask_url'],
"gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "",
'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None,
})
items_response['layers'].append({
'image_category': f"{item_result['name']}_back",
'image_size': item_result['front_image'].size if item_result['front_image'] else None,
'position': None,
'priority': 0,
'image_url': item_result['back_image_url'],
'mask_url': item_result['mask_url'],
"gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "",
'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None,
})
items_response['synthesis_url'] = synthesis_single(item_result['front_image'], item_result['back_image'])
with lock:
object_response.append(items_response)
publish_status(tasks_id, step + 1, items_response)
active_threads -= 1
for step, object in enumerate(objects_data):
t = threading.Thread(target=process_object, args=(step, object))
threads.append(t)
t.start()
with lock:
active_threads += 1
for t in threads:
t.join()
oss_upload_json(object_response, json_name)
publish_status(tasks_id, "ok", json_name)
return object_response
if __name__ == '__main__':
object_data = {
"objects": [
{
"basic": {
"body_point_test": {
"waistband_right": [
199,
239
],
"hand_point_right": [
220,
308
],
"waistband_left": [
113,
239
],
"hand_point_left": [
92,
310
],
"shoulder_left": [
99,
111
],
"shoulder_right": [
214,
111
]
},
"layer_order": False,
"scale_bag": 0.7,
"scale_earrings": 0.16,
"self_template": True,
"single_overall": "overall",
"switch_category": ""
},
"items": [
{
"color": "195 195 196",
"icon": "none",
"image_id": 116207,
"offset": [
1,
1
],
"path": "aida-sys-image/images/female/trousers/trousers_973.jpg",
"print": {
"element": {
"element_angle_list": [],
"element_path_list": [],
"element_scale_list": [],
"location": []
},
"overall": {
"location": [
[
0.0,
0.0
]
],
"print_angle_list": [
0.0,
0.0
],
"print_path_list": [],
"print_scale_list": [
0.0,
0.0
]
},
"single": {
"location": [],
"print_angle_list": [],
"print_path_list": [],
"print_scale_list": []
}
},
"resize_scale": [
1.0,
1.0
],
"type": "Trousers"
},
{
"color": "203 204 202",
"icon": "none",
"image_id": 95825,
"offset": [
1,
1
],
"path": "aida-sys-image/images/female/blouse/0902003606.jpg",
"print": {
"element": {
"element_angle_list": [],
"element_path_list": [],
"element_scale_list": [],
"location": []
},
"overall": {
"location": [
[
0.0,
0.0
]
],
"print_angle_list": [
0.0,
0.0
],
"print_path_list": [],
"print_scale_list": [
0.0,
0.0
]
},
"single": {
"location": [],
"print_angle_list": [],
"print_path_list": [],
"print_scale_list": []
}
},
"resize_scale": [
1.0,
1.0
],
"type": "Blouse"
},
{
"body_path": "aida-sys-image/models/female/23ecb158-7b70-4468-a9d1-bac3ded9da62.png",
"image_id": 116612,
"offset": [
1,
1
],
"resize_scale": [
1.0,
1.0
],
"type": "Body"
}
]
}
],
"process_id": "9062885798571902"
}
X = batch_design(object_data['objects'], "123", "test.json")
print(X)