# src/server/canvas_generate_3D/tasks.py import asyncio from celery import shared_task import httpx from src.core.config import settings from src.server.canvas_generate_3D.callback import notify_callback from src.server.utils.mq_util import send_to_rabbitmq import logging logger = logging.getLogger(__name__) @shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task') def img_to_3d_task(self, input_images: list, model: str = "single", callback_url: str = None): """img_to_3D 主任务""" task_id = self.request.id logger.info(f"开始处理 img_to_3D 任务 | task_id: {task_id}") try: input_data = { "image_paths": input_images, "model": model, } with httpx.Client(timeout=300.0) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D", json=input_data ) status_code = resp.status_code result = resp.json() logger.info(f"img_to_3D 任务处理完成 | task_id: {task_id} | status_code : {status_code} | result: {result}") # 发送到对应的回调接口 if status_code == 200: asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="completed", result=result, ) ) else: asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="failed", result={}, ) ) return result except Exception as exc: logger.error(f"img_to_3D 任务失败 | task_id: {task_id} | exc {exc}", exc_info=True) asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="failed", result=result, ) ) raise self.retry(exc=exc, countdown=60, max_retries=3) @shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') def three_d_to_3views_task(self, minio_glb_path: str, callback_url: str): """3D to 3views 主任务""" task_id = self.request.id logger.info(f"开始处理 three_d_to_3views_task | task_id: {task_id}") try: input_data = { "minio_glb_path": minio_glb_path, } with httpx.Client(timeout=1200) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views", json=input_data ) status_code = resp.status_code result = resp.json() logger.info(f"three_d_to_3views_task 任务处理完成 | task_id: {task_id} | status_code : {status_code} | result: {result}") # 发送到对应的回调接口 if status_code == 200: asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="completed", result=result, ) ) else: asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="failed", result={}, ) ) return result except Exception as exc: logger.error(f"three_d_to_3views_task 任务失败 | task_id: {task_id}", exc_info=True) asyncio.run( notify_callback( callback_url=callback_url, task_id=task_id, status="failed", result={}, ) ) raise self.retry(exc=exc, countdown=60, max_retries=3)