TASK:模块化;

This commit is contained in:
shahaibo
2025-04-22 15:59:08 +08:00
parent aca21b9d98
commit 604c57c208
8 changed files with 162 additions and 76 deletions

View File

@@ -386,9 +386,9 @@ public class GenerateConsumer {
String url = null;
if (!StringUtils.isEmpty(result)) {
url = result.getString(0);
String taskId = generateResult.getString("task_id");
userLikeGroupService.relightBatch(taskId, url, progress);
}
String taskId = generateResult.getString("task_id");
userLikeGroupService.relightBatch(taskId, url, progress);
} else {
// 修改redis中的数据状态为exception
String key = relightResultKey + ":" + generateResult.get("task_id");
@@ -422,6 +422,59 @@ public class GenerateConsumer {
log.info("============ProcessRelightBatchResult End listening==========");
}
private void processPoseTransformBatchResult(Message msg, Channel channel) {
log.info("============ProcessPoseTransformBatchResult listening==========");
long start = System.currentTimeMillis();
JSONObject generateResult = JSONObject.parseObject(msg.getBody(), JSONObject.class);
log.info("PoseTransformationBatch response : {}", generateResult);
try {
log.info("task_id : {} start ", generateResult.get("task_id"));
if (!StringUtils.isEmpty(generateResult.getString("progress"))) {
JSONArray result = generateResult.getJSONArray("result");
if (!StringUtils.isEmpty(result)) {
JSONObject jsonObject = result.getJSONObject(0);
String gifUrl = jsonObject.getString("gif_url");
String taskId = generateResult.getString("task_id");
String videoUrl = jsonObject.getString("video_url");
String imageUrl = jsonObject.getString("first_image_url");
String progress = generateResult.getString("progress");
generateService.processPoseTransformResultBatch(taskId, gifUrl, videoUrl, imageUrl, progress);
}
} else {
// 修改redis中的数据状态为exception
String key = generateResultKey + ":" + generateResult.getString("task_id");
redisUtil.addToString(key, new Gson().toJson(new PoseTransformationVO(null, generateResult.getString("task_id"),null, null, null, (byte)0, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(generateResult.getString("task_id"), generateResult.getString("message"));
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
} catch (Exception e) {
log.error(e.getMessage());
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, generateResult.getString("task_id"));
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(generateResult) +
" Exception message " + e.getMessage();
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(generateResult.getString("task_id"), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
long end = System.currentTimeMillis();
log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("============ProcessPoseTransformResult End listening==========");
}
@RabbitListener(queues = "#{rabbitMQProperties.queues.generate}")
@RabbitHandler
public void generateConsumer1(Message msg, Channel channel) {
@@ -515,4 +568,10 @@ public class GenerateConsumer {
public void getRelightBatchResult(Message msg, Channel channel) {
processRelightBatchResult(msg, channel);
}
@RabbitListener(queues = "#{rabbitMQProperties.queues.poseTransformBatch}")
@RabbitHandler
public void getPoseTransformBatchResult(Message msg, Channel channel) {
processPoseTransformBatchResult(msg, channel);
}
}

View File

@@ -24,6 +24,7 @@ public class RabbitMQProperties {
private String designBatch;
private String relightBatch;
private String toProductImageBatch;
private String poseTransformBatch;
}
@Data