From 96858c2cc33174f6c6035f5aa942f8d752b5457e Mon Sep 17 00:00:00 2001 From: xupei Date: Wed, 24 Jan 2024 11:43:56 +0800 Subject: [PATCH] =?UTF-8?q?TASK:=E5=BC=82=E6=AD=A5=E8=B0=83=E7=94=A8genera?= =?UTF-8?q?te=E5=8F=8A=E5=8F=96=E6=B6=88generate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ai/da/common/RabbitMQ/MQConsumer.java | 32 ++-- .../ai/da/common/utils/AsyncCallerUtil.java | 18 +-- .../ai/da/controller/GenerateController.java | 10 +- .../com/ai/da/mapper/entity/Generate.java | 4 +- .../dto/GenerateThroughImageTextDTO.java | 2 +- .../ai/da/model/dto/GenerateToPythonDTO.java | 2 + .../java/com/ai/da/python/PythonService.java | 47 +++++- .../com/ai/da/service/GenerateService.java | 8 +- .../da/service/impl/GenerateServiceImpl.java | 141 ++++++++++-------- .../resources/application-test.properties | 7 +- 10 files changed, 159 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index 3e9127f5..29c7e939 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -48,12 +48,11 @@ public class MQConsumer { log.info("============start listening=========="); GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); - Long uniqueId = generateThroughImageTextDTO.getUniqueId(); - // 1、将消息从redis排队队列中删除 - redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + String uniqueId = generateThroughImageTextDTO.getUniqueId(); + try { // 2、判断当前消息是否在取消列表中 - Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, String.valueOf(uniqueId)); + Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId); if (isMember) { try { // 2.1 手动确认该消息 @@ -62,40 +61,43 @@ public class MQConsumer { log.error("手动确认,不返回队列重新消费"); } // 2.2 将该消息从取消列表中删除 - redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId)); +// redisUtil.removeFromSet(cancelSetKey, uniqueId); } else { GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); +// try { +// Thread.sleep(15000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } + // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); if (!Objects.isNull(generateCollectionVO)){ HashMap generateResult = new HashMap<>(); - generateResult.put(String.valueOf(uniqueId), JSONObject.toJSONString(generateCollectionVO)); + generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO)); // 将结果存在redis中 ,为空时不要存 redisUtil.addToMap(resultMapKey, generateResult); } } } catch (BusinessException e) { - log.error(e.getMessage()); + log.error(e.getMsg()); // channel.basicNack() 为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue try { // 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); } catch (IOException exception) { log.error("手动确认,取消返回队列,不再重新消费"); } // 将入参和错误信息存入数据库 - String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + " Exception message : " + e.getMessage(); + String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + + " Exception message : " + e.getMsg(); HashMap exceptionInfo = new HashMap<>(); exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage); // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } - -// log.info(JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class).toString()); -// try { -// Thread.sleep(10000); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } log.info("============end listening=========="); } diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index f8303f8f..1f80cc41 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; @Component public class AsyncCallerUtil { - public static Map waitingStatus = new HashMap<>(); + public static Map waitingStatus = new HashMap<>(); private static PythonService pythonService; @@ -27,9 +27,10 @@ public class AsyncCallerUtil { return CompletableFuture.supplyAsync(() -> pythonService.generateSketchOrPrint(generateToPython)); } - public List generate(GenerateToPythonDTO generateToPython, Long requestId) { - ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - waitingStatus.put(requestId, true); + public List generate(GenerateToPythonDTO generateToPython) { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); + String taskId = generateToPython.getTasks_id(); + waitingStatus.put(taskId, true); ScheduledFuture timeoutTask = null; try { @@ -37,10 +38,10 @@ public class AsyncCallerUtil { // 10秒后第一次确认,之后每隔10秒确认一次用户选择结果 timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> { // 调用另一个接口获取用户的选择 - if (!waitingStatus.get(requestId)) { + if (!waitingStatus.get(taskId)) { // 如果用户选择取消,则取消对generate的调用,cancel判断是否成功取消 generateResult.cancel(true); - waitingStatus.remove(requestId); + waitingStatus.remove(taskId); } System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); }, 10, 10, TimeUnit.SECONDS); @@ -54,16 +55,15 @@ public class AsyncCallerUtil { // 处理结果 System.out.println("generate 响应: " + result); System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai")); - waitingStatus.remove(requestId); + waitingStatus.remove(taskId); return result; } catch (InterruptedException | ExecutionException | BusinessException e) { // 处理异常 log.error("发生错误 : " + e); - e.printStackTrace(); // 取消定时任务 assert timeoutTask != null; timeoutTask.cancel(true); - throw new BusinessException("generate.interface.error"); + throw new BusinessException(e.getMessage()); } finally { // 关闭线程池 // executorService.shutdown(); diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 173370f2..0c1506b7 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -55,22 +55,20 @@ public class GenerateController { @ApiOperation(value = "发起生成请求,异步获取结果") @PostMapping("/prepare") - public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ - Long l = generateService.prepareForGenerate(generateThroughImageTextDTO); - // 防止long精度丢失,这里转为String类型进行传输 - return Response.success(String.valueOf(l)); + public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO) { + return Response.success(generateService.prepareForGenerate(generateThroughImageTextDTO)); } @ApiOperation(value = "取消继续生成") @GetMapping("/stopWaiting") - public Response stopWaiting(@RequestParam("uniqueId") Long uniqueId){ + public Response stopWaiting(@RequestParam("uniqueId") String uniqueId) { generateService.cancelGenerate(uniqueId); return Response.success("stop waiting successfully"); } @ApiOperation(value = "获取生成结果") @GetMapping("/result") - public Response getGenerateResult(@RequestParam("uniqueId") Long uniqueId){ + public Response getGenerateResult(@RequestParam("uniqueId") String uniqueId) { GenerateCollectionVO generateResult = generateService.getGenerateResult(uniqueId); return Response.success(generateResult); } diff --git a/src/main/java/com/ai/da/mapper/entity/Generate.java b/src/main/java/com/ai/da/mapper/entity/Generate.java index f45cb144..9fad23fb 100644 --- a/src/main/java/com/ai/da/mapper/entity/Generate.java +++ b/src/main/java/com/ai/da/mapper/entity/Generate.java @@ -28,9 +28,9 @@ public class Generate { private Long accountId; /** - * 唯一id,用于保持消息的唯一性 + * 唯一id */ - private Long uniqueId; + private String uniqueId; /** * Sketchboard Printboard diff --git a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java index df1c9ab9..fd45533c 100644 --- a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java +++ b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java @@ -46,5 +46,5 @@ public class GenerateThroughImageTextDTO { String timeZone; @ApiModelProperty("唯一id,用于保持消息唯一性") - Long uniqueId; + String uniqueId; } diff --git a/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java index 29c77243..93553db0 100644 --- a/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java +++ b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java @@ -22,4 +22,6 @@ public class GenerateToPythonDTO { private String version; private String gender; + + private String tasks_id; } diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index e12eef12..e8f37f2f 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -56,6 +56,9 @@ public class PythonService { private String accessPythonIp; @Value("${access.python.port:''}") private String accessPythonPort; + @Value("${access.generate.port:''}") + private String accessGeneratePort; + @Resource private PythonTAllInfoService pythonTAllInfoService; @@ -2251,9 +2254,10 @@ public class PythonService { Request request = new Request.Builder() // .url("http://18.167.251.121:9992") // .url("http://127.0.0.1:5000/api/diffusion") - .url(accessPythonIp + ":" + accessPythonPort + "/api/diffusion") +// .url(accessPythonIp + ":" + accessPythonPort + "/api/diffusion") + .url(accessPythonIp + ":" + accessPythonPort + "/api/generate_image") .method("POST", body) - .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") +// .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") .addHeader("Content-Type", "application/json") .build(); Response response = null; @@ -2263,23 +2267,28 @@ public class PythonService { response = client.newCall(request).execute(); } catch (IOException ioException) { log.error("PythonService##generateSketchOrPrint异常###{}", ExceptionUtil.getThrowableList(ioException)); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException(ioException.getMessage()); } //去除限流 AccessLimitUtils.validateOut("generateSketchOrPrint"); // 判断是否生成失败 - if (Objects.isNull(response) || Objects.isNull(response.body())) { + if (Objects.isNull(response.body())) { log.error("PythonService##generateSketchOrPrint异常###{}", "response or body is empty!"); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException("PythonService##generateSketchOrPrint异常###: response or body is empty!"); } else if (response.code() != HttpURLConnection.HTTP_OK){ log.error("PythonService##generateSketchOrPrint异常###{}", "Response error!Response code ## " + response.code() + " ##"); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException("PythonService##generateSketchOrPrint异常### Response error!Response code ## " + response.code() + " ##"); } else { try { bodyString = response.body().string(); } catch (IOException e) { - throw new BusinessException("generate.interface.error"); + log.error(e.getMessage()); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException(e.getMessage()); } } JSONObject jsonObject = JSON.parseObject(bodyString); @@ -2417,4 +2426,28 @@ public class PythonService { //生成失败 throw new BusinessException("cloth-classification.interface.exception"); } + + public void cancelGenerateTask(String taskId){ + OkHttpClient client = new OkHttpClient().newBuilder() + .connectTimeout(30, TimeUnit.SECONDS) + .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) + .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) + .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) + .build(); + HttpUrl.Builder builder = HttpUrl.parse(accessPythonIp + ":" + accessGeneratePort + "/cancel_task").newBuilder(); + builder.addQueryParameter("task_id",taskId); + Request request = new Request.Builder() + .url(builder.build().toString()) + .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") + .addHeader("Content-Type", "application/json") + .build(); + try { + log.info("getGenerateResult请求入参content###{}", taskId); + client.newCall(request).execute(); + } catch (IOException ioException) { + log.error("PythonService##getGenerateResult异常###{}", ExceptionUtil.getThrowableList(ioException)); + throw new BusinessException("generate.interface.error"); + } + } + } diff --git a/src/main/java/com/ai/da/service/GenerateService.java b/src/main/java/com/ai/da/service/GenerateService.java index 9f17e7ea..cb97d9f8 100644 --- a/src/main/java/com/ai/da/service/GenerateService.java +++ b/src/main/java/com/ai/da/service/GenerateService.java @@ -25,12 +25,12 @@ public interface GenerateService extends IService { List selectBatchByLibraryId(List libraryId); - GenerateCollectionVO getGenerateResult(Long uniqueId); + GenerateCollectionVO getGenerateResult(String uniqueId); - Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO); + String prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO); - Long getRankPosition(Long uniqueId); + Long getRankPosition(String uniqueId); - void cancelGenerate(Long uniqueId); + void cancelGenerate(String uniqueId); } diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index ba729e4b..5fe3c4a4 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.minio.errors.MinioException; import io.netty.util.internal.StringUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -35,6 +36,7 @@ import java.util.*; import static com.ai.da.common.enums.CollectionLevel1TypeEnum.*; +@Slf4j @Service public class GenerateServiceImpl extends ServiceImpl implements GenerateService { @@ -77,7 +79,7 @@ public class GenerateServiceImpl extends ServiceImpl i @Override public GenerateCaptionVO generateCaption(Long sketchElementId) { CollectionElement collectionElement = collectionElementMapper.selectById(sketchElementId); - if (Objects.isNull(collectionElement)){ + if (Objects.isNull(collectionElement)) { throw new BusinessException("the.image.does.not.exist.please.reselect"); } String url = collectionElement.getUrl(); @@ -101,14 +103,14 @@ public class GenerateServiceImpl extends ServiceImpl i generate.setLevel1Type(generateThroughImageTextDTO.getLevel1Type()); // 当level1type是sketchboard时,存数据库需要加上当前性别 generate.setGenerateType(generate.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? - generateType + " (" +generateThroughImageTextDTO.getGender() + ")": + generateType + " (" + generateThroughImageTextDTO.getGender() + ")" : generateType); generate.setModelName(StringUtil.isNullOrEmpty(generateThroughImageTextDTO.getVersion()) ? ModelNameEnum.MODEL_0.getCode() : generateThroughImageTextDTO.getVersion()); generate.setCreateDate(DateUtil.getByTimeZone(generateThroughImageTextDTO.getTimeZone())); String text = generateThroughImageTextDTO.getText(); Long elementId = generateThroughImageTextDTO.getCollectionElementId(); - validateGeneraType(generate, text, elementId,generateType); + validateGeneraType(generate, text, elementId, generateType); // 2.1 sketch或print在t_collection_element表中的信息是否需要更新 如 level2Type CollectionElement collectionElement = collectionElementService.editLevel2Type(elementId, generateThroughImageTextDTO.getLevel2Type()); @@ -120,11 +122,11 @@ public class GenerateServiceImpl extends ServiceImpl i String category = generateThroughImageTextDTO.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? "sketch" : generateThroughImageTextDTO.getLevel1Type().equals(PRINT_BOARD.getRealName()) ? "print" : "moodboard"; AsyncCallerUtil asyncCallerUtil = new AsyncCallerUtil(); - List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? null : collectionElement.getUrl(), - category, text, mode, "1", generateThroughImageTextDTO.getGender()),generateThroughImageTextDTO.getUniqueId()); + List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? "" : collectionElement.getUrl(), + category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); - if (CollectionUtils.isEmpty(generatedSketchUrl)){ + if (CollectionUtils.isEmpty(generatedSketchUrl)) { return null; } @@ -139,8 +141,8 @@ public class GenerateServiceImpl extends ServiceImpl i GenerateCollectionItemVO generateCollectionItemVO = new GenerateCollectionItemVO(); String md5 = MD5Utils.encryptFile(minioUtil.getPresignedUrl(item, 24 * 60), Boolean.FALSE); // 通过MD5值和level1Type,判断不同level1Type下相同的图片是否被like过 - List> libraryIdList = generateDetailMapper.getLibraryIdThroughMD5(md5, generateThroughImageTextDTO.getLevel1Type()); - if (!libraryIdList.isEmpty()){ + List> libraryIdList = generateDetailMapper.getLibraryIdThroughMD5(md5, generateThroughImageTextDTO.getLevel1Type()); + if (!libraryIdList.isEmpty()) { generateDetail.setIsLike((byte) 1); generateDetail.setLibraryId(libraryIdList.get(0).get("library_id")); generateCollectionItemVO.setIsLiked(Boolean.TRUE); @@ -161,22 +163,22 @@ public class GenerateServiceImpl extends ServiceImpl i return new GenerateCollectionVO(generate.getId(), collectionId, generatedCollectionItems); } - private void validateGeneraType(Generate generate, String text, Long elementId,String generateType) { + private void validateGeneraType(Generate generate, String text, Long elementId, String generateType) { switch (generateType) { case "text": - if (StringUtil.isNullOrEmpty(text)){ + if (StringUtil.isNullOrEmpty(text)) { throw new BusinessException("please.input.the.caption"); } generate.setText(text); break; case "image": - if (Objects.isNull(elementId)){ + if (Objects.isNull(elementId)) { throw new BusinessException("please.choose.an.image"); } generate.setCollectionElementId(elementId); break; case "text-image": - if (StringUtil.isNullOrEmpty(text) || Objects.isNull(elementId)){ + if (StringUtil.isNullOrEmpty(text) || Objects.isNull(elementId)) { throw new BusinessException("please.input.the.caption.and.choose.an.image"); } generate.setText(text); @@ -191,21 +193,21 @@ public class GenerateServiceImpl extends ServiceImpl i // 1、判断参数是否正确 // 1.1 必须参数是否非空 if (SKETCH_BOARD.getRealName().equals(generateLikeDTO.getLevel1Type())) { - if (StringUtil.isNullOrEmpty(generateLikeDTO.getLevel2Type())){ + if (StringUtil.isNullOrEmpty(generateLikeDTO.getLevel2Type())) { throw new BusinessException("level2Type.cannot.be.empty"); } - if (StringUtil.isNullOrEmpty(generateLikeDTO.getGender())){ + if (StringUtil.isNullOrEmpty(generateLikeDTO.getGender())) { throw new BusinessException("gender.cannot.be.empty"); } } // 1.2 判断参数是否真实有效 Long generateDetailId = generateLikeDTO.getGenerateDetailId(); GenerateDetail generateDetail = generateDetailMapper.selectById(generateDetailId); - if (Objects.isNull(generateDetail)){ + if (Objects.isNull(generateDetail)) { throw new BusinessException("generateItem.does.not.exist"); } Generate generate = getById(generateDetail.getGenerateId()); - if (!generateLikeDTO.getLevel1Type().equals(generate.getLevel1Type())){ + if (!generateLikeDTO.getLevel1Type().equals(generate.getLevel1Type())) { throw new BusinessException("level1Type.does.not.match"); } @@ -213,8 +215,8 @@ public class GenerateServiceImpl extends ServiceImpl i // 2.1、不能重复喜欢 // 2.1.1 判断该图片是否被喜欢过 Library libraryDetail = libraryService.getById(generateDetail.getLibraryId()); - if ( (Objects.nonNull(generateDetail.getLibraryId()) && !generateDetail.getLibraryId().equals(0L)) - || Objects.nonNull(libraryDetail)){ + if ((Objects.nonNull(generateDetail.getLibraryId()) && !generateDetail.getLibraryId().equals(0L)) + || Objects.nonNull(libraryDetail)) { throw new BusinessException("duplicate.likes.are.not.allowed"); } @@ -237,7 +239,7 @@ public class GenerateServiceImpl extends ServiceImpl i public Boolean generateDislike(Long generateDetailId, String timeZone) { // 1、确定generateDetail中是否有这条记录 GenerateDetail generateDetail = generateDetailMapper.selectById(generateDetailId); - if (Objects.isNull(generateDetail)){ + if (Objects.isNull(generateDetail)) { throw new BusinessException("generateItem.does.not.exist"); } @@ -287,7 +289,7 @@ public class GenerateServiceImpl extends ServiceImpl i generateDetailMapper.update(generateDetail, queryWrapper); } - public void updateLikeStatusBatch(List generateDetailIdList, Byte hasLike, Long libraryId, String timeZone){ + public void updateLikeStatusBatch(List generateDetailIdList, Byte hasLike, Long libraryId, String timeZone) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.in("id", generateDetailIdList); @@ -299,93 +301,101 @@ public class GenerateServiceImpl extends ServiceImpl i generateDetailMapper.update(generateDetail, queryWrapper); } - public List selectBatchByLibraryId(List libraryId){ + public List selectBatchByLibraryId(List libraryId) { QueryWrapper qw = new QueryWrapper<>(); - qw.in("library_id",libraryId); + qw.in("library_id", libraryId); return generateDetailMapper.selectList(qw); } @Override - public Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO) { + public String prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO) { // 1、参数检查,判断必须参数是否为空 - if (Objects.isNull(generateThroughImageTextDTO.getUserId())){ + if (Objects.isNull(generateThroughImageTextDTO.getUserId())) { throw new BusinessException("userId cannot be empty"); } String generateType = generateThroughImageTextDTO.getGenerateType(); - if (!GenerateModeEnum.getGenerateModeList().contains(generateType)){ + if (!GenerateModeEnum.getGenerateModeList().contains(generateType)) { throw new BusinessException("unknown.generate.type"); } String text = generateThroughImageTextDTO.getText(); Long elementId = generateThroughImageTextDTO.getCollectionElementId(); - validateGeneraType(new Generate(), text, elementId,generateType); + validateGeneraType(new Generate(), text, elementId, generateType); - // 2、确定当前排队人数总数,超过15个,暂停使用当前功能 - Long zSetTotal = redisUtil.getZSetTotal(consumptionOrderKey); - if (zSetTotal.equals(15L)){ - return null; + // 2、生成唯一id 使用uuid + String uuid = UUID.randomUUID().toString(); + +// SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); +// long snowflakeId = idWorker.nextId(); + + int num = 1; + // 判断与已经正常生成结果的uuid中有相同的id + while (redisUtil.isElementExistsInMap(resultMapKey, uuid) && num < 10) { + uuid = UUID.randomUUID().toString(); + num++; } - - // 3、生成唯一id - SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); - long snowflakeId = idWorker.nextId(); - - if (AsyncCallerUtil.waitingStatus.containsKey(snowflakeId)){ - snowflakeId = idWorker.nextId(); + // 无依据确定的数字 + if (num > 10){ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + uuid = UUID.randomUUID().toString(); } - generateThroughImageTextDTO.setUniqueId(snowflakeId); + generateThroughImageTextDTO.setUniqueId(uuid); String jsonString = JSON.toJSONString(generateThroughImageTextDTO); - // 4、加入redis排队,便于获取实时排队信息 + // 3、加入redis排队,便于获取实时排队信息 Double maxScore = redisUtil.getMaxScore(consumptionOrderKey); - redisUtil.addToZSet(consumptionOrderKey, String.valueOf(snowflakeId),maxScore); + redisUtil.addToZSet(consumptionOrderKey, uuid, maxScore); - // 5、将消息发布到MQ消息队列 + // 4、将消息发布到MQ消息队列 rabbitMQService.publishMessage(jsonString); - // 6、返回唯一id - return snowflakeId; + // 5、返回唯一id + return uuid; } @Override - public Long getRankPosition(Long uniqueId) { - return redisUtil.getRank(consumptionOrderKey, String.valueOf(uniqueId)); + public Long getRankPosition(String uniqueId) { + return redisUtil.getRank(consumptionOrderKey, uniqueId); } @Override - public GenerateCollectionVO getGenerateResult(Long uniqueId) { + public GenerateCollectionVO getGenerateResult(String uniqueId) { // 1、判断该请求是否已经异常 - Boolean isMember = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); - if (isMember){ + Boolean isMember = redisUtil.isElementExistsInMap(exceptionMapKey, uniqueId); + if (isMember) { throw new BusinessException("generate.interface.error"); } // 2、判断该请求是否还在排队 - Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); - if (existsInZSet){ + Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); + if (existsInZSet) { // 排队中,给出当前排序位置 return new GenerateCollectionVO(getRankPosition(uniqueId) + 1L); } // 3、判断redis中有没有 - boolean hasHashKey = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); - if (hasHashKey){ + boolean hasHashKey = redisUtil.isElementExistsInMap(resultMapKey, uniqueId); + if (hasHashKey) { // 3.1 有直接从redis中拿 - String resultString = redisUtil.getMapValue(resultMapKey, String.valueOf(uniqueId)); - return JSONObject.parseObject(resultString,GenerateCollectionVO.class); + String resultString = redisUtil.getMapValue(resultMapKey, uniqueId); + return JSONObject.parseObject(resultString, GenerateCollectionVO.class); } // 3.2 判断数据库中有没有 Generate generate = selectByUniqueId(uniqueId); - if (Objects.isNull(generate)){ + if (Objects.isNull(generate)) { // 3.3 还没执行完,给出当前位置 return new GenerateCollectionVO(0L); } Long generateId = generate.getId(); QueryWrapper qw = new QueryWrapper<>(); - qw.eq("generate_id",generateId); + qw.eq("generate_id", generateId); List generateDetails = generateDetailMapper.selectList(qw); - if (CollectionUtils.isEmpty(generateDetails)){ + if (CollectionUtils.isEmpty(generateDetails)) { // 会有这种情况吗?存到generate中,但是还没存到generateDetail中 return new GenerateCollectionVO(0L); } @@ -401,7 +411,7 @@ public class GenerateServiceImpl extends ServiceImpl i return new GenerateCollectionVO(generateId, null, generatedCollectionItems); } - public Generate selectByUniqueId(Long uniqueId){ + public Generate selectByUniqueId(String uniqueId){ QueryWrapper qw = new QueryWrapper<>(); qw.eq("unique_id",uniqueId); @@ -409,23 +419,24 @@ public class GenerateServiceImpl extends ServiceImpl i } @Override - public void cancelGenerate(Long uniqueId) { + public void cancelGenerate(String uniqueId) { // 1、确认当前消息是否还在排队中 - Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); - if (exists){ + Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); + if (exists) { // 1.1、将需要取消的唯一id加入redis,以便及时取消生成 - redisUtil.addToSet(cancelSetKey, String.valueOf(uniqueId)); + redisUtil.addToSet(cancelSetKey, uniqueId); // 1.2 将需要取消的id从redis的ConsumptionOrder中删除 - redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); }else { // 2、判断该消息是否异常 - boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); + boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, uniqueId); // 3、判断该消息是否已经消费结束 - Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); + Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, uniqueId); if (!hasKey && !existsInResult){ // 设置取等待状态为false AsyncCallerUtil.waitingStatus.put(uniqueId,false); // 3、直接发送取消请求到python端 + pythonService.cancelGenerateTask(uniqueId); } } } diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 471ccb08..490d7868 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -50,8 +50,8 @@ spring.servlet.multipart.max-request-size= 5MB #access.python.ip=http://43.198.80.117 access.python.ip=http://18.167.251.121 #access.python.ip=http://18.167.251.121:9991/ -#access.python.port=9992 -access.python.port=9991 +access.python.port=9992 +#access.python.port=9991 # minIO服务配置之信息 minio.endpoint=https://www.minio.aida.com.hk:9000 @@ -70,7 +70,8 @@ spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ -spring.redis.host=172.31.11.32 +#spring.redis.host=172.31.11.32 +spring.redis.host=18.167.251.121 spring.redis.port=6379 spring.redis.database=1 spring.redis.password=Aidlab