feat(新功能): 新增wan2.2 pose-transform模型接口,comfyui-api形式

fix(修复bug):
docs(文档变更):
refactor(重构):
test(增加测试):
This commit is contained in:
zhh
2025-11-03 16:37:41 +08:00
parent 7459583377
commit a9d9bdcb71

View File

@@ -15,6 +15,7 @@ from moviepy.video.io.VideoFileClip import VideoFileClip
from app.core.config import REDIS_HOST, REDIS_PORT, REDIS_DB, MINIO_URL, MINIO_ACCESS, MINIO_SECRET, MINIO_SECURE, COMFYUI_SERVER_ADDRESS, PS_RABBITMQ_QUEUES, DEBUG
from app.schemas.pose_transform import PoseTransformModel
from app.service.generate_image.utils.mq import publish_status
logger = logging.getLogger()
@@ -331,15 +332,13 @@ class ComfyUIServerI2V:
self.tasks_id = request_data.tasks_id
self.user_id = self.tasks_id[self.tasks_id.rfind('-') + 1:]
self.redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'gif_url': '',
'video_url': '', 'image_url': ''}
self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'gif_url': '', 'video_url': '', 'image_url': ''}
self.redis_client.set(self.tasks_id, json.dumps(self.pose_transform_data))
self.redis_client.expire(self.tasks_id, 600)
self.minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
def get_result(self):
logger.info("11111111111111")
workflow_json['174']['inputs']['file'] = pose_video_map[self.pose_num ]
workflow_json['174']['inputs']['file'] = pose_video_map[self.pose_num]
workflow_json['169']['inputs']['noise_seed'] = random.randint(0, 10 ** 18)
# 下载图片 上传 comfyui server
@@ -353,7 +352,7 @@ class ComfyUIServerI2V:
return
prompt_id = prompt_response.get("prompt_id")
print(f" 任务已提交Prompt ID: {prompt_id}")
logger.info(f" 任务已提交Prompt ID: {prompt_id}")
outputs = self.poll_history(prompt_id)
file_list = {}
@@ -370,7 +369,7 @@ class ComfyUIServerI2V:
'subfolder': file_info['subfolder'],
'type': file_info['type']
}
print(file_list)
logger.info(file_list)
return self.process_and_upload_comfyui_video(filename=file_list['filename'], subfolder=file_list['subfolder'], prompt_id=prompt_response['prompt_id']), prompt_id
def read_tasks_status(self):
@@ -380,7 +379,7 @@ class ComfyUIServerI2V:
def download_from_minio_in_memory(self):
bucket = self.image_url.split('/')[0]
object_name = self.image_url[self.image_url.find('/') + 1:]
print("🚀 正在连接 MinIO 客户端...")
# print("🚀 正在连接 MinIO 客户端...")
try:
# get_object 返回一个 ResponseStream 对象
@@ -397,14 +396,14 @@ class ComfyUIServerI2V:
in_memory_file = io.BytesIO(image_bytes)
print(f"✅ 图片已下载到内存 ({len(image_bytes)} 字节)。")
# print(f"✅ 图片已下载到内存 ({len(image_bytes)} 字节)。")
return in_memory_file, object_name.rsplit('/')[-1]
except S3Error as e:
print(f"❌ MinIO S3 错误 (例如,对象不存在): {e}")
logger.error(f"❌ MinIO S3 错误 (例如,对象不存在): {e}")
return None, None
except Exception as e:
print(f"❌ MinIO 下载过程中发生未知错误: {e}")
logger.error(f"❌ MinIO 下载过程中发生未知错误: {e}")
return None, None
def upload_video_to_minio(self, BUCKET_NAME, OBJECT_NAME, LOCAL_FILE_PATH):
@@ -419,16 +418,16 @@ class ComfyUIServerI2V:
content_type="video/mp4" # 设置正确的内容类型
)
print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:")
print(f" 对象名: {result.object_name}")
print(f" Etag: {result.etag}")
# print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:")
# print(f" 对象名: {result.object_name}")
# print(f" Etag: {result.etag}")
except S3Error as e:
print(f"❌ MinIO 操作失败: {e}")
logger.error(f"❌ MinIO 操作失败: {e}")
except FileNotFoundError:
print(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}")
logger.error(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}")
except Exception as e:
print(f"❌ 发生未知错误: {e}")
logger.error(f"❌ 发生未知错误: {e}")
def upload_gif_to_minio(self, BUCKET_NAME, OBJECT_NAME, LOCAL_FILE_PATH):
"""使用 fput_object 从本地路径上传 MP4 文件"""
@@ -442,16 +441,16 @@ class ComfyUIServerI2V:
content_type="video/mp4" # 设置正确的内容类型
)
print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:")
print(f" 对象名: {result.object_name}")
print(f" Etag: {result.etag}")
# print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:")
# print(f" 对象名: {result.object_name}")
# print(f" Etag: {result.etag}")
except S3Error as e:
print(f"❌ MinIO 操作失败: {e}")
logger.error(f"❌ MinIO 操作失败: {e}")
except FileNotFoundError:
print(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}")
logger.error(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}")
except Exception as e:
print(f"❌ 发生未知错误: {e}")
logger.error(f"❌ 发生未知错误: {e}")
def upload_in_memory_file_to_comfyui(self, in_memory_file, filename):
upload_url = f"http://{COMFYUI_SERVER_ADDRESS}/upload/image"
@@ -469,7 +468,7 @@ class ComfyUIServerI2V:
'image': (filename, in_memory_file, mime_type)
}
print(f"⬆️ 正在上传图片 ({filename}) 到 ComfyUI...")
# print(f"⬆️ 正在上传图片 ({filename}) 到 ComfyUI...")
try:
comfyui_response = requests.post(upload_url, data=data, files=files)
comfyui_response.raise_for_status()
@@ -477,20 +476,15 @@ class ComfyUIServerI2V:
result = comfyui_response.json()
uploaded_name = result.get('name')
print(f"🎉 ComfyUI 上传成功! 服务器文件名: {uploaded_name}")
# print(f"🎉 ComfyUI 上传成功! 服务器文件名: {uploaded_name}")
return uploaded_name
except requests.exceptions.RequestException as e:
print(f"❌ ComfyUI 上传失败: {e}")
print(f" 响应内容: {comfyui_response.text}")
logger.error(f"❌ ComfyUI 上传失败: {e}")
logger.error(f" 响应内容: {comfyui_response.text}")
return None
def process_and_upload_comfyui_video(
self,
filename: str,
subfolder: str,
prompt_id: str,
):
def process_and_upload_comfyui_video(self, filename: str, subfolder: str, prompt_id: str, ):
"""
完整的自动化流程:获取 ComfyUI 视频 -> 转换 GIF 并提取帧 -> 上传所有结果到 MinIO。
"""
@@ -521,7 +515,7 @@ class ComfyUIServerI2V:
tmp_file.write(mp4_bytes) # 将内存数据写入磁盘
temp_mp4_path = tmp_file.name # 记录文件路径
print(f"临时文件已写入: {temp_mp4_path}")
# print(f"临时文件已写入: {temp_mp4_path}")
# 2. 使用 moviepy 打开临时文件 (传入文件路径字符串)
clip = VideoFileClip(temp_mp4_path)
@@ -536,7 +530,7 @@ class ComfyUIServerI2V:
image.save(frame_stream, 'JPEG')
frame_bytes = frame_stream.getvalue()
print("✅ 成功提取第一帧图片。")
logger.info("✅ 成功提取第一帧图片。")
# 视频转 GIF (使用另一个临时文件来保存 GIF)
temp_gif_path = ""
@@ -549,7 +543,7 @@ class ComfyUIServerI2V:
with open(temp_gif_path, 'rb') as f:
gif_bytes = f.read()
print("✅ 成功生成 GIF。")
logger.info("✅ 成功生成 GIF。")
# 返回结果 (例如: 上传到 MinIO)
# return mp4_bytes, gif_bytes, frame_bytes
@@ -557,31 +551,44 @@ class ComfyUIServerI2V:
# -----------------------------------------------
except Exception as e:
print(f"❌ 视频处理或文件操作失败: {e}")
logger.error(f"❌ 视频处理或文件操作失败: {e}")
# 在失败时,也尝试清理文件
finally:
# 3. 清理临时文件 (非常重要!)
if os.path.exists(temp_mp4_path):
os.remove(temp_mp4_path)
print(f"🗑️ 已删除临时 MP4 文件: {temp_mp4_path}")
logger.info(f"🗑️ 已删除临时 MP4 文件: {temp_mp4_path}")
if 'temp_gif_path' in locals() and os.path.exists(temp_gif_path):
os.remove(temp_gif_path)
print(f"🗑️ 已删除临时 GIF 文件: {temp_gif_path}")
logger.info(f"🗑️ 已删除临时 GIF 文件: {temp_gif_path}")
# 3. 上传所有结果到 MinIO
# 上传原始 MP4
self.upload_stream_to_minio(mp4_bytes, MP4_OBJECT, "video/mp4")
try:
# 上传原始 MP4
self.upload_stream_to_minio(mp4_bytes, MP4_OBJECT, "video/mp4")
# 上传生成的 GIF
self.upload_stream_to_minio(gif_bytes, GIF_OBJECT, "image/gif")
# 上传生成的 GIF
self.upload_stream_to_minio(gif_bytes, GIF_OBJECT, "image/gif")
# 上传第一帧图片
self.upload_stream_to_minio(frame_bytes, FRAME_OBJECT, "image/jpeg")
# 上传第一帧图片
self.upload_stream_to_minio(frame_bytes, FRAME_OBJECT, "image/jpeg")
return "\n🎉 所有任务完成!"
self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'SUCCESS', 'message': "success", 'gif_url': f'aida-users/{GIF_OBJECT}', 'video_url': f'aida-users/{MP4_OBJECT}', 'image_url': f'aida-users/{FRAME_OBJECT}'}
# 推送消息
# if not DEBUG:
publish_status(json.dumps(self.pose_transform_data), PS_RABBITMQ_QUEUES)
logger.info(
f" [x] Sent to {PS_RABBITMQ_QUEUES} data@@@@ {json.dumps(self.pose_transform_data, indent=4)}")
return "\n🎉 所有任务完成!"
except Exception as e:
logger.error(e)
return None
# --- 辅助函数:提交任务到队列 ---
def queue_prompt(self, prompt, client_id):
@@ -591,32 +598,21 @@ class ComfyUIServerI2V:
# 提交任务到 /prompt 端点
response = requests.post(f"http://{COMFYUI_SERVER_ADDRESS}/prompt", data=data)
print(f"-------------{response.text}")
print(f"------------{client_id}")
# print(f"-------------{response.text}")
# print(f"------------{client_id}")
if response.status_code == 200:
return response.json()
else:
print(f"提交任务失败,状态码: {response.status_code}")
print(response.text)
return None
def get_history(self, prompt_id):
"""通过 prompt_id 获取任务的历史记录和输出。"""
# 查询 /history/{prompt_id} 端点
response = requests.get(f"http://{COMFYUI_SERVER_ADDRESS}/history/{prompt_id}")
if response.status_code == 200:
return response.json()
else:
print(f"获取历史记录失败,状态码: {response.status_code}")
logger.warning(f"提交任务失败,状态码: {response.status_code}")
logger.warning(response.text)
return None
def poll_history(self, prompt_id, interval_seconds=5):
"""步骤 2: 轮询 /history/{prompt_id} 检查任务是否完成"""
url = f"http://{COMFYUI_SERVER_ADDRESS}/history/{prompt_id}"
print(f"⏳ 开始轮询状态 (间隔 {interval_seconds} 秒)...")
logger.info(f"⏳ 开始轮询状态 (间隔 {interval_seconds} 秒)...")
while True:
time.sleep(interval_seconds)
@@ -629,14 +625,14 @@ class ComfyUIServerI2V:
# ComfyUI 返回的历史记录结构是 {prompt_id: {outputs: ...}}
if prompt_id in history_data:
print("🎉 任务已完成!")
logger.info("🎉 任务已完成!")
return history_data[prompt_id]['outputs']
print("⏳ 任务仍在执行或等待中...")
logger.info("⏳ 任务仍在执行或等待中...")
except requests.exceptions.RequestException as e:
# 处理可能的连接错误,但通常不会在内部轮询中发生
print(f"⚠️ 轮询时发生错误: {e}")
logger.info(f"⚠️ 轮询时发生错误: {e}")
pass
def get_comfyui_video_bytes(self, filename: str, subfolder: str, file_type: str = "output"):
@@ -658,7 +654,7 @@ class ComfyUIServerI2V:
"type": file_type
}
print(f"📡 正在从 ComfyUI 下载视频: {filename}")
logger.info(f"📡 正在从 ComfyUI 下载视频: {filename}")
try:
# 使用 requests.get 下载文件
response = requests.get(url, params=params, stream=True)
@@ -668,12 +664,12 @@ class ComfyUIServerI2V:
return response.content
except requests.exceptions.RequestException as e:
print(f"❌ 从 ComfyUI 获取视频失败: {e}")
logger.error(f"❌ 从 ComfyUI 获取视频失败: {e}")
return None
def upload_stream_to_minio(self, video_bytes: bytes, object_name: str, content_type: str):
"""从内存流上传数据到 MinIO。"""
print(f"☁️ 正在上传对象到 MinIO: {object_name}")
logger.info(f"☁️ 正在上传对象到 MinIO: {object_name}")
try:
data_stream = io.BytesIO(video_bytes)
@@ -685,10 +681,10 @@ class ComfyUIServerI2V:
length=len(video_bytes),
content_type=content_type
)
print(f"✅ MinIO 上传成功: {result.object_name}")
logger.info(f"✅ MinIO 上传成功: {result.object_name}")
return True
except S3Error as e:
print(f"❌ MinIO 上传失败: {e}")
logger.error(f"❌ MinIO 上传失败: {e}")
return False