feat(新功能): batch generate 增加接口说明,design batch队列修改

fix(修复bug):
docs(文档变更):
refactor(重构):
test(增加测试):
This commit is contained in:
zchengrong
2025-04-22 13:59:59 +08:00
parent af8ed730cc
commit 96002eb7f2
5 changed files with 19 additions and 18 deletions

View File

@@ -25,11 +25,13 @@ if DEBUG:
LOGS_PATH = "logs/"
CATEGORY_PATH = "service/attribute/config/descriptor/category/category_dis.csv"
SEG_CACHE_PATH = "../seg_cache/"
POSE_TRANSFORM_VIDEO_PATH = "../pose_transform_video/"
RECOMMEND_PATH_PREFIX = "service/recommend/"
else:
LOGS_PATH = "app/logs/"
CATEGORY_PATH = "app/service/attribute/config/descriptor/category/category_dis.csv"
SEG_CACHE_PATH = "/seg_cache/"
POSE_TRANSFORM_VIDEO_PATH = "/pose_transform_video/"
RECOMMEND_PATH_PREFIX = "app/service/recommend/"
# RABBITMQ_ENV = "" # 生产环境
@@ -163,7 +165,6 @@ SEGMENTATION = {
# ollama config
OLLAMA_URL = "http://10.1.1.240:11434/api/embeddings"
# design batch
BATCH_DESIGN_RABBITMQ_QUEUES = os.getenv("BATCH_DESIGN_RABBITMQ_QUEUES", f"DesignBatch{RABBITMQ_ENV}")

View File

@@ -102,12 +102,12 @@ def batch_generate_product(batch_request_data):
if DEBUG is False:
if i + 1 < batch_size:
publish_status(tasks_id, f"{i + 1}/{batch_size}", image_url)
logger.info(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{image_url}")
print(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{image_url}")
logger.info(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
# print(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
else:
publish_status(tasks_id, f"OK", image_url_list)
logger.info(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{image_url_list}")
print(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{image_url_list}")
logger.info(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
# print(f" [x]Queue : {BATCH_GPI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
def pre_processing_image(image_url):

View File

@@ -127,12 +127,12 @@ def batch_generate_relight(batch_request_data):
if DEBUG is False:
if i + 1 < batch_size:
publish_status(tasks_id, f"{i + 1}/{batch_size}", image_url)
logger.info(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{image_url}")
print(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{image_url}")
logger.info(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
# print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
else:
publish_status(tasks_id, f"OK", image_url_list)
logger.info(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{image_url_list}")
print(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{image_url_list}")
logger.info(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
# print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
def publish_status(task_id, progress, result):

View File

@@ -29,7 +29,7 @@ from app.service.utils.oss_client import oss_get_image
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
logger = logging.getLogger()
celery_app = Celery('tasks', broker=f'amqp://rabbit:123456@18.167.251.121:5672//', backend='rpc://', BROKER_CONNECTION_RETRY_ON_STARTUP=True)
celery_app = Celery('post_transform_tasks', broker=f'amqp://rabbit:123456@18.167.251.121:5672//', backend='rpc://', BROKER_CONNECTION_RETRY_ON_STARTUP=True)
celery_app.conf.task_default_queue = 'queue_post_transform'
celery_app.conf.worker_log_format = '%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s'
celery_app.conf.worker_hijack_root_logger = False
@@ -144,21 +144,21 @@ def batch_generate_pose_transform(batch_request_data):
if DEBUG is False:
if i + 1 < batch_size:
publish_status(tasks_id, f"{i + 1}/{batch_size}", data)
logger.info(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{data}")
print(f" [x] {tasks_id}tasks_id *** progress{i + 1}/{batch_size} *** image_url{data}")
logger.info(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
# print(f" [x]Queue : {BATCH_GRI_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progress{i + 1}/{batch_size} | image_url{image_url}")
else:
publish_status(tasks_id, f"OK", result_url_list)
logger.info(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{result_url_list}")
print(f" [x] {tasks_id}tasks_id *** progressOK *** image_url{result_url_list}")
logger.info(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
# print(f" [x]Queue : {BATCH_PS_RABBITMQ_QUEUES} | tasks_id{tasks_id} | progressOK | image_url{image_url}")
def publish_status(task_id, progress, result):
connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS))
channel = connection.channel()
channel.queue_declare(queue=BATCH_GRI_RABBITMQ_QUEUES, durable=True)
channel.queue_declare(queue=BATCH_PS_RABBITMQ_QUEUES, durable=True)
message = {'task_id': task_id, 'progress': progress, "result": result}
channel.basic_publish(exchange='',
routing_key=BATCH_GRI_RABBITMQ_QUEUES,
routing_key=BATCH_PS_RABBITMQ_QUEUES,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,

View File

@@ -1,7 +1,7 @@
import io
import logging
import os.path
import cv2
import numpy as np
import skvideo.io
# import boto3
@@ -66,7 +66,7 @@ def upload_video(frames, user_id, category, file_name):
def ndarray_to_video(images, output_path, frame_size=(512, 768), fps=9):
# 初始化视频写入器
writer = skvideo.io.FFmpegWriter(
output_path,
os.path.join(POSE_TRANSFORM_VIDEO_PATH,output_path),
inputdict={'-r': str(fps)},
outputdict={'-r': str(fps), '-vcodec': 'libx264'}
)