diff --git a/app/server/ChatbotAgent/agent_server.py b/app/server/ChatbotAgent/agent_server.py index c5e7042..14fe18a 100644 --- a/app/server/ChatbotAgent/agent_server.py +++ b/app/server/ChatbotAgent/agent_server.py @@ -20,6 +20,7 @@ class AgentRequestModel(BaseModel): num_outfits: int stylist_path: str callback_url: str + gender: str class LCAgent(ls.LitAPI): @@ -78,13 +79,13 @@ class LCAgent(ls.LitAPI): start_outfit=[], num_outfits=request.num_outfits, user_id=request.user_id, + gender=request.gender, callback_url=request.callback_url) - logger.info("\n--- Final Recommendation Results ---") for i, path in enumerate(recommendation_results.get("successful_outfits", [])): logger.info(f"✅ Outfit {i + 1} saved to: {path}") - for error in recommendation_results.get("failed_outfits", []): - logger.error(f"❌ {error}") + for failed in recommendation_results.get("failed_outfits", []): + logger.error(f"❌ {failed}") async def get_conversation_summary(self, user_id: str) -> str: """ @@ -100,7 +101,7 @@ class LCAgent(ls.LitAPI): return summary async def recommend_outfit(self, request_summary: str, stylist_name: str, start_outfit=None, num_outfits: int = 1, - user_id: str = "test", callback_url: str = None): + user_id: str = "test", gender: str = "male", callback_url: str = None): """ 基于用户的对话历史和需求,推荐一套搭配。 @@ -111,6 +112,7 @@ class LCAgent(ls.LitAPI): if start_outfit is None: start_outfit = [] tasks = [] + task_map = {} for i in range(num_outfits): self.stylist_agent_kwages['outfit_id'] = self.outfit_ids[i] agent = AsyncStylistAgent(**self.stylist_agent_kwages) @@ -119,29 +121,81 @@ class LCAgent(ls.LitAPI): stylist_path=stylist_name, start_outfit=start_outfit, user_id=user_id, - callback_url=callback_url + callback_url=callback_url, + gender=gender, ) tasks.append(task) - print(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---") + task_map[task] = {"outfit_id": self.outfit_ids[i], "retries": 0} + logger.info(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---") - try: - results = await asyncio.gather(*tasks, return_exceptions=True) + # 2. 任务执行与重试循环 + tasks_to_run = tasks - successful_outfits = [] - failed_outfits = [] - for result in results: - if isinstance(result, Exception): - # 任务执行中发生异常 - failed_outfits.append(f"Failed: {result}") - else: - # 任务成功,result 是 run_styling_process 返回的图片路径 - successful_outfits.append(result) + successful_outfits = [] + failed_outfits = [] + retry_limit = 1 # 允许重试一次 + while tasks_to_run: + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + next_tasks_to_run = [] + for task, result in zip(tasks_to_run, results): + task_info = task_map[task] + outfit_id = task_info["outfit_id"] - return { - "successful_outfits": successful_outfits, - "failed_outfits": failed_outfits - } + if isinstance(result, Exception): + # 任务执行中发生异常 + current_retries = task_info["retries"] + logger.error(f"Outfit {outfit_id} failed with error: {result}. Current retries: {current_retries}.") - except Exception as e: - print(f"An unexpected error occurred during concurrent recommendation: {e}") - return {"error": str(e)} + if current_retries < retry_limit: + # 尚未达到重试上限,准备重试 + task_info["retries"] += 1 + logger.info(f"--- Retrying outfit {outfit_id} (Attempt {current_retries + 1}/{retry_limit}). ---") + + # 重新创建任务 (可能需要短暂延迟,例如 time.sleep(1),但在此异步环境中,我们会通过重新创建 agent/task 来实现) + self.stylist_agent_kwages['outfit_id'] = outfit_id + agent = AsyncStylistAgent(**self.stylist_agent_kwages) + new_task = agent.run_styling_process( + request_summary=request_summary, + stylist_path=stylist_name, + start_outfit=start_outfit, + user_id=user_id, + callback_url=callback_url + ) + # 将新任务添加到下一轮运行列表,并更新任务映射 + next_tasks_to_run.append(new_task) + task_map[new_task] = task_info # 新任务继承旧任务的重试信息 + + # 清理旧任务(可选,但推荐,以防内存泄漏或混淆) + del task_map[task] + else: + # 达到重试上限,最终失败 + failed_outfits.append(f"Outfit {outfit_id} ultimately failed after {retry_limit} retries: {result}") + del task_map[task] + + else: + # 任务成功 + successful_outfits.append(result) + del task_map[task] + + # 更新任务列表,进入下一轮循环 + tasks_to_run = next_tasks_to_run + + # 如果有任务需要重试,可以在这里添加一个短暂的等待,避免瞬时重试造成服务器压力 + if tasks_to_run: + logger.info(f"--- Starting {len(tasks_to_run)} retry tasks. ---") + except Exception as e: + # 外部异常 (例如 asyncio.gather 自身的创建或调度问题,或更低级别的系统错误) + print(f"An unexpected error occurred during concurrent recommendation or retry: {e}") + # 返回当前已成功和已失败的列表,并标记外部错误 + return { + "successful_outfits": successful_outfits, + "failed_outfits": failed_outfits, + "error": str(e) + } + + return { + "successful_outfits": successful_outfits, + "failed_outfits": failed_outfits, + "error": "" + } \ No newline at end of file diff --git a/app/server/ChatbotAgent/core/stylist_agent_server.py b/app/server/ChatbotAgent/core/stylist_agent_server.py index baaf1c2..8485394 100644 --- a/app/server/ChatbotAgent/core/stylist_agent_server.py +++ b/app/server/ChatbotAgent/core/stylist_agent_server.py @@ -34,7 +34,6 @@ class AsyncStylistAgent: # 存储桶配置 try: - # TODO 目前写死路径 生产环境切换路径 self.credentials = service_account.Credentials.from_service_account_file(os.getenv("GOOGLE_APPLICATION_CREDENTIALS")) except Exception as e: # 这里的异常处理应根据实际情况调整 @@ -71,11 +70,18 @@ class AsyncStylistAgent: except Exception as e: raise Exception(f"Failed to load style guide from {path}: {e}") - def _build_system_prompt(self, request_summary: str = "") -> str: + def _build_system_prompt(self, request_summary: str = "", gender: str = "male") -> str: """Constructs the complete System Prompt.""" + + clothing_gender = "women's clothing" + if gender == "male": + clothing_gender = "men's clothing" + elif gender == "female": + clothing_gender = "women's clothing" + # Insert the style_guide content into the template template = f""" - You are a professional fashion stylist Agent, specialized in creating complete outfits for the user. + You are a professional fashion stylist Agent, specialized in creating complete, tailored outfits exclusively for {clothing_gender}. Your task is to **create a cohesive and complete outfit**, strictly adhering to **BOTH** the user's explicit **Request Summary** and the **Outfit Style Guide**. You must decide the next logical item to add to the outfit based on the currently selected items (if any). @@ -258,7 +264,7 @@ class AsyncStylistAgent: context += "\nPlease recommend the next single item based on the selected items, user's request, and style guide." return context - async def run_styling_process(self, request_summary, stylist_path, start_outfit=None, user_id="test", callback_url=""): + async def run_styling_process(self, request_summary, stylist_path, start_outfit=None, user_id="test", callback_url="", gender: str = "male"): if start_outfit is None: start_outfit = [] self.outfit_items = start_outfit if start_outfit else [] @@ -266,7 +272,7 @@ class AsyncStylistAgent: print(f"--- Starting Agent (Outfit ID: {self.outfit_id}) ---") self.style_guide = self._load_style_guide(stylist_path) - self.system_prompt = self._build_system_prompt(request_summary) + self.system_prompt = self._build_system_prompt(request_summary, gender) response_data = {"status": "", "message": "", "path": "", @@ -287,6 +293,7 @@ class AsyncStylistAgent: if item_id: response_data['items'].append({"item_id": item_id, "category": item_category}) if not gemini_data: + # if gemini_data: print("🚨 Agent 返回无效响应,终止流程。") self.stop_reason = "Agent failed to return response" response_data['status'] = "failed" @@ -299,6 +306,7 @@ class AsyncStylistAgent: self.stop_reason = "Finish reason: " + gemini_data.get('reason', 'No reason provided') response_data['status'] = "stop" response_data['message'] = self.stop_reason + break # 4. 处理推荐单品 if gemini_data.get('action') == 'recommend_item': @@ -355,6 +363,7 @@ class AsyncStylistAgent: self.stop_reason = "Finish reason: Reached max outfit length." response_data['status'] = "stop" response_data['message'] = self.stop_reason + break logger.info(f"request data :{response_data}") headers = { @@ -366,25 +375,9 @@ class AsyncStylistAgent: } url = f'{callback_url}/api/style/callback' response = post_request(url=url, data=json.dumps(response_data), headers=headers) - logger.info(response.text) + logger.info(f"JAVA callback info -> status:{response.status_code} | message:{response.text}") return response_data - # def _save_outfit_results(self, user_id): - # """保存最终的 JSON 列表和图片到指定文件夹。""" - # if not self.outfit_items: - # raise ValueError("No outfit items to save.") - # - # # 1. 保存 JSON 文件 - # results_list = [{'item_id': item['item_id'], 'category': item['category'], 'description': item['description'], 'gpt_description': item['gpt_description']} for item in self.outfit_items] - # results_list.append({'stop_reason': self.stop_reason}) - # - # return upload_json_to_minio_sync( - # minio_client=minio_client, - # bucket_name=f"lanecarford", - # object_name=f"lc_stylist_agent_outfit_items/{user_id}/{uuid.uuid4()}.json", - # data=results_list - # ) - def _upload_to_gcs(self, bucket_name: str, blob_name: str, mime_type, image_bytes) -> str: """同步方法:将文件上传到 GCS 并返回 GCS URI。""" bucket = self.gcs_client.bucket(bucket_name) @@ -396,40 +389,3 @@ class AsyncStylistAgent: gcs_uri = f"gs://{bucket_name}/{blob_name}" return gcs_uri - - async def recommend_outfit(self, request_summary: str, stylist_name: str, start_outfit: List[Dict[str, str]] = [], num_outfits: int = 1): - """ - 基于用户的对话历史和需求,推荐一套搭配。 - - Args: - request_summary: 用户的request - start_outfit: 可选的初始搭配列表,每个元素包含 'item_id' 和 'category'。 - """ - tasks = [] - for _ in range(num_outfits): - agent = AsyncStylistAgent(**self.stylist_agent_kwages) - task = agent.run_styling_process(request_summary, stylist_name, start_outfit) - tasks.append(task) - print(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---") - - try: - results = await asyncio.gather(*tasks, return_exceptions=True) - - successful_outfits = [] - failed_outfits = [] - for result in results: - if isinstance(result, Exception): - # 任务执行中发生异常 - failed_outfits.append(f"Failed: {result}") - else: - # 任务成功,result 是 run_styling_process 返回的图片路径 - successful_outfits.append(result) - - return { - "successful_outfits": successful_outfits, - "failed_outfits": failed_outfits - } - - except Exception as e: - print(f"An unexpected error occurred during concurrent recommendation: {e}") - return {"error": str(e)}