1.搭配超出最大值退出逻辑修复

2.新增异常重试机制
This commit is contained in:
zhh
2025-10-31 10:47:02 +08:00
parent 9cde3cb920
commit e195372bef
2 changed files with 93 additions and 83 deletions

View File

@@ -20,6 +20,7 @@ class AgentRequestModel(BaseModel):
num_outfits: int
stylist_path: str
callback_url: str
gender: str
class LCAgent(ls.LitAPI):
@@ -78,13 +79,13 @@ class LCAgent(ls.LitAPI):
start_outfit=[],
num_outfits=request.num_outfits,
user_id=request.user_id,
gender=request.gender,
callback_url=request.callback_url)
logger.info("\n--- Final Recommendation Results ---")
for i, path in enumerate(recommendation_results.get("successful_outfits", [])):
logger.info(f"✅ Outfit {i + 1} saved to: {path}")
for error in recommendation_results.get("failed_outfits", []):
logger.error(f"{error}")
for failed in recommendation_results.get("failed_outfits", []):
logger.error(f"{failed}")
async def get_conversation_summary(self, user_id: str) -> str:
"""
@@ -100,7 +101,7 @@ class LCAgent(ls.LitAPI):
return summary
async def recommend_outfit(self, request_summary: str, stylist_name: str, start_outfit=None, num_outfits: int = 1,
user_id: str = "test", callback_url: str = None):
user_id: str = "test", gender: str = "male", callback_url: str = None):
"""
基于用户的对话历史和需求,推荐一套搭配。
@@ -111,6 +112,7 @@ class LCAgent(ls.LitAPI):
if start_outfit is None:
start_outfit = []
tasks = []
task_map = {}
for i in range(num_outfits):
self.stylist_agent_kwages['outfit_id'] = self.outfit_ids[i]
agent = AsyncStylistAgent(**self.stylist_agent_kwages)
@@ -119,29 +121,81 @@ class LCAgent(ls.LitAPI):
stylist_path=stylist_name,
start_outfit=start_outfit,
user_id=user_id,
callback_url=callback_url
callback_url=callback_url,
gender=gender,
)
tasks.append(task)
print(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---")
task_map[task] = {"outfit_id": self.outfit_ids[i], "retries": 0}
logger.info(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---")
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
# 2. 任务执行与重试循环
tasks_to_run = tasks
successful_outfits = []
failed_outfits = []
for result in results:
retry_limit = 1 # 允许重试一次
while tasks_to_run:
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
next_tasks_to_run = []
for task, result in zip(tasks_to_run, results):
task_info = task_map[task]
outfit_id = task_info["outfit_id"]
if isinstance(result, Exception):
# 任务执行中发生异常
failed_outfits.append(f"Failed: {result}")
current_retries = task_info["retries"]
logger.error(f"Outfit {outfit_id} failed with error: {result}. Current retries: {current_retries}.")
if current_retries < retry_limit:
# 尚未达到重试上限,准备重试
task_info["retries"] += 1
logger.info(f"--- Retrying outfit {outfit_id} (Attempt {current_retries + 1}/{retry_limit}). ---")
# 重新创建任务 (可能需要短暂延迟,例如 time.sleep(1),但在此异步环境中,我们会通过重新创建 agent/task 来实现)
self.stylist_agent_kwages['outfit_id'] = outfit_id
agent = AsyncStylistAgent(**self.stylist_agent_kwages)
new_task = agent.run_styling_process(
request_summary=request_summary,
stylist_path=stylist_name,
start_outfit=start_outfit,
user_id=user_id,
callback_url=callback_url
)
# 将新任务添加到下一轮运行列表,并更新任务映射
next_tasks_to_run.append(new_task)
task_map[new_task] = task_info # 新任务继承旧任务的重试信息
# 清理旧任务(可选,但推荐,以防内存泄漏或混淆)
del task_map[task]
else:
# 任务成功result 是 run_styling_process 返回的图片路径
# 达到重试上限,最终失败
failed_outfits.append(f"Outfit {outfit_id} ultimately failed after {retry_limit} retries: {result}")
del task_map[task]
else:
# 任务成功
successful_outfits.append(result)
del task_map[task]
# 更新任务列表,进入下一轮循环
tasks_to_run = next_tasks_to_run
# 如果有任务需要重试,可以在这里添加一个短暂的等待,避免瞬时重试造成服务器压力
if tasks_to_run:
logger.info(f"--- Starting {len(tasks_to_run)} retry tasks. ---")
except Exception as e:
# 外部异常 (例如 asyncio.gather 自身的创建或调度问题,或更低级别的系统错误)
print(f"An unexpected error occurred during concurrent recommendation or retry: {e}")
# 返回当前已成功和已失败的列表,并标记外部错误
return {
"successful_outfits": successful_outfits,
"failed_outfits": failed_outfits,
"error": str(e)
}
return {
"successful_outfits": successful_outfits,
"failed_outfits": failed_outfits
"failed_outfits": failed_outfits,
"error": ""
}
except Exception as e:
print(f"An unexpected error occurred during concurrent recommendation: {e}")
return {"error": str(e)}

View File

@@ -34,7 +34,6 @@ class AsyncStylistAgent:
# 存储桶配置
try:
# TODO 目前写死路径 生产环境切换路径
self.credentials = service_account.Credentials.from_service_account_file(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
except Exception as e:
# 这里的异常处理应根据实际情况调整
@@ -71,11 +70,18 @@ class AsyncStylistAgent:
except Exception as e:
raise Exception(f"Failed to load style guide from {path}: {e}")
def _build_system_prompt(self, request_summary: str = "") -> str:
def _build_system_prompt(self, request_summary: str = "", gender: str = "male") -> str:
"""Constructs the complete System Prompt."""
clothing_gender = "women's clothing"
if gender == "male":
clothing_gender = "men's clothing"
elif gender == "female":
clothing_gender = "women's clothing"
# Insert the style_guide content into the template
template = f"""
You are a professional fashion stylist Agent, specialized in creating complete outfits for the user.
You are a professional fashion stylist Agent, specialized in creating complete, tailored outfits exclusively for {clothing_gender}.
Your task is to **create a cohesive and complete outfit**, strictly adhering to **BOTH** the user's explicit **Request Summary** and the **Outfit Style Guide**. You must decide the next logical item to add to the outfit based on the currently selected items (if any).
@@ -258,7 +264,7 @@ class AsyncStylistAgent:
context += "\nPlease recommend the next single item based on the selected items, user's request, and style guide."
return context
async def run_styling_process(self, request_summary, stylist_path, start_outfit=None, user_id="test", callback_url=""):
async def run_styling_process(self, request_summary, stylist_path, start_outfit=None, user_id="test", callback_url="", gender: str = "male"):
if start_outfit is None:
start_outfit = []
self.outfit_items = start_outfit if start_outfit else []
@@ -266,7 +272,7 @@ class AsyncStylistAgent:
print(f"--- Starting Agent (Outfit ID: {self.outfit_id}) ---")
self.style_guide = self._load_style_guide(stylist_path)
self.system_prompt = self._build_system_prompt(request_summary)
self.system_prompt = self._build_system_prompt(request_summary, gender)
response_data = {"status": "",
"message": "",
"path": "",
@@ -287,6 +293,7 @@ class AsyncStylistAgent:
if item_id:
response_data['items'].append({"item_id": item_id, "category": item_category})
if not gemini_data:
# if gemini_data:
print("🚨 Agent 返回无效响应,终止流程。")
self.stop_reason = "Agent failed to return response"
response_data['status'] = "failed"
@@ -299,6 +306,7 @@ class AsyncStylistAgent:
self.stop_reason = "Finish reason: " + gemini_data.get('reason', 'No reason provided')
response_data['status'] = "stop"
response_data['message'] = self.stop_reason
break
# 4. 处理推荐单品
if gemini_data.get('action') == 'recommend_item':
@@ -355,6 +363,7 @@ class AsyncStylistAgent:
self.stop_reason = "Finish reason: Reached max outfit length."
response_data['status'] = "stop"
response_data['message'] = self.stop_reason
break
logger.info(f"request data {response_data}")
headers = {
@@ -366,25 +375,9 @@ class AsyncStylistAgent:
}
url = f'{callback_url}/api/style/callback'
response = post_request(url=url, data=json.dumps(response_data), headers=headers)
logger.info(response.text)
logger.info(f"JAVA callback info -> status:{response.status_code} | message:{response.text}")
return response_data
# def _save_outfit_results(self, user_id):
# """保存最终的 JSON 列表和图片到指定文件夹。"""
# if not self.outfit_items:
# raise ValueError("No outfit items to save.")
#
# # 1. 保存 JSON 文件
# results_list = [{'item_id': item['item_id'], 'category': item['category'], 'description': item['description'], 'gpt_description': item['gpt_description']} for item in self.outfit_items]
# results_list.append({'stop_reason': self.stop_reason})
#
# return upload_json_to_minio_sync(
# minio_client=minio_client,
# bucket_name=f"lanecarford",
# object_name=f"lc_stylist_agent_outfit_items/{user_id}/{uuid.uuid4()}.json",
# data=results_list
# )
def _upload_to_gcs(self, bucket_name: str, blob_name: str, mime_type, image_bytes) -> str:
"""同步方法:将文件上传到 GCS 并返回 GCS URI。"""
bucket = self.gcs_client.bucket(bucket_name)
@@ -396,40 +389,3 @@ class AsyncStylistAgent:
gcs_uri = f"gs://{bucket_name}/{blob_name}"
return gcs_uri
async def recommend_outfit(self, request_summary: str, stylist_name: str, start_outfit: List[Dict[str, str]] = [], num_outfits: int = 1):
"""
基于用户的对话历史和需求,推荐一套搭配。
Args:
request_summary: 用户的request
start_outfit: 可选的初始搭配列表,每个元素包含 'item_id''category'
"""
tasks = []
for _ in range(num_outfits):
agent = AsyncStylistAgent(**self.stylist_agent_kwages)
task = agent.run_styling_process(request_summary, stylist_name, start_outfit)
tasks.append(task)
print(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---")
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
successful_outfits = []
failed_outfits = []
for result in results:
if isinstance(result, Exception):
# 任务执行中发生异常
failed_outfits.append(f"Failed: {result}")
else:
# 任务成功result 是 run_styling_process 返回的图片路径
successful_outfits.append(result)
return {
"successful_outfits": successful_outfits,
"failed_outfits": failed_outfits
}
except Exception as e:
print(f"An unexpected error occurred during concurrent recommendation: {e}")
return {"error": str(e)}