Files
AiDA_Python/app/service/design_batch/design_batch_celery.py

142 lines
6.3 KiB
Python
Raw Normal View History

2024-09-26 06:09:05 +00:00
import logging
import threading
from celery import Celery
from minio import Minio
from app.core.config import *
from app.service.design_batch.item import BodyItem, TopItem, BottomItem, AccessoriesItem
2024-09-26 06:09:05 +00:00
from app.service.design_batch.utils.MQ import publish_status
from app.service.design_batch.utils.organize import organize_body, organize_clothing, organize_accessories
2024-09-26 06:09:05 +00:00
from app.service.design_batch.utils.save_json import oss_upload_json
from app.service.design_batch.utils.synthesis_item import update_base_size_priority, synthesis, synthesis_single
id_lock = threading.Lock()
2024-12-11 14:40:58 +08:00
celery_app = Celery('tasks', broker=f'amqp://rabbit:123456@18.167.251.121:5672//', backend='rpc://', BROKER_CONNECTION_RETRY_ON_STARTUP=True)
2024-09-26 06:09:05 +00:00
celery_app.conf.worker_log_format = '%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s'
celery_app.conf.worker_hijack_root_logger = False
logging.getLogger('pika').setLevel(logging.WARNING)
logger = logging.getLogger()
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
print("start")
2024-09-26 06:09:05 +00:00
def process_item(item, basic):
# 处理project中单个item
if item['type'] == "Body":
body_server = BodyItem(data=item, basic=basic, minio_client=minio_client)
item_data = body_server.process()
elif item['type'].lower() in ['blouse', 'outwear', 'dress', 'tops']:
top_server = TopItem(data=item, basic=basic, minio_client=minio_client)
item_data = top_server.process()
elif item['type'].lower() in ['skirt', 'trousers', 'bottoms']:
2024-09-26 06:09:05 +00:00
bottom_server = BottomItem(data=item, basic=basic, minio_client=minio_client)
item_data = bottom_server.process()
elif item['type'].lower() in ['accessories']:
bottom_server = AccessoriesItem(data=item, basic=basic, minio_client=minio_client)
item_data = bottom_server.process()
else:
raise NotImplementedError(f"Item type {item['type']} not implemented")
2024-09-26 06:09:05 +00:00
return item_data
def process_layer(item, layers):
# item处理结束后 对图层数据组装
if item['name'] == "mannequin":
body_layer = organize_body(item)
layers.append(body_layer)
return item['body_image'].size
elif item['name'] == 'accessories':
front_layer, back_layer = organize_accessories(item)
layers.append(front_layer)
layers.append(back_layer)
2024-09-26 06:09:05 +00:00
else:
front_layer, back_layer = organize_clothing(item)
layers.append(front_layer)
layers.append(back_layer)
2024-12-11 11:21:03 +08:00
@celery_app.task
2024-09-26 06:09:05 +00:00
def batch_design(objects_data, tasks_id, json_name):
print(objects_data)
print(tasks_id)
print(json_name)
2024-09-26 06:09:05 +00:00
object_response = []
threads = []
active_threads = 0
lock = threading.Lock()
def process_object(step, object):
nonlocal active_threads
basic = object['basic']
items_response = {'layers': []}
if basic['single_overall'] == "overall":
item_results = []
for item in object['items']:
item_results.append(process_item(item, basic))
layers = []
body_size = None
for item in item_results:
body_size = process_layer(item, layers)
layers = sorted(layers, key=lambda s: s.get("priority", float('inf')))
layers, new_size = update_base_size_priority(layers, body_size)
for lay in layers:
items_response['layers'].append({
'image_category': lay['name'],
'position': lay['position'],
'priority': lay.get("priority", None),
'resize_scale': lay['resize_scale'] if "resize_scale" in lay.keys() else None,
'image_size': lay['image'] if lay['image'] is None else lay['image'].size,
'gradient_string': lay['gradient_string'] if 'gradient_string' in lay.keys() else "",
'mask_url': lay['mask_url'],
'image_url': lay['image_url'] if 'image_url' in lay.keys() else None,
'pattern_image_url': lay['pattern_image_url'] if 'pattern_image_url' in lay.keys() else None,
})
items_response['synthesis_url'] = synthesis(layers, new_size, basic)
else:
item_result = process_item(object['items'][0], basic)
items_response['layers'].append({
'image_category': f"{item_result['name']}_front",
'image_size': item_result['back_image'].size if item_result['back_image'] else None,
'position': None,
'priority': 0,
'image_url': item_result['front_image_url'],
'mask_url': item_result['mask_url'],
"gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "",
'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None,
})
items_response['layers'].append({
'image_category': f"{item_result['name']}_back",
'image_size': item_result['front_image'].size if item_result['front_image'] else None,
'position': None,
'priority': 0,
'image_url': item_result['back_image_url'],
'mask_url': item_result['mask_url'],
"gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "",
'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None,
})
items_response['synthesis_url'] = synthesis_single(item_result['front_image'], item_result['back_image'])
with lock:
object_response.append(items_response)
publish_status(tasks_id, step + 1, items_response)
active_threads -= 1
for step, object in enumerate(objects_data):
t = threading.Thread(target=process_object, args=(step, object))
threads.append(t)
t.start()
with lock:
active_threads += 1
for t in threads:
t.join()
2024-12-11 14:40:58 +08:00
logger.debug(object_response)
print(object_response)
2024-09-26 06:09:05 +00:00
oss_upload_json(minio_client, object_response, json_name)
publish_status(tasks_id, "ok", json_name)
return object_response