sr MQ异常捕捉 手动ack

generate 全部失败 抛异常
This commit is contained in:
2024-04-23 16:44:55 +08:00
parent c31a4bcd8f
commit 2882b06128
3 changed files with 32 additions and 11 deletions

View File

@@ -105,19 +105,20 @@ public class GenerateConsumer {
log.info("=============end listening===========");
}
public void processGenerateResult(Message msg, Channel channel){
public void processGenerateResult(Message msg, Channel channel) {
log.info("============ProcessGenerateResult listening==========");
long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class);
log.info("generate response : {}", generateResult);
try{
log.info("tasks_id : {} start ",generateResult.get("tasks_id"));
if (generateResult.get("status").equals("SUCCESS")){
try {
log.info("tasks_id : {} start ", generateResult.get("tasks_id"));
if (generateResult.get("status").equals("SUCCESS")) {
String url = generateResult.get("data");
String taskId = generateResult.get("tasks_id");
generateService.processGenerateResult(taskId, url);
}else {
generateService.processGenerateResult(taskId, url);
} else {
// 修改redis中的数据状态为exception
String key = generateResultKey + ":" + generateResult.get("tasks_id");
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
@@ -127,7 +128,7 @@ public class GenerateConsumer {
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
}catch (Exception e){
} catch (Exception e) {
log.error(e.getMessage());
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
@@ -146,7 +147,7 @@ public class GenerateConsumer {
}
long end = System.currentTimeMillis();
log.info("tasks_id : {}, message : {}, 执行时长: {} 毫秒",generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("============ProcessGenerateResult End listening==========");
}
@@ -207,7 +208,7 @@ public class GenerateConsumer {
@RabbitListener(queues = MQConfig.GENERATE_RESULT_QUEUE)
@RabbitHandler
public void getGenerateResult(Message msg, Channel channel){
public void getGenerateResult(Message msg, Channel channel) {
processGenerateResult(msg, channel);
}
}

View File

@@ -132,17 +132,31 @@ public class SRConsumer {
try {
result = JSONObject.parseObject(msg.getBody(), JSONObject.class);
log.info("SR response : {}", result);
taskId = result.get("tasks_id").toString();
} catch (JSONException e) {
log.error("SRResult 返回数据格式不合规范");
log.error(e.getMessage());
setErrorMessage(msg, channel, e.getMessage(), null);
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
} catch (Exception e){
log.error(e.getMessage());
setErrorMessage(msg, channel, e.getMessage(), null);
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
}
try {
// channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 2、判断状态是否成功
if ("SUCCESS".equals(result.get("status").toString())) {
String output = result.get("data").toString();