TASK:generate 修复逻辑Bug

This commit is contained in:
2024-01-25 13:37:17 +08:00
parent 65bed9dada
commit c35a188abb
7 changed files with 75 additions and 44 deletions

View File

@@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Value;
public class MQConfig { public class MQConfig {
public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange"; public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange";
public static final String GENERATE_QUEUE = "generate-queue"; public static final String GENERATE_QUEUE = "generate-queue-prod";
public MQConfig() { public MQConfig() {
} }

View File

@@ -63,12 +63,12 @@ public class MQConsumer {
// 2.2 将该消息从取消列表中删除 // 2.2 将该消息从取消列表中删除
// redisUtil.removeFromSet(cancelSetKey, uniqueId); // redisUtil.removeFromSet(cancelSetKey, uniqueId);
} else { } else {
/*try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO);
// try {
// Thread.sleep(15000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
if (!Objects.isNull(generateCollectionVO)) { if (!Objects.isNull(generateCollectionVO)) {
@@ -101,7 +101,7 @@ public class MQConsumer {
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒"); log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("============end listening=========="); log.info("=============end listening===========");
} }
@RabbitListener(queues = MQConfig.GENERATE_QUEUE) @RabbitListener(queues = MQConfig.GENERATE_QUEUE)

View File

@@ -30,21 +30,20 @@ public class AsyncCallerUtil {
public List<String> generate(GenerateToPythonDTO generateToPython) { public List<String> generate(GenerateToPythonDTO generateToPython) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
String taskId = generateToPython.getTasks_id(); String taskId = generateToPython.getTasks_id();
waitingStatus.put(taskId, true);
ScheduledFuture<?> timeoutTask = null; ScheduledFuture<?> timeoutTask = null;
if (!waitingStatus.containsKey(taskId)) waitingStatus.put(taskId, true);
try { try {
CompletableFuture<List<String>> generateResult = callGenerateAsync(generateToPython); CompletableFuture<List<String>> generateResult = callGenerateAsync(generateToPython);
// 10秒后第一次确认之后每隔10秒确认一次用户选择结果 // 5秒后第一次确认之后每隔10秒确认一次用户选择结果
timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> { timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> {
// 调用另一个接口获取用户的选择 // 调用另一个接口获取用户的选择
if (!waitingStatus.get(taskId)) { if (!waitingStatus.get(taskId)) {
// 如果用户选择取消则取消对generate的调用cancel判断是否成功取消 // 如果用户选择取消则取消对generate的调用
generateResult.cancel(true); generateResult.cancel(true);
waitingStatus.remove(taskId); waitingStatus.remove(taskId);
} } else log.info("===============持续等待===============");
log.info("===============持续等待==============="); }, 5, 10, TimeUnit.SECONDS);
}, 10, 10, TimeUnit.SECONDS);
log.info("阻塞等待结果..."); log.info("阻塞等待结果...");
// 阻塞,等待结果 // 阻塞,等待结果
@@ -53,6 +52,10 @@ public class AsyncCallerUtil {
timeoutTask.cancel(true); timeoutTask.cancel(true);
waitingStatus.remove(taskId); waitingStatus.remove(taskId);
return result; return result;
} catch (CancellationException e) {
// generateResult.cancel(true);通过抛出异常取消该任务
log.info("==========成功取消generate任务==========");
return null;
} catch (InterruptedException | ExecutionException | BusinessException e) { } catch (InterruptedException | ExecutionException | BusinessException e) {
// 处理异常 // 处理异常
log.error("发生错误 " + e); log.error("发生错误 " + e);

View File

@@ -2195,7 +2195,9 @@ public class PythonService {
throw new BusinessException("design.interface.exception"); throw new BusinessException("design.interface.exception");
} }
/** 暂时未用 */ /**
* 暂时未用
*/
public String generateSketchCaption(String url) { public String generateSketchCaption(String url) {
//限流校验 //限流校验
AccessLimitUtils.validate("generateSketchCaption", 5); AccessLimitUtils.validate("generateSketchCaption", 5);
@@ -2432,7 +2434,7 @@ public class PythonService {
.readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒)
.writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒)
.build(); .build();
String url = accessPythonIp + ":" + accessPythonPort + "/api/generate_cancel/" + taskId + "/"; String url = accessPythonIp + ":" + accessPythonPort + "/api/generate_cancel/" + taskId;
Request request = new Request.Builder() Request request = new Request.Builder()
.url(url) .url(url)
// .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") // .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==")
@@ -2444,12 +2446,14 @@ public class PythonService {
response = client.newCall(request).execute(); response = client.newCall(request).execute();
} catch (IOException ioException) { } catch (IOException ioException) {
log.error("PythonService##cancelGenerateTask异常###{}", ExceptionUtil.getThrowableList(ioException)); log.error("PythonService##cancelGenerateTask异常###{}", ExceptionUtil.getThrowableList(ioException));
throw new BusinessException("generate.interface.error"); return null;
} }
if (response.code() != HttpURLConnection.HTTP_OK) { if (response.code() != HttpURLConnection.HTTP_OK) {
log.info("generate-python 取消请求失败");
return Boolean.FALSE; return Boolean.FALSE;
} }
log.info("generate-python 取消请求成功");
return Boolean.TRUE; return Boolean.TRUE;
} }

View File

@@ -423,7 +423,10 @@ public class GenerateServiceImpl extends ServiceImpl<GenerateMapper, Generate> i
public void cancelGenerate(String uniqueId) { public void cancelGenerate(String uniqueId) {
// 1、确认当前消息是否还在排队中 // 1、确认当前消息是否还在排队中
Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId);
if (exists) { Boolean flag = Boolean.FALSE;
if (exists) flag = redisUtil.getRank(consumptionOrderKey, uniqueId) > 1L ? Boolean.TRUE : Boolean.FALSE;
// 不管flag的默认值是true还是false只要exists为false&& 将短路
if (exists && flag) {
// 1.1、将需要取消的唯一id加入redis以便及时取消生成 // 1.1、将需要取消的唯一id加入redis以便及时取消生成
redisUtil.addToSet(cancelSetKey, uniqueId); redisUtil.addToSet(cancelSetKey, uniqueId);
// 1.2 将需要取消的id从redis的ConsumptionOrder中删除 // 1.2 将需要取消的id从redis的ConsumptionOrder中删除

View File

@@ -54,3 +54,24 @@ minio.bucketName.sysImage=aida-sys-image
minio.bucketName.users=aida-users minio.bucketName.users=aida-users
minio.bucketName.collectionElement=aida-collection-element minio.bucketName.collectionElement=aida-collection-element
redirect_url=http://18.167.251.121:7788 redirect_url=http://18.167.251.121:7788
spring.rabbitmq.host=18.167.251.121
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.redis.host=172.31.11.32
#spring.redis.host=18.167.251.121
spring.redis.port=6379
spring.redis.database=1
spring.redis.password=Aidlab
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-wait=5
redis.key.consumptionOrder=ConsumptionOrder
redis.key.cancelSet=CancelSet
redis.key.exceptionMap=ExceptionMap
redis.key.resultMap=ResultMap

View File

@@ -1,8 +1,8 @@
#<23><><EFBFBD><EFBFBD>application-test<73>ļ<EFBFBD>(<28><><EFBFBD>Ի<EFBFBD><D4BB><EFBFBD>) #<23><><EFBFBD><EFBFBD>application-test<73>ļ<EFBFBD>(<28><><EFBFBD>Ի<EFBFBD><D4BB><EFBFBD>)
spring.profiles.active=test #spring.profiles.active=test
#<23><><EFBFBD><EFBFBD>application-prod<6F>ļ<EFBFBD>(<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>) #<23><><EFBFBD><EFBFBD>application-prod<6F>ļ<EFBFBD>(<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>)
#spring.profiles.active=prod spring.profiles.active=prod
#<23><><EFBFBD><EFBFBD>application-dev<65>ļ<EFBFBD>(<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>) #<23><><EFBFBD><EFBFBD>application-dev<65>ļ<EFBFBD>(<28><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>)
#spring.profiles.active=dev #spring.profiles.active=dev