更新canvas 3d接口 ,使用异步mq队列处理

This commit is contained in:
zcr
2026-03-31 18:16:28 +08:00
parent 5bc27d4d52
commit 35ad8f69e8
9 changed files with 1559 additions and 734 deletions

View File

@@ -24,8 +24,6 @@ RUN playwright install
#CMD ["tail","-f","/dev/null"] #CMD ["tail","-f","/dev/null"]
# Run the application. # Run the application.
CMD ["gunicorn", "main:app_server", \ CMD ["gunicorn", "main:app_server", \
"-w", "4", \ "-c", "/app/gunicorn.conf.py", \
"-k", "uvicorn.workers.UvicornWorker", \
"--bind", "0.0.0.0:80", \
"--access-logfile", "-", \ "--access-logfile", "-", \
"--error-logfile", "-"] "--error-logfile", "-"]

View File

@@ -1,4 +1,5 @@
name: fida-python-prod name: fida-python-prod
services: services:
server: server:
container_name: "FiDA_${SERVE_ENV}_Server" container_name: "FiDA_${SERVE_ENV}_Server"
@@ -12,3 +13,55 @@ services:
- /etc/localtime:/etc/localtime:ro - /etc/localtime:/etc/localtime:ro
ports: ports:
- "${SERVE_PORT}:80" - "${SERVE_PORT}:80"
environment:
- SERVE_ENV=${SERVE_ENV}
restart: unless-stopped
# ==================== Worker 1: img_to_3d重资源建议只跑1个 ====================
img_worker:
container_name: "FiDA_${SERVE_ENV}_ImgWorker"
build:
context: .
dockerfile: Dockerfile
working_dir: /app
command: >
celery -A src.server.canvas_generate_3D.celery_app worker
-n img_worker@%h
-Q img_to_3d_queue
--concurrency=1
--prefetch-multiplier=1
--max-tasks-per-child=1
--loglevel=INFO
volumes:
- ./:/app
- ./.env:/app/.env
- /etc/localtime:/etc/localtime:ro
environment:
- SERVE_ENV=${SERVE_ENV}
depends_on:
- server # 可选:等 server 启动后再启动 worker
restart: unless-stopped
# ==================== Worker 2: 3d_to_3views ====================
views_worker:
container_name: "FiDA_${SERVE_ENV}_ViewsWorker"
build:
context: .
dockerfile: Dockerfile
working_dir: /app
command: >
celery -A src.server.canvas_generate_3D.celery_app worker
-n views_worker@%h
-Q 3d_to_3views_queue
--concurrency=2
--prefetch-multiplier=1
--loglevel=INFO
volumes:
- ./:/app
- ./.env:/app/.env
- /etc/localtime:/etc/localtime:ro
environment:
- SERVE_ENV=${SERVE_ENV}
depends_on:
- server
restart: unless-stopped

View File

@@ -29,9 +29,7 @@ dependencies = [
"psycopg[binary]>=3.3.3", "psycopg[binary]>=3.3.3",
"postgres>=4.0", "postgres>=4.0",
"langchain-huggingface>=1.2.0", "langchain-huggingface>=1.2.0",
"sentence-transformers>=5.2.3",
"rank-bm25>=0.2.2", "rank-bm25>=0.2.2",
"torch>=2.10.0",
"faiss-cpu>=1.13.2", "faiss-cpu>=1.13.2",
"terminate>=0.0.9", "terminate>=0.0.9",
"report-generator>=0.1.10", "report-generator>=0.1.10",
@@ -51,4 +49,15 @@ dependencies = [
"tool>=0.8.0", "tool>=0.8.0",
"langchain-daytona>=0.0.3", "langchain-daytona>=0.0.3",
"langgraph-cli[inmem]>=0.4.19", "langgraph-cli[inmem]>=0.4.19",
"start>=0.2",
"end>=1.3.1",
"annotated>=0.0.2",
"field>=0.2.0",
"aio-pika>=9.6.2",
"celery[redis]>=5.6.3",
"python3-pika>=0.9.14",
"tasks>=2.8.0",
"pika>=1.3.0",
"kombu>=5.4.0",
"sentence-transformers[onnx]>=5.3.0",
] ]

View File

@@ -28,6 +28,11 @@ class Settings(BaseSettings):
MINIO_SECRET: str = Field(default='', description="") MINIO_SECRET: str = Field(default='', description="")
MINIO_SECURE: bool = Field(default=True, description="") MINIO_SECURE: bool = Field(default=True, description="")
# --- redis 配置信息 ---
REDIS_HOST: str = Field(default='', description="")
REDIS_PORT: str = Field(default='', description="")
REDIS_DB: int = Field(default=0, description="")
# --- mongodb配置信息 --- # --- mongodb配置信息 ---
MONGODB_USERNAME: str = Field(default="", description="") MONGODB_USERNAME: str = Field(default="", description="")
MONGODB_PASSWORD: str = Field(default="", description="") MONGODB_PASSWORD: str = Field(default="", description="")
@@ -38,6 +43,7 @@ class Settings(BaseSettings):
IMAGE_TO_3D_MODEL_URL: str = Field(default='', description="") IMAGE_TO_3D_MODEL_URL: str = Field(default='', description="")
FLUX2_GEN_IMG_MODEL_URL: str = Field(default='', description="") FLUX2_GEN_IMG_MODEL_URL: str = Field(default='', description="")
SEG_ANYTHING: str = Field(default='', description="") SEG_ANYTHING: str = Field(default='', description="")
RABBITMQ_URL: str = Field(default='', description="")
# --- 外部工具api配置信息 --- # --- 外部工具api配置信息 ---
TAVILY_API_KEY: str = Field(default="", description="") TAVILY_API_KEY: str = Field(default="", description="")

View File

@@ -1,17 +1,23 @@
import asyncio
import json import json
import logging import logging
import httpx import httpx
import requests import requests
import uuid
from fastapi import APIRouter from fastapi import APIRouter
from src.core.config import settings from src.core.config import settings
from src.schemas.generate_3D import ImageTo3DRequest, ToSVGRequest from src.schemas.generate_3D import ImageTo3DRequest, ToSVGRequest
from src.schemas.response_template import ResponseModel from src.schemas.response_template import ResponseModel
from src.server.canvas_generate_3D.server import submit_img_to_3d_task, submit_three_d_to_3views_task
from src.server.canvas_generate_3D.tasks import img_to_3d_task
router = APIRouter(prefix="/canvas", tags=["Furniture Canvas"]) router = APIRouter(prefix="/canvas", tags=["Furniture Canvas"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
img_to_3d_semaphore = asyncio.Semaphore(1)
@router.post("/img_to_3D") @router.post("/img_to_3D")
async def img_to_3D(request_data: ImageTo3DRequest): async def img_to_3D(request_data: ImageTo3DRequest):
@@ -106,6 +112,15 @@ async def img_to_3D(request_data: ImageTo3DRequest):
logger.warning(f"img_to_3D Run Exception: {e}") logger.warning(f"img_to_3D Run Exception: {e}")
@router.post("/img_to_3D_mq")
async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
result = submit_img_to_3d_task(
input_images=request_data.input_images,
model=request_data.model
)
return result
@router.post("/3d_to_3views") @router.post("/3d_to_3views")
async def to_3views(request_data: ToSVGRequest): async def to_3views(request_data: ToSVGRequest):
@@ -149,3 +164,9 @@ async def to_3views(request_data: ToSVGRequest):
except Exception as e: except Exception as e:
logger.warning(f"img_to_3D Run Exception: {e}") logger.warning(f"img_to_3D Run Exception: {e}")
@router.post("/3d_to_3views_mq")
async def to_3views(request_data: ToSVGRequest):
result = submit_three_d_to_3views_task(minio_glb_path=request_data.minio_glb_path)
return result

View File

@@ -0,0 +1,60 @@
from celery import Celery
import os
from kombu import Queue
from src.core.config import settings
# RabbitMQ 连接(请改成你的真实配置)
BROKER_URL = settings.RABBITMQ_URL # 用户名:密码@主机:端口/vhost
celery_app = Celery(
"canvas_generate_3D",
broker=BROKER_URL,
backend=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", # 推荐用 Redis 存任务结果
include=["src.server.canvas_generate_3D.tasks"], # 明确包含任务模块
)
# 重要配置:限制并发为 1一次只处理一个 img_to_3D 请求)
celery_app.conf.update(
imports=[
'src.server.canvas_generate_3D.tasks', # ← 加上这一行(或你的实际路径)
],
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Hong_Kong",
enable_utc=True,
# ==================== 新增:定义多个队列 ====================
task_queues=(
Queue("img_to_3d_queue", durable=True),
Queue("three_d_to_3views_queue", durable=True),
),
task_routes={
'src.server.canvas_generate_3D.tasks.img_to_3d_task': {
'queue': 'img_to_3d_queue'
},
'src.server.canvas_generate_3D.tasks.three_d_to_3views_task': { # 注意任务名称要一致
'queue': 'three_d_to_3views_queue'
},
},
task_default_queue="img_to_3d_queue",
# 全局或针对该队列的限制
worker_concurrency=1, # 同时只跑 1 个
worker_prefetch_multiplier=1, # 严格一次只预取 1 个
worker_max_tasks_per_child=1, # 处理完一个后重启子进程(推荐用于重资源任务)
# 可选:任务 ack 策略(长任务建议晚 ack
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# 可选:打印已注册的任务,帮助调试
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
print("✅ Celery 已启动,以下任务已注册:")
for task_name in sorted(sender.tasks.keys()):
print(f" - {task_name}")

View File

@@ -0,0 +1,80 @@
from celery import current_app
from src.server.canvas_generate_3D.tasks import img_to_3d_task, three_d_to_3views_task
def submit_img_to_3d_task(input_images: list, model: str = "single", **kwargs):
"""
提交 3D 生成任务 - 队列最多堆积 10 个
"""
queue_name = "img_to_3d_queue"
max_queue_length = 10
try:
with current_app.connection() as conn: # 使用 Celery 的连接(最推荐)
with conn.channel() as channel:
queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False)
current_length = queue_info.message_count
# 队列已满
if current_length >= max_queue_length:
return {
"state": "queue_full",
"message": "当前 3D 生成请求较多,请等待片刻后重试。",
"queue_length": current_length,
"max_length": max_queue_length
}
task = img_to_3d_task.delay(input_images, model, **kwargs)
return {
"state": "success",
"task_id": task.id,
"message": "任务已成功提交,正在处理中...",
"queue_length": current_length + 1
}
except Exception as e:
return {
"state": "fail",
"message": f"提交失败,请稍后重试。错误: {str(e)}",
"error": str(e)
}
def submit_three_d_to_3views_task(minio_glb_path: str, **kwargs):
"""
提交 3D 生成任务 - 队列最多堆积 10 个
"""
queue_name = "three_d_to_3views_queue"
max_queue_length = 3
try:
with current_app.connection() as conn: # 使用 Celery 的连接(最推荐)
with conn.channel() as channel:
queue_info = channel.queue_declare(queue=queue_name, durable=True, auto_delete=False, passive=False)
current_length = queue_info.message_count
# 队列已满
if current_length >= max_queue_length:
return {
"state": "queue_full",
"message": "当前 3 视图 生成请求较多,请等待片刻后重试。",
"queue_length": current_length,
"max_length": max_queue_length
}
task = three_d_to_3views_task.delay(minio_glb_path, **kwargs)
return {
"state": "success",
"task_id": task.id,
"message": "任务已成功提交,正在处理中...",
"queue_length": current_length + 1
}
except Exception as e:
return {
"state": "fail",
"message": f"提交失败,请稍后重试。错误: {str(e)}",
"error": str(e)
}

View File

@@ -0,0 +1,139 @@
# src/server/canvas_generate_3D/tasks.py
import json
import time
import httpx
from celery import shared_task
from src.core.config import settings
from src.server.canvas_generate_3D.celery_app import celery_app
import logging
logger = logging.getLogger(__name__)
def send_result_to_rabbitmq(result: dict, job_id: str, status: str = "completed"):
"""发送结果到 RabbitMQ建议后续移到 mq_util.py"""
try:
# 你已经有 mq_util.py可以调用那里面的函数
# 这里先用简单实现,如果你想用 mq_util.py 的方式,后面我再帮你调整
import pika
from pika import DeliveryMode
connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_URL))
channel = connection.channel()
EXCHANGE_NAME = "img_to_3d_exchange"
ROUTING_KEY = "img_to_3d_results"
QUEUE_NAME = "img_to_3d_results"
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type="direct", durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME, routing_key=ROUTING_KEY)
message_body = {
"job_id": job_id,
"status": status,
"timestamp": time.time(),
"result": result
}
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
body=json.dumps(message_body).encode("utf-8"),
properties=pika.BasicProperties(delivery_mode=DeliveryMode.Persistent)
)
logger.info(f"✅ RabbitMQ 发送成功 | job_id: {job_id}")
connection.close()
except Exception as e:
logger.error(f"❌ RabbitMQ 发送失败 | job_id: {job_id} | {e}")
@shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task')
def img_to_3d_task(self, input_images: list, model: str = "single"):
"""img_to_3D 主任务"""
# ====================== 处理 job_id ======================
job_id = self.request.id # 如果没传 job_id就使用 Celery 自带的 task id
logger.info(f"开始处理 img_to_3D 任务 | job_id: {job_id} | celery_task_id: {self.request.id}")
try:
input_data = {
"image_paths": input_images, # 注意:后端服务用的是 image_paths不是 input_images
"model": model,
}
# 调用模型服务(推荐使用同步 httpx避免 asyncio.run 在 worker 中的潜在问题)
with httpx.Client(timeout=300.0) as client: # 改成同步 Client
resp = client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D",
json=input_data
)
resp.raise_for_status() # 自动抛出 HTTP 错误
result = resp.json()
logger.info(f"任务处理完成 | job_id: {job_id}")
# 发送 RabbitMQ 通知
send_result_to_rabbitmq(result=result, job_id=job_id, status="completed")
return result
except Exception as exc:
logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True)
# 发送失败通知
send_result_to_rabbitmq(
result={"error": str(exc)},
job_id=job_id,
status="failed"
)
# 重试
raise self.retry(exc=exc, countdown=60, max_retries=3)
@shared_task(bind=True, queue="three_d_to_3views_task", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
def three_d_to_3views_task(self, minio_glb_path: str):
"""3D to 3views 主任务"""
# ====================== 处理 job_id ======================
job_id = self.request.id # 如果没传 job_id就使用 Celery 自带的 task id
logger.info(f"开始处理 three_d_to_3views_task 任务 | job_id: {job_id} | celery_task_id: {self.request.id}")
try:
input_data = {
"minio_glb_path": minio_glb_path, # 注意:后端服务用的是 image_paths不是 input_images
}
# 调用模型服务(推荐使用同步 httpx避免 asyncio.run 在 worker 中的潜在问题)
with httpx.Client(timeout=300.0) as client: # 改成同步 Client
resp = client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views",
json=input_data
)
resp.raise_for_status() # 自动抛出 HTTP 错误
result = resp.json()
logger.info(f"任务处理完成 | job_id: {job_id}")
# 发送 RabbitMQ 通知
send_result_to_rabbitmq(result=result, job_id=job_id, status="completed")
return result
except Exception as exc:
logger.error(f"任务失败 | job_id: {job_id} | error: {exc}", exc_info=True)
# 发送失败通知
send_result_to_rabbitmq(
result={"error": str(exc)},
job_id=job_id,
status="failed"
)
# 重试
raise self.retry(exc=exc, countdown=60, max_retries=3)

1915
uv.lock generated

File diff suppressed because it is too large Load Diff