BUGFIX: 消息被双重转义时无法被正常解析,导致消息一直被重复消费
This commit is contained in:
7
pom.xml
7
pom.xml
@@ -332,7 +332,12 @@
|
||||
<artifactId>easyexcel</artifactId>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- JSON 转义恢复 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-text</artifactId>
|
||||
<version>1.10.0</version> <!-- 使用最新版本 -->
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@@ -10,9 +10,13 @@ import com.ai.da.service.GenerateService;
|
||||
import com.ai.da.service.UserLikeGroupService;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.text.StringEscapeUtils;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
@@ -23,6 +27,7 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -266,7 +271,7 @@ public class GenerateConsumer {
|
||||
log.info("============ProcessPoseTransformResult listening==========");
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class);
|
||||
Map<String, String> generateResult = jsonBytesToMap(msg, channel);
|
||||
log.info("PoseTransformation response : {}", generateResult);
|
||||
|
||||
try {
|
||||
@@ -311,6 +316,48 @@ public class GenerateConsumer {
|
||||
|
||||
}
|
||||
|
||||
public static Map<String, String> jsonBytesToMap(Message msg, Channel channel) {
|
||||
try {
|
||||
// 1. byte[] -> String
|
||||
String jsonString = new String(msg.getBody(), StandardCharsets.UTF_8).trim();
|
||||
// 2. 处理可能的双重转义
|
||||
if (jsonString.startsWith("\"") && jsonString.endsWith("\"")) {
|
||||
jsonString = jsonString.substring(1, jsonString.length() - 1);
|
||||
// 使用 Apache Commons Text, 将 JSON 字符串中的转义字符还原为原始字符
|
||||
jsonString = StringEscapeUtils.unescapeJson(jsonString);
|
||||
}
|
||||
// 3. 验证 JSON 格式
|
||||
if (!isValidJson(jsonString)) {
|
||||
throw new IllegalArgumentException("Invalid JSON format");
|
||||
}
|
||||
// 4. 解析为 Map
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.readValue(jsonString, new TypeReference<Map<String, String>>() {});
|
||||
} catch (Exception e) {
|
||||
log.error("消息解析失败: {}", e.getMessage(), e);
|
||||
try {
|
||||
// 仅对不可恢复错误(如非 JSON 数据)进行 ACK
|
||||
if (e instanceof JsonParseException || e instanceof IllegalArgumentException) {
|
||||
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
||||
log.warn("因消息格式错误,已确认并丢弃消息。原始消息为:{}", msg);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
log.error("消息确认失败: {}", ex.getMessage(), ex);
|
||||
}
|
||||
throw new RuntimeException("Failed to parse JSON to Map", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 辅助方法:验证字符串是否为合法 JSON
|
||||
private static boolean isValidJson(String json) {
|
||||
try {
|
||||
new ObjectMapper().readTree(json);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void processDesignBatchResult(Message msg, Channel channel) {
|
||||
log.info("============processDesignBatchResult listening==========");
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
Reference in New Issue
Block a user