TASK:异步调用generate及取消generate
This commit is contained in:
@@ -48,12 +48,11 @@ public class MQConsumer {
|
||||
log.info("============start listening==========");
|
||||
|
||||
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class);
|
||||
Long uniqueId = generateThroughImageTextDTO.getUniqueId();
|
||||
// 1、将消息从redis排队队列中删除
|
||||
redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId));
|
||||
String uniqueId = generateThroughImageTextDTO.getUniqueId();
|
||||
|
||||
try {
|
||||
// 2、判断当前消息是否在取消列表中
|
||||
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, String.valueOf(uniqueId));
|
||||
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId);
|
||||
if (isMember) {
|
||||
try {
|
||||
// 2.1 手动确认该消息
|
||||
@@ -62,40 +61,43 @@ public class MQConsumer {
|
||||
log.error("手动确认,不返回队列重新消费");
|
||||
}
|
||||
// 2.2 将该消息从取消列表中删除
|
||||
redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId));
|
||||
// redisUtil.removeFromSet(cancelSetKey, uniqueId);
|
||||
} else {
|
||||
GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO);
|
||||
// try {
|
||||
// Thread.sleep(15000);
|
||||
// } catch (InterruptedException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
|
||||
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
|
||||
if (!Objects.isNull(generateCollectionVO)){
|
||||
HashMap<String, String> generateResult = new HashMap<>();
|
||||
generateResult.put(String.valueOf(uniqueId), JSONObject.toJSONString(generateCollectionVO));
|
||||
generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO));
|
||||
// 将结果存在redis中 ,为空时不要存
|
||||
redisUtil.addToMap(resultMapKey, generateResult);
|
||||
}
|
||||
|
||||
}
|
||||
} catch (BusinessException e) {
|
||||
log.error(e.getMessage());
|
||||
log.error(e.getMsg());
|
||||
// channel.basicNack() 为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
|
||||
try {
|
||||
// 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。
|
||||
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
||||
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
|
||||
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
|
||||
} catch (IOException exception) {
|
||||
log.error("手动确认,取消返回队列,不再重新消费");
|
||||
}
|
||||
// 将入参和错误信息存入数据库
|
||||
String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + " Exception message : " + e.getMessage();
|
||||
String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) +
|
||||
" Exception message : " + e.getMsg();
|
||||
HashMap<String, String> exceptionInfo = new HashMap<>();
|
||||
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
|
||||
// 存redis
|
||||
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
|
||||
}
|
||||
|
||||
// log.info(JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class).toString());
|
||||
// try {
|
||||
// Thread.sleep(10000);
|
||||
// } catch (InterruptedException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
log.info("============end listening==========");
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.util.concurrent.*;
|
||||
@Component
|
||||
public class AsyncCallerUtil {
|
||||
|
||||
public static Map<Long, Boolean> waitingStatus = new HashMap<>();
|
||||
public static Map<String, Boolean> waitingStatus = new HashMap<>();
|
||||
|
||||
private static PythonService pythonService;
|
||||
|
||||
@@ -27,9 +27,10 @@ public class AsyncCallerUtil {
|
||||
return CompletableFuture.supplyAsync(() -> pythonService.generateSketchOrPrint(generateToPython));
|
||||
}
|
||||
|
||||
public List<String> generate(GenerateToPythonDTO generateToPython, Long requestId) {
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
|
||||
waitingStatus.put(requestId, true);
|
||||
public List<String> generate(GenerateToPythonDTO generateToPython) {
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
|
||||
String taskId = generateToPython.getTasks_id();
|
||||
waitingStatus.put(taskId, true);
|
||||
ScheduledFuture<?> timeoutTask = null;
|
||||
|
||||
try {
|
||||
@@ -37,10 +38,10 @@ public class AsyncCallerUtil {
|
||||
// 10秒后第一次确认,之后每隔10秒确认一次用户选择结果
|
||||
timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> {
|
||||
// 调用另一个接口获取用户的选择
|
||||
if (!waitingStatus.get(requestId)) {
|
||||
if (!waitingStatus.get(taskId)) {
|
||||
// 如果用户选择取消,则取消对generate的调用,cancel判断是否成功取消
|
||||
generateResult.cancel(true);
|
||||
waitingStatus.remove(requestId);
|
||||
waitingStatus.remove(taskId);
|
||||
}
|
||||
System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai"));
|
||||
}, 10, 10, TimeUnit.SECONDS);
|
||||
@@ -54,16 +55,15 @@ public class AsyncCallerUtil {
|
||||
// 处理结果
|
||||
System.out.println("generate 响应: " + result);
|
||||
System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai"));
|
||||
waitingStatus.remove(requestId);
|
||||
waitingStatus.remove(taskId);
|
||||
return result;
|
||||
} catch (InterruptedException | ExecutionException | BusinessException e) {
|
||||
// 处理异常
|
||||
log.error("发生错误 : " + e);
|
||||
e.printStackTrace();
|
||||
// 取消定时任务
|
||||
assert timeoutTask != null;
|
||||
timeoutTask.cancel(true);
|
||||
throw new BusinessException("generate.interface.error");
|
||||
throw new BusinessException(e.getMessage());
|
||||
} finally {
|
||||
// 关闭线程池
|
||||
// executorService.shutdown();
|
||||
|
||||
Reference in New Issue
Block a user