BUGFIX:消息监听,消息解析失败,重复消费异常消息导致日志文件内容骤增

This commit is contained in:
2025-06-24 16:11:53 +08:00
parent a809905c52
commit 6f44489c6e

View File

@@ -71,9 +71,9 @@ public class GenerateConsumer {
public void generate(Message msg, Channel channel, String consumerName) { public void generate(Message msg, Channel channel, String consumerName) {
log.info("============start listening=========="); log.info("============start listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> resp = jsonBytesToMap(msg, channel);
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); String uniqueId = resp.get("tasks_id");
String uniqueId = generateThroughImageTextDTO.getUniqueId(); // String uniqueId = generateThroughImageTextDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId); log.info("From " + consumerName + " : " + uniqueId);
try { try {
@@ -87,6 +87,7 @@ public class GenerateConsumer {
log.error("手动确认,不返回队列重新消费"); log.error("手动确认,不返回队列重新消费");
} }
} else { } else {
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class);
generateService.generateThroughImageText(generateThroughImageTextDTO); generateService.generateThroughImageText(generateThroughImageTextDTO);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
@@ -101,13 +102,13 @@ public class GenerateConsumer {
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
String key = generateResultKey + ":" + uniqueId; String key = generateResultKey + ":" + uniqueId;
GenerateResultVO generateResultVO = new GenerateResultVO(generateThroughImageTextDTO.getUniqueId(), null, null, "Fail"); GenerateResultVO generateResultVO = new GenerateResultVO(uniqueId, null, null, "Fail");
redisUtil.addToString(key, new Gson().toJson(generateResultVO), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); redisUtil.addToString(key, new Gson().toJson(generateResultVO), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
} catch (IOException exception) { } catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费"); log.error("手动确认,取消返回队列,不再重新消费");
} }
// 将入参和错误信息存入数据库 // 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + String exceptionMessage = JSONObject.toJSONString(resp) +
" Exception message " + e.getMessage(); " Exception message " + e.getMessage();
HashMap<String, String> exceptionInfo = new HashMap<>(); HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage); exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
@@ -124,7 +125,7 @@ public class GenerateConsumer {
log.info("============ProcessGenerateResult listening=========="); log.info("============ProcessGenerateResult listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class); Map<String, String> generateResult = jsonBytesToMap(msg, channel);
log.info("generate response : {}", generateResult); log.info("generate response : {}", generateResult);
try { try {
@@ -172,7 +173,7 @@ public class GenerateConsumer {
log.info("============processToProductImageResult listening=========="); log.info("============processToProductImageResult listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class); Map<String, String> generateResult = jsonBytesToMap(msg, channel);
log.info("toProductImage response : {}", generateResult); log.info("toProductImage response : {}", generateResult);
try { try {
@@ -222,7 +223,7 @@ public class GenerateConsumer {
log.info("============processRelightResult listening=========="); log.info("============processRelightResult listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class); Map<String, String> generateResult = jsonBytesToMap(msg, channel);
log.info("toProductImage response : {}", generateResult); log.info("toProductImage response : {}", generateResult);
try { try {