fix:修复任务失败不进入重试的逻辑。feat:优化快慢任务创建逻辑

This commit is contained in:
zcr
2025-12-29 17:45:53 +08:00
parent 042e6015f0
commit feb431e9c1
2 changed files with 71 additions and 59 deletions

View File

@@ -195,33 +195,26 @@ class LCAgent(ls.LitAPI):
task_map = {} task_map = {}
stylist_agent_kwages = self.stylist_agent_kwages.copy() stylist_agent_kwages = self.stylist_agent_kwages.copy()
tasks_mapping = {}
if num_outfits == 1: if num_outfits == 1:
logger.info(f"fast request outfit_id is : {outfit_ids[0]}") tasks_mapping[outfit_ids[0]] = "fast"
# 通过请求数量判断 num == 1 单个outfit刷新 else:
stylist_agent_kwages['outfit_id'] = outfit_ids[0]
stylist_agent_kwages['stylist_name'] = stylist_name
stylist_agent_kwages['gender'] = gender
agent = AsyncStylistAgent(**stylist_agent_kwages)
task = agent.run_quick_batch_styling(
request_summary=request_summary,
occasions=occasions,
start_outfit=start_outfit,
batch_sources=batch_sources,
user_id=user_id,
callback_url=callback_url,
)
tasks.append(task)
task_map[task] = {"outfit_id": outfit_ids[0], "retries": 0}
elif num_outfits > 1:
# 通过请求数量判断 num > 1 四套搭配推荐 (1快 , num-1慢)
for i in range(num_outfits): for i in range(num_outfits):
stylist_agent_kwages['outfit_id'] = outfit_ids[i] if i == 0:
tasks_mapping[outfit_ids[i]] = "fast"
else:
tasks_mapping[outfit_ids[i]] = "slow"
for k, v in tasks_mapping.items():
logger.info(f"fast request outfit_id is : {k}")
# 通过请求数量判断 num == 1 单个outfit刷新
stylist_agent_kwages['outfit_id'] = k
stylist_agent_kwages['stylist_name'] = stylist_name stylist_agent_kwages['stylist_name'] = stylist_name
stylist_agent_kwages['gender'] = gender stylist_agent_kwages['gender'] = gender
stylist_agent_kwages['callback_url'] = callback_url
agent = AsyncStylistAgent(**stylist_agent_kwages) agent = AsyncStylistAgent(**stylist_agent_kwages)
if i == 0: if v == "fast":
# 第一套搭配使用快速方法 一次跑出所有单品
logger.info(f"fast request outfit_id is : {outfit_ids[i]}")
task = agent.run_quick_batch_styling( task = agent.run_quick_batch_styling(
request_summary=request_summary, request_summary=request_summary,
occasions=occasions, occasions=occasions,
@@ -231,7 +224,6 @@ class LCAgent(ls.LitAPI):
callback_url=callback_url, callback_url=callback_url,
) )
else: else:
# 后续
task = agent.run_iterative_styling( task = agent.run_iterative_styling(
request_summary=request_summary, request_summary=request_summary,
occasions=occasions, occasions=occasions,
@@ -241,7 +233,7 @@ class LCAgent(ls.LitAPI):
callback_url=callback_url, callback_url=callback_url,
) )
tasks.append(task) tasks.append(task)
task_map[task] = {"outfit_id": outfit_ids[i], "retries": 0} task_map[task] = {"outfit_id": k, "retries": 0}
logger.info(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---") logger.info(f"--- Starting {num_outfits} concurrent outfit generation tasks. ---")
# 2. 任务执行与重试循环 # 2. 任务执行与重试循环
@@ -252,7 +244,8 @@ class LCAgent(ls.LitAPI):
retry_limit = 1 # 允许重试一次 retry_limit = 1 # 允许重试一次
while tasks_to_run: while tasks_to_run:
try: try:
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks_to_run, return_exceptions=True)
next_tasks_to_run = [] next_tasks_to_run = []
for task, result in zip(tasks_to_run, results): for task, result in zip(tasks_to_run, results):
task_info = task_map[task] task_info = task_map[task]
@@ -279,8 +272,20 @@ class LCAgent(ls.LitAPI):
stylist_agent_kwages['outfit_id'] = outfit_id stylist_agent_kwages['outfit_id'] = outfit_id
stylist_agent_kwages['stylist_name'] = stylist_name stylist_agent_kwages['stylist_name'] = stylist_name
stylist_agent_kwages['gender'] = gender stylist_agent_kwages['gender'] = gender
stylist_agent_kwages['callback_url'] = callback_url
agent = AsyncStylistAgent(**stylist_agent_kwages) agent = AsyncStylistAgent(**stylist_agent_kwages)
if tasks_mapping[outfit_id] == "fast":
new_task = agent.run_quick_batch_styling( new_task = agent.run_quick_batch_styling(
request_summary=request_summary,
occasions=occasions,
start_outfit=start_outfit,
batch_sources=batch_sources,
user_id=user_id,
callback_url=callback_url,
)
else:
new_task = agent.run_iterative_styling(
request_summary=request_summary, request_summary=request_summary,
occasions=occasions, occasions=occasions,
start_outfit=start_outfit, start_outfit=start_outfit,
@@ -298,11 +303,12 @@ class LCAgent(ls.LitAPI):
# 达到重试上限,最终失败 并通知前端 # 达到重试上限,最终失败 并通知前端
object_data = { object_data = {
'outfit_id': outfit_id, 'outfit_id': outfit_id,
"status": "failed", "status": "retry_failed",
"path": "", "path": "",
} }
post_request(url=f'{callback_url}/api/style/callback', data=json.dumps(object_data)) response = post_request(url=f'{callback_url}/api/style/callback', data=json.dumps(object_data))
failed_outfits.append(f"Outfit {outfit_id} ultimately failed after {retry_limit} retries: {result}") failed_outfits.append(f"Outfit {outfit_id} ultimately failed after {retry_limit} retries: {result}")
logger.info(f"request data {json.dumps(object_data, ensure_ascii=False, indent=2)} | JAVA callback info -> status:{response.status_code} | message:{response.text}")
del task_map[task] del task_map[task]
else: else:

View File

@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
class AsyncStylistAgent: class AsyncStylistAgent:
def __init__(self, local_db: str, gemini_model_name: str, outfit_id: str, stylist_name: str, gender: str): def __init__(self, local_db: str, gemini_model_name: str, outfit_id: str, stylist_name: str, gender: str, callback_url: str):
# self.outfit_items: List[Dict[str, str]] = [] # self.outfit_items: List[Dict[str, str]] = []
self.outfit_id = outfit_id self.outfit_id = outfit_id
self.stylist_name = stylist_name self.stylist_name = stylist_name
@@ -52,6 +52,7 @@ class AsyncStylistAgent:
) )
self.gcs_bucket = "lc_stylist_agent_outfit_items" self.gcs_bucket = "lc_stylist_agent_outfit_items"
self.minio_bucket = "lanecarford" self.minio_bucket = "lanecarford"
self.callback_url = f'{callback_url}/api/style/callback'
def _load_style_guide(self, stylist_name: str): def _load_style_guide(self, stylist_name: str):
"""加载 markdown 风格指南内容。""" """加载 markdown 风格指南内容。"""
@@ -182,8 +183,13 @@ class AsyncStylistAgent:
results = [] results = []
if not results: if not results:
print(f"数据库中未找到符合 '{category}' 和描述的单品。") self.post_operation(
return None status="failed",
message=f"数据库中未找到符合 '{category}' 和描述的单品。",
callback_url=self.callback_url,
img_path="",
)
raise Exception(f"数据库中未找到符合 '{category}' 和描述的单品。")
# 3. 模拟 Agent 审核(实际应用中,你需要将图片发回给 Agent进行审核) # 3. 模拟 Agent 审核(实际应用中,你需要将图片发回给 Agent进行审核)
best_meta = results[0] # 第一个 batch 的第一个 metadata best_meta = results[0] # 第一个 batch 的第一个 metadata
@@ -295,14 +301,13 @@ class AsyncStylistAgent:
gemini_data = self._parse_gemini_response(gemini_response_text) gemini_data = self._parse_gemini_response(gemini_response_text)
if not gemini_data: if not gemini_data:
print("Agent 返回无效响应,终止流程。")
self.post_operation( self.post_operation(
status="failed", status="failed",
message="Agent returned invalid response, terminating process.", message="Agent returned invalid response, terminating process.",
callback_url=url, callback_url=url,
img_path=merged_image_path, img_path=merged_image_path,
) )
break raise Exception("Agent 返回无效响应,终止流程。")
# 处理推荐单品 # 处理推荐单品
if gemini_data.get('action') == 'recommend_item': if gemini_data.get('action') == 'recommend_item':
@@ -385,13 +390,14 @@ class AsyncStylistAgent:
failed_found_item_count = 0 failed_found_item_count = 0
if not recommended_items or not isinstance(recommended_items, List): if not recommended_items or not isinstance(recommended_items, List):
print("No recommended item from Gemini, terminating process.")
self.post_operation( self.post_operation(
status="failed", status="failed",
message="Agent returned invalid response, terminating process.", message="Agent returned invalid response, terminating process.",
callback_url=url, callback_url=url,
img_path=merged_image_path img_path=merged_image_path
) )
raise Exception("No recommended item from Gemini, terminating process.")
else: else:
for idx, rec_item in enumerate(recommended_items): for idx, rec_item in enumerate(recommended_items):
subcategory = rec_item.get('subcategory') subcategory = rec_item.get('subcategory')
@@ -428,7 +434,7 @@ class AsyncStylistAgent:
callback_url=url, callback_url=url,
img_path=merged_image_path img_path=merged_image_path
) )
print(f"There are {failed_found_item_count} items (total {len(recommended_items)}) are not found in the database") raise Exception(f"There are {failed_found_item_count} items (total {len(recommended_items)}) are not found in the database")
return reason return reason
async def run_iterative_styling(self, request_summary, occasions, start_outfit: Optional[List] = None, batch_sources: List = [], user_id="test", callback_url=""): async def run_iterative_styling(self, request_summary, occasions, start_outfit: Optional[List] = None, batch_sources: List = [], user_id="test", callback_url=""):