From 79258a6a439b4900bad809d62c2df2f768b41f6c Mon Sep 17 00:00:00 2001 From: zcr Date: Thu, 2 Apr 2026 11:30:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=83=E7=94=A8mq=20=E9=87=87=E7=94=A8?= =?UTF-8?q?=E5=9B=9E=E8=B0=83=E6=8E=A5=E5=8F=A3=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/routers/generate_3D.py | 125 +++++----------------- src/schemas/generate_3D.py | 7 +- src/server/canvas_generate_3D/callback.py | 34 ++++++ src/server/canvas_generate_3D/server.py | 30 +++--- src/server/canvas_generate_3D/tasks.py | 112 ++++++++++--------- 5 files changed, 145 insertions(+), 163 deletions(-) create mode 100644 src/server/canvas_generate_3D/callback.py diff --git a/src/routers/generate_3D.py b/src/routers/generate_3D.py index eb3448d..f25621c 100755 --- a/src/routers/generate_3D.py +++ b/src/routers/generate_3D.py @@ -156,7 +156,7 @@ async def to_3views(request_data: ToSVGRequest): logger.warning(f"img_to_3D Run Exception: {e}") -@router.post("/img_to_3D_mq") +@router.post("/img_to_3D_v2") async def img_to_3d_endpoint(request_data: ImageTo3DRequest): """ ### 接口说明: @@ -169,9 +169,13 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): ### 请求体示例: **单张图片模式:** ```json - { - "input_images": ["test/img_to_3d_data/example/character_1.png"], - "model": "single" + { + "input_images": [ + "test/img_to_3d_data/example_multi_image/character_1.png" + ], + "model": "single", + "task_id": "123", + "callback_url": "https://example.com/" } ``` **多张图片模式:** @@ -182,7 +186,9 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): "test/img_to_3d_data/example_multi_image/character_2.png", "test/img_to_3d_data/example_multi_image/character_3.png" ], - "model": "multi" + "model": "multi", + "task_id": "123", + "callback_url": "https://example.com/" } ``` @@ -190,11 +196,11 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): ```json { "code": 200, - "message": "ok", + "msg": "OK!", "data": { - "job_id": "b2c3d4e5-f6g7-8901-hijk-lm2345678901", - "status": "queued", - "message": "任务已进入后台处理" + "state": "success", + "task_id": "123", + "message": "任务已成功提交,正在后台处理..." } } ``` @@ -204,6 +210,7 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): "message": "ok", "data": { "status": "queue_full", + "task_id": "123", "message": "当前 3D 生成请求较多,请稍后重试。", "queue_length": 10, "max_length": 10 @@ -216,69 +223,14 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): "message": "ok", "data": { "status": "fail", + "task_id": "123", "message": "提交失败,请稍后重试。", "error": str(e) } } ``` - - -------------------------------------------------------------------- - RabbitMQ 结果消息说明: - 处理完成后,结果会通过 RabbitMQ 发送到以下位置: - Queue: img_to_3d_results - - 消息格式: - ```json - { - "job_id": "6d0492dd-f276-4512-913c-134de2ba4b84", - "status": "completed", - "timestamp": "2026-04-01T11:54:31.146435", - "task_type": "img_to_3d_results-prod", - "result": { - "glb_path": "test/3d_result/glb/3d2777e6ac4f48769e5bb4642807bfdb.glb", - "glb_static_img_path": "test/3d_result/png/d54f1fff6b0b48b5ba4604d334aed73b.png", - "glb_info": { - "file_format": ".glb", - "vertex_count": 5519, - "centroid": [ - -0.01005209478665426, - 0.10863836704985305, - -0.011404903160470693 - ], - "bounding_box_min": [ - -0.17623867094516754, - -0.499397873878479, - -0.16439560055732727 - ], - "bounding_box_max": [ - 0.18468153476715088, - 0.5002236366271973, - 0.1541675627231598 - ], - "size": [ - 0.3609202057123184, - 0.9996215105056763, - 0.31856316328048706 - ], - "size_ratio": [ - 0.21494798217733652, - 0.5953300015447786, - 0.189722016277885 - ], - "size_ratio_percentage": [ - 21.494798217733653, - 59.533000154477854, - 18.9722016277885 - ] - } - } - } - ``` - status 说明: - completed: 处理成功,result 中包含 3D 模型相关路径和信息 - failed: 处理失败,result 中会包含 error 字段 """ - result = submit_img_to_3d_task(input_images=request_data.input_images, model=request_data.model) + result = submit_img_to_3d_task(input_images=request_data.input_images, model=request_data.model, task_id=request_data.task_id, callback_url=request_data.callback_url) if result.get("state") == "success": state_code = 200 elif result.get("state") == "queue_full": @@ -289,7 +241,7 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest): return ResponseModel(data=result, code=state_code) -@router.post("/3d_to_3views_mq") +@router.post("/3d_to_3views_v2") async def to_3views(request_data: ToSVGRequest): """ ### 接口说明: @@ -311,21 +263,8 @@ async def to_3views(request_data: ToSVGRequest): "code": 200, "message": "任务已提交", "data": { - "job_id": "b2c3d4e5-f6g7-8901-hijk-lm2345678901", - "status": "queued", - "message": "任务已进入后台处理" - } - } - ``` - - ### 输出示例: - ```json - { - "code": 200, - "message": "ok", - "data": { - "job_id": "b2c3d4e5-f6g7-8901-hijk-lm2345678901", - "status": "queued", + "task_id": "123", + "status": "success", "message": "任务已进入后台处理" } } @@ -336,6 +275,7 @@ async def to_3views(request_data: ToSVGRequest): "message": "ok", "data": { "status": "queue_full", + "task_id": "123", "message": "当前 3D 生成请求较多,请稍后重试。", "queue_length": 10, "max_length": 10 @@ -348,31 +288,14 @@ async def to_3views(request_data: ToSVGRequest): "message": "ok", "data": { "status": "fail", + "task_id": "123", "message": "提交失败,请稍后重试。", "error": str(e) } } ``` - -------------------------------------------------------------------- - RabbitMQ 结果消息说明: - Queue: three_d_to_3views_results - 消息格式: - ```json - { - "job_id": "88047a47-6c16-4607-a548-00034b6d56cf", - "status": "completed", - "timestamp": "2026-04-01T12:28:41.026066", - "task_type": "three_d_to_3views_results", - "result": { - "minio_svg_path": "test/3d_result/svg/dac119f93b3f46a3ad8a476b608feb71.svg" - } - } - ``` - status 说明: - completed: 处理成功,result 中包含转换后的视图图片路径 - failed: 处理失败,result 中会包含 "error" 字段 """ - result = submit_three_d_to_3views_task(minio_glb_path=request_data.minio_glb_path) + result = submit_three_d_to_3views_task(minio_glb_path=request_data.minio_glb_path, task_id=request_data.task_id, callback_url=request_data.callback_url) if result.get("state") == "success": state_code = 200 elif result.get("state") == "queue_full": diff --git a/src/schemas/generate_3D.py b/src/schemas/generate_3D.py index c23e339..2725ebb 100755 --- a/src/schemas/generate_3D.py +++ b/src/schemas/generate_3D.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field, confloat +from pydantic import BaseModel, Field, confloat, HttpUrl from typing import Optional, List, Dict, Any @@ -12,6 +12,10 @@ class ImageTo3DRequest(BaseModel): default="single", description="模型类型: single 或 multi" ) + task_id: str = Field( + ... + ) + callback_url: str # 必填,客户端提供的回调地址 class ToSVGRequest(BaseModel): @@ -19,3 +23,4 @@ class ToSVGRequest(BaseModel): ..., description="输入图片路径列表" ) + callback_url: str # 必填 diff --git a/src/server/canvas_generate_3D/callback.py b/src/server/canvas_generate_3D/callback.py new file mode 100644 index 0000000..5590856 --- /dev/null +++ b/src/server/canvas_generate_3D/callback.py @@ -0,0 +1,34 @@ +import httpx +import logging + +logger = logging.getLogger(__name__) + + +async def notify_callback(callback_url: str, job_id: str, status: str, result: dict, ): + """ + 调用客户端提供的回调接口 + """ + try: + payload = { + "job_id": job_id, + " ": status, + "result": result + } + logger.info(payload) + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + str(callback_url), + json=payload, + headers={"Content-Type": "application/json"} + ) + + if resp.status_code >= 200 and resp.status_code < 300: + logger.info(f"回调成功 | job_id: {job_id} | status: {status} | url: {callback_url}") + return True + else: + logger.warning(f"回调返回非2xx状态码 | job_id: {job_id} | status: {resp.status_code} | url: {callback_url}") + return False + + except Exception as e: + logger.error(f"回调失败 | job_id: {job_id} | url: {callback_url} | error: {e}", exc_info=True) + return False diff --git a/src/server/canvas_generate_3D/server.py b/src/server/canvas_generate_3D/server.py index 5126d41..6303648 100755 --- a/src/server/canvas_generate_3D/server.py +++ b/src/server/canvas_generate_3D/server.py @@ -1,4 +1,6 @@ -from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行 +from pydantic import HttpUrl + +from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行 from src.server.canvas_generate_3D.tasks import img_to_3d_task, three_d_to_3views_task import logging @@ -22,7 +24,7 @@ def get_queue_length(queue_name: str) -> int: return 0 # 失败时默认不拒绝提交,防止误判 -def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs): +def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: str = "", callback_url: str = ""): """提交 img_to_3D 任务(带队列长度限制)""" queue_name = "img_to_3d_queue" max_queue_length = 10 @@ -39,15 +41,17 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs): } # 提交任务 - task = img_to_3d_task.delay(input_images, model, **kwargs) + task = img_to_3d_task.apply_async( + args=(input_images, model, callback_url), + task_id=task_id, + queue="img_to_3d_queue") - logger.info(f"img_to_3d_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}") + logger.info(f"img_to_3d_task 已提交 | task_id: {task_id} | 当前队列长度: {current_length}") return { "state": "success", - "task_id": task.id, + "task_id": task_id, "message": "任务已成功提交,正在后台处理...", - "queue_length": current_length + 1 } except Exception as e: @@ -59,8 +63,8 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs): } -def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs): - """提交 3D转3视图 任务(带队列长度限制)""" +def submit_three_d_to_3views_task(minio_glb_path: str, task_id: str = "", callback_url: str = ""): + """提交 3D转 3 视图 任务(带队列长度限制)""" queue_name = "three_d_to_3views_task" # ← 必须和 @shared_task 中的 queue 完全一致! max_queue_length = 3 @@ -75,15 +79,17 @@ def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs): "max_length": max_queue_length } - task = three_d_to_3views_task.delay(minio_glb_path, **kwargs) + task = three_d_to_3views_task.apply_async( + args=(minio_glb_path, callback_url), + task_id=task_id, + queue="three_d_to_3views_queue") - logger.info(f"three_d_to_3views_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}") + logger.info(f"three_d_to_3views_task 已提交 | task_id: {task_id} | 当前队列长度: {current_length}") return { "state": "success", - "task_id": task.id, + "task_id": task_id, "message": "任务已成功提交,正在后台处理...", - "queue_length": current_length + 1 } except Exception as e: diff --git a/src/server/canvas_generate_3D/tasks.py b/src/server/canvas_generate_3D/tasks.py index a82833e..5ae5b6b 100755 --- a/src/server/canvas_generate_3D/tasks.py +++ b/src/server/canvas_generate_3D/tasks.py @@ -3,6 +3,7 @@ import asyncio from celery import shared_task import httpx from src.core.config import settings +from src.server.canvas_generate_3D.callback import notify_callback from src.server.utils.mq_util import send_to_rabbitmq import logging @@ -10,90 +11,103 @@ logger = logging.getLogger(__name__) @shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task') -def img_to_3d_task(self, input_images: list, model: str = "single"): +def img_to_3d_task(self, input_images: list, model: str = "single", callback_url: str = None): """img_to_3D 主任务""" job_id = self.request.id - logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id}") - try: input_data = { "image_paths": input_images, "model": model, } - with httpx.Client(timeout=300.0) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D", json=input_data ) - resp.raise_for_status() + status_code = resp.status_code result = resp.json() - - logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id}") - - # 发送到对应的结果队列 - asyncio.run(send_to_rabbitmq( - result=result, - job_id=job_id, - status="completed", - routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" # ← 第一个任务的结果队列 - )) - + logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id} | status_code : {status_code} | result: {result}") + # 发送到对应的回调接口 + if status_code == 200: + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="completed", + result=result, + ) + ) + else: + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="failed", + result={}, + ) + ) return result except Exception as exc: - logger.error(f"img_to_3D 任务失败 | job_id: {job_id}", exc_info=True) - - asyncio.run(send_to_rabbitmq( - result={"error": str(exc)}, - job_id=job_id, - status="failed", - routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" - )) + logger.error(f"img_to_3D 任务失败 | job_id: {job_id} | exc {exc}", exc_info=True) + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="failed", + result=result, + ) + ) raise self.retry(exc=exc, countdown=60, max_retries=3) -@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, - name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') -def three_d_to_3views_task(self, minio_glb_path: str): +@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') +def three_d_to_3views_task(self, minio_glb_path: str, callback_url: str): """3D to 3views 主任务""" job_id = self.request.id - logger.info(f"开始处理 three_d_to_3views_task | job_id: {job_id}") - try: input_data = { "minio_glb_path": minio_glb_path, } - with httpx.Client(timeout=300.0) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views", json=input_data ) - resp.raise_for_status() + status_code = resp.status_code result = resp.json() - - logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id}") - - # 发送到对应的结果队列 - asyncio.run(send_to_rabbitmq( - result=result, - job_id=job_id, - status="completed", - routing_key="three_d_to_3views_results" # ← 第二个任务的结果队列 - )) - + logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id} | status_code : {status_code} | result: {result}") + # 发送到对应的回调接口 + if status_code == 200: + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="completed", + result=result, + ) + ) + else: + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="failed", + result={}, + ) + ) return result except Exception as exc: logger.error(f"three_d_to_3views_task 任务失败 | job_id: {job_id}", exc_info=True) - - asyncio.run(send_to_rabbitmq( - result={"error": str(exc)}, - job_id=job_id, - status="failed", - routing_key="three_d_to_3views_results" - )) + asyncio.run( + notify_callback( + callback_url=callback_url, + job_id=job_id, + status="failed", + result={}, + ) + ) raise self.retry(exc=exc, countdown=60, max_retries=3)