Files

647 lines
22 KiB
Python
Raw Permalink Normal View History

import io
import json
import logging
import os
import random
import tempfile
import time
import uuid
import requests
from PIL import Image
from minio import Minio, S3Error
from moviepy.video.io.VideoFileClip import VideoFileClip
from app.core.config import PS_RABBITMQ_QUEUES
from app.core.config import settings
from app.schemas.comfyui_i2v import ComfyuiFLF2VModel
from app.service.generate_image.utils.mq import publish_status
logger = logging.getLogger()
# 首尾帧 + 文字 = 视频 工作流
workflow_json = {
"6": {
"inputs": {
"text": "A bearded man with red facial hair wearing a yellow straw hat and dark coat in Van Gogh's self-portrait style, slowly and continuously transforms into a space astronaut. The transformation flows like liquid paint - his beard fades away strand by strand, the yellow hat melts and reforms smoothly into a silver space helmet, dark coat gradually lightens and restructures into a white spacesuit. The background swirling brushstrokes slowly organize and clarify into realistic stars and space, with Earth appearing gradually in the distance. Every change happens in seamless waves, maintaining visual continuity throughout the metamorphosis.\n\nConsistent soft lighting throughout, medium close-up maintaining same framing, central composition stays fixed, gentle color temperature shift from warm to cool, gradual contrast increase, smooth style transition from painterly to photorealistic. Static camera with subtle slow zoom, emphasizing the flowing transformation process without abrupt changes.",
"clip": [
"38",
0
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Positive Prompt)"
}
},
"7": {
"inputs": {
"text": "色调艳丽过曝静态细节模糊不清字幕风格作品画作画面静止整体发灰最差质量低质量JPEG压缩残留丑陋的残缺的多余的手指画得不好的手部画得不好的脸部畸形的毁容的形态畸形的肢体手指融合静止不动的画面杂乱的背景三条腿背景人很多倒着走",
"clip": [
"38",
0
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Negative Prompt)"
}
},
"8": {
"inputs": {
"samples": [
"58",
0
],
"vae": [
"39",
0
]
},
"class_type": "VAEDecode",
"_meta": {
"title": "VAE解码"
}
},
"37": {
"inputs": {
"unet_name": "wan2.2_i2v_high_noise_14B_fp8_scaled.safetensors",
"weight_dtype": "default"
},
"class_type": "UNETLoader",
"_meta": {
"title": "UNet加载器"
}
},
"38": {
"inputs": {
"clip_name": "umt5_xxl_fp8_e4m3fn_scaled.safetensors",
"type": "wan",
"device": "default"
},
"class_type": "CLIPLoader",
"_meta": {
"title": "加载CLIP"
}
},
"39": {
"inputs": {
"vae_name": "wan_2.1_vae.safetensors"
},
"class_type": "VAELoader",
"_meta": {
"title": "加载VAE"
}
},
"54": {
"inputs": {
"shift": 5,
"model": [
"91",
0
]
},
"class_type": "ModelSamplingSD3",
"_meta": {
"title": "采样算法SD3"
}
},
"55": {
"inputs": {
"shift": 5,
"model": [
"92",
0
]
},
"class_type": "ModelSamplingSD3",
"_meta": {
"title": "采样算法SD3"
}
},
"56": {
"inputs": {
"unet_name": "wan2.2_i2v_low_noise_14B_fp8_scaled.safetensors",
"weight_dtype": "default"
},
"class_type": "UNETLoader",
"_meta": {
"title": "UNet加载器"
}
},
"57": {
"inputs": {
"add_noise": "enable",
"noise_seed": 984937593540091,
"steps": 4,
"cfg": 1,
"sampler_name": "euler",
"scheduler": "simple",
"start_at_step": 0,
"end_at_step": 2,
"return_with_leftover_noise": "enable",
"model": [
"54",
0
],
"positive": [
"67",
0
],
"negative": [
"67",
1
],
"latent_image": [
"67",
2
]
},
"class_type": "KSamplerAdvanced",
"_meta": {
"title": "K采样器高级"
}
},
"58": {
"inputs": {
"add_noise": "disable",
"noise_seed": 0,
"steps": 4,
"cfg": 1,
"sampler_name": "euler",
"scheduler": "simple",
"start_at_step": 2,
"end_at_step": 10000,
"return_with_leftover_noise": "disable",
"model": [
"55",
0
],
"positive": [
"67",
0
],
"negative": [
"67",
1
],
"latent_image": [
"57",
0
]
},
"class_type": "KSamplerAdvanced",
"_meta": {
"title": "K采样器高级"
}
},
"60": {
"inputs": {
"fps": 16,
"images": [
"8",
0
]
},
"class_type": "CreateVideo",
"_meta": {
"title": "创建视频"
}
},
"61": {
"inputs": {
"filename_prefix": "video/ComfyUI",
"format": "auto",
"codec": "auto",
"video": [
"60",
0
]
},
"class_type": "SaveVideo",
"_meta": {
"title": "保存视频"
}
},
"62": {
"inputs": {
"image": "video_wan2_2_14B_flf2v_start_image.png"
},
"class_type": "LoadImage",
"_meta": {
"title": "加载end图像"
}
},
"67": {
"inputs": {
"width": 640,
"height": 640,
"length": 81,
"batch_size": 1,
"positive": [
"6",
0
],
"negative": [
"7",
0
],
"vae": [
"39",
0
],
"start_image": [
"68",
0
],
"end_image": [
"62",
0
]
},
"class_type": "WanFirstLastFrameToVideo",
"_meta": {
"title": "WanFirstLastFrameToVideo"
}
},
"68": {
"inputs": {
"image": "video_wan2_2_14B_flf2v_end_image.png"
},
"class_type": "LoadImage",
"_meta": {
"title": "加载start图像"
}
},
"91": {
"inputs": {
"lora_name": "wan2.2_i2v_lightx2v_4steps_lora_v1_high_noise.safetensors",
"strength_model": 1,
"model": [
"37",
0
]
},
"class_type": "LoraLoaderModelOnly",
"_meta": {
"title": "LoRA加载器仅模型"
}
},
"92": {
"inputs": {
"lora_name": "wan2.2_i2v_lightx2v_4steps_lora_v1_low_noise.safetensors",
"strength_model": 1,
"model": [
"56",
0
]
},
"class_type": "LoraLoaderModelOnly",
"_meta": {
"title": "LoRA加载器仅模型"
}
}
}
class ComfyUIServerFLF2V:
def __init__(self, request_data):
self.pose_transform_data = None
self.start_image_url = request_data.start_image_url
self.end_image_url = request_data.end_image_url
self.prompt = request_data.prompt
self.tasks_id = request_data.tasks_id
self.user_id = self.tasks_id[self.tasks_id.rfind('-') + 1:]
self.server_status_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'gif_url': '', 'video_url': '', 'image_url': ''}
self.minio_client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS, secret_key=settings.MINIO_SECRET, secure=settings.MINIO_SECURE)
def get_result(self):
workflow_json['6']['inputs']['text'] = self.prompt
workflow_json['57']['inputs']["noise_seed"] = random.randint(0, 10 ** 18)
if self.start_image_url:
# 下载图片 上传 comfyui server
# TODO 设置视频宽度为480高度自适应
workflow_json['67']['inputs']["width"] = 480
workflow_json['67']['inputs']["height"] = 848
if self.start_image_url:
start_in_memory_file, start_object_name = self.download_from_minio_in_memory(self.start_image_url)
# 上传图片到comfyui server
filename = self.upload_in_memory_file_to_comfyui(start_in_memory_file, start_object_name)
workflow_json['68']['inputs']['image'] = filename
else:
assert "start_image_url is None"
if self.end_image_url:
end_in_memory_file, end_object_name = self.download_from_minio_in_memory(self.end_image_url)
# 上传图片到comfyui server
filename = self.upload_in_memory_file_to_comfyui(end_in_memory_file, end_object_name)
workflow_json['62']['inputs']['image'] = filename
else:
assert "end_image_url is None"
# 1. 提交任务
prompt_response = self.queue_prompt(workflow_json, self.tasks_id)
if not prompt_response:
return None
prompt_id = prompt_response.get("prompt_id")
logger.info(f" 任务已提交Prompt ID: {prompt_id}")
outputs = self.poll_history(prompt_id)
file_list = {}
for node_id, node_output in outputs.items():
# 检查当前节点输出中是否包含 'images' 列表
if 'images' in node_output and isinstance(node_output['images'], list):
# 'images' 列表中的每个元素都是一个文件对象
for file_info in node_output['images']:
# 确保关键字段存在
if all(key in file_info for key in ['filename', 'subfolder', 'type']):
file_list = {
'filename': file_info['filename'],
'subfolder': file_info['subfolder'],
'type': file_info['type']
}
logger.info(file_list)
return self.process_and_upload_comfyui_video(filename=file_list['filename'], subfolder=file_list['subfolder'], prompt_id=prompt_response['prompt_id']), prompt_id
return None
def download_from_minio_in_memory(self, image_url):
bucket = image_url.split('/')[0]
object_name = image_url[image_url.find('/') + 1:]
try:
# get_object 返回一个 ResponseStream 对象
response_stream = self.minio_client.get_object(
bucket,
object_name,
)
# 读取整个流到内存 (BytesIO),避免写入本地文件
image_bytes = response_stream.read()
response_stream.close()
response_stream.release_conn()
in_memory_file = io.BytesIO(image_bytes)
# print(f"✅ 图片已下载到内存 ({len(image_bytes)} 字节)。")
return in_memory_file, object_name.rsplit('/')[-1]
except S3Error as e:
logger.error(f"❌ MinIO S3 错误 (例如,对象不存在): {e}")
return None, None
except Exception as e:
logger.error(f"❌ MinIO 下载过程中发生未知错误: {e}")
return None, None
@staticmethod
def upload_in_memory_file_to_comfyui(in_memory_file, filename):
upload_url = f"http://{settings.COMFYUI_SERVER_ADDRESS}/upload/image"
data = {
"overwrite": "true",
"type": "input"
}
# 构建 multipart/form-data: (文件名, 内存文件对象, MIME 类型)
# MIME 类型可以根据实际图片类型修改,这里使用常见的 png/jpeg
mime_type = 'image/png' if filename.lower().endswith('.png') else 'image/jpeg'
files = {
'image': (filename, in_memory_file, mime_type)
}
# print(f"⬆️ 正在上传图片 ({filename}) 到 ComfyUI...")
try:
comfyui_response = requests.post(upload_url, data=data, files=files)
comfyui_response.raise_for_status()
result = comfyui_response.json()
uploaded_name = result.get('name')
# print(f"🎉 ComfyUI 上传成功! 服务器文件名: {uploaded_name}")
return uploaded_name
except requests.exceptions.RequestException as e:
logger.error(f"❌ ComfyUI 上传失败: {e}")
logger.error(f" 响应内容: {comfyui_response.text}")
return None
def process_and_upload_comfyui_video(self, filename: str, subfolder: str, prompt_id: str, ):
"""
完整的自动化流程获取 ComfyUI 视频 -> 转换 GIF 并提取帧 -> 上传所有结果到 MinIO
"""
# 1. 从 ComfyUI 获取视频二进制数据
mp4_bytes = self.get_comfyui_video_bytes(filename, subfolder)
if not mp4_bytes:
return None
# 2. 准备进行视频处理
# moviepy 不支持直接使用 bytes需要将 bytes 写入一个 BytesIO 或临时文件
# 为了避免写磁盘,我们将使用 BytesIO但 MoviePy 内部依赖 FFmpeg有时需要一个可寻址的本地文件路径。
# 最可靠且避免写本地的方案是在内存中操作,然后将结果上传。
# ⚠️ 关键点:将 mp4_bytes 写入 BytesIO 以模拟文件,供 moviepy 读取
# 定义输出对象名
output_base_name = uuid.uuid4().hex
MP4_OBJECT = f"{self.user_id}/pose_transform_video/{prompt_id}/{output_base_name}.mp4"
GIF_OBJECT = f"{self.user_id}/pose_transform_gif/{prompt_id}/{output_base_name}.gif"
FRAME_OBJECT = f"{self.user_id}/pose_transform_first_img/{prompt_id}/{output_base_name}_frame.jpg"
# --- 视频处理和帧提取 ---
try:
# 1. 创建一个临时的 MP4 文件路径
# delete=False 确保文件在关闭后仍然存在,直到我们手动删除
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
tmp_file.write(mp4_bytes) # 将内存数据写入磁盘
temp_mp4_path = tmp_file.name # 记录文件路径
# print(f"临时文件已写入: {temp_mp4_path}")
# 2. 使用 moviepy 打开临时文件 (传入文件路径字符串)
clip = VideoFileClip(temp_mp4_path)
# --- 在这里进行所有的视频处理和提取操作 ---
# 提取第一帧 (保持原尺寸)
frame_array = clip.get_frame(t=0.0)
image = Image.fromarray(frame_array)
frame_stream = io.BytesIO()
image.save(frame_stream, 'JPEG')
frame_bytes = frame_stream.getvalue()
logger.info("✅ 成功提取第一帧图片。")
# 视频转 GIF (使用另一个临时文件来保存 GIF)
temp_gif_path = ""
with tempfile.NamedTemporaryFile(suffix=".gif", delete=False) as tmp_file:
temp_gif_path = tmp_file.name
target_fps = int(round(clip.fps)) if clip.fps else 24
clip.write_gif(temp_gif_path, fps=target_fps)
with open(temp_gif_path, 'rb') as f:
gif_bytes = f.read()
logger.info("✅ 成功生成 GIF。")
# 返回结果 (例如: 上传到 MinIO)
# return mp4_bytes, gif_bytes, frame_bytes
# -----------------------------------------------
except Exception as e:
logger.error(f"❌ 视频处理或文件操作失败: {e}")
# 在失败时,也尝试清理文件
finally:
# 3. 清理临时文件 (非常重要!)
if os.path.exists(temp_mp4_path):
os.remove(temp_mp4_path)
logger.info(f"🗑️ 已删除临时 MP4 文件: {temp_mp4_path}")
if 'temp_gif_path' in locals() and os.path.exists(temp_gif_path):
os.remove(temp_gif_path)
logger.info(f"🗑️ 已删除临时 GIF 文件: {temp_gif_path}")
# 3. 上传所有结果到 MinIO
try:
# 上传原始 MP4
self.upload_stream_to_minio(mp4_bytes, MP4_OBJECT, "video/mp4")
# 上传生成的 GIF
self.upload_stream_to_minio(gif_bytes, GIF_OBJECT, "image/gif")
# 上传第一帧图片
self.upload_stream_to_minio(frame_bytes, FRAME_OBJECT, "image/jpeg")
self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'SUCCESS', 'message': "success", 'gif_url': f'aida-users/{GIF_OBJECT}', 'video_url': f'aida-users/{MP4_OBJECT}', 'image_url': f'aida-users/{FRAME_OBJECT}'}
# 推送消息
if not settings.DEBUG:
publish_status(json.dumps(self.pose_transform_data), PS_RABBITMQ_QUEUES)
logger.info(
f" [x] Sent to {PS_RABBITMQ_QUEUES} data@@@@ {json.dumps(self.pose_transform_data, indent=4)}")
return "\n🎉 所有任务完成!"
except Exception as e:
logger.error(e)
return None
# --- 辅助函数:提交任务到队列 ---
@staticmethod
def queue_prompt(prompt, client_id):
"""向 ComfyUI 提交工作流提示。"""
p = {"prompt": prompt, "client_id": client_id, "prompt_id": client_id}
data = json.dumps(p).encode('utf-8')
# 提交任务到 /prompt 端点
response = requests.post(f"http://{settings.COMFYUI_SERVER_ADDRESS}/prompt", data=data)
# print(f"-------------{response.text}")
# print(f"------------{client_id}")
if response.status_code == 200:
return response.json()
else:
logger.warning(f"提交任务失败,状态码: {response.status_code}")
logger.warning(response.text)
return None
@staticmethod
def poll_history(prompt_id, interval_seconds=5):
"""步骤 2: 轮询 /history/{prompt_id} 检查任务是否完成"""
url = f"http://{settings.COMFYUI_SERVER_ADDRESS}/history/{prompt_id}"
logger.info(f"⏳ 开始轮询状态 (间隔 {interval_seconds} 秒)...")
while True:
time.sleep(interval_seconds)
try:
response = requests.get(url)
# 任务未完成时ComfyUI可能会返回404或空响应我们只关注成功响应
if response.status_code == 200:
history_data = response.json()
# ComfyUI 返回的历史记录结构是 {prompt_id: {outputs: ...}}
if prompt_id in history_data:
logger.info("🎉 任务已完成!")
return history_data[prompt_id]['outputs']
logger.info("⏳ 任务仍在执行或等待中...")
except requests.exceptions.RequestException as e:
# 处理可能的连接错误,但通常不会在内部轮询中发生
logger.info(f"⚠️ 轮询时发生错误: {e}")
pass
@staticmethod
def get_comfyui_video_bytes(filename: str, subfolder: str, file_type: str = "output"):
"""
ComfyUI /view 端点获取视频文件的二进制数据
参数:
- filename: 视频文件名 (例如: 'ComfyUI_00002_.mp4')
- subfolder: 存储子文件夹 (例如: 'ComfyUI_2025-10-31')
- file_type: 文件类型 (通常是 'output')
返回:
- 视频文件的二进制内容 (bytes) None
"""
url = f"http://{settings.COMFYUI_SERVER_ADDRESS}/view"
params = {
"filename": filename,
"subfolder": subfolder,
"type": file_type
}
logger.info(f"📡 正在从 ComfyUI 下载视频: {filename}")
try:
# 使用 requests.get 下载文件
response = requests.get(url, params=params, stream=True)
response.raise_for_status() # 检查 HTTP 错误
# 返回文件的完整二进制内容
return response.content
except requests.exceptions.RequestException as e:
logger.error(f"❌ 从 ComfyUI 获取视频失败: {e}")
return None
def upload_stream_to_minio(self, video_bytes: bytes, object_name: str, content_type: str):
"""从内存流上传数据到 MinIO。"""
logger.info(f"☁️ 正在上传对象到 MinIO: {object_name}")
try:
data_stream = io.BytesIO(video_bytes)
result = self.minio_client.put_object(
bucket_name='aida-users',
object_name=object_name,
data=data_stream,
length=len(video_bytes),
content_type=content_type
)
logger.info(f"✅ MinIO 上传成功: {result.object_name}")
return True
except S3Error as e:
logger.error(f"❌ MinIO 上传失败: {e}")
return False
if __name__ == '__main__':
request_data = ComfyuiFLF2VModel(
tasks_id="202511051619-89111",
start_image_url="test/start.png",
end_image_url="test/end.png",
prompt="Model executing a series of poses, dynamic camera movement alternating between detailed close-ups and full shots."
)
server = ComfyUIServerFLF2V(request_data)
print(server.get_result())