# src/server/canvas_generate_3D/tasks.py import json import time import httpx from celery import shared_task from src.core.config import settings from src.server.canvas_generate_3D.celery_app import celery_app import logging logger = logging.getLogger(__name__) def send_result_to_rabbitmq(result: dict, job_id: str, status: str = "completed"): """发送结果到 RabbitMQ(建议后续移到 mq_util.py)""" try: # 你已经有 mq_util.py,可以调用那里面的函数 # 这里先用简单实现,如果你想用 mq_util.py 的方式,后面我再帮你调整 import pika from pika import DeliveryMode connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_URL)) channel = connection.channel() EXCHANGE_NAME = "img_to_3d_exchange" ROUTING_KEY = "img_to_3d_results" QUEUE_NAME = "img_to_3d_results" channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type="direct", durable=True) channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME, routing_key=ROUTING_KEY) message_body = { "job_id": job_id, "status": status, "timestamp": time.time(), "result": result } channel.basic_publish( exchange=EXCHANGE_NAME, routing_key=ROUTING_KEY, body=json.dumps(message_body).encode("utf-8"), properties=pika.BasicProperties(delivery_mode=DeliveryMode.Persistent) ) logger.info(f"✅ RabbitMQ 发送成功 | job_id: {job_id}") connection.close() except Exception as e: logger.error(f"❌ RabbitMQ 发送失败 | job_id: {job_id} | {e}") @shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task') def img_to_3d_task(self, input_images: list, model: str = "single"): """img_to_3D 主任务""" # ====================== 处理 job_id ====================== job_id = self.request.id # 如果没传 job_id,就使用 Celery 自带的 task id logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id} | celery_task_id: {self.request.id}") try: input_data = { "image_paths": input_images, # 注意:后端服务用的是 image_paths,不是 input_images "model": model, } # 调用模型服务(推荐使用同步 httpx,避免 asyncio.run 在 worker 中的潜在问题) with httpx.Client(timeout=300.0) as client: # 改成同步 Client resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D", json=input_data ) resp.raise_for_status() # 自动抛出 HTTP 错误 result = resp.json() logger.info(f"任务处理完成 | job_id: {job_id}") # 发送 RabbitMQ 通知 send_result_to_rabbitmq(result=result, job_id=job_id, status="completed") return result except Exception as exc: logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True) # 发送失败通知 send_result_to_rabbitmq( result={"error": str(exc)}, job_id=job_id, status="failed" ) # 重试 raise self.retry(exc=exc, countdown=60, max_retries=3) @shared_task(bind=True, queue="three_d_to_3views_task", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') def three_d_to_3views_task(self, minio_glb_path: str): """3D to 3views 主任务""" # ====================== 处理 job_id ====================== job_id = self.request.id # 如果没传 job_id,就使用 Celery 自带的 task id logger.info(f"开始处理 three_d_to_3views_task 任务 | job_id: {job_id} | celery_task_id: {self.request.id}") try: input_data = { "minio_glb_path": minio_glb_path, # 注意:后端服务用的是 image_paths,不是 input_images } # 调用模型服务(推荐使用同步 httpx,避免 asyncio.run 在 worker 中的潜在问题) with httpx.Client(timeout=300.0) as client: # 改成同步 Client resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views", json=input_data ) resp.raise_for_status() # 自动抛出 HTTP 错误 result = resp.json() logger.info(f"任务处理完成 | job_id: {job_id}") # 发送 RabbitMQ 通知 send_result_to_rabbitmq(result=result, job_id=job_id, status="completed") return result except Exception as exc: logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True) # 发送失败通知 send_result_to_rabbitmq( result={"error": str(exc)}, job_id=job_id, status="failed" ) # 重试 raise self.retry(exc=exc, countdown=60, max_retries=3)