diff --git a/app/server/ChatbotAgent/agent_server.py b/app/server/ChatbotAgent/agent_server.py index fabd136..b98962f 100644 --- a/app/server/ChatbotAgent/agent_server.py +++ b/app/server/ChatbotAgent/agent_server.py @@ -195,53 +195,45 @@ class LCAgent(ls.LitAPI): task_map = {} stylist_agent_kwages = self.stylist_agent_kwages.copy() + tasks_mapping = {} if num_outfits == 1: - logger.info(f"fast request outfit_id is : {outfit_ids[0]}") + tasks_mapping[outfit_ids[0]] = "fast" + else: + for i in range(num_outfits): + if i == 0: + tasks_mapping[outfit_ids[i]] = "fast" + else: + tasks_mapping[outfit_ids[i]] = "slow" + + for k, v in tasks_mapping.items(): + logger.info(f"fast request outfit_id is : {k}") # 通过请求数量判断 num == 1 单个outfit刷新 - stylist_agent_kwages['outfit_id'] = outfit_ids[0] + stylist_agent_kwages['outfit_id'] = k stylist_agent_kwages['stylist_name'] = stylist_name stylist_agent_kwages['gender'] = gender + stylist_agent_kwages['callback_url'] = callback_url + agent = AsyncStylistAgent(**stylist_agent_kwages) - task = agent.run_quick_batch_styling( - request_summary=request_summary, - occasions=occasions, - start_outfit=start_outfit, - batch_sources=batch_sources, - user_id=user_id, - callback_url=callback_url, - ) + if v == "fast": + task = agent.run_quick_batch_styling( + request_summary=request_summary, + occasions=occasions, + start_outfit=start_outfit, + batch_sources=batch_sources, + user_id=user_id, + callback_url=callback_url, + ) + else: + task = agent.run_iterative_styling( + request_summary=request_summary, + occasions=occasions, + start_outfit=start_outfit, + batch_sources=batch_sources, + user_id=user_id, + callback_url=callback_url, + ) tasks.append(task) - task_map[task] = {"outfit_id": outfit_ids[0], "retries": 0} - elif num_outfits > 1: - # 通过请求数量判断 num > 1 四套搭配推荐 (1快 , num-1慢) - for i in range(num_outfits): - stylist_agent_kwages['outfit_id'] = outfit_ids[i] - stylist_agent_kwages['stylist_name'] = stylist_name - stylist_agent_kwages['gender'] = gender - agent = AsyncStylistAgent(**stylist_agent_kwages) - if i == 0: - # 第一套搭配使用快速方法 一次跑出所有单品 - logger.info(f"fast request outfit_id is : {outfit_ids[i]}") - task = agent.run_quick_batch_styling( - request_summary=request_summary, - occasions=occasions, - start_outfit=start_outfit, - batch_sources=batch_sources, - user_id=user_id, - callback_url=callback_url, - ) - else: - # 后续 - task = agent.run_iterative_styling( - request_summary=request_summary, - occasions=occasions, - start_outfit=start_outfit, - batch_sources=batch_sources, - user_id=user_id, - callback_url=callback_url, - ) - tasks.append(task) - task_map[task] = {"outfit_id": outfit_ids[i], "retries": 0} + task_map[task] = {"outfit_id": k, "retries": 0} logger.info(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---") # 2. 任务执行与重试循环 @@ -252,7 +244,8 @@ class LCAgent(ls.LitAPI): retry_limit = 1 # 允许重试一次 while tasks_to_run: try: - results = await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*tasks_to_run, return_exceptions=True) + next_tasks_to_run = [] for task, result in zip(tasks_to_run, results): task_info = task_map[task] @@ -279,15 +272,27 @@ class LCAgent(ls.LitAPI): stylist_agent_kwages['outfit_id'] = outfit_id stylist_agent_kwages['stylist_name'] = stylist_name stylist_agent_kwages['gender'] = gender + stylist_agent_kwages['callback_url'] = callback_url + agent = AsyncStylistAgent(**stylist_agent_kwages) - new_task = agent.run_quick_batch_styling( - request_summary=request_summary, - occasions=occasions, - start_outfit=start_outfit, - batch_sources=batch_sources, - user_id=user_id, - callback_url=callback_url - ) + if tasks_mapping[outfit_id] == "fast": + new_task = agent.run_quick_batch_styling( + request_summary=request_summary, + occasions=occasions, + start_outfit=start_outfit, + batch_sources=batch_sources, + user_id=user_id, + callback_url=callback_url, + ) + else: + new_task = agent.run_iterative_styling( + request_summary=request_summary, + occasions=occasions, + start_outfit=start_outfit, + batch_sources=batch_sources, + user_id=user_id, + callback_url=callback_url + ) # 将新任务添加到下一轮运行列表,并更新任务映射 next_tasks_to_run.append(new_task) task_map[new_task] = task_info # 新任务继承旧任务的重试信息 @@ -298,11 +303,12 @@ class LCAgent(ls.LitAPI): # 达到重试上限,最终失败 , 并通知前端 object_data = { 'outfit_id': outfit_id, - "status": "failed", + "status": "retry_failed", "path": "", } - post_request(url=f'{callback_url}/api/style/callback', data=json.dumps(object_data)) + response = post_request(url=f'{callback_url}/api/style/callback', data=json.dumps(object_data)) failed_outfits.append(f"Outfit {outfit_id} ultimately failed after {retry_limit} retries: {result}") + logger.info(f"request data :{json.dumps(object_data, ensure_ascii=False, indent=2)} | JAVA callback info -> status:{response.status_code} | message:{response.text}") del task_map[task] else: diff --git a/app/server/ChatbotAgent/core/stylist_agent_server.py b/app/server/ChatbotAgent/core/stylist_agent_server.py index 84e0fe9..6a5afac 100644 --- a/app/server/ChatbotAgent/core/stylist_agent_server.py +++ b/app/server/ChatbotAgent/core/stylist_agent_server.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) class AsyncStylistAgent: - def __init__(self, local_db: str, gemini_model_name: str, outfit_id: str, stylist_name: str, gender: str): + def __init__(self, local_db: str, gemini_model_name: str, outfit_id: str, stylist_name: str, gender: str, callback_url: str): # self.outfit_items: List[Dict[str, str]] = [] self.outfit_id = outfit_id self.stylist_name = stylist_name @@ -52,6 +52,7 @@ class AsyncStylistAgent: ) self.gcs_bucket = "lc_stylist_agent_outfit_items" self.minio_bucket = "lanecarford" + self.callback_url = f'{callback_url}/api/style/callback' def _load_style_guide(self, stylist_name: str): """加载 markdown 风格指南内容。""" @@ -182,8 +183,13 @@ class AsyncStylistAgent: results = [] if not results: - print(f"数据库中未找到符合 '{category}' 和描述的单品。") - return None + self.post_operation( + status="failed", + message=f"数据库中未找到符合 '{category}' 和描述的单品。", + callback_url=self.callback_url, + img_path="", + ) + raise Exception(f"数据库中未找到符合 '{category}' 和描述的单品。") # 3. 模拟 Agent 审核(实际应用中,你需要将图片发回给 Agent进行审核) best_meta = results[0] # 第一个 batch 的第一个 metadata @@ -295,14 +301,13 @@ class AsyncStylistAgent: gemini_data = self._parse_gemini_response(gemini_response_text) if not gemini_data: - print("Agent 返回无效响应,终止流程。") self.post_operation( status="failed", message="Agent returned invalid response, terminating process.", callback_url=url, img_path=merged_image_path, ) - break + raise Exception("Agent 返回无效响应,终止流程。") # 处理推荐单品 if gemini_data.get('action') == 'recommend_item': @@ -385,13 +390,14 @@ class AsyncStylistAgent: failed_found_item_count = 0 if not recommended_items or not isinstance(recommended_items, List): - print("No recommended item from Gemini, terminating process.") self.post_operation( status="failed", message="Agent returned invalid response, terminating process.", callback_url=url, img_path=merged_image_path ) + raise Exception("No recommended item from Gemini, terminating process.") + else: for idx, rec_item in enumerate(recommended_items): subcategory = rec_item.get('subcategory') @@ -428,7 +434,7 @@ class AsyncStylistAgent: callback_url=url, img_path=merged_image_path ) - print(f"There are {failed_found_item_count} items (total {len(recommended_items)}) are not found in the database") + raise Exception(f"There are {failed_found_item_count} items (total {len(recommended_items)}) are not found in the database") return reason async def run_iterative_styling(self, request_summary, occasions, start_outfit: Optional[List] = None, batch_sources: List = [], user_id="test", callback_url=""):