From 96002eb7f238fef9a0694ad9e0686ee490e692fe Mon Sep 17 00:00:00 2001 From: zchengrong <124802516+zchengrong@users.noreply.github.com> Date: Tue, 22 Apr 2025 13:59:59 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88=E6=96=B0=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=89:=20=20batch=20generate=20=E5=A2=9E=E5=8A=A0=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E8=AF=B4=E6=98=8E=EF=BC=8Cdesign=20batch=E9=98=9F?= =?UTF-8?q?=E5=88=97=E4=BF=AE=E6=94=B9=20fix=EF=BC=88=E4=BF=AE=E5=A4=8Dbug?= =?UTF-8?q?=EF=BC=89:=20docs=EF=BC=88=E6=96=87=E6=A1=A3=E5=8F=98=E6=9B=B4?= =?UTF-8?q?=EF=BC=89:=20refactor=EF=BC=88=E9=87=8D=E6=9E=84=EF=BC=89:=20te?= =?UTF-8?q?st(=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95):?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/config.py | 3 ++- .../service_batch_generate_product_image.py | 8 ++++---- .../service_batch_generate_relight_image.py | 8 ++++---- .../service_batch_pose_transform.py | 14 +++++++------- .../generate_image/utils/pose_transform_upload.py | 4 ++-- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 6905e09..ab53e0f 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -25,11 +25,13 @@ if DEBUG: LOGS_PATH = "logs/" CATEGORY_PATH = "service/attribute/config/descriptor/category/category_dis.csv" SEG_CACHE_PATH = "../seg_cache/" + POSE_TRANSFORM_VIDEO_PATH = "../pose_transform_video/" RECOMMEND_PATH_PREFIX = "service/recommend/" else: LOGS_PATH = "app/logs/" CATEGORY_PATH = "app/service/attribute/config/descriptor/category/category_dis.csv" SEG_CACHE_PATH = "/seg_cache/" + POSE_TRANSFORM_VIDEO_PATH = "/pose_transform_video/" RECOMMEND_PATH_PREFIX = "app/service/recommend/" # RABBITMQ_ENV = "" # 生产环境 @@ -163,7 +165,6 @@ SEGMENTATION = { # ollama config OLLAMA_URL = "http://10.1.1.240:11434/api/embeddings" - # design batch BATCH_DESIGN_RABBITMQ_QUEUES = os.getenv("BATCH_DESIGN_RABBITMQ_QUEUES", f"DesignBatch{RABBITMQ_ENV}") diff --git a/app/service/generate_batch_image/service_batch_generate_product_image.py b/app/service/generate_batch_image/service_batch_generate_product_image.py index 438ec99..ee2a424 100644 --- a/app/service/generate_batch_image/service_batch_generate_product_image.py +++ b/app/service/generate_batch_image/service_batch_generate_product_image.py @@ -102,12 +102,12 @@ def batch_generate_product(batch_request_data): if DEBUG is False: if i + 1 < batch_size: publish_status(tasks_id, f"{i + 1}/{batch_size}", image_url) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{image_url}") - print(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{image_url}") + logger.info(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") else: publish_status(tasks_id, f"OK", image_url_list) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{image_url_list}") - print(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{image_url_list}") + logger.info(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") def pre_processing_image(image_url): diff --git a/app/service/generate_batch_image/service_batch_generate_relight_image.py b/app/service/generate_batch_image/service_batch_generate_relight_image.py index fa53f26..3ff0978 100644 --- a/app/service/generate_batch_image/service_batch_generate_relight_image.py +++ b/app/service/generate_batch_image/service_batch_generate_relight_image.py @@ -127,12 +127,12 @@ def batch_generate_relight(batch_request_data): if DEBUG is False: if i + 1 < batch_size: publish_status(tasks_id, f"{i + 1}/{batch_size}", image_url) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{image_url}") - print(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{image_url}") + logger.info(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") else: publish_status(tasks_id, f"OK", image_url_list) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{image_url_list}") - print(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{image_url_list}") + logger.info(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") def publish_status(task_id, progress, result): diff --git a/app/service/generate_batch_image/service_batch_pose_transform.py b/app/service/generate_batch_image/service_batch_pose_transform.py index 3507a43..29f127c 100644 --- a/app/service/generate_batch_image/service_batch_pose_transform.py +++ b/app/service/generate_batch_image/service_batch_pose_transform.py @@ -29,7 +29,7 @@ from app.service.utils.oss_client import oss_get_image minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE) logger = logging.getLogger() -celery_app = Celery('tasks', broker=f'amqp://rabbit:123456@18.167.251.121:5672//', backend='rpc://', BROKER_CONNECTION_RETRY_ON_STARTUP=True) +celery_app = Celery('post_transform_tasks', broker=f'amqp://rabbit:123456@18.167.251.121:5672//', backend='rpc://', BROKER_CONNECTION_RETRY_ON_STARTUP=True) celery_app.conf.task_default_queue = 'queue_post_transform' celery_app.conf.worker_log_format = '%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s' celery_app.conf.worker_hijack_root_logger = False @@ -144,21 +144,21 @@ def batch_generate_pose_transform(batch_request_data): if DEBUG is False: if i + 1 < batch_size: publish_status(tasks_id, f"{i + 1}/{batch_size}", data) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{data}") - print(f" [x] {tasks_id}:tasks_id *** progress:{i + 1}/{batch_size} *** image_url:{data}") + logger.info(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:{i + 1}/{batch_size} | image_url:{image_url}") else: publish_status(tasks_id, f"OK", result_url_list) - logger.info(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{result_url_list}") - print(f" [x] {tasks_id}:tasks_id *** progress:OK *** image_url:{result_url_list}") + logger.info(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") + # print(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id:{tasks_id} | progress:OK | image_url:{image_url}") def publish_status(task_id, progress, result): connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS)) channel = connection.channel() - channel.queue_declare(queue=BATCH_GRI_RABBITMQ_QUEUES, durable=True) + channel.queue_declare(queue=BATCH_PS_RABBITMQ_QUEUES, durable=True) message = {'task_id': task_id, 'progress': progress, "result": result} channel.basic_publish(exchange='', - routing_key=BATCH_GRI_RABBITMQ_QUEUES, + routing_key=BATCH_PS_RABBITMQ_QUEUES, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, diff --git a/app/service/generate_image/utils/pose_transform_upload.py b/app/service/generate_image/utils/pose_transform_upload.py index b6d632d..4ecb35d 100644 --- a/app/service/generate_image/utils/pose_transform_upload.py +++ b/app/service/generate_image/utils/pose_transform_upload.py @@ -1,7 +1,7 @@ import io import logging +import os.path -import cv2 import numpy as np import skvideo.io # import boto3 @@ -66,7 +66,7 @@ def upload_video(frames, user_id, category, file_name): def ndarray_to_video(images, output_path, frame_size=(512, 768), fps=9): # 初始化视频写入器 writer = skvideo.io.FFmpegWriter( - output_path, + os.path.join(POSE_TRANSFORM_VIDEO_PATH,output_path), inputdict={'-r': str(fps)}, outputdict={'-r': str(fps), '-vcodec': 'libx264'} )