generate bug修复
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package com.ai.da.common.RabbitMQ;
|
package com.ai.da.common.RabbitMQ;
|
||||||
|
|
||||||
import com.ai.da.common.config.exception.BusinessException;
|
import com.ai.da.common.config.exception.BusinessException;
|
||||||
|
import com.ai.da.common.constant.CommonConstant;
|
||||||
import com.ai.da.common.utils.RedisUtil;
|
import com.ai.da.common.utils.RedisUtil;
|
||||||
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
|
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
|
||||||
import com.ai.da.model.vo.GenerateResultVO;
|
import com.ai.da.model.vo.GenerateResultVO;
|
||||||
@@ -109,26 +110,37 @@ public class GenerateConsumer {
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class);
|
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class);
|
||||||
// 2.1 手动确认该消息
|
|
||||||
try {
|
try{
|
||||||
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
log.info("tasks_id : {} start ",generateResult.get("tasks_id"));
|
||||||
} catch (IOException e) {
|
if (generateResult.get("status").equals("SUCCESS")){
|
||||||
throw new RuntimeException(e);
|
String url = generateResult.get("data");
|
||||||
}
|
String taskId = generateResult.get("tasks_id");
|
||||||
log.info("tasks_id : {} start ",generateResult.get("tasks_id"));
|
generateService.processGenerateResult(taskId, url);
|
||||||
// log.info("tasks_id : {}, message : {}",generateResult.get("tasks_id"), generateResult.get("message") );
|
}else {
|
||||||
if (generateResult.get("status").equals("SUCCESS")){
|
// 修改redis中的数据状态为exception
|
||||||
String url = generateResult.get("data");
|
String key = generateResultKey + ":" + generateResult.get("tasks_id");
|
||||||
String taskId = generateResult.get("tasks_id");
|
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
|
||||||
// generateService.processGenerateResult(taskId, url);
|
// 将异常信息存到exception中
|
||||||
}else {
|
HashMap<String, String> exceptionInfo = new HashMap<>();
|
||||||
// 修改redis中的数据状态为exception
|
exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data"));
|
||||||
String key = generateResultKey + ":" + generateResult.get("tasks_id");
|
// 存redis
|
||||||
Long expire = redisUtil.getExpire(key);
|
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
|
||||||
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), expire);
|
}
|
||||||
// 将异常信息存到exception中
|
}catch (Exception e){
|
||||||
|
log.error(e.getMessage());
|
||||||
|
try {
|
||||||
|
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
|
||||||
|
redisUtil.removeFromZSet(consumptionOrderKey, generateResult.get("tasks_id"));
|
||||||
|
} catch (IOException exception) {
|
||||||
|
log.error("手动确认,取消返回队列,不再重新消费");
|
||||||
|
}
|
||||||
|
// 将入参和错误信息存入数据库
|
||||||
|
String exceptionMessage = JSONObject.toJSONString(generateResult) +
|
||||||
|
" Exception message : " + e.getMessage();
|
||||||
HashMap<String, String> exceptionInfo = new HashMap<>();
|
HashMap<String, String> exceptionInfo = new HashMap<>();
|
||||||
exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data"));
|
exceptionInfo.put(String.valueOf(generateResult.get("tasks_id")), exceptionMessage);
|
||||||
// 存redis
|
// 存redis
|
||||||
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
|
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import javax.annotation.Resource;
|
|||||||
@Component
|
@Component
|
||||||
public class MQPublisher {
|
public class MQPublisher {
|
||||||
|
|
||||||
private final String url = "http://localhost:15672/api/queues/%2f/generate-queue";
|
// private final String url = "http://localhost:15672/api/queues/%2f/generate-queue";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private AmqpTemplate amqpTemplate;
|
private AmqpTemplate amqpTemplate;
|
||||||
|
|||||||
@@ -222,9 +222,8 @@ public class GenerateServiceImpl extends ServiceImpl<GenerateMapper, Generate> i
|
|||||||
generateDetailMapper.insert(generateDetail);
|
generateDetailMapper.insert(generateDetail);
|
||||||
|
|
||||||
String key = generateResultKey + ":" + taskId;
|
String key = generateResultKey + ":" + taskId;
|
||||||
Long expire = redisUtil.getExpire(key);
|
|
||||||
GenerateResultVO generateResultVO = new GenerateResultVO(taskId, generateDetail.getId(), url, "Success");
|
GenerateResultVO generateResultVO = new GenerateResultVO(taskId, generateDetail.getId(), url, "Success");
|
||||||
redisUtil.addToString(key, new Gson().toJson(generateResultVO), expire);
|
redisUtil.addToString(key, new Gson().toJson(generateResultVO), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateGeneraType(Generate generate, String text, Long elementId, String generateType) {
|
private void validateGeneraType(Generate generate, String text, Long elementId, String generateType) {
|
||||||
@@ -527,6 +526,7 @@ public class GenerateServiceImpl extends ServiceImpl<GenerateMapper, Generate> i
|
|||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void cancelGenerate(Long userId, List<String> uniqueIdList, String timeZone) {
|
public void cancelGenerate(Long userId, List<String> uniqueIdList, String timeZone) {
|
||||||
|
// todo 取消待优化
|
||||||
uniqueIdList.forEach(uniqueId -> {
|
uniqueIdList.forEach(uniqueId -> {
|
||||||
// 1、确认当前消息是否还在排队中
|
// 1、确认当前消息是否还在排队中
|
||||||
Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId);
|
Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId);
|
||||||
@@ -550,6 +550,9 @@ public class GenerateServiceImpl extends ServiceImpl<GenerateMapper, Generate> i
|
|||||||
pythonService.cancelGenerateTask(uniqueId);
|
pythonService.cancelGenerateTask(uniqueId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
String key = generateResultKey + ":" + uniqueId;
|
||||||
|
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(uniqueId, null, null, "Cancelled")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
|
||||||
|
|
||||||
// 3、考虑加一张表,专门用于记录哪些用户在什么时间进行了取消操作,包括已经异常的请求
|
// 3、考虑加一张表,专门用于记录哪些用户在什么时间进行了取消操作,包括已经异常的请求
|
||||||
GenerateCancel generateCancel = new GenerateCancel(userId, uniqueId, DateUtil.getByTimeZone(timeZone));
|
GenerateCancel generateCancel = new GenerateCancel(userId, uniqueId, DateUtil.getByTimeZone(timeZone));
|
||||||
generateCancelMapper.insert(generateCancel);
|
generateCancelMapper.insert(generateCancel);
|
||||||
|
|||||||
Reference in New Issue
Block a user