弃用mq 采用回调接口方式
This commit is contained in:
34
src/server/canvas_generate_3D/callback.py
Normal file
34
src/server/canvas_generate_3D/callback.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import httpx
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def notify_callback(callback_url: str, job_id: str, status: str, result: dict, ):
|
||||
"""
|
||||
调用客户端提供的回调接口
|
||||
"""
|
||||
try:
|
||||
payload = {
|
||||
"job_id": job_id,
|
||||
" ": status,
|
||||
"result": result
|
||||
}
|
||||
logger.info(payload)
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(
|
||||
str(callback_url),
|
||||
json=payload,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
|
||||
if resp.status_code >= 200 and resp.status_code < 300:
|
||||
logger.info(f"回调成功 | job_id: {job_id} | status: {status} | url: {callback_url}")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"回调返回非2xx状态码 | job_id: {job_id} | status: {resp.status_code} | url: {callback_url}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"回调失败 | job_id: {job_id} | url: {callback_url} | error: {e}", exc_info=True)
|
||||
return False
|
||||
@@ -1,4 +1,6 @@
|
||||
from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行
|
||||
from pydantic import HttpUrl
|
||||
|
||||
from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行
|
||||
from src.server.canvas_generate_3D.tasks import img_to_3d_task, three_d_to_3views_task
|
||||
import logging
|
||||
|
||||
@@ -22,7 +24,7 @@ def get_queue_length(queue_name: str) -> int:
|
||||
return 0 # 失败时默认不拒绝提交,防止误判
|
||||
|
||||
|
||||
def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs):
|
||||
def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: str = "", callback_url: str = ""):
|
||||
"""提交 img_to_3D 任务(带队列长度限制)"""
|
||||
queue_name = "img_to_3d_queue"
|
||||
max_queue_length = 10
|
||||
@@ -39,15 +41,17 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs):
|
||||
}
|
||||
|
||||
# 提交任务
|
||||
task = img_to_3d_task.delay(input_images, model, **kwargs)
|
||||
task = img_to_3d_task.apply_async(
|
||||
args=(input_images, model, callback_url),
|
||||
task_id=task_id,
|
||||
queue="img_to_3d_queue")
|
||||
|
||||
logger.info(f"img_to_3d_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}")
|
||||
logger.info(f"img_to_3d_task 已提交 | task_id: {task_id} | 当前队列长度: {current_length}")
|
||||
|
||||
return {
|
||||
"state": "success",
|
||||
"task_id": task.id,
|
||||
"task_id": task_id,
|
||||
"message": "任务已成功提交,正在后台处理...",
|
||||
"queue_length": current_length + 1
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
@@ -59,8 +63,8 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs):
|
||||
}
|
||||
|
||||
|
||||
def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs):
|
||||
"""提交 3D转3视图 任务(带队列长度限制)"""
|
||||
def submit_three_d_to_3views_task(minio_glb_path: str, task_id: str = "", callback_url: str = ""):
|
||||
"""提交 3D转 3 视图 任务(带队列长度限制)"""
|
||||
queue_name = "three_d_to_3views_task" # ← 必须和 @shared_task 中的 queue 完全一致!
|
||||
max_queue_length = 3
|
||||
|
||||
@@ -75,15 +79,17 @@ def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs):
|
||||
"max_length": max_queue_length
|
||||
}
|
||||
|
||||
task = three_d_to_3views_task.delay(minio_glb_path, **kwargs)
|
||||
task = three_d_to_3views_task.apply_async(
|
||||
args=(minio_glb_path, callback_url),
|
||||
task_id=task_id,
|
||||
queue="three_d_to_3views_queue")
|
||||
|
||||
logger.info(f"three_d_to_3views_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}")
|
||||
logger.info(f"three_d_to_3views_task 已提交 | task_id: {task_id} | 当前队列长度: {current_length}")
|
||||
|
||||
return {
|
||||
"state": "success",
|
||||
"task_id": task.id,
|
||||
"task_id": task_id,
|
||||
"message": "任务已成功提交,正在后台处理...",
|
||||
"queue_length": current_length + 1
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -3,6 +3,7 @@ import asyncio
|
||||
from celery import shared_task
|
||||
import httpx
|
||||
from src.core.config import settings
|
||||
from src.server.canvas_generate_3D.callback import notify_callback
|
||||
from src.server.utils.mq_util import send_to_rabbitmq
|
||||
import logging
|
||||
|
||||
@@ -10,90 +11,103 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task')
|
||||
def img_to_3d_task(self, input_images: list, model: str = "single"):
|
||||
def img_to_3d_task(self, input_images: list, model: str = "single", callback_url: str = None):
|
||||
"""img_to_3D 主任务"""
|
||||
job_id = self.request.id
|
||||
|
||||
logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id}")
|
||||
|
||||
try:
|
||||
input_data = {
|
||||
"image_paths": input_images,
|
||||
"model": model,
|
||||
}
|
||||
|
||||
with httpx.Client(timeout=300.0) as client:
|
||||
resp = client.post(
|
||||
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D",
|
||||
json=input_data
|
||||
)
|
||||
resp.raise_for_status()
|
||||
status_code = resp.status_code
|
||||
result = resp.json()
|
||||
|
||||
logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id}")
|
||||
|
||||
# 发送到对应的结果队列
|
||||
asyncio.run(send_to_rabbitmq(
|
||||
result=result,
|
||||
job_id=job_id,
|
||||
status="completed",
|
||||
routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" # ← 第一个任务的结果队列
|
||||
))
|
||||
|
||||
logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id} | status_code : {status_code} | result: {result}")
|
||||
# 发送到对应的回调接口
|
||||
if status_code == 200:
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="completed",
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
else:
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
result={},
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"img_to_3D 任务失败 | job_id: {job_id}", exc_info=True)
|
||||
|
||||
asyncio.run(send_to_rabbitmq(
|
||||
result={"error": str(exc)},
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
routing_key=f"img_to_3d_results-{settings.SERVE_ENV}"
|
||||
))
|
||||
logger.error(f"img_to_3D 任务失败 | job_id: {job_id} | exc {exc}", exc_info=True)
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
raise self.retry(exc=exc, countdown=60, max_retries=3)
|
||||
|
||||
|
||||
@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3,
|
||||
name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
|
||||
def three_d_to_3views_task(self, minio_glb_path: str):
|
||||
@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
|
||||
def three_d_to_3views_task(self, minio_glb_path: str, callback_url: str):
|
||||
"""3D to 3views 主任务"""
|
||||
job_id = self.request.id
|
||||
|
||||
logger.info(f"开始处理 three_d_to_3views_task | job_id: {job_id}")
|
||||
|
||||
try:
|
||||
input_data = {
|
||||
"minio_glb_path": minio_glb_path,
|
||||
}
|
||||
|
||||
with httpx.Client(timeout=300.0) as client:
|
||||
resp = client.post(
|
||||
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views",
|
||||
json=input_data
|
||||
)
|
||||
resp.raise_for_status()
|
||||
status_code = resp.status_code
|
||||
result = resp.json()
|
||||
|
||||
logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id}")
|
||||
|
||||
# 发送到对应的结果队列
|
||||
asyncio.run(send_to_rabbitmq(
|
||||
result=result,
|
||||
job_id=job_id,
|
||||
status="completed",
|
||||
routing_key="three_d_to_3views_results" # ← 第二个任务的结果队列
|
||||
))
|
||||
|
||||
logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id} | status_code : {status_code} | result: {result}")
|
||||
# 发送到对应的回调接口
|
||||
if status_code == 200:
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="completed",
|
||||
result=result,
|
||||
)
|
||||
)
|
||||
else:
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
result={},
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"three_d_to_3views_task 任务失败 | job_id: {job_id}", exc_info=True)
|
||||
|
||||
asyncio.run(send_to_rabbitmq(
|
||||
result={"error": str(exc)},
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
routing_key="three_d_to_3views_results"
|
||||
))
|
||||
asyncio.run(
|
||||
notify_callback(
|
||||
callback_url=callback_url,
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
result={},
|
||||
)
|
||||
)
|
||||
raise self.retry(exc=exc, countdown=60, max_retries=3)
|
||||
|
||||
Reference in New Issue
Block a user