新增img 转 3D api接口

This commit is contained in:
zcr
2026-04-13 12:11:34 +08:00
parent 85e75cd43f
commit 6f4a0cc80c
8 changed files with 1383 additions and 176 deletions

View File

@@ -28,13 +28,7 @@ services:
dockerfile: Dockerfile
working_dir: /app
command: >
celery -A src.server.canvas_generate_3D.celery_app worker
-n celery_worker@%h
-Q img_to_3d_queue,three_d_to_3views_queue
--concurrency=1
--prefetch-multiplier=1
--max-tasks-per-child=1
--loglevel=INFO
celery -A src.server.canvas_generate_3D.celery_app worker -n celery_worker@%h -Q img_to_3d_queue,three_d_to_3views_queue --concurrency=1 --prefetch-multiplier=1 --max-tasks-per-child=1 --loglevel=INFO
volumes:
- ./:/app
- ./.env:/app/.env
@@ -67,4 +61,6 @@ networks:
# restart: unless-stopped
# volumes:
# rabbitmq_data:
# rabbitmq_data:

View File

@@ -47,6 +47,7 @@ class Settings(BaseSettings):
# --- 外部工具api配置信息 ---
TAVILY_API_KEY: str = Field(default="", description="")
TRIPO_API_KEY: str = Field(default="", description="")
LOGS_PATH: str = Field(default="/mnt/data/FiDA/logs", description="")

View File

@@ -3,15 +3,13 @@ import json
import logging
import httpx
import requests
import uuid
from fastapi import APIRouter
from fastapi import APIRouter, BackgroundTasks
from src.core.config import settings
from src.schemas.generate_3D import ImageTo3DRequest, ToSVGRequest
from src.schemas.generate_3D import ImageTo3DRequest, ToSVGRequest, Tripo3dApiModel
from src.schemas.response_template import ResponseModel
from src.server.canvas_generate_3D.server import submit_img_to_3d_task, submit_three_d_to_3views_task
from src.server.canvas_generate_3D.tasks import img_to_3d_task
from src.server.canvas_generate_3D.triop3d_api_server import create_single_task, create_multi_task, get_task_result_async, single_img_to_model_async
router = APIRouter(prefix="/canvas", tags=["Furniture Canvas"])
logger = logging.getLogger(__name__)
@@ -19,145 +17,8 @@ logger = logging.getLogger(__name__)
img_to_3d_semaphore = asyncio.Semaphore(1)
@router.post("/img_to_3D")
async def img_to_3D(request_data: ImageTo3DRequest):
"""
### 参数说明:
- **input_images**:输入图片list,单张或多张
- **model**: 推理模式,单张或多张
### 请求体示例:
```json
单张
{
"input_images": ["test/img_to_3d_data/example_multi_image/character_1.png"],
"model": "single"
}
多张
{
"input_imaes": [
"test/img_to_3d_data/example_multi_image/character_1.png",
"test/img_to_3d_data/example_multi_image/character_2.png",
"test/img_to_3d_data/example_multi_image/character_3.png"
],
"model": "multi"
}
```
### 输出示例:
```json
{
"glb_path": "test/3d_result/glb/5ebe2fe118c94946bdc379e4d44799d2.glb",
"glb_static_img_path": "test/3d_result/png/19c4b60ab7594e3f84e58d0169739bd1.png",
"glb_info": {
"file_format": ".glb",
"vertex_count": 7312,
"centroid": [
0.0010040254158151611,
-0.10831894948487081,
0.07473365460649548
],
"bounding_box_min": [
-0.23948338627815247,
-0.38543057441711426,
-0.5015472769737244
],
"bounding_box_max": [
0.228701651096344,
0.37523990869522095,
0.49702101945877075
],
"size": [
0.46818503737449646,
0.7606704831123352,
0.9985682964324951
],
"size_ratio": [
0.21019126841430072,
0.34150235681882596,
0.4483063747668733
],
"size_ratio_percentage": [
21.019126841430072,
34.1502356818826,
44.83063747668733
]
}
}
```
"""
try:
logger.info(
f"img_to_3D request: {json.dumps(request_data.dict(), indent=4)}"
)
input_data = {
"image_paths": request_data.input_images,
"model": request_data.model,
}
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D",
json=input_data
)
result = resp.json()
logger.info(f"img_to_3D response: {json.dumps(result, indent=4)}")
return ResponseModel(data=result)
except Exception as e:
logger.warning(f"img_to_3D Run Exception: {e}")
@router.post("/3d_to_3views")
async def to_3views(request_data: ToSVGRequest):
"""
### 参数说明:
- **minio_glb_path**:glb文件路径
### 请求体示例:
```json
{
"minio_glb_path": "test/3d_result/glb/543570111d344552b080ff6f875e4e83.glb"
}
```
### 输出示例:
```json
{
"minio_svg_path": "test/3d_result/svg/bbcd534cffa143bba418148a0db80ad0.svg"
}
```
"""
try:
logger.info(
f"img_to_3D request: {json.dumps(request_data.dict(), indent=4)}"
)
input_data = {
"minio_glb_path": request_data.minio_glb_path,
}
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views",
json=input_data
)
result = resp.json()
logger.info(f"img_to_3D response: {json.dumps(result, indent=4)}")
return ResponseModel(data=result)
except Exception as e:
logger.warning(f"img_to_3D Run Exception: {e}")
@router.post("/img_to_3D_v2")
async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
async def img_to_3D_v2(request_data: ImageTo3DRequest):
"""
### 接口说明:
将图片转换为3D模型异步处理。接口接收请求后立即返回任务ID后台通过 Celery 处理,处理完成后结果会通过 RabbitMQ 发送。
@@ -173,6 +34,8 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
"input_images": [
"test/img_to_3d_data/example_multi_image/character_1.png"
],
"bucket_name": "test",
"user_id": "123",
"model": "single",
"task_id": "123",
"callback_url": "https://example.com/"
@@ -186,6 +49,8 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
"test/img_to_3d_data/example_multi_image/character_2.png",
"test/img_to_3d_data/example_multi_image/character_3.png"
],
"bucket_name": "test",
"user_id": "123",
"model": "multi",
"task_id": "123",
"callback_url": "https://example.com/"
@@ -231,7 +96,11 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
```
"""
logger.info(f"img_to_3D_v2 request: {json.dumps(request_data.model_dump(), indent=4)}")
result = submit_img_to_3d_task(input_images=request_data.input_images, model=request_data.model, task_id=request_data.task_id, callback_url=request_data.callback_url)
result = submit_img_to_3d_task(
input_images=request_data.input_images, model=request_data.model,
task_id=request_data.task_id, callback_url=request_data.callback_url,
bucket_name=request_data.bucket_name, user_id=request_data.user_id
)
if result.get("state") == "success":
state_code = 200
elif result.get("state") == "queue_full":
@@ -243,7 +112,7 @@ async def img_to_3d_endpoint(request_data: ImageTo3DRequest):
@router.post("/3d_to_3views_v2")
async def to_3views(request_data: ToSVGRequest):
async def model_to_3views_v2(request_data: ToSVGRequest):
"""
### 接口说明:
将 GLB 3D 模型文件转换为 3 个视图图片3-views异步处理。
@@ -254,7 +123,9 @@ async def to_3views(request_data: ToSVGRequest):
### 请求体示例:
```json
{
"minio_glb_path": "test/3d_result/glb/543570111d344552b080ff6f875e4e83.glb"
"minio_glb_path": "test/3d_result/glb/543570111d344552b080ff6f875e4e83.glb",
"bucket_name": "test",
"user_id": "123"
}
```
@@ -306,3 +177,209 @@ async def to_3views(request_data: ToSVGRequest):
state_code = 500
return ResponseModel(data=result, code=state_code)
@router.post("/triop_api_img_to_3D")
async def triop_api_img_to_3D(request_data: Tripo3dApiModel, background_tasks: BackgroundTasks):
"""
### 接口说明:
将图片转换为3D模型异步处理。接口接收请求后立即返回任务ID后台通过 Celery 处理,处理完成后结果会通过 RabbitMQ 发送。
### 参数说明:
- **input_images**: 输入图片路径列表(支持单张或多张)
- **model**: 推理模式,`single` 表示单张图片,`multi` 表示多张图片融合
### 请求体示例:
**单张图片模式:**
```json
{
"input_images": [
"test/img_to_3d_data/example_multi_image/character_1.png"
],
"bucket_name": "test",
"user_id": "123",
"model": "single",
"user_id": "123",
"model": "single",
"callback_url": "http://18.167.251.121:10015/api/image/webhook/img-to-3d"
}
```
**多张图片模式:**
```json
{
"input_images": [
"test/img_to_3d_data/example_multi_image/character_1.png",
"test/img_to_3d_data/example_multi_image/character_2.png",
"test/img_to_3d_data/example_multi_image/character_3.png"
],
"bucket_name": "test",
"user_id": "123",
"model": "multi",
"task_id": "123",
"callback_url": "http://18.167.251.121:10015/api/image/webhook/img-to-3d"
}
```
### 输出示例:
```json
{
"code": 200,
"msg": "OK!",
"data": {
"state": "success",
"task_id": "8cb65855-93de-496f-95a0-d667826ad129",
"message": "任务已成功提交,正在后台处理..."
}
}
```
"""
logger.info(f"img_to_3D_v2 request: {json.dumps(request_data.model_dump(), indent=4)}")
if request_data.model == "single":
api_task_id = await create_single_task(input_data=request_data)
else:
api_task_id = await create_multi_task(input_data=request_data)
background_tasks.add_task(get_task_result_async, request_data, request_data.task_id, api_task_id, request_data.callback_url)
result = {
"state": "success",
"task_id": request_data.task_id,
"message": "任务已成功提交,正在后台处理...",
}
state_code = 200
return ResponseModel(data=result, code=state_code)
# @router.post("/img_to_3D")
# async def img_to_3D(request_data: ImageTo3DRequest):
# """
# ### 参数说明:
# - **input_images**:输入图片list,单张或多张
# - **model**: 推理模式,单张或多张
# ### 请求体示例:
# ```json
# 单张
# {
# "input_images": ["test/img_to_3d_data/example_multi_image/character_1.png"],
# "model": "single"
# }
#
# 多张
# {
# "input_imaes": [
# "test/img_to_3d_data/example_multi_image/character_1.png",
# "test/img_to_3d_data/example_multi_image/character_2.png",
# "test/img_to_3d_data/example_multi_image/character_3.png"
#
# ],
# "model": "multi"
# }
# ```
# ### 输出示例:
# ```json
# {
# "glb_path": "test/3d_result/glb/5ebe2fe118c94946bdc379e4d44799d2.glb",
# "glb_static_img_path": "test/3d_result/png/19c4b60ab7594e3f84e58d0169739bd1.png",
# "glb_info": {
# "file_format": ".glb",
# "vertex_count": 7312,
# "centroid": [
# 0.0010040254158151611,
# -0.10831894948487081,
# 0.07473365460649548
# ],
# "bounding_box_min": [
# -0.23948338627815247,
# -0.38543057441711426,
# -0.5015472769737244
# ],
# "bounding_box_max": [
# 0.228701651096344,
# 0.37523990869522095,
# 0.49702101945877075
# ],
# "size": [
# 0.46818503737449646,
# 0.7606704831123352,
# 0.9985682964324951
# ],
# "size_ratio": [
# 0.21019126841430072,
# 0.34150235681882596,
# 0.4483063747668733
# ],
# "size_ratio_percentage": [
# 21.019126841430072,
# 34.1502356818826,
# 44.83063747668733
# ]
# }
# }
# ```
# """
# try:
# logger.info(
# f"img_to_3D request: {json.dumps(request_data.dict(), indent=4)}"
# )
#
# input_data = {
# "image_paths": request_data.input_images,
# "model": request_data.model,
# }
#
# async with httpx.AsyncClient(timeout=120) as client:
# resp = await client.post(
# f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/img_to_3D",
# json=input_data
# )
#
# result = resp.json()
#
# logger.info(f"img_to_3D response: {json.dumps(result, indent=4)}")
#
# return ResponseModel(data=result)
#
# except Exception as e:
# logger.warning(f"img_to_3D Run Exception: {e}")
#
#
# @router.post("/3d_to_3views")
# async def to_3views(request_data: ToSVGRequest):
# """
# ### 参数说明:
# - **minio_glb_path**:glb文件路径
#
# ### 请求体示例:
# ```json
# {
# "minio_glb_path": "test/3d_result/glb/543570111d344552b080ff6f875e4e83.glb"
# }
# ```
# ### 输出示例:
# ```json
# {
# "minio_svg_path": "test/3d_result/svg/bbcd534cffa143bba418148a0db80ad0.svg"
# }
# ```
# """
# try:
# logger.info(
# f"img_to_3D request: {json.dumps(request_data.dict(), indent=4)}"
# )
#
# input_data = {
# "minio_glb_path": request_data.minio_glb_path,
# }
#
# async with httpx.AsyncClient(timeout=120) as client:
# resp = await client.post(
# f"http://{settings.IMAGE_TO_3D_MODEL_URL}/canvas/3d_to_3views",
# json=input_data
# )
#
# result = resp.json()
#
# logger.info(f"img_to_3D response: {json.dumps(result, indent=4)}")
#
# return ResponseModel(data=result)
#
# except Exception as e:
# logger.warning(f"img_to_3D Run Exception: {e}")

View File

@@ -3,25 +3,48 @@ from typing import Optional, List, Dict, Any
class ImageTo3DRequest(BaseModel):
input_images: List[str] = Field(
...,
description="输入图片路径列表"
)
model: str = Field(
default="single",
description="模型类型: single 或 multi"
)
task_id: str = Field(
...
)
input_images: List[str] = Field(..., description="输入图片路径列表")
model: str = Field(default="single", description="模型类型: single 或 multi")
bucket_name: str = Field(..., description="输入图片路径列表")
user_id: str = Field(..., description="用户id")
task_id: str = Field(...)
callback_url: str # 必填,客户端提供的回调地址
class ToSVGRequest(BaseModel):
minio_glb_path: str = Field(
...,
description="输入图片路径列表"
)
minio_glb_path: str = Field(..., description="输入图片路径列表")
bucket_name: str = Field(..., description="输入图片路径列表")
user_id: str = Field(..., description="用户id")
task_id: str = Field(...)
callback_url: str # 必填
class Tripo3dApiModel(BaseModel):
input_images: List[str] = Field(..., description="输入图片路径列表")
bucket_name: str = Field(..., description="输入图片路径列表")
user_id: str = Field(..., description="用户id")
callback_url: str # 必填,客户端提供的回调地址
task_id: str = Field()
model: str = Field(default="single", description="模型类型: single 或 multi")
model_version: Optional[str] = Field(default="v3.1-20260211", description="Model version, e.g. v3.1-20260211 / v3.0-20250812 / v2.5-20250123")
poll_interval: Optional[float] = Field(default=2.0, description="Polling interval (seconds)")
poll_timeout: Optional[float] = Field(default=1800.0, description="Max polling time (seconds)")
request_timeout: Optional[float] = Field(default=120.0, description="HTTP request timeout (seconds)")
texture: Optional[bool] = Field(default=True, description="是否生成纹理")
pbr: Optional[bool] = Field(default=True, description="是否生成 PBR 材质")
texture_quality: Optional[str] = Field(default="standard", description="Texture quality: standard / detailed")
texture_alignment: Optional[str] = Field(default="original_image", description="Texture alignment mode: original_image / geometry")
orientation: Optional[str] = Field(default="default", description="Orientation mode: default / align_image")
face_limit: Optional[int] = Field(default=None, description="限制输出模型的面数")
model_seed: Optional[int] = Field(default=None, description="模型生成随机种子")
texture_seed: Optional[int] = Field(default=None, description="纹理生成随机种子")
auto_size: Optional[str] = Field(default=None, description="Auto size option")
quad: Optional[str] = Field(default=None, description="Enable quad remeshing")
compress: Optional[str] = Field(default=None, description="Compress option")
generate_parts: Optional[str] = Field(default=None, description="Generate segmented parts")
smart_low_poly: Optional[str] = Field(default=None, description="Smart low poly optimization")
download_outputs: Optional[bool] = Field(default=True, description="是否下载输出文件(现在改为上传到 MinIO")
save_task_json: Optional[bool] = Field(default=True, description="是否保存 task JSON")
print_payload: Optional[bool] = Field(default=False, description="是否打印请求 payload")
print_output: Optional[bool] = Field(default=True, description="是否打印输出结果")

View File

@@ -24,7 +24,7 @@ def get_queue_length(queue_name: str) -> int:
return 0 # 失败时默认不拒绝提交,防止误判
def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: str = "", callback_url: str = ""):
def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: str = "", callback_url: str = "", bucket_name: str = "test", user_id: str = "123"):
"""提交 img_to_3D 任务(带队列长度限制)"""
queue_name = "img_to_3d_queue"
max_queue_length = 10
@@ -42,7 +42,7 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: st
# 提交任务
task = img_to_3d_task.apply_async(
args=(input_images, model, callback_url),
args=(input_images, model, callback_url, bucket_name, user_id),
task_id=task_id,
queue="img_to_3d_queue")
@@ -63,7 +63,7 @@ def submit_img_to_3d_task(input_images: list, model: str = "single", task_id: st
}
def submit_three_d_to_3views_task(minio_glb_path: str, task_id: str = "", callback_url: str = ""):
def submit_three_d_to_3views_task(minio_glb_path: str, task_id: str = "", callback_url: str = "", bucket_name: str = "test", user_id: str = "123"):
"""提交 3D转 3 视图 任务(带队列长度限制)"""
queue_name = "three_d_to_3views_task" # ← 必须和 @shared_task 中的 queue 完全一致!
max_queue_length = 3
@@ -80,7 +80,7 @@ def submit_three_d_to_3views_task(minio_glb_path: str, task_id: str = "", callba
}
task = three_d_to_3views_task.apply_async(
args=(minio_glb_path, callback_url),
args=(minio_glb_path, callback_url, bucket_name, user_id),
task_id=task_id,
queue="three_d_to_3views_queue")

View File

@@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
@shared_task(bind=True, queue="img_to_3d_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.img_to_3d_task')
def img_to_3d_task(self, input_images: list, model: str = "single", callback_url: str = None):
def img_to_3d_task(self, input_images: list, model: str = "single", callback_url: str = None, bucket_name: str = "test", user_id: str = "123"):
"""img_to_3D 主任务"""
task_id = self.request.id
logger.info(f"开始处理 img_to_3D 任务 | task_id: {task_id}")
@@ -19,6 +19,8 @@ def img_to_3d_task(self, input_images: list, model: str = "single", callback_url
input_data = {
"image_paths": input_images,
"model": model,
"bucket_name": bucket_name,
"user_id": user_id
}
with httpx.Client(timeout=300.0) as client:
resp = client.post(
@@ -63,13 +65,14 @@ def img_to_3d_task(self, input_images: list, model: str = "single", callback_url
@shared_task(bind=True, queue="three_d_to_3views_queue", max_retries=3, name='src.server.canvas_generate_3D.tasks.three_d_to_3views_task')
def three_d_to_3views_task(self, minio_glb_path: str, callback_url: str):
def three_d_to_3views_task(self, minio_glb_path: str, callback_url: str, bucket_name: str = "test", user_id: str = "123"):
"""3D to 3views 主任务"""
task_id = self.request.id
logger.info(f"开始处理 three_d_to_3views_task | task_id: {task_id}")
try:
input_data = {
"minio_glb_path": minio_glb_path,
"bucket_name": bucket_name, "user_id": user_id
}
with httpx.Client(timeout=1200) as client:
resp = client.post(

View File

@@ -0,0 +1,474 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
import json
import time
import argparse
import mimetypes
from pathlib import Path
from typing import Any, Dict, Iterator, Tuple
from urllib.parse import urlparse
import requests
BASE_URL = "https://api.tripo3d.ai/v2/openapi"
class TripoAPIError(RuntimeError):
pass
def build_parser():
p = argparse.ArgumentParser("Tripo3D CLI: single image -> 3D")
# I/O
p.add_argument("-i", "--image", required=True, help="Input image path")
p.add_argument("-o", "--out_dir", default="tripo_outputs", help="Output directory")
# Auth
p.add_argument(
"--api_key",
default=os.getenv("TRIPO_API_KEY", "tcli_50ecbff125084d4db958b1863ec082e6"),
help="Tripo API key, or set env TRIPO_API_KEY",
)
# Model
p.add_argument(
"--model_version",
type=str,
default="v3.1-20260211",
help="Model version, e.g. P1-20260311 / v3.1-20260211 / v3.0-20250812 / v2.5-20250123",
)
# Network / polling
p.add_argument("--poll_interval", type=float, default=2.0, help="Polling interval (seconds)")
p.add_argument("--poll_timeout", type=float, default=1800.0, help="Max polling time (seconds)")
p.add_argument("--request_timeout", type=float, default=120.0, help="HTTP request timeout (seconds)")
# Generation options
p.add_argument("--texture", dest="texture", action="store_true", default=True)
p.add_argument("--no-texture", dest="texture", action="store_false")
p.add_argument("--pbr", dest="pbr", action="store_true", default=True)
p.add_argument("--no-pbr", dest="pbr", action="store_false")
p.add_argument(
"--texture_quality",
type=str,
default="standard",
choices=["standard", "detailed"],
help="Texture quality",
)
p.add_argument(
"--texture_alignment",
type=str,
default="original_image",
choices=["original_image", "geometry"],
help="Texture alignment mode",
)
p.add_argument(
"--orientation",
type=str,
default="default",
choices=["default", "align_image"],
help="Orientation mode",
)
# Optional params
p.add_argument("--face_limit", type=int, default=None)
p.add_argument("--model_seed", type=int, default=None)
p.add_argument("--texture_seed", type=int, default=None)
p.add_argument("--auto_size", type=str, default=None)
p.add_argument("--quad", type=str, default=None)
p.add_argument("--compress", type=str, default=None)
p.add_argument("--generate_parts", type=str, default=None)
p.add_argument("--smart_low_poly", type=str, default=None)
# Save / download toggles
p.add_argument("--download_outputs", dest="download_outputs", action="store_true", default=True)
p.add_argument("--no-download_outputs", dest="download_outputs", action="store_false")
p.add_argument("--save_task_json", dest="save_task_json", action="store_true", default=True)
p.add_argument("--no-save_task_json", dest="save_task_json", action="store_false")
p.add_argument("--print_payload", dest="print_payload", action="store_true", default=False)
p.add_argument("--print_output", dest="print_output", action="store_true", default=True)
p.add_argument("--no-print_output", dest="print_output", action="store_false")
return p
def guess_mime_type(file_path: Path) -> str:
mime, _ = mimetypes.guess_type(str(file_path))
return mime or "application/octet-stream"
def safe_filename(name: str) -> str:
name = re.sub(r'[\\/:*?"<>|]+', "_", name)
name = re.sub(r"\s+", "_", name).strip("._")
return name or "file"
def extract_error_message(payload: Any) -> str:
if isinstance(payload, dict):
for key in ("message", "error", "error_message", "detail"):
if payload.get(key):
return str(payload[key])
data = payload.get("data")
if isinstance(data, dict):
for key in ("message", "error", "error_message", "detail"):
if data.get(key):
return str(data[key])
return json.dumps(payload, ensure_ascii=False)[:800]
return str(payload)[:800]
def request_json(
session: requests.Session,
method: str,
endpoint: str,
request_timeout: float,
**kwargs,
) -> Dict[str, Any]:
url = f"{BASE_URL}{endpoint}"
try:
resp = session.request(method=method, url=url, timeout=request_timeout, **kwargs)
except requests.RequestException as e:
raise TripoAPIError(f"请求失败: {method} {url} | {e}") from e
if not resp.ok:
try:
err_payload = resp.json()
except Exception:
err_payload = resp.text
raise TripoAPIError(
f"HTTP {resp.status_code} | {method} {url} | {extract_error_message(err_payload)}"
)
try:
payload = resp.json()
except Exception as e:
raise TripoAPIError(
f"响应不是合法 JSON: {method} {url}\n原始响应前 500 字符:\n{resp.text[:500]}"
) from e
return payload
def create_session(api_key: str) -> requests.Session:
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
})
return session
def upload_image(session: requests.Session, image_path: Path, request_timeout: float) -> str:
if not image_path.exists():
raise FileNotFoundError(f"找不到图片: {image_path}")
with image_path.open("rb") as f:
files = {
"file": (image_path.name, f, guess_mime_type(image_path))
}
payload = request_json(
session,
"POST",
"/upload",
request_timeout=request_timeout,
files=files,
)
data = payload.get("data") or {}
file_token = data.get("image_token")
if not file_token:
raise TripoAPIError(f"上传成功但未返回 image_token: {json.dumps(payload, ensure_ascii=False)}")
return file_token
def build_generation_payload(args, file_token: str, image_path: Path) -> Dict[str, Any]:
file_ext = image_path.suffix.lower().lstrip(".") or "png"
payload: Dict[str, Any] = {
"type": "image_to_model",
"model_version": args.model_version,
"file": {
"type": file_ext,
"file_token": file_token,
},
"texture": args.texture,
"pbr": args.pbr,
"texture_quality": args.texture_quality,
"texture_alignment": args.texture_alignment,
"orientation": args.orientation,
}
optional_fields = [
"face_limit",
"model_seed",
"texture_seed",
"auto_size",
"quad",
"compress",
"generate_parts",
"smart_low_poly",
]
for key in optional_fields:
value = getattr(args, key)
if value is not None:
payload[key] = value
return payload
def create_task(session: requests.Session, payload: Dict[str, Any], request_timeout: float) -> str:
resp = request_json(
session,
"POST",
"/task",
request_timeout=request_timeout,
json=payload,
)
data = resp.get("data") or {}
task_id = data.get("task_id")
if not task_id:
raise TripoAPIError(f"提交任务成功但未返回 task_id: {json.dumps(resp, ensure_ascii=False)}")
return task_id
def get_task(session: requests.Session, task_id: str, request_timeout: float) -> Dict[str, Any]:
return request_json(
session,
"GET",
f"/task/{task_id}",
request_timeout=request_timeout,
)
def poll_task(
session: requests.Session,
task_id: str,
poll_interval: float,
poll_timeout: float,
request_timeout: float,
) -> Dict[str, Any]:
start = time.perf_counter()
last_line = ""
while True:
resp = get_task(session, task_id, request_timeout=request_timeout)
data = resp.get("data") or {}
status = str(data.get("status", "unknown")).lower()
progress = data.get("progress", 0)
elapsed = time.perf_counter() - start
line = f"\r[状态] {status:<10} | [进度] {progress:>3}% | [已等待] {elapsed:>7.1f}s"
if line != last_line:
sys.stdout.write(line)
sys.stdout.flush()
last_line = line
if status == "success":
sys.stdout.write("\n")
sys.stdout.flush()
return resp
if status == "failed":
sys.stdout.write("\n")
sys.stdout.flush()
error_message = data.get("error_message") or extract_error_message(resp)
raise TripoAPIError(f"任务失败 | task_id={task_id} | {error_message}")
if elapsed > poll_timeout:
sys.stdout.write("\n")
sys.stdout.flush()
raise TimeoutError(f"轮询超时: 已等待 {elapsed:.1f}stask_id={task_id}")
time.sleep(poll_interval)
def iter_urls(obj: Any, prefix: str = "output") -> Iterator[Tuple[str, str]]:
if isinstance(obj, dict):
for k, v in obj.items():
yield from iter_urls(v, f"{prefix}.{k}")
elif isinstance(obj, list):
for i, v in enumerate(obj):
yield from iter_urls(v, f"{prefix}[{i}]")
elif isinstance(obj, str) and obj.startswith(("http://", "https://")):
yield prefix, obj
def infer_extension_from_url(url: str) -> str:
path = urlparse(url).path
ext = Path(path).suffix
return ext if ext else ".bin"
def unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
i = 1
while True:
candidate = parent / f"{stem}_{i}{suffix}"
if not candidate.exists():
return candidate
i += 1
def download_file(session: requests.Session, url: str, save_path: Path, request_timeout: float) -> None:
try:
with session.get(url, stream=True, timeout=request_timeout) as resp:
resp.raise_for_status()
with save_path.open("wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
except requests.RequestException as e:
raise TripoAPIError(f"下载失败: {url} | {e}") from e
def save_outputs(
session: requests.Session,
task_resp: Dict[str, Any],
out_dir: Path,
request_timeout: float,
save_task_json: bool = True,
download_outputs: bool = True,
) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
data = task_resp.get("data") or {}
task_id = data.get("task_id", "unknown_task")
output = data.get("output") or {}
if save_task_json:
meta_path = out_dir / f"{safe_filename(task_id)}.json"
with meta_path.open("w", encoding="utf-8") as f:
json.dump(task_resp, f, ensure_ascii=False, indent=2)
if not output:
print("⚠️ 任务成功,但 output 为空。")
return
if not download_outputs:
print(" 已跳过下载,仅保存任务响应。")
return
url_items = list(iter_urls(output))
if not url_items:
print("⚠️ output 中没有找到可下载 URL。")
return
print("\n📥 开始下载输出文件...")
for logical_key, url in url_items:
short_key = logical_key.replace("output.", "")
ext = infer_extension_from_url(url)
filename = safe_filename(short_key) + ext
save_path = unique_path(out_dir / filename)
print(f" - {short_key} -> {save_path}")
download_file(session, url, save_path, request_timeout=request_timeout)
def main():
args = build_parser().parse_args()
if not args.api_key:
raise ValueError("请提供 --api_key 或设置环境变量 TRIPO_API_KEY")
image_path = Path(args.image)
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
session = create_session(args.api_key)
print(f"🚀 启动测试 | 模型: {args.model_version}")
print(f"🖼️ 输入图片: {image_path}")
print(f"📁 输出目录: {out_dir.resolve()}")
start_wall_time = time.perf_counter()
# 1) 上传
print("\n[1/4] 上传图片...")
upload_start = time.perf_counter()
file_token = upload_image(session, image_path, request_timeout=args.request_timeout)
upload_end = time.perf_counter()
print(f"✅ 上传完成 | file_token: {file_token}")
print(f"⏱️ 上传耗时: {upload_end - upload_start:.2f}s")
# 2) 提交任务
print("\n[2/4] 提交 image_to_model 任务...")
payload = build_generation_payload(args, file_token, image_path)
if args.print_payload:
print(json.dumps(payload, ensure_ascii=False, indent=2))
task_id = create_task(session, payload, request_timeout=args.request_timeout)
print(f"✅ 任务提交成功 | task_id: {task_id}")
# 3) 轮询任务
print("\n[3/4] 轮询任务状态...")
gen_start = time.perf_counter()
task_resp = poll_task(
session,
task_id,
poll_interval=args.poll_interval,
poll_timeout=args.poll_timeout,
request_timeout=args.request_timeout,
)
gen_end = time.perf_counter()
data = task_resp.get("data") or {}
output = data.get("output") or {}
total_end = time.perf_counter()
print("\n🎉 生成成功")
print("=" * 60)
print(f"task_id : {task_id}")
print(f"纯生成耗时 : {gen_end - gen_start:.2f}s")
print(f"总流程耗时 : {total_end - start_wall_time:.2f}s")
print(f"最终 status : {data.get('status')}")
print(f"output keys : {list(output.keys()) if isinstance(output, dict) else type(output)}")
print("=" * 60)
if args.print_output:
print(json.dumps(output, ensure_ascii=False, indent=2))
# 4) 下载输出
print("\n[4/4] 保存结果...")
save_outputs(
session,
task_resp,
out_dir=out_dir,
request_timeout=args.request_timeout,
save_task_json=args.save_task_json,
download_outputs=args.download_outputs,
)
print("\n✅ 全部完成。")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"\n❌ 程序终止: {e}")
sys.exit(1)

View File

@@ -0,0 +1,633 @@
import asyncio
import io
import json
import logging
import mimetypes
import os
import time
from pathlib import Path
from typing import Any, Dict, Iterator, Tuple, List
from urllib.parse import urlparse
import httpx
import numpy as np
import requests
import trimesh
from minio import Minio, S3Error
from src.core.config import settings
from src.schemas.generate_3D import Tripo3dApiModel
from src.server.canvas_generate_3D.callback import notify_callback
logger = logging.getLogger(__name__)
minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE)
class TripoAPIError(RuntimeError):
pass
class Triop3dApiServer:
def __init__(self):
self.base_url = "https://api.tripo3d.ai/v2/openapi"
async def _get_client(self) -> httpx.AsyncClient:
"""获取或创建异步客户端(懒加载)"""
self.async_client = httpx.AsyncClient(
timeout=httpx.Timeout(120.0), # 可根据需要调整
headers={
"Authorization": f"Bearer {settings.TRIPO_API_KEY}",
"Accept": "application/json"
},
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10)
)
return self.async_client
async def request_json(self, method: str, endpoint: str, request_timeout: float, **kwargs) -> Dict[str, Any]:
"""异步请求核心方法"""
url = f"{self.base_url}{endpoint}"
client = await self._get_client()
try:
resp = await client.request(method=method, url=url, timeout=request_timeout, **kwargs)
except httpx.RequestError as e:
raise TripoAPIError(f"请求失败: {method} {url} | {e}") from e
if not resp.is_success:
try:
err_payload = resp.json()
except Exception:
err_payload = resp.text
raise TripoAPIError(f"HTTP {resp.status_code} | {method} {url} | {extract_error_message(err_payload)}")
try:
return resp.json()
except Exception as e:
raise TripoAPIError(f"响应不是合法 JSON: {method} {url}") from e
async def upload_image(self, image_path: str, request_timeout: float) -> str:
"""
从 MinIO 读取图片 → 直接上传到 Tripo3D
Args:
image_path: MinIO 中的完整路径,例如 "fida-public-bucket/furniture/sketches/xxx.png"
"user_123/images/test.png"
request_timeout: 请求超时时间
Returns:
str: Tripo3D 返回的 image_token
"""
try:
# 解析 bucket 和 object_name
bucket_name, object_name = image_path.split('/', 1)
print(f"从 MinIO 下载图片: {bucket_name}/{object_name}")
logger.info(f"从 MinIO 下载图片: {bucket_name}/{object_name}")
# 1. 从 MinIO 获取文件
response = minio_client.get_object(bucket_name=bucket_name, object_name=object_name)
# 2. 读取为 bytes关键修复点
data = response.read()
file_name = Path(object_name).name
content_type = get_mime_type(file_name)
# 3. 用 BytesIO 包装httpx 处理更稳定)
file_obj = io.BytesIO(data)
files = {
"file": (
file_name, # 文件名
file_obj, # BytesIO 对象
content_type
)
}
# 4. 异步上传
payload = await self.request_json(
"POST",
"/upload",
request_timeout=request_timeout,
files=files
)
data = payload.get("data") or {}
file_token = data.get("image_token")
if not file_token:
raise TripoAPIError(f"上传成功但未返回 image_token: {json.dumps(payload, ensure_ascii=False)}")
print(f"✅ 图片上传成功 | image_token: {file_token} | 文件: {file_name}")
logger.info(f"✅ 图片上传成功 | image_token: {file_token} | 文件: {file_name}")
return file_token
except Exception as e:
logger.error(f"上传图片失败 {image_path}: {e}")
raise
# ====================== 异步上传多张图片 ======================
async def upload_images(self, image_paths: List[str], request_timeout: float) -> List[str]:
"""
批量从 MinIO 上传多张图片到 Tripo3D
Args:
image_paths: MinIO 对象路径列表
request_timeout: 请求超时时间
Returns:
List[str]: Tripo3D 返回的 image_token 列表
"""
file_tokens = []
for idx, image_path in enumerate(image_paths, 1):
print(f" - 上传第 {idx}/{len(image_paths)} 张图片: {image_path}")
logger.info(f" - 上传第 {idx}/{len(image_paths)} 张图片: {image_path}")
token = await self.upload_image(
image_path=image_path,
request_timeout=request_timeout
)
file_tokens.append(token)
print(f"✅ 所有图片上传完成,共 {len(file_tokens)}")
logger.info(f"✅ 所有图片上传完成,共 {len(file_tokens)}")
return file_tokens
async def create_task(self, payload: Dict[str, Any], request_timeout: float) -> str:
resp = await self.request_json("POST", "/task", request_timeout=request_timeout, json=payload)
data = resp.get("data") or {}
task_id = data.get("task_id")
if not task_id:
raise TripoAPIError(f"未返回 task_id: {json.dumps(resp, ensure_ascii=False)}")
return task_id
# step 3 查询任务状态
async def poll_task(self, task_id: str, poll_interval: float, poll_timeout: float, request_timeout: float, callback_url: str) -> Dict[str, Any]:
start = asyncio.get_running_loop().time()
last_line = ""
while True:
resp = await self.request_json("GET", f"/task/{task_id}", request_timeout=request_timeout)
data = resp.get("data") or {}
status = str(data.get("status", "unknown")).lower()
progress = data.get("progress", 0)
elapsed = asyncio.get_running_loop().time() - start
line = f"[状态] {status:<10} | [进度] {progress:>3}% | [已等待] {elapsed:>7.1f}s"
if line != last_line:
logger.info(line)
print(line)
last_line = line
if status == "success":
return resp
if status == "failed":
await notify_callback(callback_url=callback_url, task_id=task_id, status="failed", result={})
error_message = data.get("error_message") or extract_error_message(resp)
raise TripoAPIError(f"任务失败 | task_id={task_id} | {error_message}")
if elapsed > poll_timeout:
await notify_callback(callback_url=callback_url, task_id=task_id, status="failed", result={})
raise TimeoutError(f"轮询超时: 已等待 {elapsed:.1f}stask_id={task_id}")
await asyncio.sleep(poll_interval)
# step 4 上传结果
async def save_outputs(self, task_resp: Dict[str, Any], request_timeout: float, bucket_name: str, user_id: str):
data = task_resp.get("data") or {}
task_id = data.get("task_id", "unknown_task")
result = data.get("result") or {}
print("\n📥 开始异步处理并上传输出文件...")
logger.info("\n📥 开始异步处理并上传输出文件...")
outputs = {}
for key, value in result.items():
if not isinstance(value, dict) or 'url' not in value:
continue
url = value['url']
parsed = urlparse(url)
path = Path(parsed.path.split('?')[0])
ext = path.suffix.lower() or ".bin"
object_name = f"{user_id}/3d_result/{task_id}{ext}"
# 异步上传到 MinIO
await upload_file_to_minio_from_url_async(
url=url,
bucket_name=bucket_name,
object_name=object_name,
request_timeout=request_timeout
)
if value.get('type') == "glb":
outputs['glb_path'] = f"{bucket_name}/{object_name}"
elif value.get('type') == "webp":
outputs['glb_static_img_path'] = f"{bucket_name}/{object_name}"
else:
outputs[value.get('type', key)] = f"{bucket_name}/{object_name}"
# 异步分析 GLB 模型CPU密集型任务
if 'glb_path' in outputs:
glb_info = await analyze_mesh_async(outputs['glb_path'])
outputs['glb_info'] = glb_info
# outputs = {
# 'glb_path': 'test/3d_result/glb/aea689fd4ee14f53ac9ab0922f9fe5b3.glb',
# 'glb_static_img_path': 'test/3d_result/png/26a7fa7ca48641348847c1f4bca353db.png',
# 'glb_info': {'file_format': '.glb', 'vertex_count': 5275, 'centroid': [0.0044253334706297175, -0.01139796154609474, -0.06385942913980143], 'bounding_box_min': [-0.500163733959198, -0.18078294396400452, -0.29821905493736267], 'bounding_box_max': [0.49963313341140747, 0.17052923142910004, 0.3003925383090973], 'size': [0.9997968673706055, 0.35131217539310455, 0.59861159324646], 'size_ratio': [0.5127898063471029, 0.1801859040236737, 0.30702428962922335],
# 'size_ratio_percentage': [51.278980634710294, 18.01859040236737, 30.702428962922333]}}
return outputs
async def call_back_result(self, callback_url: str, result: Dict, task_id: str):
await notify_callback(
callback_url=callback_url,
task_id=task_id,
status="completed",
result=result,
)
return "ok"
async def upload_file_to_minio_from_url_async(url: str, bucket_name: str, object_name: str, request_timeout: float = 60.0, content_type: str = None):
"""
异步从 Tripo URL 下载文件并上传到 MinIO最终修复版
"""
try:
async with httpx.AsyncClient(timeout=request_timeout) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
# 正确方式:先读取所有内容为 bytes
data_bytes = await resp.aread()
if content_type is None:
content_type = get_mime_type(object_name)
# 关键修复:用 BytesIO 包装 bytes让它拥有 .read() 方法
file_obj = io.BytesIO(data_bytes)
logger.info(f"开始上传到 MinIO → {bucket_name}/{object_name} | 大小: {len(data_bytes):,} bytes")
# 上传到 MinIO
result = minio_client.put_object(
bucket_name=bucket_name,
object_name=object_name,
data=file_obj, # ← 必须传 BytesIO 或有 .read() 的对象
length=len(data_bytes),
content_type=content_type,
part_size=0 # 自动分片
)
logger.info(f"✅ 成功上传到 MinIO: {bucket_name}/{object_name}")
return result
except httpx.HTTPError as e:
raise TripoAPIError(f"下载 Tripo 文件失败: {url} | {e}") from e
except S3Error as e:
raise TripoAPIError(f"上传到 MinIO 失败: {bucket_name}/{object_name} | {e}") from e
except Exception as e:
raise TripoAPIError(f"上传过程异常 {url}: {e}") from e
async def analyze_mesh_async(image_path: str) -> Dict:
"""异步包装 analyze_meshCPU密集型"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, analyze_mesh_sync, image_path)
def analyze_mesh_sync(image_path: str):
"""同步版本(供 executor 调用)"""
bucket_name, object_name = image_path.split('/', 1)
vertices = load_mesh_from_minio(bucket_name=bucket_name, object_name=object_name)
min_coords = vertices.min(axis=0)
max_coords = vertices.max(axis=0)
centroid = vertices.mean(axis=0)
size = max_coords - min_coords
total_size = np.sum(size)
size_ratio = size / total_size if total_size != 0 else np.zeros(3)
return {
"file_format": os.path.splitext(image_path)[1].lower(),
"vertex_count": len(vertices),
"centroid": centroid.tolist(),
"bounding_box_min": min_coords.tolist(),
"bounding_box_max": max_coords.tolist(),
"size": size.tolist(),
"size_ratio": size_ratio.tolist(),
"size_ratio_percentage": (size_ratio * 100).tolist()
}
def get_mime_type(path):
mime, _ = mimetypes.guess_type(str(path))
return mime or "application/octet-stream"
def extract_error_message(payload: Any) -> str:
if isinstance(payload, dict):
for key in ("message", "error", "error_message", "detail", "suggestion"):
if payload.get(key):
return str(payload[key])
data = payload.get("data")
if isinstance(data, dict):
for key in ("message", "error", "error_message", "detail", "suggestion"):
if data.get(key):
return str(data[key])
return json.dumps(payload, ensure_ascii=False)[:800]
return str(payload)[:800]
def iter_urls(obj: Any, prefix: str = "output") -> Iterator[Tuple[str, str]]:
if isinstance(obj, dict):
for k, v in obj.items():
yield from iter_urls(v, f"{prefix}.{k}")
elif isinstance(obj, list):
for i, v in enumerate(obj):
yield from iter_urls(v, f"{prefix}[{i}]")
elif isinstance(obj, str) and obj.startswith(("http://", "https://")):
yield prefix, obj
def upload_file_to_minio_from_url(session: requests.Session, url: str, bucket_name: str, object_name: str, request_timeout: float = 30.0, content_type: str = "application/octet-stream"):
"""
从 URL 下载文件流,直接上传到 MinIO不落地本地
"""
try:
with session.get(url, stream=True, timeout=request_timeout) as resp:
resp.raise_for_status()
# 获取文件大小(如果服务器返回 Content-Length
content_length = int(resp.headers.get('Content-Length', 0))
# 如果无法获取长度,可以设为 -1MinIO 会自动处理分块上传)
length = content_length if content_length > 0 else -1
# 直接把 response.raw 传给 put_object最推荐的流式方式
result = minio_client.put_object( # 假设你的 MinIO 客户端是 self.minio_client
bucket_name=bucket_name,
object_name=object_name,
data=resp.raw, # 关键:直接传 raw stream
length=length,
content_type=content_type,
part_size=0 # 0 表示让 MinIO 自动选择合适的分片大小
)
except requests.RequestException as e:
raise TripoAPIError(f"下载失败: {url} | {e}") from e
except S3Error as e:
raise TripoAPIError(f"上传到 MinIO 失败: {bucket_name}/{object_name} | {e}") from e
return result
def analyze_mesh(image_path: str):
# 加载模型顶点(直接从 MinIO
bucket_name, object_name = image_path.split('/', 1)
vertices = load_mesh_from_minio(bucket_name=bucket_name, object_name=object_name)
# 计算各项指标
min_coords = vertices.min(axis=0)
max_coords = vertices.max(axis=0)
centroid = vertices.mean(axis=0)
size = max_coords - min_coords
total_size = np.sum(size)
size_ratio = size / total_size if total_size != 0 else np.zeros(3)
info = {
"file_format": os.path.splitext(image_path)[1].lower(),
"vertex_count": len(vertices),
"centroid": centroid.tolist(),
"bounding_box_min": min_coords.tolist(),
"bounding_box_max": max_coords.tolist(),
"size": size.tolist(),
"size_ratio": size_ratio.tolist(),
"size_ratio_percentage": (size_ratio * 100).tolist()
}
return info
def load_mesh_from_minio(object_name: str, bucket_name: str = "fida-user"):
"""
从 MinIO 直接加载 .glb / .gltf / .obj 文件,返回顶点数组
"""
try:
# 从 MinIO 获取文件流
response = minio_client.get_object(bucket_name, object_name)
# 读取为 bytes 并包装成 BytesIO
data = response.read()
file_obj = io.BytesIO(data)
file_ext = os.path.splitext(object_name)[1].lower()
# 根据后缀加载模型
if file_ext in ('.glb', '.gltf'):
mesh = trimesh.load(file_obj, file_type='glb')
elif file_ext == '.obj':
mesh = trimesh.load(file_obj, file_type='obj')
else:
raise ValueError(f"不支持的文件格式: {file_ext},仅支持 .obj 和 .glb/.gltf")
except S3Error as e:
raise RuntimeError(f"从 MinIO 获取模型失败: {object_name} | {e}") from e
except Exception as e:
raise RuntimeError(f"加载模型失败: {object_name} | {e}") from e
# 处理 Scene 或单个 Mesh
if isinstance(mesh, trimesh.Scene):
vertices = np.vstack([geom.vertices for geom in mesh.geometry.values()])
else:
vertices = mesh.vertices
if len(vertices) == 0:
raise ValueError(f"模型中未找到顶点数据: {object_name}")
return vertices
async def create_single_task(input_data: Tripo3dApiModel) -> str:
"""
异步版本:创建单个图片转 3D 的任务
"""
server = Triop3dApiServer()
# Step 1: 上传图片(异步)
print(f"开始上传图片: {input_data.input_images[0]}")
logger.info(f"开始上传图片: {input_data.input_images[0]}")
file_token = await server.upload_image(
image_path=input_data.input_images[0],
request_timeout=input_data.request_timeout
)
print(f"✅ 图片上传成功file_token: {file_token}")
logger.info(f"✅ 图片上传成功file_token: {file_token}")
# Step 2: 构建请求 payload
file_ext = Path(input_data.input_images[0]).suffix.lower().lstrip('.') or "png"
if file_ext == "jpeg":
file_ext = "jpg"
input_payload = {
"type": "image_to_model",
"file": {
"type": file_ext,
"file_token": file_token,
}
}
# 合并用户传入的参数Pydantic Model 转 dict
payload = input_payload | input_data.model_dump(exclude_unset=True)
# Step 3: 提交任务(异步)
print("正在提交 Tripo3D 任务...")
logger.info("正在提交 Tripo3D 任务...")
task_id = await server.create_task(
payload=payload,
request_timeout=input_data.request_timeout
)
print(f"✅ 任务创建成功 | task_id: {task_id}")
logger.info(f"✅ 任务创建成功 | task_id: {task_id}")
return task_id
async def create_multi_task(input_data: Tripo3dApiModel) -> str:
"""
异步版本:创建多图转 3D 的任务
"""
server = Triop3dApiServer()
# Step 1: 上传多张图片(异步)
logger.info(f"开始上传 {len(input_data.input_images)} 张图片...")
print(f"开始上传 {len(input_data.input_images)} 张图片...")
file_tokens = await server.upload_images(
image_paths=input_data.input_images,
request_timeout=input_data.request_timeout
)
logger.info(f"✅ 图片上传完成,共 {len(file_tokens)} 个 token")
print(f"✅ 图片上传完成,共 {len(file_tokens)} 个 token")
# Step 2: 构建多图 payload
files = []
for image_path, file_token in zip(input_data.input_images, file_tokens):
file_ext = Path(image_path).suffix.lower().lstrip('.') or "png"
if file_ext == "jpeg":
file_ext = "jpg"
files.append({
"type": file_ext,
"file_token": file_token,
})
while len(files) < 4:
files.append({})
if len(files) > 4:
files = files[:4]
payload: Dict[str, Any] = {
"type": "multiview_to_model",
"model_version": input_data.model_version,
"files": files,
"face_limit": 2000,
"texture": input_data.texture,
"pbr": input_data.pbr,
}
# Step 3: 提交任务(异步)
logger.info(f"正在提交多图 Tripo3D 任务...{payload}")
print(f"正在提交多图 Tripo3D 任务...{payload}")
task_id = await server.create_task(payload=payload, request_timeout=input_data.request_timeout)
logger.info(f"✅ 多图任务创建成功 | task_id: {task_id}")
print(f"✅ 多图任务创建成功 | task_id: {task_id}")
return task_id
async def get_task_result_async(input_data: Tripo3dApiModel, task_id: str, api_task_id: str, callback_url: str):
server = Triop3dApiServer()
task_resp = await server.poll_task(
task_id=api_task_id,
poll_interval=input_data.poll_interval,
poll_timeout=input_data.poll_timeout,
request_timeout=input_data.request_timeout,
callback_url=callback_url
)
outputs = await server.save_outputs(
task_resp=task_resp,
request_timeout=input_data.request_timeout,
bucket_name=input_data.bucket_name,
user_id=input_data.user_id
)
print(f"tripo3d 任务处理完成 | api_task_id: {api_task_id} | status: success")
logger.info(f"tripo3d 任务处理完成 | api_task_id: {api_task_id} | status: success")
await server.call_back_result(callback_url, outputs, task_id)
async def single_img_to_model_async(input_data: Tripo3dApiModel):
"""
完整的单图转 3D 异步流程
"""
try:
# Step 1: 创建任务
task_id = await create_single_task(input_data)
# Step 2: 轮询任务状态 + 处理输出 + 回调
await get_task_result_async(input_data, task_id, input_data.callback_url)
return task_id
except Exception as e:
logger.error(f"单图转 3D 任务失败 | error: {e}", exc_info=True)
# 可在此处调用失败回调
await notify_callback(
callback_url=input_data.callback_url,
task_id="unknown",
status="failed",
result={"error": str(e)}
)
raise
async def multi_img_to_model_async(input_data: Tripo3dApiModel):
"""
完整的多图转 3D 异步流程
"""
try:
# Step 1: 创建多图任务
task_id = await create_multi_task(input_data)
# Step 2: 轮询任务 + 处理输出 + 回调
await get_task_result_async(input_data, task_id, input_data.callback_url)
except Exception as e:
logger.error(f"多图转 3D 任务失败 | error: {e}", exc_info=True)
# 失败回调
await notify_callback(
callback_url=input_data.callback_url,
task_id="unknown",
status="failed",
result={"error": str(e)}
)
raise
if __name__ == '__main__':
# input_data = Tripo3dApiModel(input_images=['test/img_to_3d_data/example_multi_image/mushroom_1.png'], bucket_name='test', user_id='test', callback_url="http://18.167.251.121:10015/api/image/webhook/img-to-3d")
# asyncio.run(single_img_to_model_async(input_data))
input_data = Tripo3dApiModel(
input_images=['test/img_to_3d_data/example_multi_image/mushroom_3.png', 'test/img_to_3d_data/example_multi_image/mushroom_2.png', 'test/img_to_3d_data/example_multi_image/mushroom_1.png'],
bucket_name='test', user_id='test', callback_url="http://18.167.251.121:10015/api/image/webhook/img-to-3d",
face_limit=4000
)
asyncio.run(multi_img_to_model_async(input_data))