From 540d2a4e2804f62e33ecaa31e53135a8cc0ca254 Mon Sep 17 00:00:00 2001 From: xupei Date: Thu, 18 Apr 2024 18:34:44 +0800 Subject: [PATCH] =?UTF-8?q?generate=20bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../da/common/RabbitMQ/GenerateConsumer.java | 50 ++++++++++++------- .../ai/da/common/RabbitMQ/MQPublisher.java | 2 +- .../da/service/impl/GenerateServiceImpl.java | 7 ++- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java index 9e522949..ca1f512b 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java @@ -1,6 +1,7 @@ package com.ai.da.common.RabbitMQ; import com.ai.da.common.config.exception.BusinessException; +import com.ai.da.common.constant.CommonConstant; import com.ai.da.common.utils.RedisUtil; import com.ai.da.model.dto.GenerateThroughImageTextDTO; import com.ai.da.model.vo.GenerateResultVO; @@ -109,26 +110,37 @@ public class GenerateConsumer { long start = System.currentTimeMillis(); Map generateResult = JSONObject.parseObject(msg.getBody(), Map.class); - // 2.1 手动确认该消息 - try { - channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); - } catch (IOException e) { - throw new RuntimeException(e); - } - log.info("tasks_id : {} start ",generateResult.get("tasks_id")); -// log.info("tasks_id : {}, message : {}",generateResult.get("tasks_id"), generateResult.get("message") ); - if (generateResult.get("status").equals("SUCCESS")){ - String url = generateResult.get("data"); - String taskId = generateResult.get("tasks_id"); -// generateService.processGenerateResult(taskId, url); - }else { - // 修改redis中的数据状态为exception - String key = generateResultKey + ":" + generateResult.get("tasks_id"); - Long expire = redisUtil.getExpire(key); - redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), expire); - // 将异常信息存到exception中 + + try{ + log.info("tasks_id : {} start ",generateResult.get("tasks_id")); + if (generateResult.get("status").equals("SUCCESS")){ + String url = generateResult.get("data"); + String taskId = generateResult.get("tasks_id"); + generateService.processGenerateResult(taskId, url); + }else { + // 修改redis中的数据状态为exception + String key = generateResultKey + ":" + generateResult.get("tasks_id"); + redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); + // 将异常信息存到exception中 + HashMap exceptionInfo = new HashMap<>(); + exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data")); + // 存redis + redisUtil.addToMap(exceptionMapKey, exceptionInfo); + } + }catch (Exception e){ + log.error(e.getMessage()); + try { + channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 + redisUtil.removeFromZSet(consumptionOrderKey, generateResult.get("tasks_id")); + } catch (IOException exception) { + log.error("手动确认,取消返回队列,不再重新消费"); + } + // 将入参和错误信息存入数据库 + String exceptionMessage = JSONObject.toJSONString(generateResult) + + " Exception message : " + e.getMessage(); HashMap exceptionInfo = new HashMap<>(); - exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data")); + exceptionInfo.put(String.valueOf(generateResult.get("tasks_id")), exceptionMessage); // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java index 8afffa29..5cffa019 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java @@ -10,7 +10,7 @@ import javax.annotation.Resource; @Component public class MQPublisher { - private final String url = "http://localhost:15672/api/queues/%2f/generate-queue"; +// private final String url = "http://localhost:15672/api/queues/%2f/generate-queue"; @Resource private AmqpTemplate amqpTemplate; diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 68daf1bb..6b26bea4 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -222,9 +222,8 @@ public class GenerateServiceImpl extends ServiceImpl i generateDetailMapper.insert(generateDetail); String key = generateResultKey + ":" + taskId; - Long expire = redisUtil.getExpire(key); GenerateResultVO generateResultVO = new GenerateResultVO(taskId, generateDetail.getId(), url, "Success"); - redisUtil.addToString(key, new Gson().toJson(generateResultVO), expire); + redisUtil.addToString(key, new Gson().toJson(generateResultVO), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); } private void validateGeneraType(Generate generate, String text, Long elementId, String generateType) { @@ -527,6 +526,7 @@ public class GenerateServiceImpl extends ServiceImpl i @Override @Transactional(rollbackFor = Exception.class) public void cancelGenerate(Long userId, List uniqueIdList, String timeZone) { + // todo 取消待优化 uniqueIdList.forEach(uniqueId -> { // 1、确认当前消息是否还在排队中 Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); @@ -550,6 +550,9 @@ public class GenerateServiceImpl extends ServiceImpl i pythonService.cancelGenerateTask(uniqueId); } } + String key = generateResultKey + ":" + uniqueId; + redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(uniqueId, null, null, "Cancelled")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); + // 3、考虑加一张表,专门用于记录哪些用户在什么时间进行了取消操作,包括已经异常的请求 GenerateCancel generateCancel = new GenerateCancel(userId, uniqueId, DateUtil.getByTimeZone(timeZone)); generateCancelMapper.insert(generateCancel);