diff --git a/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java index ca1f512b..0420b6ff 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/GenerateConsumer.java @@ -105,19 +105,20 @@ public class GenerateConsumer { log.info("=============end listening==========="); } - public void processGenerateResult(Message msg, Channel channel){ + public void processGenerateResult(Message msg, Channel channel) { log.info("============ProcessGenerateResult listening=========="); long start = System.currentTimeMillis(); Map generateResult = JSONObject.parseObject(msg.getBody(), Map.class); + log.info("generate response : {}", generateResult); - try{ - log.info("tasks_id : {} start ",generateResult.get("tasks_id")); - if (generateResult.get("status").equals("SUCCESS")){ + try { + log.info("tasks_id : {} start ", generateResult.get("tasks_id")); + if (generateResult.get("status").equals("SUCCESS")) { String url = generateResult.get("data"); String taskId = generateResult.get("tasks_id"); - generateService.processGenerateResult(taskId, url); - }else { + generateService.processGenerateResult(taskId, url); + } else { // 修改redis中的数据状态为exception String key = generateResultKey + ":" + generateResult.get("tasks_id"); redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); @@ -127,7 +128,7 @@ public class GenerateConsumer { // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } - }catch (Exception e){ + } catch (Exception e) { log.error(e.getMessage()); try { channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); @@ -146,7 +147,7 @@ public class GenerateConsumer { } long end = System.currentTimeMillis(); - log.info("tasks_id : {}, message : {}, 执行时长: {} 毫秒",generateResult.get("tasks_id"), generateResult.get("message"), (end - start)); + log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start)); log.info("============ProcessGenerateResult End listening=========="); } @@ -207,7 +208,7 @@ public class GenerateConsumer { @RabbitListener(queues = MQConfig.GENERATE_RESULT_QUEUE) @RabbitHandler - public void getGenerateResult(Message msg, Channel channel){ + public void getGenerateResult(Message msg, Channel channel) { processGenerateResult(msg, channel); } } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/SRConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/SRConsumer.java index 341b58a1..6ad261bd 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/SRConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/SRConsumer.java @@ -132,17 +132,31 @@ public class SRConsumer { try { result = JSONObject.parseObject(msg.getBody(), JSONObject.class); + log.info("SR response : {}", result); taskId = result.get("tasks_id").toString(); } catch (JSONException e) { log.error("SRResult 返回数据格式不合规范"); log.error(e.getMessage()); setErrorMessage(msg, channel, e.getMessage(), null); + try { + // 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。 + channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + } catch (IOException exception) { + log.error("手动确认,取消返回队列,不再重新消费"); + } + } catch (Exception e){ + log.error(e.getMessage()); + setErrorMessage(msg, channel, e.getMessage(), null); + try { + // 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。 + channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + } catch (IOException exception) { + log.error("手动确认,取消返回队列,不再重新消费"); + } } try { -// channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); - // 2、判断状态是否成功 if ("SUCCESS".equals(result.get("status").toString())) { String output = result.get("data").toString(); diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index ccbd397d..a64a02ae 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -38,6 +38,7 @@ import javax.annotation.Resource; import java.io.IOException; import java.time.LocalDateTime; import java.util.*; +import java.util.stream.Collectors; import static com.ai.da.common.enums.CollectionLevel1TypeEnum.*; @@ -527,6 +528,11 @@ public class GenerateServiceImpl extends ServiceImpl i } results.add(generateResultVO); }); + // todo + Set collect = results.stream().map(GenerateResultVO::getStatus).collect(Collectors.toSet()); + if (taskIdList.size() == 4 && collect.size() == 1 && collect.contains("Fail")){ + throw new BusinessException("generate.interface.error"); + } return results; }