TASK:aida;

This commit is contained in:
shahaibo
2024-06-12 09:47:55 +08:00
parent b1b4b4147c
commit 3a29d25060
51 changed files with 1114 additions and 193 deletions

View File

@@ -44,6 +44,8 @@ public class GenerateConsumer {
@Value("${redis.key.generateResult}")
private String generateResultKey;
@Value("${redis.key.toProductImageResultKey}")
private String toProductImageResultKey;
public void generate(Message msg, Channel channel, String consumerName) {
log.info("============start listening==========");
@@ -154,6 +156,54 @@ public class GenerateConsumer {
}
public void processToProductImageResult(Message msg, Channel channel) {
log.info("============processToProductImageResult listening==========");
long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class);
log.info("toProductImage response : {}", generateResult);
try {
log.info("tasks_id : {} start ", generateResult.get("tasks_id"));
if (generateResult.get("status").equals("SUCCESS")) {
String url = generateResult.get("image_url");
String taskId = generateResult.get("tasks_id");
String category = generateResult.get("category");
generateService.processToProductImageResult(taskId, url, category);
} else {
// 修改redis中的数据状态为exception
String key = toProductImageResultKey + ":" + generateResult.get("tasks_id");
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data"));
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
} catch (Exception e) {
log.error(e.getMessage());
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, generateResult.get("tasks_id"));
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(generateResult) +
" Exception message " + e.getMessage();
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(String.valueOf(generateResult.get("tasks_id")), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
long end = System.currentTimeMillis();
log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("============ProcessGenerateResult End listening==========");
}
// @RabbitListener(queues = MQConfig.GENERATE_QUEUE)
// @RabbitHandler
// public void generateConsumer1(Message msg, Channel channel) {
@@ -213,4 +263,10 @@ public class GenerateConsumer {
// public void getGenerateResult(Message msg, Channel channel) {
// processGenerateResult(msg, channel);
// }
@RabbitListener(queues = MQConfig.TO_PRODUCT_IMAGE_RESULT_QUEUE)
@RabbitHandler
public void getToProductImageResult(Message msg, Channel channel) {
processToProductImageResult(msg, channel);
}
}

View File

@@ -23,6 +23,7 @@ public class MQConfig {
// public static final String GENERATE_RESULT_QUEUE = "GenerateImage-local";
public static final String GENERATE_RESULT_QUEUE = "GenerateImage-prod";
public static final String TO_PRODUCT_IMAGE_RESULT_QUEUE = "ToProductImage-local";
public MQConfig() {
}