@@ -1,613 +0,0 @@
import io
import json
import logging . config
import threading
import uuid
import cv2
import numpy as np
import urllib3
from PIL import Image
from celery import Celery
from minio import Minio
from app . core . config import *
from app . service . design_batch . item import BodyItem , TopItem , BottomItem
id_lock = threading . Lock ( )
celery_app = Celery ( ' tasks ' , broker = ' amqp://guest:guest@10.1.2.213:5672// ' , backend = ' rpc:// ' )
celery_app . conf . worker_log_format = ' %(asctime)s %(filename)s [line: %(lineno)d ] %(levelname)s %(message)s '
celery_app . conf . worker_hijack_root_logger = False
logging . getLogger ( ' pika ' ) . setLevel ( logging . WARNING )
logger = logging . getLogger ( )
timeout = urllib3 . Timeout ( connect = 1 , read = 10.0 ) # 连接超时 5 秒,读取超时 10 秒
# 自定义 Retry 类
class CustomRetry ( urllib3 . Retry ) :
def increment ( self , method = None , url = None , response = None , error = None , * * kwargs ) :
# 调用父类的 increment 方法
new_retry = super ( CustomRetry , self ) . increment ( method , url , response , error , * * kwargs )
# 打印重试信息
logger . info ( f " 重试连接: { method } { url } ,错误: { error } ,重试次数: { self . total - new_retry . total } " )
return new_retry
http_client = urllib3 . PoolManager (
num_pools = 50 , # 设置连接池大小
maxsize = 50 ,
timeout = timeout ,
cert_reqs = ' CERT_REQUIRED ' , # 需要证书验证
retries = CustomRetry (
total = 5 ,
backoff_factor = 0.2 ,
status_forcelist = [ 500 , 502 , 503 , 504 ] ,
) ,
)
minio_client = Minio ( MINIO_URL , access_key = MINIO_ACCESS , secret_key = MINIO_SECRET , secure = MINIO_SECURE , http_client = http_client )
def oss_upload_image ( bucket , object_name , image_bytes ) :
req = None
try :
oss_client = Minio ( MINIO_URL , access_key = MINIO_ACCESS , secret_key = MINIO_SECRET , secure = MINIO_SECURE )
req = oss_client . put_object ( bucket_name = bucket , object_name = object_name , data = io . BytesIO ( image_bytes ) , length = len ( image_bytes ) , content_type = ' image/png ' )
except Exception as e :
logger . warning ( f " 上传图片出现异常 ######: { e } " )
return req
# 优先级
priority_dict = {
' earring_front ' : 99 ,
' bag_front ' : 98 ,
' hairstyle_front ' : 97 ,
' outwear_front ' : 20 ,
' tops_front ' : 19 ,
' dress_front ' : 18 ,
' blouse_front ' : 17 ,
' skirt_front ' : 16 ,
' trousers_front ' : 15 ,
' bottoms_front ' : 14 ,
' shoes_right ' : 1 ,
' shoes_left ' : 1 ,
' body ' : 0 ,
' bottoms_back ' : - 14 ,
' trousers_back ' : - 15 ,
' skirt_back ' : - 16 ,
' blouse_back ' : - 17 ,
' dress_back ' : - 18 ,
' tops_back ' : - 19 ,
' outwear_back ' : - 20 ,
' hairstyle_back ' : - 97 ,
' bag_back ' : - 98 ,
' earring_back ' : - 99 ,
}
def process_item ( item , basic ) :
if item [ ' type ' ] == " Body " :
body_server = BodyItem ( data = item , basic = basic , minio_client = minio_client )
item_data = body_server . process ( )
elif item [ ' type ' ] . lower ( ) in [ ' blouse ' , ' outwear ' , ' dress ' , ' tops ' ] :
top_server = TopItem ( data = item , basic = basic , minio_client = minio_client )
item_data = top_server . process ( )
else :
bottom_server = BottomItem ( data = item , basic = basic , minio_client = minio_client )
item_data = bottom_server . process ( )
return item_data
def process_layer ( item , layers ) :
if item [ ' name ' ] == " mannequin " :
body_layer = organize_body ( item )
layers . append ( body_layer )
return item [ ' body_image ' ] . size
else :
front_layer , back_layer = organize_clothing ( item )
layers . append ( front_layer )
layers . append ( back_layer )
def organize_body ( layer ) :
body_layer = dict ( priority = 0 ,
name = layer [ " name " ] . lower ( ) ,
image = layer [ ' body_image ' ] ,
image_url = layer [ ' body_path ' ] ,
mask_image = None ,
mask_url = None ,
sacle = 1 ,
# mask=layer['body_mask'],
position = ( 0 , 0 ) )
return body_layer
def organize_clothing ( layer ) :
# 起始坐标
start_point = calculate_start_point ( layer [ ' keypoint ' ] , layer [ ' scale ' ] , layer [ ' clothes_keypoint ' ] , layer [ ' body_point_test ' ] , layer [ " offset " ] , layer [ " resize_scale " ] )
# 前片数据
front_layer = dict ( priority = layer [ ' priority ' ] if layer . get ( " layer_order " , False ) else priority_dict . get ( f ' { layer [ " name " ] . lower ( ) } _front ' , None ) ,
name = f ' { layer [ " name " ] . lower ( ) } _front ' ,
image = layer [ " front_image " ] ,
# mask_image=layer['front_mask_image'],
image_url = layer [ ' front_image_url ' ] ,
mask_url = layer [ ' mask_url ' ] ,
sacle = layer [ ' scale ' ] ,
clothes_keypoint = layer [ ' clothes_keypoint ' ] ,
position = start_point ,
resize_scale = layer [ " resize_scale " ] ,
mask = cv2 . resize ( layer [ ' mask ' ] , layer [ " front_image " ] . size ) ,
gradient_string = layer [ ' gradient_string ' ] if ' gradient_string ' in layer . keys ( ) else " " ,
pattern_image_url = layer [ ' pattern_image_url ' ] ,
pattern_image = layer [ ' pattern_image ' ]
)
# 后片数据
back_layer = dict ( priority = - layer . get ( " priority " , 0 ) if layer . get ( " layer_order " , False ) else priority_dict . get ( f ' { layer [ " name " ] . lower ( ) } _back ' , None ) ,
name = f ' { layer [ " name " ] . lower ( ) } _back ' ,
image = layer [ " back_image " ] ,
# mask_image=layer['back_mask_image'],
image_url = layer [ ' back_image_url ' ] ,
mask_url = layer [ ' mask_url ' ] ,
sacle = layer [ ' scale ' ] ,
clothes_keypoint = layer [ ' clothes_keypoint ' ] ,
position = start_point ,
resize_scale = layer [ " resize_scale " ] ,
mask = cv2 . resize ( layer [ ' mask ' ] , layer [ " front_image " ] . size ) ,
gradient_string = layer [ ' gradient_string ' ] if ' gradient_string ' in layer . keys ( ) else " " ,
pattern_image_url = layer [ ' pattern_image_url ' ] ,
)
return front_layer , back_layer
def calculate_start_point ( keypoint_type , scale , clothes_point , body_point , offset , resize_scale ) :
"""
Align left
Args:
keypoint_type: string, " waistband " | " shoulder " | " ear_point "
scale: float
clothes_point: dict { ' left ' : [x1, y1, z1], ' right ' : [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
Returns:
start_point: tuple (x ' , y ' )
x ' = y_body - y1 * scale + offset
y ' = x_body - x1 * scale + offset
"""
side_indicator = f ' { keypoint_type } _left '
start_point = (
int ( body_point [ side_indicator ] [ 1 ] + offset [ 1 ] - int ( clothes_point [ side_indicator ] [ 0 ] ) * scale ) , # y
int ( body_point [ side_indicator ] [ 0 ] + offset [ 0 ] - int ( clothes_point [ side_indicator ] [ 1 ] ) * scale ) # x
)
return start_point
def update_base_size_priority ( layers , size ) :
# 计算透明背景图片的宽度
min_x = min ( info [ ' position ' ] [ 1 ] for info in layers )
x_list = [ ]
for info in layers :
if info [ ' image ' ] is not None :
x_list . append ( info [ ' position ' ] [ 1 ] + info [ ' image ' ] . width )
max_x = max ( x_list )
new_width = max_x - min_x
new_height = 700
# 更新坐标
for info in layers :
info [ ' adaptive_position ' ] = ( info [ ' position ' ] [ 0 ] , info [ ' position ' ] [ 1 ] - min_x )
return layers , ( new_width , new_height )
def synthesis_single ( front_image , back_image ) :
result_image = None
if front_image :
result_image = front_image
if back_image :
result_image . paste ( back_image , ( 0 , 0 ) , back_image )
image_data = io . BytesIO ( )
result_image . save ( image_data , format = ' PNG ' )
image_data . seek ( 0 )
image_bytes = image_data . read ( )
bucket_name = ' aida-results '
object_name = f ' result_ { generate_uuid ( ) } .png '
oss_upload_image ( bucket = bucket_name , object_name = object_name , image_bytes = image_bytes )
return f " { bucket_name } / { object_name } "
def oss_upload_json ( json_data , object_name ) :
try :
with open ( f " app/service/design/design_batch/response_json/ { object_name } " , ' w ' ) as file :
json . dump ( json_data , file , indent = 4 )
oss_client = Minio ( MINIO_URL , access_key = MINIO_ACCESS , secret_key = MINIO_SECRET , secure = MINIO_SECURE )
oss_client . fput_object ( " test " , object_name , f " app/service/design/design_batch/response_json/ { object_name } " )
except Exception as e :
logger . warning ( str ( e ) )
def generate_uuid ( ) :
with id_lock :
unique_id = str ( uuid . uuid1 ( ) )
return unique_id
def positioning ( all_mask_shape , mask_shape , offset ) :
all_start = 0
all_end = 0
mask_start = 0
mask_end = 0
if offset == 0 :
all_start = 0
all_end = min ( all_mask_shape , mask_shape )
mask_start = 0
mask_end = min ( all_mask_shape , mask_shape )
elif offset > 0 :
all_start = min ( offset , all_mask_shape )
all_end = min ( offset + mask_shape , all_mask_shape )
mask_start = 0
mask_end = 0 if offset > all_mask_shape else min ( all_mask_shape - offset , mask_shape )
elif offset < 0 :
if abs ( offset ) > mask_shape :
all_start = 0
all_end = 0
else :
all_start = 0
if mask_shape - abs ( offset ) > all_mask_shape :
all_end = min ( mask_shape - abs ( offset ) , all_mask_shape )
else :
all_end = mask_shape - abs ( offset )
if abs ( offset ) > mask_shape :
mask_start = mask_shape
mask_end = mask_shape
else :
mask_start = abs ( offset )
if mask_shape - abs ( offset ) > = all_mask_shape :
mask_end = all_mask_shape + abs ( offset )
else :
mask_end = mask_shape
return all_start , all_end , mask_start , mask_end
def synthesis ( data , size , basic_info ) :
# 创建底图
base_image = Image . new ( ' RGBA ' , size , ( 0 , 0 , 0 , 0 ) )
try :
all_mask_shape = ( size [ 1 ] , size [ 0 ] )
body_mask = None
for d in data :
if d [ ' name ' ] == ' body ' or d [ ' name ' ] == ' mannequin ' :
# 创建一个新的宽高透明图像, 把模特贴上去获取mask
transparent_image = Image . new ( " RGBA " , size , ( 0 , 0 , 0 , 0 ) )
transparent_image . paste ( d [ ' image ' ] , ( d [ ' adaptive_position ' ] [ 1 ] , d [ ' adaptive_position ' ] [ 0 ] ) , d [ ' image ' ] ) # 此处可变数组会被paste篡改值, 所以使用下标获取position
body_mask = np . array ( transparent_image . split ( ) [ 3 ] )
# 根据新的坐标获取新的肩点
left_shoulder = [ x + y for x , y in zip ( basic_info [ ' body_point_test ' ] [ ' shoulder_left ' ] , [ d [ ' adaptive_position ' ] [ 1 ] , d [ ' adaptive_position ' ] [ 0 ] ] ) ]
right_shoulder = [ x + y for x , y in zip ( basic_info [ ' body_point_test ' ] [ ' shoulder_right ' ] , [ d [ ' adaptive_position ' ] [ 1 ] , d [ ' adaptive_position ' ] [ 0 ] ] ) ]
body_mask [ : min ( left_shoulder [ 1 ] , right_shoulder [ 1 ] ) , left_shoulder [ 0 ] : right_shoulder [ 0 ] ] = 255
_ , binary_body_mask = cv2 . threshold ( body_mask , 127 , 255 , cv2 . THRESH_BINARY )
top_outer_mask = np . array ( binary_body_mask )
bottom_outer_mask = np . array ( binary_body_mask )
top = True
bottom = True
i = len ( data )
while i :
i - = 1
if top and data [ i ] [ ' name ' ] in [ " blouse_front " , " outwear_front " , " dress_front " , " tops_front " ] :
top = False
mask_shape = data [ i ] [ ' mask ' ] . shape
y_offset , x_offset = data [ i ] [ ' adaptive_position ' ]
# 初始化叠加区域的起始和结束位置
all_y_start , all_y_end , mask_y_start , mask_y_end = positioning ( all_mask_shape = all_mask_shape [ 0 ] , mask_shape = mask_shape [ 0 ] , offset = y_offset )
all_x_start , all_x_end , mask_x_start , mask_x_end = positioning ( all_mask_shape = all_mask_shape [ 1 ] , mask_shape = mask_shape [ 1 ] , offset = x_offset )
# 将叠加区域赋值为相应的像素值
_ , sketch_mask = cv2 . threshold ( data [ i ] [ ' mask ' ] , 127 , 255 , cv2 . THRESH_BINARY )
background = np . zeros_like ( top_outer_mask )
background [ all_y_start : all_y_end , all_x_start : all_x_end ] = sketch_mask [ mask_y_start : mask_y_end , mask_x_start : mask_x_end ]
top_outer_mask = background + top_outer_mask
elif bottom and data [ i ] [ ' name ' ] in [ " trousers_front " , " skirt_front " , " bottoms_front " , " dress_front " ] :
bottom = False
mask_shape = data [ i ] [ ' mask ' ] . shape
y_offset , x_offset = data [ i ] [ ' adaptive_position ' ]
# 初始化叠加区域的起始和结束位置
all_y_start , all_y_end , mask_y_start , mask_y_end = positioning ( all_mask_shape = all_mask_shape [ 0 ] , mask_shape = mask_shape [ 0 ] , offset = y_offset )
all_x_start , all_x_end , mask_x_start , mask_x_end = positioning ( all_mask_shape = all_mask_shape [ 1 ] , mask_shape = mask_shape [ 1 ] , offset = x_offset )
# 将叠加区域赋值为相应的像素值
_ , sketch_mask = cv2 . threshold ( data [ i ] [ ' mask ' ] , 127 , 255 , cv2 . THRESH_BINARY )
background = np . zeros_like ( top_outer_mask )
background [ all_y_start : all_y_end , all_x_start : all_x_end ] = sketch_mask [ mask_y_start : mask_y_end , mask_x_start : mask_x_end ]
bottom_outer_mask = background + bottom_outer_mask
elif bottom is False and top is False :
break
all_mask = cv2 . bitwise_or ( top_outer_mask , bottom_outer_mask )
for layer in data :
if layer [ ' image ' ] is not None :
if layer [ ' name ' ] != " body " :
test_image = Image . new ( ' RGBA ' , size , ( 0 , 0 , 0 , 0 ) )
test_image . paste ( layer [ ' image ' ] , ( layer [ ' adaptive_position ' ] [ 1 ] , layer [ ' adaptive_position ' ] [ 0 ] ) , layer [ ' image ' ] )
mask_data = np . where ( all_mask > 0 , 255 , 0 ) . astype ( np . uint8 )
mask_alpha = Image . fromarray ( mask_data )
cropped_image = Image . composite ( test_image , Image . new ( " RGBA " , test_image . size , ( 255 , 255 , 255 , 0 ) ) , mask_alpha )
base_image . paste ( test_image , ( 0 , 0 ) , cropped_image ) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00
else :
base_image . paste ( layer [ ' image ' ] , ( layer [ ' adaptive_position ' ] [ 1 ] , layer [ ' adaptive_position ' ] [ 0 ] ) , layer [ ' image ' ] )
result_image = base_image
image_data = io . BytesIO ( )
result_image . save ( image_data , format = ' PNG ' )
image_data . seek ( 0 )
# oss upload
image_bytes = image_data . read ( )
bucket_name = " aida-results "
object_name = f ' result_ { generate_uuid ( ) } .png '
oss_upload_image ( bucket = bucket_name , object_name = object_name , image_bytes = image_bytes )
return f " { bucket_name } / { object_name } "
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
# object_name = f'result_{generate_uuid()}.png'
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
# object_url = f"aida-results/{object_name}"
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
# return object_url
# else:
# return ""
except Exception as e :
logging . warning ( f " synthesis runtime exception : { e } " )
def publish_status ( task_id , progress , result ) :
connection = pika . BlockingConnection ( pika . ConnectionParameters ( ' 10.1.2.213 ' ) )
channel = connection . channel ( )
channel . queue_declare ( queue = ' DesignBatch ' , durable = True )
message = { ' task_id ' : task_id , ' progress ' : progress , " result " : result }
channel . basic_publish ( exchange = ' ' ,
routing_key = ' DesignBatch ' ,
body = json . dumps ( message ) ,
properties = pika . BasicProperties (
delivery_mode = 2 ,
) )
connection . close ( )
@celery_app.task
def batch_design ( objects_data , tasks_id , json_name ) :
object_response = [ ]
threads = [ ]
active_threads = 0
lock = threading . Lock ( )
def process_object ( step , object ) :
nonlocal active_threads
basic = object [ ' basic ' ]
items_response = { ' layers ' : [ ] }
if basic [ ' single_overall ' ] == " overall " :
item_results = [ ]
for item in object [ ' items ' ] :
item_results . append ( process_item ( item , basic ) )
layers = [ ]
body_size = None
for item in item_results :
body_size = process_layer ( item , layers )
layers = sorted ( layers , key = lambda s : s . get ( " priority " , float ( ' inf ' ) ) )
layers , new_size = update_base_size_priority ( layers , body_size )
for lay in layers :
items_response [ ' layers ' ] . append ( {
' image_category ' : lay [ ' name ' ] ,
' position ' : lay [ ' position ' ] ,
' priority ' : lay . get ( " priority " , None ) ,
' resize_scale ' : lay [ ' resize_scale ' ] if " resize_scale " in lay . keys ( ) else None ,
' image_size ' : lay [ ' image ' ] if lay [ ' image ' ] is None else lay [ ' image ' ] . size ,
' gradient_string ' : lay [ ' gradient_string ' ] if ' gradient_string ' in lay . keys ( ) else " " ,
' mask_url ' : lay [ ' mask_url ' ] ,
' image_url ' : lay [ ' image_url ' ] if ' image_url ' in lay . keys ( ) else None ,
' pattern_image_url ' : lay [ ' pattern_image_url ' ] if ' pattern_image_url ' in lay . keys ( ) else None ,
} )
items_response [ ' synthesis_url ' ] = synthesis ( layers , new_size , basic )
else :
item_result = process_item ( object [ ' items ' ] [ 0 ] , basic )
items_response [ ' layers ' ] . append ( {
' image_category ' : f " { item_result [ ' name ' ] } _front " ,
' image_size ' : item_result [ ' back_image ' ] . size if item_result [ ' back_image ' ] else None ,
' position ' : None ,
' priority ' : 0 ,
' image_url ' : item_result [ ' front_image_url ' ] ,
' mask_url ' : item_result [ ' mask_url ' ] ,
" gradient_string " : item_result [ ' gradient_string ' ] if ' gradient_string ' in item_result . keys ( ) else " " ,
' pattern_image_url ' : item_result [ ' pattern_image_url ' ] if ' pattern_image_url ' in item_result . keys ( ) else None ,
} )
items_response [ ' layers ' ] . append ( {
' image_category ' : f " { item_result [ ' name ' ] } _back " ,
' image_size ' : item_result [ ' front_image ' ] . size if item_result [ ' front_image ' ] else None ,
' position ' : None ,
' priority ' : 0 ,
' image_url ' : item_result [ ' back_image_url ' ] ,
' mask_url ' : item_result [ ' mask_url ' ] ,
" gradient_string " : item_result [ ' gradient_string ' ] if ' gradient_string ' in item_result . keys ( ) else " " ,
' pattern_image_url ' : item_result [ ' pattern_image_url ' ] if ' pattern_image_url ' in item_result . keys ( ) else None ,
} )
items_response [ ' synthesis_url ' ] = synthesis_single ( item_result [ ' front_image ' ] , item_result [ ' back_image ' ] )
with lock :
object_response . append ( items_response )
publish_status ( tasks_id , step + 1 , items_response )
active_threads - = 1
for step , object in enumerate ( objects_data ) :
t = threading . Thread ( target = process_object , args = ( step , object ) )
threads . append ( t )
t . start ( )
with lock :
active_threads + = 1
for t in threads :
t . join ( )
oss_upload_json ( object_response , json_name )
publish_status ( tasks_id , " ok " , json_name )
return object_response
if __name__ == ' __main__ ' :
object_data = {
" objects " : [
{
" basic " : {
" body_point_test " : {
" waistband_right " : [
199 ,
239
] ,
" hand_point_right " : [
220 ,
308
] ,
" waistband_left " : [
113 ,
239
] ,
" hand_point_left " : [
92 ,
310
] ,
" shoulder_left " : [
99 ,
111
] ,
" shoulder_right " : [
214 ,
111
]
} ,
" layer_order " : False ,
" scale_bag " : 0.7 ,
" scale_earrings " : 0.16 ,
" self_template " : True ,
" single_overall " : " overall " ,
" switch_category " : " "
} ,
" items " : [
{
" color " : " 195 195 196 " ,
" icon " : " none " ,
" image_id " : 116207 ,
" offset " : [
1 ,
1
] ,
" path " : " aida-sys-image/images/female/trousers/trousers_973.jpg " ,
" print " : {
" element " : {
" element_angle_list " : [ ] ,
" element_path_list " : [ ] ,
" element_scale_list " : [ ] ,
" location " : [ ]
} ,
" overall " : {
" location " : [
[
0.0 ,
0.0
]
] ,
" print_angle_list " : [
0.0 ,
0.0
] ,
" print_path_list " : [ ] ,
" print_scale_list " : [
0.0 ,
0.0
]
} ,
" single " : {
" location " : [ ] ,
" print_angle_list " : [ ] ,
" print_path_list " : [ ] ,
" print_scale_list " : [ ]
}
} ,
" resize_scale " : [
1.0 ,
1.0
] ,
" type " : " Trousers "
} ,
{
" color " : " 203 204 202 " ,
" icon " : " none " ,
" image_id " : 95825 ,
" offset " : [
1 ,
1
] ,
" path " : " aida-sys-image/images/female/blouse/0902003606.jpg " ,
" print " : {
" element " : {
" element_angle_list " : [ ] ,
" element_path_list " : [ ] ,
" element_scale_list " : [ ] ,
" location " : [ ]
} ,
" overall " : {
" location " : [
[
0.0 ,
0.0
]
] ,
" print_angle_list " : [
0.0 ,
0.0
] ,
" print_path_list " : [ ] ,
" print_scale_list " : [
0.0 ,
0.0
]
} ,
" single " : {
" location " : [ ] ,
" print_angle_list " : [ ] ,
" print_path_list " : [ ] ,
" print_scale_list " : [ ]
}
} ,
" resize_scale " : [
1.0 ,
1.0
] ,
" type " : " Blouse "
} ,
{
" body_path " : " aida-sys-image/models/female/23ecb158-7b70-4468-a9d1-bac3ded9da62.png " ,
" image_id " : 116612 ,
" offset " : [
1 ,
1
] ,
" resize_scale " : [
1.0 ,
1.0
] ,
" type " : " Body "
}
]
}
] ,
" process_id " : " 9062885798571902 "
}
X = batch_design ( object_data [ ' objects ' ] , " 123 " , " test.json " )
print ( X )