import logging import threading from celery import Celery from minio import Minio from app.core.config import * from app.service.design_batch.item import BodyItem, TopItem, BottomItem from app.service.design_batch.utils.MQ import publish_status from app.service.design_batch.utils.organize import organize_body, organize_clothing from app.service.design_batch.utils.save_json import oss_upload_json from app.service.design_batch.utils.synthesis_item import update_base_size_priority, synthesis, synthesis_single id_lock = threading.Lock() celery_app = Celery('tasks', broker='amqp://guest:guest@10.1.2.190:5672//', backend='rpc://') celery_app.conf.worker_log_format = '%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s' celery_app.conf.worker_hijack_root_logger = False logging.getLogger('pika').setLevel(logging.WARNING) logger = logging.getLogger() minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE) def process_item(item, basic): # 处理project中单个item if item['type'] == "Body": body_server = BodyItem(data=item, basic=basic, minio_client=minio_client) item_data = body_server.process() elif item['type'].lower() in ['blouse', 'outwear', 'dress', 'tops']: top_server = TopItem(data=item, basic=basic, minio_client=minio_client) item_data = top_server.process() else: bottom_server = BottomItem(data=item, basic=basic, minio_client=minio_client) item_data = bottom_server.process() return item_data def process_layer(item, layers): # item处理结束后 对图层数据组装 if item['name'] == "mannequin": body_layer = organize_body(item) layers.append(body_layer) return item['body_image'].size else: front_layer, back_layer = organize_clothing(item) layers.append(front_layer) layers.append(back_layer) # @celery_app.task def batch_design(objects_data, tasks_id, json_name): object_response = [] threads = [] active_threads = 0 lock = threading.Lock() def process_object(step, object): nonlocal active_threads basic = object['basic'] items_response = {'layers': []} if basic['single_overall'] == "overall": item_results = [] for item in object['items']: item_results.append(process_item(item, basic)) layers = [] body_size = None for item in item_results: body_size = process_layer(item, layers) layers = sorted(layers, key=lambda s: s.get("priority", float('inf'))) layers, new_size = update_base_size_priority(layers, body_size) for lay in layers: items_response['layers'].append({ 'image_category': lay['name'], 'position': lay['position'], 'priority': lay.get("priority", None), 'resize_scale': lay['resize_scale'] if "resize_scale" in lay.keys() else None, 'image_size': lay['image'] if lay['image'] is None else lay['image'].size, 'gradient_string': lay['gradient_string'] if 'gradient_string' in lay.keys() else "", 'mask_url': lay['mask_url'], 'image_url': lay['image_url'] if 'image_url' in lay.keys() else None, 'pattern_image_url': lay['pattern_image_url'] if 'pattern_image_url' in lay.keys() else None, }) items_response['synthesis_url'] = synthesis(layers, new_size, basic) else: item_result = process_item(object['items'][0], basic) items_response['layers'].append({ 'image_category': f"{item_result['name']}_front", 'image_size': item_result['back_image'].size if item_result['back_image'] else None, 'position': None, 'priority': 0, 'image_url': item_result['front_image_url'], 'mask_url': item_result['mask_url'], "gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "", 'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None, }) items_response['layers'].append({ 'image_category': f"{item_result['name']}_back", 'image_size': item_result['front_image'].size if item_result['front_image'] else None, 'position': None, 'priority': 0, 'image_url': item_result['back_image_url'], 'mask_url': item_result['mask_url'], "gradient_string": item_result['gradient_string'] if 'gradient_string' in item_result.keys() else "", 'pattern_image_url': item_result['pattern_image_url'] if 'pattern_image_url' in item_result.keys() else None, }) items_response['synthesis_url'] = synthesis_single(item_result['front_image'], item_result['back_image']) with lock: object_response.append(items_response) logger.info(items_response) publish_status(tasks_id, step + 1, items_response) active_threads -= 1 for step, object in enumerate(objects_data): t = threading.Thread(target=process_object, args=(step, object)) threads.append(t) t.start() with lock: active_threads += 1 for t in threads: t.join() oss_upload_json(minio_client, object_response, json_name) publish_status(tasks_id, "ok", json_name) return object_response