From 1c8283334e7e48818c9fb56f24526abc45d19b68 Mon Sep 17 00:00:00 2001 From: zcr Date: Wed, 1 Apr 2026 11:57:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0canvas=203d=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=20,=E4=BD=BF=E7=94=A8=E5=BC=82=E6=AD=A5mq=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/prod_build_manual.yaml | 0 .gitignore | 0 Dockerfile | 0 docker-compose.yml | 51 ++++--- gunicorn.conf.py | 0 logging_env.py | 0 pyproject.toml | 2 +- src/__init__.py | 0 src/core/__init__.py | 0 src/core/config.py | 2 + src/db/__init__.py | 0 src/db/init_mongodb.py | 0 src/db/mongo.py | 0 src/routers/__init__.py | 0 src/routers/deep_agent_chat.py | 0 src/routers/flux2_gen_img.py | 0 src/routers/generate_3D.py | 0 src/routers/seg_furniture.py | 0 src/schemas/__init__.py | 0 src/schemas/deep_agent_chat.py | 0 src/schemas/flux2_gen_img.py | 0 src/schemas/generate_3D.py | 0 src/schemas/response_template.py | 0 src/schemas/san_furniture.py | 0 src/server/__init__.py | 0 src/server/canvas_generate_3D/celery_app.py | 56 ++++---- src/server/canvas_generate_3D/server.py | 63 +++++---- src/server/canvas_generate_3D/tasks.py | 124 ++++++------------ src/server/deep_agent/__init__.py | 0 src/server/deep_agent/agents/painter.py | 0 src/server/deep_agent/agents/researcher.py | 0 src/server/deep_agent/agents/user_profile.py | 0 src/server/deep_agent/init_llm.py | 0 src/server/deep_agent/run_test.py | 0 src/server/deep_agent/tools/__init__.py | 0 .../tools/conversation_title_tool.py | 0 src/server/deep_agent/tools/crawl_tool.py | 0 .../tools/extract_suggested_questions.py | 0 .../tools/generate_furniture_sketch.py | 0 .../deep_agent/tools/report_generator_tool.py | 0 src/server/deep_agent/tools/research_tool.py | 0 .../tools/structured_retrieval_tool.py | 0 .../deep_agent/tools/user_persona_tool.py | 0 .../deep_agent/tools/vision_analyze_tool.py | 0 src/server/deep_agent/utils/mongodb_util.py | 0 src/server/utils/__init__.py | 0 src/server/utils/generate_suggestion.py | 0 src/server/utils/mq_util.py | 54 ++++++++ src/server/utils/new_oss_client.py | 0 uv.lock | 25 ++-- 50 files changed, 202 insertions(+), 175 deletions(-) mode change 100644 => 100755 .gitea/workflows/prod_build_manual.yaml mode change 100644 => 100755 .gitignore mode change 100644 => 100755 Dockerfile mode change 100644 => 100755 docker-compose.yml mode change 100644 => 100755 gunicorn.conf.py mode change 100644 => 100755 logging_env.py mode change 100644 => 100755 pyproject.toml mode change 100644 => 100755 src/__init__.py mode change 100644 => 100755 src/core/__init__.py mode change 100644 => 100755 src/core/config.py mode change 100644 => 100755 src/db/__init__.py mode change 100644 => 100755 src/db/init_mongodb.py mode change 100644 => 100755 src/db/mongo.py mode change 100644 => 100755 src/routers/__init__.py mode change 100644 => 100755 src/routers/deep_agent_chat.py mode change 100644 => 100755 src/routers/flux2_gen_img.py mode change 100644 => 100755 src/routers/generate_3D.py mode change 100644 => 100755 src/routers/seg_furniture.py mode change 100644 => 100755 src/schemas/__init__.py mode change 100644 => 100755 src/schemas/deep_agent_chat.py mode change 100644 => 100755 src/schemas/flux2_gen_img.py mode change 100644 => 100755 src/schemas/generate_3D.py mode change 100644 => 100755 src/schemas/response_template.py mode change 100644 => 100755 src/schemas/san_furniture.py mode change 100644 => 100755 src/server/__init__.py mode change 100644 => 100755 src/server/canvas_generate_3D/celery_app.py mode change 100644 => 100755 src/server/canvas_generate_3D/server.py mode change 100644 => 100755 src/server/canvas_generate_3D/tasks.py mode change 100644 => 100755 src/server/deep_agent/__init__.py mode change 100644 => 100755 src/server/deep_agent/agents/painter.py mode change 100644 => 100755 src/server/deep_agent/agents/researcher.py mode change 100644 => 100755 src/server/deep_agent/agents/user_profile.py mode change 100644 => 100755 src/server/deep_agent/init_llm.py mode change 100644 => 100755 src/server/deep_agent/run_test.py mode change 100644 => 100755 src/server/deep_agent/tools/__init__.py mode change 100644 => 100755 src/server/deep_agent/tools/conversation_title_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/crawl_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/extract_suggested_questions.py mode change 100644 => 100755 src/server/deep_agent/tools/generate_furniture_sketch.py mode change 100644 => 100755 src/server/deep_agent/tools/report_generator_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/research_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/structured_retrieval_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/user_persona_tool.py mode change 100644 => 100755 src/server/deep_agent/tools/vision_analyze_tool.py mode change 100644 => 100755 src/server/deep_agent/utils/mongodb_util.py mode change 100644 => 100755 src/server/utils/__init__.py mode change 100644 => 100755 src/server/utils/generate_suggestion.py create mode 100755 src/server/utils/mq_util.py mode change 100644 => 100755 src/server/utils/new_oss_client.py mode change 100644 => 100755 uv.lock diff --git a/.gitea/workflows/prod_build_manual.yaml b/.gitea/workflows/prod_build_manual.yaml old mode 100644 new mode 100755 diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/Dockerfile b/Dockerfile old mode 100644 new mode 100755 diff --git a/docker-compose.yml b/docker-compose.yml old mode 100644 new mode 100755 index 5c57ca0..f868ebf --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,17 +17,17 @@ services: - SERVE_ENV=${SERVE_ENV} restart: unless-stopped - # ==================== Worker 1: img_to_3d(重资源,建议只跑1个) ==================== - img_worker: - container_name: "FiDA_${SERVE_ENV}_ImgWorker" + # ==================== Celery Worker(单个 Worker 同时处理两个任务) ==================== + celery_worker: + container_name: "FiDA_${SERVE_ENV}_CeleryWorker" build: context: . dockerfile: Dockerfile working_dir: /app command: > celery -A src.server.canvas_generate_3D.celery_app worker - -n img_worker@%h - -Q img_to_3d_queue + -n celery_worker@%h + -Q img_to_3d_queue,three_d_to_3views_queue --concurrency=1 --prefetch-multiplier=1 --max-tasks-per-child=1 @@ -39,29 +39,22 @@ services: environment: - SERVE_ENV=${SERVE_ENV} depends_on: - - server # 可选:等 server 启动后再启动 worker + - server restart: unless-stopped - # ==================== Worker 2: 3d_to_3views ==================== - views_worker: - container_name: "FiDA_${SERVE_ENV}_ViewsWorker" - build: - context: . - dockerfile: Dockerfile - working_dir: /app - command: > - celery -A src.server.canvas_generate_3D.celery_app worker - -n views_worker@%h - -Q 3d_to_3views_queue - --concurrency=2 - --prefetch-multiplier=1 - --loglevel=INFO - volumes: - - ./:/app - - ./.env:/app/.env - - /etc/localtime:/etc/localtime:ro - environment: - - SERVE_ENV=${SERVE_ENV} - depends_on: - - server - restart: unless-stopped \ No newline at end of file + # ==================== 可选:RabbitMQ(如果你想把 RabbitMQ 也纳入 docker-compose 管理) ==================== + # rabbitmq: + # image: rabbitmq:3.13-management + # container_name: "FiDA_${SERVE_ENV}_RabbitMQ" + # ports: + # - "5672:5672" + # - "15672:15672" + # environment: + # RABBITMQ_DEFAULT_USER: guest + # RABBITMQ_DEFAULT_PASS: guest + # volumes: + # - rabbitmq_data:/var/lib/rabbitmq + # restart: unless-stopped + +# volumes: +# rabbitmq_data: \ No newline at end of file diff --git a/gunicorn.conf.py b/gunicorn.conf.py old mode 100644 new mode 100755 diff --git a/logging_env.py b/logging_env.py old mode 100644 new mode 100755 diff --git a/pyproject.toml b/pyproject.toml old mode 100644 new mode 100755 index 5ec7203..ebd654d --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ dependencies = [ "celery[redis]>=5.6.3", "python3-pika>=0.9.14", "tasks>=2.8.0", - "pika>=1.3.2", "kombu>=5.4.0", "sentence-transformers[onnx]>=5.3.0", + "celery-types>=0.26.0", ] diff --git a/src/__init__.py b/src/__init__.py old mode 100644 new mode 100755 diff --git a/src/core/__init__.py b/src/core/__init__.py old mode 100644 new mode 100755 diff --git a/src/core/config.py b/src/core/config.py old mode 100644 new mode 100755 index acb3237..ef36fae --- a/src/core/config.py +++ b/src/core/config.py @@ -50,6 +50,8 @@ class Settings(BaseSettings): LOGS_PATH: str = Field(default="/mnt/data/FiDA/logs", description="") + SERVE_ENV: str = Field(default="dev", description="") + settings = Settings() MONGO_URI = f"mongodb://{settings.MONGODB_USERNAME}:{settings.MONGODB_PASSWORD}@{settings.MONGODB_HOST}:{settings.MONGODB_PORT}" diff --git a/src/db/__init__.py b/src/db/__init__.py old mode 100644 new mode 100755 diff --git a/src/db/init_mongodb.py b/src/db/init_mongodb.py old mode 100644 new mode 100755 diff --git a/src/db/mongo.py b/src/db/mongo.py old mode 100644 new mode 100755 diff --git a/src/routers/__init__.py b/src/routers/__init__.py old mode 100644 new mode 100755 diff --git a/src/routers/deep_agent_chat.py b/src/routers/deep_agent_chat.py old mode 100644 new mode 100755 diff --git a/src/routers/flux2_gen_img.py b/src/routers/flux2_gen_img.py old mode 100644 new mode 100755 diff --git a/src/routers/generate_3D.py b/src/routers/generate_3D.py old mode 100644 new mode 100755 diff --git a/src/routers/seg_furniture.py b/src/routers/seg_furniture.py old mode 100644 new mode 100755 diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py old mode 100644 new mode 100755 diff --git a/src/schemas/deep_agent_chat.py b/src/schemas/deep_agent_chat.py old mode 100644 new mode 100755 diff --git a/src/schemas/flux2_gen_img.py b/src/schemas/flux2_gen_img.py old mode 100644 new mode 100755 diff --git a/src/schemas/generate_3D.py b/src/schemas/generate_3D.py old mode 100644 new mode 100755 diff --git a/src/schemas/response_template.py b/src/schemas/response_template.py old mode 100644 new mode 100755 diff --git a/src/schemas/san_furniture.py b/src/schemas/san_furniture.py old mode 100644 new mode 100755 diff --git a/src/server/__init__.py b/src/server/__init__.py old mode 100644 new mode 100755 diff --git a/src/server/canvas_generate_3D/celery_app.py b/src/server/canvas_generate_3D/celery_app.py old mode 100644 new mode 100755 index cf66ffa..b5470c5 --- a/src/server/canvas_generate_3D/celery_app.py +++ b/src/server/canvas_generate_3D/celery_app.py @@ -1,60 +1,60 @@ +# src/server/canvas_generate_3D/celery_app.py from celery import Celery -import os - -from kombu import Queue - +from kombu import Queue, Exchange from src.core.config import settings -# RabbitMQ 连接(请改成你的真实配置) -BROKER_URL = settings.RABBITMQ_URL # 用户名:密码@主机:端口/vhost - celery_app = Celery( - "canvas_generate_3D", - broker=BROKER_URL, - backend=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", # 推荐用 Redis 存任务结果 - include=["src.server.canvas_generate_3D.tasks"], # 明确包含任务模块 + "canvas_generate_3d", + broker=settings.RABBITMQ_URL, + backend=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", + include=["src.server.canvas_generate_3D.tasks"], ) -# 重要配置:限制并发为 1(一次只处理一个 img_to_3D 请求) celery_app.conf.update( - imports=[ - 'src.server.canvas_generate_3D.tasks', # ← 加上这一行(或你的实际路径) - ], task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Hong_Kong", enable_utc=True, - # ==================== 新增:定义多个队列 ==================== + # ==================== 修改 Exchange 名称 ==================== + task_default_exchange="canvas_3d_exchange", # ← 修改这里 + task_default_exchange_type="direct", + + # 定义队列 task_queues=( - Queue("img_to_3d_queue", durable=True), - Queue("three_d_to_3views_queue", durable=True), + Queue("img_to_3d_queue", + exchange=Exchange("canvas_3d_exchange", type="direct"), + durable=True), + Queue("three_d_to_3views_queue", + exchange=Exchange("canvas_3d_exchange", type="direct"), + durable=True), ), + # 任务路由 task_routes={ 'src.server.canvas_generate_3D.tasks.img_to_3d_task': { - 'queue': 'img_to_3d_queue' + 'queue': 'img_to_3d_queue', + 'exchange': 'canvas_3d_exchange', # ← 修改这里 }, - 'src.server.canvas_generate_3D.tasks.three_d_to_3views_task': { # 注意任务名称要一致 - 'queue': 'three_d_to_3views_queue' + 'src.server.canvas_generate_3D.tasks.three_d_to_3views_task': { + 'queue': 'three_d_to_3views_queue', + 'exchange': 'canvas_3d_exchange', # ← 修改这里 }, }, + task_default_queue="img_to_3d_queue", - # 全局或针对该队列的限制 - worker_concurrency=1, # 同时只跑 1 个 - worker_prefetch_multiplier=1, # 严格一次只预取 1 个 - worker_max_tasks_per_child=1, # 处理完一个后重启子进程(推荐用于重资源任务) - # 可选:任务 ack 策略(长任务建议晚 ack) + worker_concurrency=1, + worker_prefetch_multiplier=1, + worker_max_tasks_per_child=1, task_acks_late=True, task_reject_on_worker_lost=True, ) -# 可选:打印已注册的任务,帮助调试 @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): print("✅ Celery 已启动,以下任务已注册:") for task_name in sorted(sender.tasks.keys()): - print(f" - {task_name}") + print(f" - {task_name}") \ No newline at end of file diff --git a/src/server/canvas_generate_3D/server.py b/src/server/canvas_generate_3D/server.py old mode 100644 new mode 100755 index 8f9779b..5126d41 --- a/src/server/canvas_generate_3D/server.py +++ b/src/server/canvas_generate_3D/server.py @@ -1,80 +1,95 @@ -from celery import current_app +from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行 from src.server.canvas_generate_3D.tasks import img_to_3d_task, three_d_to_3views_task +import logging + +logger = logging.getLogger(__name__) + + +def get_queue_length(queue_name: str) -> int: + """获取指定队列当前待处理消息数量(更可靠的方式)""" + try: + with celery_app.connection() as conn: + with conn.channel() as channel: + # passive=True:只查询,不创建队列 + queue_info = channel.queue_declare( + queue=queue_name, + passive=True, + durable=True + ) + return queue_info.message_count + except Exception as e: + logger.warning(f"获取队列长度失败 {queue_name}: {e}") + return 0 # 失败时默认不拒绝提交,防止误判 def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs): - """ - 提交 3D 生成任务 - 队列最多堆积 10 个 - """ + """提交 img_to_3D 任务(带队列长度限制)""" queue_name = "img_to_3d_queue" max_queue_length = 10 try: - with current_app.connection() as conn: # 使用 Celery 的连接(最推荐) - with conn.channel() as channel: - queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False) - current_length = queue_info.message_count + current_length = get_queue_length(queue_name) - # 队列已满 if current_length >= max_queue_length: return { "state": "queue_full", - "message": "当前 3D 生成请求较多,请等待片刻后重试。", + "message": "当前 3D 生成请求较多,请稍后重试。", "queue_length": current_length, "max_length": max_queue_length } + # 提交任务 task = img_to_3d_task.delay(input_images, model, **kwargs) + logger.info(f"img_to_3d_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}") + return { "state": "success", "task_id": task.id, - "message": "任务已成功提交,正在处理中...", + "message": "任务已成功提交,正在后台处理...", "queue_length": current_length + 1 } except Exception as e: + logger.error(f"提交 img_to_3d_task 失败: {e}", exc_info=True) return { "state": "fail", - "message": f"提交失败,请稍后重试。错误: {str(e)}", + "message": "提交失败,请稍后重试。", "error": str(e) } def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs): - """ - 提交 3D 生成任务 - 队列最多堆积 10 个 - """ - queue_name = "three_d_to_3views_queue" + """提交 3D转3视图 任务(带队列长度限制)""" + queue_name = "three_d_to_3views_task" # ← 必须和 @shared_task 中的 queue 完全一致! max_queue_length = 3 try: - with current_app.connection() as conn: # 使用 Celery 的连接(最推荐) - with conn.channel() as channel: - queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False) - current_length = queue_info.message_count + current_length = get_queue_length(queue_name) - # 队列已满 if current_length >= max_queue_length: return { "state": "queue_full", - "message": "当前 3 视图 生成请求较多,请等待片刻后重试。", + "message": "当前 3视图 生成请求较多,请稍后重试。", "queue_length": current_length, "max_length": max_queue_length } task = three_d_to_3views_task.delay(minio_glb_path, **kwargs) + logger.info(f"three_d_to_3views_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}") + return { "state": "success", "task_id": task.id, - "message": "任务已成功提交,正在处理中...", + "message": "任务已成功提交,正在后台处理...", "queue_length": current_length + 1 } except Exception as e: + logger.error(f"提交 three_d_to_3views_task 失败: {e}", exc_info=True) return { "state": "fail", - "message": f"提交失败,请稍后重试。错误: {str(e)}", + "message": "提交失败,请稍后重试。", "error": str(e) } diff --git a/src/server/canvas_generate_3D/tasks.py b/src/server/canvas_generate_3D/tasks.py old mode 100644 new mode 100755 index ae46c0a..a82833e --- a/src/server/canvas_generate_3D/tasks.py +++ b/src/server/canvas_generate_3D/tasks.py @@ -1,139 +1,99 @@ # src/server/canvas_generate_3D/tasks.py -import json -import time -import httpx +import asyncio from celery import shared_task - +import httpx from src.core.config import settings -from src.server.canvas_generate_3D.celery_app import celery_app +from src.server.utils.mq_util import send_to_rabbitmq import logging logger = logging.getLogger(__name__) -def send_result_to_rabbitmq(result: dict, job_id: str, status: str = "completed"): - """发送结果到 RabbitMQ(建议后续移到 mq_util.py)""" - try: - # 你已经有 mq_util.py,可以调用那里面的函数 - # 这里先用简单实现,如果你想用 mq_util.py 的方式,后面我再帮你调整 - import pika - from pika import DeliveryMode - - connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_URL)) - channel = connection.channel() - - EXCHANGE_NAME = "img_to_3d_exchange" - ROUTING_KEY = "img_to_3d_results" - QUEUE_NAME = "img_to_3d_results" - - channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type="direct", durable=True) - channel.queue_declare(queue=QUEUE_NAME, durable=True) - channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME, routing_key=ROUTING_KEY) - - message_body = { - "job_id": job_id, - "status": status, - "timestamp": time.time(), - "result": result - } - - channel.basic_publish( - exchange=EXCHANGE_NAME, - routing_key=ROUTING_KEY, - body=json.dumps(message_body).encode("utf-8"), - properties=pika.BasicProperties(delivery_mode=DeliveryMode.Persistent) - ) - logger.info(f"✅ RabbitMQ 发送成功 | job_id: {job_id}") - connection.close() - - except Exception as e: - logger.error(f"❌ RabbitMQ 发送失败 | job_id: {job_id} | {e}") - - @shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task') def img_to_3d_task(self, input_images: list, model: str = "single"): """img_to_3D 主任务""" + job_id = self.request.id - # ====================== 处理 job_id ====================== - job_id = self.request.id # 如果没传 job_id,就使用 Celery 自带的 task id - - logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id} | celery_task_id: {self.request.id}") + logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id}") try: input_data = { - "image_paths": input_images, # 注意:后端服务用的是 image_paths,不是 input_images + "image_paths": input_images, "model": model, } - # 调用模型服务(推荐使用同步 httpx,避免 asyncio.run 在 worker 中的潜在问题) - with httpx.Client(timeout=300.0) as client: # 改成同步 Client + with httpx.Client(timeout=300.0) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D", json=input_data ) - resp.raise_for_status() # 自动抛出 HTTP 错误 + resp.raise_for_status() result = resp.json() - logger.info(f"任务处理完成 | job_id: {job_id}") + logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id}") - # 发送 RabbitMQ 通知 - send_result_to_rabbitmq(result=result, job_id=job_id, status="completed") + # 发送到对应的结果队列 + asyncio.run(send_to_rabbitmq( + result=result, + job_id=job_id, + status="completed", + routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" # ← 第一个任务的结果队列 + )) return result except Exception as exc: - logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True) + logger.error(f"img_to_3D 任务失败 | job_id: {job_id}", exc_info=True) - # 发送失败通知 - send_result_to_rabbitmq( + asyncio.run(send_to_rabbitmq( result={"error": str(exc)}, job_id=job_id, - status="failed" - ) - - # 重试 + status="failed", + routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" + )) raise self.retry(exc=exc, countdown=60, max_retries=3) -@shared_task(bind=True, queue="three_d_to_3views_task", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') +@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, + name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task') def three_d_to_3views_task(self, minio_glb_path: str): """3D to 3views 主任务""" + job_id = self.request.id - # ====================== 处理 job_id ====================== - job_id = self.request.id # 如果没传 job_id,就使用 Celery 自带的 task id - - logger.info(f"开始处理 three_d_to_3views_task 任务 | job_id: {job_id} | celery_task_id: {self.request.id}") + logger.info(f"开始处理 three_d_to_3views_task | job_id: {job_id}") try: input_data = { - "minio_glb_path": minio_glb_path, # 注意:后端服务用的是 image_paths,不是 input_images + "minio_glb_path": minio_glb_path, } - # 调用模型服务(推荐使用同步 httpx,避免 asyncio.run 在 worker 中的潜在问题) - with httpx.Client(timeout=300.0) as client: # 改成同步 Client + with httpx.Client(timeout=300.0) as client: resp = client.post( f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views", json=input_data ) - resp.raise_for_status() # 自动抛出 HTTP 错误 + resp.raise_for_status() result = resp.json() - logger.info(f"任务处理完成 | job_id: {job_id}") + logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id}") - # 发送 RabbitMQ 通知 - send_result_to_rabbitmq(result=result, job_id=job_id, status="completed") + # 发送到对应的结果队列 + asyncio.run(send_to_rabbitmq( + result=result, + job_id=job_id, + status="completed", + routing_key="three_d_to_3views_results" # ← 第二个任务的结果队列 + )) return result except Exception as exc: - logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True) + logger.error(f"three_d_to_3views_task 任务失败 | job_id: {job_id}", exc_info=True) - # 发送失败通知 - send_result_to_rabbitmq( + asyncio.run(send_to_rabbitmq( result={"error": str(exc)}, job_id=job_id, - status="failed" - ) - - # 重试 - raise self.retry(exc=exc, countdown=60, max_retries=3) \ No newline at end of file + status="failed", + routing_key="three_d_to_3views_results" + )) + raise self.retry(exc=exc, countdown=60, max_retries=3) diff --git a/src/server/deep_agent/__init__.py b/src/server/deep_agent/__init__.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/agents/painter.py b/src/server/deep_agent/agents/painter.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/agents/researcher.py b/src/server/deep_agent/agents/researcher.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/agents/user_profile.py b/src/server/deep_agent/agents/user_profile.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/init_llm.py b/src/server/deep_agent/init_llm.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/run_test.py b/src/server/deep_agent/run_test.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/__init__.py b/src/server/deep_agent/tools/__init__.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/conversation_title_tool.py b/src/server/deep_agent/tools/conversation_title_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/crawl_tool.py b/src/server/deep_agent/tools/crawl_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/extract_suggested_questions.py b/src/server/deep_agent/tools/extract_suggested_questions.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/generate_furniture_sketch.py b/src/server/deep_agent/tools/generate_furniture_sketch.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/report_generator_tool.py b/src/server/deep_agent/tools/report_generator_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/research_tool.py b/src/server/deep_agent/tools/research_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/structured_retrieval_tool.py b/src/server/deep_agent/tools/structured_retrieval_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/user_persona_tool.py b/src/server/deep_agent/tools/user_persona_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/tools/vision_analyze_tool.py b/src/server/deep_agent/tools/vision_analyze_tool.py old mode 100644 new mode 100755 diff --git a/src/server/deep_agent/utils/mongodb_util.py b/src/server/deep_agent/utils/mongodb_util.py old mode 100644 new mode 100755 diff --git a/src/server/utils/__init__.py b/src/server/utils/__init__.py old mode 100644 new mode 100755 diff --git a/src/server/utils/generate_suggestion.py b/src/server/utils/generate_suggestion.py old mode 100644 new mode 100755 diff --git a/src/server/utils/mq_util.py b/src/server/utils/mq_util.py new file mode 100755 index 0000000..748787d --- /dev/null +++ b/src/server/utils/mq_util.py @@ -0,0 +1,54 @@ +import json +from datetime import datetime + +import aio_pika +from aio_pika import DeliveryMode, ExchangeType +from src.core.config import settings +import logging + +logger = logging.getLogger(__name__) + +EXCHANGE_NAME = "canvas_3d_exchange" # ← 修改这里 + + +async def send_to_rabbitmq( + result: dict, + job_id: str, + status: str = "completed", + routing_key: str = "img_to_3d_results" +): + try: + connection = await aio_pika.connect_robust(settings.RABBITMQ_URL) + + async with connection: + channel = await connection.channel() + + # 使用新的 Exchange 名称 + exchange = await channel.declare_exchange( + name=EXCHANGE_NAME, # ← 使用常量 + type=ExchangeType.DIRECT, + durable=True + ) + + queue = await channel.declare_queue(name=routing_key, durable=True) + await queue.bind(exchange, routing_key=routing_key) + + message_body = { + "job_id": job_id, + "status": status, + "timestamp": datetime.now().isoformat(), + "task_type": routing_key, # 方便区分是哪个任务的结果 + "result": result + } + + message = aio_pika.Message( + body=json.dumps(message_body).encode("utf-8"), + delivery_mode=DeliveryMode.PERSISTENT, + ) + + await exchange.publish(message, routing_key=routing_key) + + logger.info(f"✅ 发送成功 → routing_key: {routing_key} | job_id: {job_id} | status: {status}") + + except Exception as e: + logger.error(f"❌ 发送失败 → routing_key: {routing_key} | job_id: {job_id} | {e}", exc_info=True) diff --git a/src/server/utils/new_oss_client.py b/src/server/utils/new_oss_client.py old mode 100644 new mode 100755 diff --git a/uv.lock b/uv.lock old mode 100644 new mode 100755 index 83aec4a..bb8606a --- a/uv.lock +++ b/uv.lock @@ -460,6 +460,18 @@ redis = [ { name = "kombu", extra = ["redis"] }, ] +[[package]] +name = "celery-types" +version = "0.26.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fc/38/813dd7534e41682684d3a5c2cc4a8710e3acc51b364920b9c4d747c7b18f/celery_types-0.26.0.tar.gz", hash = "sha256:fa318136fdad83f83f1531deecd9fe664b5dfffff29f3c31e9120a46b8e3908f", size = 106210, upload-time = "2026-03-12T23:06:49.941Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d9/e5/c5ec98f7fd7817d077c9a5a5e705d54f74d4ca08ee3f14dee881c93c0511/celery_types-0.26.0-py3-none-any.whl", hash = "sha256:eb9da76f461786091970df466ec647d9a27956399852542cb6cab9309970f950", size = 211260, upload-time = "2026-03-12T23:06:48.588Z" }, +] + [[package]] name = "certifi" version = "2026.2.25" @@ -1330,6 +1342,7 @@ dependencies = [ { name = "annotated" }, { name = "asyncio" }, { name = "celery", extra = ["redis"] }, + { name = "celery-types" }, { name = "chardet" }, { name = "crawl4ai" }, { name = "dashscope" }, @@ -1359,7 +1372,6 @@ dependencies = [ { name = "modality" }, { name = "motor" }, { name = "path" }, - { name = "pika" }, { name = "playwright" }, { name = "postgres" }, { name = "prompt" }, @@ -1389,6 +1401,7 @@ requires-dist = [ { name = "annotated", specifier = ">=0.0.2" }, { name = "asyncio", specifier = ">=4.0.0" }, { name = "celery", extras = ["redis"], specifier = ">=5.6.3" }, + { name = "celery-types", specifier = ">=0.26.0" }, { name = "chardet", specifier = "<6" }, { name = "crawl4ai", specifier = ">=0.8.0" }, { name = "dashscope", specifier = ">=1.25.13" }, @@ -1418,7 +1431,6 @@ requires-dist = [ { name = "modality", specifier = ">=0.1.0" }, { name = "motor", specifier = ">=3.7.1" }, { name = "path", specifier = ">=17.1.1" }, - { name = "pika", specifier = ">=1.3.2" }, { name = "playwright", specifier = ">=1.58.0" }, { name = "postgres", specifier = ">=4.0" }, { name = "prompt", specifier = ">=0.4.1" }, @@ -3572,15 +3584,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/50/11c9ee1ede64b45d687fd36eb8768dafc57afc78b4d83396920cfd69ed30/path-17.1.1-py3-none-any.whl", hash = "sha256:ec7e136df29172e5030dd07e037d55f676bdb29d15bfa09b80da29d07d3b9303", size = 23936, upload-time = "2025-07-27T20:40:22.453Z" }, ] -[[package]] -name = "pika" -version = "1.3.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f", size = 145029, upload-time = "2023-05-05T14:25:43.368Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f9/f3/f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0/pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f", size = 155415, upload-time = "2023-05-05T14:25:41.484Z" }, -] - [[package]] name = "pillow" version = "12.1.1"