From a9d9bdcb7137649bfee5d290a1a480521f1c67e2 Mon Sep 17 00:00:00 2001 From: zhh Date: Mon, 3 Nov 2025 16:37:41 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88=E6=96=B0=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=89:=20=20=E6=96=B0=E5=A2=9Ewan2.2=20pose-transform?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E6=8E=A5=E5=8F=A3=EF=BC=8Ccomfyui-api?= =?UTF-8?q?=E5=BD=A2=E5=BC=8F=20fix=EF=BC=88=E4=BF=AE=E5=A4=8Dbug=EF=BC=89?= =?UTF-8?q?:=20docs=EF=BC=88=E6=96=87=E6=A1=A3=E5=8F=98=E6=9B=B4=EF=BC=89:?= =?UTF-8?q?=20refactor=EF=BC=88=E9=87=8D=E6=9E=84=EF=BC=89:=20test(?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95):?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/service/comfyui_I2V/server.py | 134 +++++++++++++++--------------- 1 file changed, 65 insertions(+), 69 deletions(-) diff --git a/app/service/comfyui_I2V/server.py b/app/service/comfyui_I2V/server.py index 1ea7e29..7f98038 100644 --- a/app/service/comfyui_I2V/server.py +++ b/app/service/comfyui_I2V/server.py @@ -15,6 +15,7 @@ from moviepy.video.io.VideoFileClip import VideoFileClip from app.core.config import REDIS_HOST, REDIS_PORT, REDIS_DB, MINIO_URL, MINIO_ACCESS, MINIO_SECRET, MINIO_SECURE, COMFYUI_SERVER_ADDRESS, PS_RABBITMQ_QUEUES, DEBUG from app.schemas.pose_transform import PoseTransformModel +from app.service.generate_image.utils.mq import publish_status logger = logging.getLogger() @@ -331,15 +332,13 @@ class ComfyUIServerI2V: self.tasks_id = request_data.tasks_id self.user_id = self.tasks_id[self.tasks_id.rfind('-') + 1:] self.redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True) - self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'gif_url': '', - 'video_url': '', 'image_url': ''} + self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'gif_url': '', 'video_url': '', 'image_url': ''} self.redis_client.set(self.tasks_id, json.dumps(self.pose_transform_data)) self.redis_client.expire(self.tasks_id, 600) self.minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE) def get_result(self): - logger.info("11111111111111") - workflow_json['174']['inputs']['file'] = pose_video_map[self.pose_num ] + workflow_json['174']['inputs']['file'] = pose_video_map[self.pose_num] workflow_json['169']['inputs']['noise_seed'] = random.randint(0, 10 ** 18) # 下载图片 上传 comfyui server @@ -353,7 +352,7 @@ class ComfyUIServerI2V: return prompt_id = prompt_response.get("prompt_id") - print(f" 任务已提交,Prompt ID: {prompt_id}") + logger.info(f" 任务已提交,Prompt ID: {prompt_id}") outputs = self.poll_history(prompt_id) file_list = {} @@ -370,7 +369,7 @@ class ComfyUIServerI2V: 'subfolder': file_info['subfolder'], 'type': file_info['type'] } - print(file_list) + logger.info(file_list) return self.process_and_upload_comfyui_video(filename=file_list['filename'], subfolder=file_list['subfolder'], prompt_id=prompt_response['prompt_id']), prompt_id def read_tasks_status(self): @@ -380,7 +379,7 @@ class ComfyUIServerI2V: def download_from_minio_in_memory(self): bucket = self.image_url.split('/')[0] object_name = self.image_url[self.image_url.find('/') + 1:] - print("🚀 正在连接 MinIO 客户端...") + # print("🚀 正在连接 MinIO 客户端...") try: # get_object 返回一个 ResponseStream 对象 @@ -397,14 +396,14 @@ class ComfyUIServerI2V: in_memory_file = io.BytesIO(image_bytes) - print(f"✅ 图片已下载到内存 ({len(image_bytes)} 字节)。") + # print(f"✅ 图片已下载到内存 ({len(image_bytes)} 字节)。") return in_memory_file, object_name.rsplit('/')[-1] except S3Error as e: - print(f"❌ MinIO S3 错误 (例如,对象不存在): {e}") + logger.error(f"❌ MinIO S3 错误 (例如,对象不存在): {e}") return None, None except Exception as e: - print(f"❌ MinIO 下载过程中发生未知错误: {e}") + logger.error(f"❌ MinIO 下载过程中发生未知错误: {e}") return None, None def upload_video_to_minio(self, BUCKET_NAME, OBJECT_NAME, LOCAL_FILE_PATH): @@ -419,16 +418,16 @@ class ComfyUIServerI2V: content_type="video/mp4" # 设置正确的内容类型 ) - print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:") - print(f" 对象名: {result.object_name}") - print(f" Etag: {result.etag}") + # print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:") + # print(f" 对象名: {result.object_name}") + # print(f" Etag: {result.etag}") except S3Error as e: - print(f"❌ MinIO 操作失败: {e}") + logger.error(f"❌ MinIO 操作失败: {e}") except FileNotFoundError: - print(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}") + logger.error(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}") except Exception as e: - print(f"❌ 发生未知错误: {e}") + logger.error(f"❌ 发生未知错误: {e}") def upload_gif_to_minio(self, BUCKET_NAME, OBJECT_NAME, LOCAL_FILE_PATH): """使用 fput_object 从本地路径上传 MP4 文件""" @@ -442,16 +441,16 @@ class ComfyUIServerI2V: content_type="video/mp4" # 设置正确的内容类型 ) - print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:") - print(f" 对象名: {result.object_name}") - print(f" Etag: {result.etag}") + # print(f"✅ 文件 '{LOCAL_FILE_PATH}' 已成功上传至 MinIO:") + # print(f" 对象名: {result.object_name}") + # print(f" Etag: {result.etag}") except S3Error as e: - print(f"❌ MinIO 操作失败: {e}") + logger.error(f"❌ MinIO 操作失败: {e}") except FileNotFoundError: - print(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}") + logger.error(f"❌ 找不到本地文件: {LOCAL_FILE_PATH}") except Exception as e: - print(f"❌ 发生未知错误: {e}") + logger.error(f"❌ 发生未知错误: {e}") def upload_in_memory_file_to_comfyui(self, in_memory_file, filename): upload_url = f"http://{COMFYUI_SERVER_ADDRESS}/upload/image" @@ -469,7 +468,7 @@ class ComfyUIServerI2V: 'image': (filename, in_memory_file, mime_type) } - print(f"⬆️ 正在上传图片 ({filename}) 到 ComfyUI...") + # print(f"⬆️ 正在上传图片 ({filename}) 到 ComfyUI...") try: comfyui_response = requests.post(upload_url, data=data, files=files) comfyui_response.raise_for_status() @@ -477,20 +476,15 @@ class ComfyUIServerI2V: result = comfyui_response.json() uploaded_name = result.get('name') - print(f"🎉 ComfyUI 上传成功! 服务器文件名: {uploaded_name}") + # print(f"🎉 ComfyUI 上传成功! 服务器文件名: {uploaded_name}") return uploaded_name except requests.exceptions.RequestException as e: - print(f"❌ ComfyUI 上传失败: {e}") - print(f" 响应内容: {comfyui_response.text}") + logger.error(f"❌ ComfyUI 上传失败: {e}") + logger.error(f" 响应内容: {comfyui_response.text}") return None - def process_and_upload_comfyui_video( - self, - filename: str, - subfolder: str, - prompt_id: str, - ): + def process_and_upload_comfyui_video(self, filename: str, subfolder: str, prompt_id: str, ): """ 完整的自动化流程:获取 ComfyUI 视频 -> 转换 GIF 并提取帧 -> 上传所有结果到 MinIO。 """ @@ -521,7 +515,7 @@ class ComfyUIServerI2V: tmp_file.write(mp4_bytes) # 将内存数据写入磁盘 temp_mp4_path = tmp_file.name # 记录文件路径 - print(f"临时文件已写入: {temp_mp4_path}") + # print(f"临时文件已写入: {temp_mp4_path}") # 2. 使用 moviepy 打开临时文件 (传入文件路径字符串) clip = VideoFileClip(temp_mp4_path) @@ -536,7 +530,7 @@ class ComfyUIServerI2V: image.save(frame_stream, 'JPEG') frame_bytes = frame_stream.getvalue() - print("✅ 成功提取第一帧图片。") + logger.info("✅ 成功提取第一帧图片。") # 视频转 GIF (使用另一个临时文件来保存 GIF) temp_gif_path = "" @@ -549,7 +543,7 @@ class ComfyUIServerI2V: with open(temp_gif_path, 'rb') as f: gif_bytes = f.read() - print("✅ 成功生成 GIF。") + logger.info("✅ 成功生成 GIF。") # 返回结果 (例如: 上传到 MinIO) # return mp4_bytes, gif_bytes, frame_bytes @@ -557,31 +551,44 @@ class ComfyUIServerI2V: # ----------------------------------------------- except Exception as e: - print(f"❌ 视频处理或文件操作失败: {e}") + logger.error(f"❌ 视频处理或文件操作失败: {e}") # 在失败时,也尝试清理文件 finally: # 3. 清理临时文件 (非常重要!) if os.path.exists(temp_mp4_path): os.remove(temp_mp4_path) - print(f"🗑️ 已删除临时 MP4 文件: {temp_mp4_path}") + logger.info(f"🗑️ 已删除临时 MP4 文件: {temp_mp4_path}") if 'temp_gif_path' in locals() and os.path.exists(temp_gif_path): os.remove(temp_gif_path) - print(f"🗑️ 已删除临时 GIF 文件: {temp_gif_path}") + logger.info(f"🗑️ 已删除临时 GIF 文件: {temp_gif_path}") # 3. 上传所有结果到 MinIO - # 上传原始 MP4 - self.upload_stream_to_minio(mp4_bytes, MP4_OBJECT, "video/mp4") + try: + # 上传原始 MP4 + self.upload_stream_to_minio(mp4_bytes, MP4_OBJECT, "video/mp4") - # 上传生成的 GIF - self.upload_stream_to_minio(gif_bytes, GIF_OBJECT, "image/gif") + # 上传生成的 GIF + self.upload_stream_to_minio(gif_bytes, GIF_OBJECT, "image/gif") - # 上传第一帧图片 - self.upload_stream_to_minio(frame_bytes, FRAME_OBJECT, "image/jpeg") + # 上传第一帧图片 + self.upload_stream_to_minio(frame_bytes, FRAME_OBJECT, "image/jpeg") - return "\n🎉 所有任务完成!" + self.pose_transform_data = {'tasks_id': self.tasks_id, 'status': 'SUCCESS', 'message': "success", 'gif_url': f'aida-users/{GIF_OBJECT}', 'video_url': f'aida-users/{MP4_OBJECT}', 'image_url': f'aida-users/{FRAME_OBJECT}'} + + # 推送消息 + # if not DEBUG: + publish_status(json.dumps(self.pose_transform_data), PS_RABBITMQ_QUEUES) + logger.info( + f" [x] Sent to: {PS_RABBITMQ_QUEUES} data:@@@@ {json.dumps(self.pose_transform_data, indent=4)}") + + return "\n🎉 所有任务完成!" + + except Exception as e: + logger.error(e) + return None # --- 辅助函数:提交任务到队列 --- def queue_prompt(self, prompt, client_id): @@ -591,32 +598,21 @@ class ComfyUIServerI2V: # 提交任务到 /prompt 端点 response = requests.post(f"http://{COMFYUI_SERVER_ADDRESS}/prompt", data=data) - print(f"-------------{response.text}") - print(f"------------{client_id}") + # print(f"-------------{response.text}") + # print(f"------------{client_id}") if response.status_code == 200: return response.json() else: - print(f"提交任务失败,状态码: {response.status_code}") - print(response.text) - return None - - def get_history(self, prompt_id): - """通过 prompt_id 获取任务的历史记录和输出。""" - # 查询 /history/{prompt_id} 端点 - response = requests.get(f"http://{COMFYUI_SERVER_ADDRESS}/history/{prompt_id}") - - if response.status_code == 200: - return response.json() - else: - print(f"获取历史记录失败,状态码: {response.status_code}") + logger.warning(f"提交任务失败,状态码: {response.status_code}") + logger.warning(response.text) return None def poll_history(self, prompt_id, interval_seconds=5): """步骤 2: 轮询 /history/{prompt_id} 检查任务是否完成""" url = f"http://{COMFYUI_SERVER_ADDRESS}/history/{prompt_id}" - print(f"⏳ 开始轮询状态 (间隔 {interval_seconds} 秒)...") + logger.info(f"⏳ 开始轮询状态 (间隔 {interval_seconds} 秒)...") while True: time.sleep(interval_seconds) @@ -629,14 +625,14 @@ class ComfyUIServerI2V: # ComfyUI 返回的历史记录结构是 {prompt_id: {outputs: ...}} if prompt_id in history_data: - print("🎉 任务已完成!") + logger.info("🎉 任务已完成!") return history_data[prompt_id]['outputs'] - print("⏳ 任务仍在执行或等待中...") + logger.info("⏳ 任务仍在执行或等待中...") except requests.exceptions.RequestException as e: # 处理可能的连接错误,但通常不会在内部轮询中发生 - print(f"⚠️ 轮询时发生错误: {e}") + logger.info(f"⚠️ 轮询时发生错误: {e}") pass def get_comfyui_video_bytes(self, filename: str, subfolder: str, file_type: str = "output"): @@ -658,7 +654,7 @@ class ComfyUIServerI2V: "type": file_type } - print(f"📡 正在从 ComfyUI 下载视频: {filename}") + logger.info(f"📡 正在从 ComfyUI 下载视频: {filename}") try: # 使用 requests.get 下载文件 response = requests.get(url, params=params, stream=True) @@ -668,12 +664,12 @@ class ComfyUIServerI2V: return response.content except requests.exceptions.RequestException as e: - print(f"❌ 从 ComfyUI 获取视频失败: {e}") + logger.error(f"❌ 从 ComfyUI 获取视频失败: {e}") return None def upload_stream_to_minio(self, video_bytes: bytes, object_name: str, content_type: str): """从内存流上传数据到 MinIO。""" - print(f"☁️ 正在上传对象到 MinIO: {object_name}") + logger.info(f"☁️ 正在上传对象到 MinIO: {object_name}") try: data_stream = io.BytesIO(video_bytes) @@ -685,10 +681,10 @@ class ComfyUIServerI2V: length=len(video_bytes), content_type=content_type ) - print(f"✅ MinIO 上传成功: {result.object_name}") + logger.info(f"✅ MinIO 上传成功: {result.object_name}") return True except S3Error as e: - print(f"❌ MinIO 上传失败: {e}") + logger.error(f"❌ MinIO 上传失败: {e}") return False