更新canvas 3d接口 ,使用异步mq队列处理

This commit is contained in:
zcr
2026-04-01 11:57:25 +08:00
parent f35753954e
commit 1c8283334e
50 changed files with 202 additions and 175 deletions

0
src/server/__init__.py Normal file → Executable file
View File

56
src/server/canvas_generate_3D/celery_app.py Normal file → Executable file
View File

@@ -1,60 +1,60 @@
# src/server/canvas_generate_3D/celery_app.py
from celery import Celery
import os
from kombu import Queue
from kombu import Queue, Exchange
from src.core.config import settings
# RabbitMQ 连接(请改成你的真实配置)
BROKER_URL = settings.RABBITMQ_URL # 用户名:密码@主机:端口/vhost
celery_app = Celery(
"canvas_generate_3D",
broker=BROKER_URL,
backend=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", # 推荐用 Redis 存任务结果
include=["src.server.canvas_generate_3D.tasks"], # 明确包含任务模块
"canvas_generate_3d",
broker=settings.RABBITMQ_URL,
backend=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}",
include=["src.server.canvas_generate_3D.tasks"],
)
# 重要配置:限制并发为 1一次只处理一个 img_to_3D 请求)
celery_app.conf.update(
imports=[
'src.server.canvas_generate_3D.tasks', # ← 加上这一行(或你的实际路径)
],
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Hong_Kong",
enable_utc=True,
# ==================== 新增:定义多个队列 ====================
# ==================== 修改 Exchange 名称 ====================
task_default_exchange="canvas_3d_exchange", # ← 修改这里
task_default_exchange_type="direct",
# 定义队列
task_queues=(
Queue("img_to_3d_queue", durable=True),
Queue("three_d_to_3views_queue", durable=True),
Queue("img_to_3d_queue",
exchange=Exchange("canvas_3d_exchange", type="direct"),
durable=True),
Queue("three_d_to_3views_queue",
exchange=Exchange("canvas_3d_exchange", type="direct"),
durable=True),
),
# 任务路由
task_routes={
'src.server.canvas_generate_3D.tasks.img_to_3d_task': {
'queue': 'img_to_3d_queue'
'queue': 'img_to_3d_queue',
'exchange': 'canvas_3d_exchange', # ← 修改这里
},
'src.server.canvas_generate_3D.tasks.three_d_to_3views_task': { # 注意任务名称要一致
'queue': 'three_d_to_3views_queue'
'src.server.canvas_generate_3D.tasks.three_d_to_3views_task': {
'queue': 'three_d_to_3views_queue',
'exchange': 'canvas_3d_exchange', # ← 修改这里
},
},
task_default_queue="img_to_3d_queue",
# 全局或针对该队列的限制
worker_concurrency=1, # 同时只跑 1 个
worker_prefetch_multiplier=1, # 严格一次只预取 1 个
worker_max_tasks_per_child=1, # 处理完一个后重启子进程(推荐用于重资源任务)
# 可选:任务 ack 策略(长任务建议晚 ack
worker_concurrency=1,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# 可选:打印已注册的任务,帮助调试
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
print("✅ Celery 已启动,以下任务已注册:")
for task_name in sorted(sender.tasks.keys()):
print(f" - {task_name}")
print(f" - {task_name}")

63
src/server/canvas_generate_3D/server.py Normal file → Executable file
View File

@@ -1,80 +1,95 @@
from celery import current_app
from src.server.canvas_generate_3D.celery_app import celery_app # ← 改成这行
from src.server.canvas_generate_3D.tasks import img_to_3d_task, three_d_to_3views_task
import logging
logger = logging.getLogger(__name__)
def get_queue_length(queue_name: str) -> int:
"""获取指定队列当前待处理消息数量(更可靠的方式)"""
try:
with celery_app.connection() as conn:
with conn.channel() as channel:
# passive=True只查询不创建队列
queue_info = channel.queue_declare(
queue=queue_name,
passive=True,
durable=True
)
return queue_info.message_count
except Exception as e:
logger.warning(f"获取队列长度失败 {queue_name}: {e}")
return 0 # 失败时默认不拒绝提交,防止误判
def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs):
"""
提交 3D 生成任务 - 队列最多堆积 10 个
"""
"""提交 img_to_3D 任务(带队列长度限制)"""
queue_name = "img_to_3d_queue"
max_queue_length = 10
try:
with current_app.connection() as conn: # 使用 Celery 的连接(最推荐)
with conn.channel() as channel:
queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False)
current_length = queue_info.message_count
current_length = get_queue_length(queue_name)
# 队列已满
if current_length >= max_queue_length:
return {
"state": "queue_full",
"message": "当前 3D 生成请求较多,请等待片刻后重试。",
"message": "当前 3D 生成请求较多,请后重试。",
"queue_length": current_length,
"max_length": max_queue_length
}
# 提交任务
task = img_to_3d_task.delay(input_images, model, **kwargs)
logger.info(f"img_to_3d_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}")
return {
"state": "success",
"task_id": task.id,
"message": "任务已成功提交,正在处理...",
"message": "任务已成功提交,正在后台处理...",
"queue_length": current_length + 1
}
except Exception as e:
logger.error(f"提交 img_to_3d_task 失败: {e}", exc_info=True)
return {
"state": "fail",
"message": f"提交失败,请稍后重试。错误: {str(e)}",
"message": "提交失败,请稍后重试。",
"error": str(e)
}
def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs):
"""
提交 3D 生成任务 - 队列最多堆积 10 个
"""
queue_name = "three_d_to_3views_queue"
"""提交 3D转3视图 任务(带队列长度限制)"""
queue_name = "three_d_to_3views_task" # ← 必须和 @shared_task 中的 queue 完全一致!
max_queue_length = 3
try:
with current_app.connection() as conn: # 使用 Celery 的连接(最推荐)
with conn.channel() as channel:
queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False)
current_length = queue_info.message_count
current_length = get_queue_length(queue_name)
# 队列已满
if current_length >= max_queue_length:
return {
"state": "queue_full",
"message": "当前 3 视图 生成请求较多,请等待片刻后重试。",
"message": "当前 3视图 生成请求较多,请后重试。",
"queue_length": current_length,
"max_length": max_queue_length
}
task = three_d_to_3views_task.delay(minio_glb_path, **kwargs)
logger.info(f"three_d_to_3views_task 已提交 | task_id: {task.id} | 当前队列长度: {current_length}")
return {
"state": "success",
"task_id": task.id,
"message": "任务已成功提交,正在处理...",
"message": "任务已成功提交,正在后台处理...",
"queue_length": current_length + 1
}
except Exception as e:
logger.error(f"提交 three_d_to_3views_task 失败: {e}", exc_info=True)
return {
"state": "fail",
"message": f"提交失败,请稍后重试。错误: {str(e)}",
"message": "提交失败,请稍后重试。",
"error": str(e)
}

124
src/server/canvas_generate_3D/tasks.py Normal file → Executable file
View File

@@ -1,139 +1,99 @@
# src/server/canvas_generate_3D/tasks.py
import json
import time
import httpx
import asyncio
from celery import shared_task
import httpx
from src.core.config import settings
from src.server.canvas_generate_3D.celery_app import celery_app
from src.server.utils.mq_util import send_to_rabbitmq
import logging
logger = logging.getLogger(__name__)
def send_result_to_rabbitmq(result: dict, job_id: str, status: str = "completed"):
"""发送结果到 RabbitMQ建议后续移到 mq_util.py"""
try:
# 你已经有 mq_util.py可以调用那里面的函数
# 这里先用简单实现,如果你想用 mq_util.py 的方式,后面我再帮你调整
import pika
from pika import DeliveryMode
connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_URL))
channel = connection.channel()
EXCHANGE_NAME = "img_to_3d_exchange"
ROUTING_KEY = "img_to_3d_results"
QUEUE_NAME = "img_to_3d_results"
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type="direct", durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME, routing_key=ROUTING_KEY)
message_body = {
"job_id": job_id,
"status": status,
"timestamp": time.time(),
"result": result
}
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
body=json.dumps(message_body).encode("utf-8"),
properties=pika.BasicProperties(delivery_mode=DeliveryMode.Persistent)
)
logger.info(f"✅ RabbitMQ 发送成功 | job_id: {job_id}")
connection.close()
except Exception as e:
logger.error(f"❌ RabbitMQ 发送失败 | job_id: {job_id} | {e}")
@shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task')
def img_to_3d_task(self, input_images: list, model: str = "single"):
"""img_to_3D 主任务"""
job_id = self.request.id
# ====================== 处理 job_id ======================
job_id = self.request.id # 如果没传 job_id就使用 Celery 自带的 task id
logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id} | celery_task_id: {self.request.id}")
logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id}")
try:
input_data = {
"image_paths": input_images, # 注意:后端服务用的是 image_paths不是 input_images
"image_paths": input_images,
"model": model,
}
# 调用模型服务(推荐使用同步 httpx避免 asyncio.run 在 worker 中的潜在问题)
with httpx.Client(timeout=300.0) as client: # 改成同步 Client
with httpx.Client(timeout=300.0) as client:
resp = client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D",
json=input_data
)
resp.raise_for_status() # 自动抛出 HTTP 错误
resp.raise_for_status()
result = resp.json()
logger.info(f"任务处理完成 | job_id: {job_id}")
logger.info(f"img_to_3D 任务处理完成 | job_id: {job_id}")
# 发送 RabbitMQ 通知
send_result_to_rabbitmq(result=result, job_id=job_id, status="completed")
# 发送到对应的结果队列
asyncio.run(send_to_rabbitmq(
result=result,
job_id=job_id,
status="completed",
routing_key=f"img_to_3d_results-{settings.SERVE_ENV}" # ← 第一个任务的结果队列
))
return result
except Exception as exc:
logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True)
logger.error(f"img_to_3D 任务失败 | job_id: {job_id}", exc_info=True)
# 发送失败通知
send_result_to_rabbitmq(
asyncio.run(send_to_rabbitmq(
result={"error": str(exc)},
job_id=job_id,
status="failed"
)
# 重试
status="failed",
routing_key=f"img_to_3d_results-{settings.SERVE_ENV}"
))
raise self.retry(exc=exc, countdown=60, max_retries=3)
@shared_task(bind=True, queue="three_d_to_3views_task", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3,
name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
def three_d_to_3views_task(self, minio_glb_path: str):
"""3D to 3views 主任务"""
job_id = self.request.id
# ====================== 处理 job_id ======================
job_id = self.request.id # 如果没传 job_id就使用 Celery 自带的 task id
logger.info(f"开始处理 three_d_to_3views_task 任务 | job_id: {job_id} | celery_task_id: {self.request.id}")
logger.info(f"开始处理 three_d_to_3views_task | job_id: {job_id}")
try:
input_data = {
"minio_glb_path": minio_glb_path, # 注意:后端服务用的是 image_paths不是 input_images
"minio_glb_path": minio_glb_path,
}
# 调用模型服务(推荐使用同步 httpx避免 asyncio.run 在 worker 中的潜在问题)
with httpx.Client(timeout=300.0) as client: # 改成同步 Client
with httpx.Client(timeout=300.0) as client:
resp = client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views",
json=input_data
)
resp.raise_for_status() # 自动抛出 HTTP 错误
resp.raise_for_status()
result = resp.json()
logger.info(f"任务处理完成 | job_id: {job_id}")
logger.info(f"three_d_to_3views_task 任务处理完成 | job_id: {job_id}")
# 发送 RabbitMQ 通知
send_result_to_rabbitmq(result=result, job_id=job_id, status="completed")
# 发送到对应的结果队列
asyncio.run(send_to_rabbitmq(
result=result,
job_id=job_id,
status="completed",
routing_key="three_d_to_3views_results" # ← 第二个任务的结果队列
))
return result
except Exception as exc:
logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True)
logger.error(f"three_d_to_3views_task 任务失败 | job_id: {job_id}", exc_info=True)
# 发送失败通知
send_result_to_rabbitmq(
asyncio.run(send_to_rabbitmq(
result={"error": str(exc)},
job_id=job_id,
status="failed"
)
# 重试
raise self.retry(exc=exc, countdown=60, max_retries=3)
status="failed",
routing_key="three_d_to_3views_results"
))
raise self.retry(exc=exc, countdown=60, max_retries=3)

0
src/server/deep_agent/__init__.py Normal file → Executable file
View File

0
src/server/deep_agent/agents/painter.py Normal file → Executable file
View File

0
src/server/deep_agent/agents/researcher.py Normal file → Executable file
View File

0
src/server/deep_agent/agents/user_profile.py Normal file → Executable file
View File

0
src/server/deep_agent/init_llm.py Normal file → Executable file
View File

0
src/server/deep_agent/run_test.py Normal file → Executable file
View File

0
src/server/deep_agent/tools/__init__.py Normal file → Executable file
View File

0
src/server/deep_agent/tools/conversation_title_tool.py Normal file → Executable file
View File

0
src/server/deep_agent/tools/crawl_tool.py Normal file → Executable file
View File

View File

View File

0
src/server/deep_agent/tools/report_generator_tool.py Normal file → Executable file
View File

0
src/server/deep_agent/tools/research_tool.py Normal file → Executable file
View File

View File

0
src/server/deep_agent/tools/user_persona_tool.py Normal file → Executable file
View File

0
src/server/deep_agent/tools/vision_analyze_tool.py Normal file → Executable file
View File

0
src/server/deep_agent/utils/mongodb_util.py Normal file → Executable file
View File

0
src/server/utils/__init__.py Normal file → Executable file
View File

0
src/server/utils/generate_suggestion.py Normal file → Executable file
View File

54
src/server/utils/mq_util.py Executable file
View File

@@ -0,0 +1,54 @@
import json
from datetime import datetime
import aio_pika
from aio_pika import DeliveryMode, ExchangeType
from src.core.config import settings
import logging
logger = logging.getLogger(__name__)
EXCHANGE_NAME = "canvas_3d_exchange" # ← 修改这里
async def send_to_rabbitmq(
result: dict,
job_id: str,
status: str = "completed",
routing_key: str = "img_to_3d_results"
):
try:
connection = await aio_pika.connect_robust(settings.RABBITMQ_URL)
async with connection:
channel = await connection.channel()
# 使用新的 Exchange 名称
exchange = await channel.declare_exchange(
name=EXCHANGE_NAME, # ← 使用常量
type=ExchangeType.DIRECT,
durable=True
)
queue = await channel.declare_queue(name=routing_key, durable=True)
await queue.bind(exchange, routing_key=routing_key)
message_body = {
"job_id": job_id,
"status": status,
"timestamp": datetime.now().isoformat(),
"task_type": routing_key, # 方便区分是哪个任务的结果
"result": result
}
message = aio_pika.Message(
body=json.dumps(message_body).encode("utf-8"),
delivery_mode=DeliveryMode.PERSISTENT,
)
await exchange.publish(message, routing_key=routing_key)
logger.info(f"✅ 发送成功 → routing_key: {routing_key} | job_id: {job_id} | status: {status}")
except Exception as e:
logger.error(f"❌ 发送失败 → routing_key: {routing_key} | job_id: {job_id} | {e}", exc_info=True)

0
src/server/utils/new_oss_client.py Normal file → Executable file
View File