From c31ecfb1e9a76bf72d7c5548e8315c7abc5d53db Mon Sep 17 00:00:00 2001 From: shahaibo <1023316923@qq.com> Date: Thu, 18 Jan 2024 13:17:16 +0800 Subject: [PATCH 01/15] =?UTF-8?q?TASK:docker=E5=9B=9E=E9=80=80;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c1843a76..b9bca48c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,10 @@ version: "3" services: aida_back: - container_name: stable-version-aida-back + container_name: develop-version-aida-back build: . volumes: # 数据挂载 - /workspace/home/aida/file/:/workspace/home/aida/file/ ports: - - "10086:5567" \ No newline at end of file + - "10090:5567" \ No newline at end of file From 577dc948504df5dd4975ec7cb4a0896aabd053cc Mon Sep 17 00:00:00 2001 From: xupei Date: Sun, 21 Jan 2024 14:14:55 +0800 Subject: [PATCH 02/15] =?UTF-8?q?1=E3=80=81generate=20=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E7=94=9F=E6=88=90=E5=8F=8A=E8=8E=B7=E5=8F=96=E6=8E=92=E9=98=9F?= =?UTF-8?q?=E6=83=85=E5=86=B5=202=E3=80=81generate=20=E5=8F=96=E6=B6=88?= =?UTF-8?q?=E7=94=9F=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 13 ++ .../com/ai/da/common/RabbitMQ/MQConfig.java | 45 +++++ .../com/ai/da/common/RabbitMQ/MQConsumer.java | 108 +++++++++++ .../ai/da/common/RabbitMQ/MQPublisher.java | 39 ++++ .../com/ai/da/common/config/RedisConfig.java | 42 +++++ .../ai/da/common/utils/AsyncCallerUtil.java | 73 ++++++++ .../com/ai/da/common/utils/RedisUtil.java | 126 +++++++++++++ .../com/ai/da/common/utils/SnowflakeUtil.java | 137 ++++++++++++++ .../ai/da/controller/GenerateController.java | 18 ++ .../com/ai/da/mapper/entity/Generate.java | 5 + .../dto/GenerateThroughImageTextDTO.java | 7 + .../ai/da/model/dto/GenerateToPythonDTO.java | 25 +++ .../ai/da/model/vo/GenerateCollectionVO.java | 7 + .../java/com/ai/da/python/PythonService.java | 18 +- .../com/ai/da/service/GenerateService.java | 9 + .../com/ai/da/service/RabbitMQService.java | 11 ++ .../impl/CollectionElementServiceImpl.java | 8 +- .../da/service/impl/GenerateServiceImpl.java | 176 ++++++++++++++++-- .../da/service/impl/RabbitMQServiceImpl.java | 76 ++++++++ .../resources/application-test.properties | 18 ++ src/main/resources/application.properties | 4 +- 21 files changed, 929 insertions(+), 36 deletions(-) create mode 100644 src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java create mode 100644 src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java create mode 100644 src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java create mode 100644 src/main/java/com/ai/da/common/config/RedisConfig.java create mode 100644 src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java create mode 100644 src/main/java/com/ai/da/common/utils/RedisUtil.java create mode 100644 src/main/java/com/ai/da/common/utils/SnowflakeUtil.java create mode 100644 src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java create mode 100644 src/main/java/com/ai/da/service/RabbitMQService.java create mode 100644 src/main/java/com/ai/da/service/impl/RabbitMQServiceImpl.java diff --git a/pom.xml b/pom.xml index 50c4b965..14913240 100644 --- a/pom.xml +++ b/pom.xml @@ -157,6 +157,19 @@ commons-fileupload 1.4 + + + + org.springframework.boot + spring-boot-starter-amqp + + + + + org.apache.commons + commons-pool2 + + diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java new file mode 100644 index 00000000..880def93 --- /dev/null +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java @@ -0,0 +1,45 @@ +package com.ai.da.common.RabbitMQ; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.beans.factory.annotation.Value; + +@Configuration +public class MQConfig { + + public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange"; + public static final String GENERATE_QUEUE = "generate-queue"; + + public MQConfig() { + } + + @Bean + FanoutExchange fanoutRasaExchange() { + return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); + } + + /** + * 创建队列,使用工作模式,不用定义交换机 + */ + @Bean + public Queue queueRasa() { + return new Queue(GENERATE_QUEUE); + } + + /** + * 将队列绑定到交换机上【队列订阅交换机】 + */ + @Bean + Binding bindingExchangeRasa() { + return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); + } + +} diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java new file mode 100644 index 00000000..bce55fcc --- /dev/null +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -0,0 +1,108 @@ +package com.ai.da.common.RabbitMQ; + +import com.ai.da.common.config.exception.BusinessException; +import com.ai.da.common.utils.RedisUtil; +import com.ai.da.model.dto.GenerateThroughImageTextDTO; +import com.ai.da.model.vo.GenerateCollectionVO; +import com.ai.da.service.GenerateService; +import com.alibaba.fastjson.JSONObject; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.IOException; +import java.util.HashMap; +import java.util.Objects; + + +@Slf4j +@Component +public class MQConsumer { + + @Resource + private GenerateService generateService; + + @Resource + private RedisUtil redisUtil; + + @Value("${redis.key.consumptionOrder}") + private String consumptionOrderKey; + + @Value("${redis.key.cancelSet}") + private String cancelSetKey; + + @Value("${redis.key.exceptionMap}") + private String exceptionMapKey; + + @Value("${redis.key.resultMap}") + private String resultMapKey; + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generate(Message msg, Channel channel) { + log.info("============start listening=========="); + + GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); + Long uniqueId = generateThroughImageTextDTO.getUniqueId(); + // 1、将消息从redis排队队列中删除 + redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + try { + // 2、判断当前消息是否在取消列表中 + Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, String.valueOf(uniqueId)); + if (isMember) { + try { + // 2.1 手动确认该消息 + channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + } catch (IOException ex) { + log.error("手动确认,不返回队列重新消费"); + } + // 2.2 将该消息从取消列表中删除 + redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId)); + } else { + try { + // 模拟耗时 + Thread.sleep(40000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); + if (!Objects.isNull(generateCollectionVO)){ + HashMap generateResult = new HashMap<>(); + generateResult.put(String.valueOf(uniqueId), JSONObject.toJSONString(generateCollectionVO)); + // 将结果存在redis中 ,为空时不要存 + redisUtil.addToMap(resultMapKey, generateResult); + } + + } + } catch (BusinessException e) { + log.error(e.getMessage()); + // channel.basicNack() 为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue + try { + // 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。 + channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + } catch (IOException exception) { + log.error("手动确认,取消返回队列,不再重新消费"); + } + // 将入参和错误信息存入数据库 + String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + " Exception message : " + e.getMessage(); + HashMap exceptionInfo = new HashMap<>(); + exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage); + // 存redis + redisUtil.addToMap(exceptionMapKey, exceptionInfo); + } + +// log.info(JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class).toString()); +// try { +// Thread.sleep(10000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } + log.info("============end listening=========="); + } + +} diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java new file mode 100644 index 00000000..40b7baea --- /dev/null +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java @@ -0,0 +1,39 @@ +package com.ai.da.common.RabbitMQ; + +import com.ai.da.service.RabbitMQService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Slf4j +@Component +public class MQPublisher { + + private final String url = "http://localhost:15672/api/queues/%2f/generate-queue"; + + @Resource + private AmqpTemplate amqpTemplate; + @Resource + private RabbitMQService rabbitMQService; + + public void sendGenerateMessage(String mm) { + log.info("send message:" + mm); + amqpTemplate.convertAndSend(MQConfig.GENERATE_QUEUE, mm); + + } + + public void getMsgCount() { +//// AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(MQConfig.GENERATE_QUEUE)); +// +// QueueInformation queueInfo = rabbitAdmin.getQueueInfo(MQConfig.GENERATE_QUEUE); +//// assert queueInfo != null; +// +//// System.out.println(declareOk.getMessageCount()); +// System.out.println(queueInfo.getMessageCount()); +//// return declareOk.getMessageCount(); +// return queueInfo.getMessageCount(); + } +} diff --git a/src/main/java/com/ai/da/common/config/RedisConfig.java b/src/main/java/com/ai/da/common/config/RedisConfig.java new file mode 100644 index 00000000..5f01753c --- /dev/null +++ b/src/main/java/com/ai/da/common/config/RedisConfig.java @@ -0,0 +1,42 @@ +package com.ai.da.common.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +@Configuration +public class RedisConfig { + @Bean(name = "redisTemplate") + public RedisTemplate getRedisTemplate(RedisConnectionFactory factory) { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(factory); + + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + + redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化类型 + + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + // 方法过期,改为下面代码 +// objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); + objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, + ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); + jackson2JsonRedisSerializer.setObjectMapper(objectMapper); + jackson2JsonRedisSerializer.setObjectMapper(objectMapper); + + redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化类型 + redisTemplate.setHashKeySerializer(stringRedisSerializer); + redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } +} diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java new file mode 100644 index 00000000..8f5dac91 --- /dev/null +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -0,0 +1,73 @@ +package com.ai.da.common.utils; + +import com.ai.da.common.config.exception.BusinessException; +import com.ai.da.model.dto.GenerateToPythonDTO; +import com.ai.da.python.PythonService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.*; + +@Slf4j +@Component +public class AsyncCallerUtil { + + public static Map waitingStatus = new HashMap<>(); + + private static PythonService pythonService; + + @Autowired + public void setPythonService(PythonService pythonService) { + AsyncCallerUtil.pythonService = pythonService; + } + + public CompletableFuture> callGenerateAsync(GenerateToPythonDTO generateToPython) { + return CompletableFuture.supplyAsync(() -> pythonService.generateSketchOrPrint(generateToPython)); + } + + public List generate(GenerateToPythonDTO generateToPython, Long requestId) { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + waitingStatus.put(requestId, true); + ScheduledFuture timeoutTask = null; + + try { + CompletableFuture> generateResult = callGenerateAsync(generateToPython); + // 10秒后第一次确认,之后每隔10秒确认一次用户选择结果 + timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> { + // 调用另一个接口获取用户的选择 + if (!waitingStatus.get(requestId)) { + // 如果用户选择取消,则取消对generate的调用,cancel判断是否成功取消 + generateResult.cancel(true); + waitingStatus.remove(requestId); + } + System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); + }, 10, 10, TimeUnit.SECONDS); + + System.out.println("开始阻塞 : " + DateUtil.getByTimeZone("Asia/Shanghai")); + // 阻塞,等待结果 + List result = generateResult.get(); + // 取消定时任务 + timeoutTask.cancel(true); + + // 处理结果 + System.out.println("generate 响应: " + result); + System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai")); + waitingStatus.remove(requestId); + return result; + } catch (InterruptedException | ExecutionException | BusinessException e) { + // 处理异常 + log.error("发生错误 : " + e); + // 取消定时任务 + assert timeoutTask != null; + timeoutTask.cancel(true); + throw new BusinessException("generate.interface.error"); + } finally { + // 关闭线程池 +// executorService.shutdown(); +// scheduledExecutorService.shutdown(); + } + } + +} diff --git a/src/main/java/com/ai/da/common/utils/RedisUtil.java b/src/main/java/com/ai/da/common/utils/RedisUtil.java new file mode 100644 index 00000000..5121e883 --- /dev/null +++ b/src/main/java/com/ai/da/common/utils/RedisUtil.java @@ -0,0 +1,126 @@ +package com.ai.da.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.Map; +import java.util.Set; + +@Slf4j +@Component +public class RedisUtil { + + @Resource + private RedisTemplate redisTemplate; + + //- - - - - - - - - - - - - - - - - - - - - ZSet类型 - - - - - - - - - - - - - - - - - - - - + + /** + * 向ZSet中添加元素 + */ + public void addToZSet(String key, String value, Double score) { + redisTemplate.opsForZSet().add(key, value, score); + } + + /** + * 从ZSet中删除元素 + */ + public void removeFromZSet(String key, String value) { + redisTemplate.opsForZSet().remove(key, value); + } + + /** + * 获取指定元素的当前排列顺序 + */ + public Long getRank(String key, String value) { + return redisTemplate.opsForZSet().rank(key, value); + } + + /** + * 获取当前ZSet中的最大score + */ + public Double getMaxScore(String key) { + Set> set = redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 0); + + if (!CollectionUtils.isEmpty(set)) { + Double score = set.iterator().next().getScore(); + return score + 1.0; + } else { + return 1.0; + } + } + + /** + * 判断元素是否存在 + */ + public Boolean isElementExistsInZSet(String key, String value) { + return redisTemplate.opsForZSet().score(key, value) != null; + } + + /** + * 获取当前ZSet中数据量的总和 + */ + public Long getZSetTotal(String key) { + return redisTemplate.opsForZSet().zCard(key); + } + + //- - - - - - - - - - - - - - - - - - - - - set类型 - - - - - - - - - - - - - - - - - - - - + + /** + * 将数据放入set缓存 + */ + public void addToSet(String key, String value) { + redisTemplate.opsForSet().add(key, value); + } + + /** + * 弹出变量中的元素 + */ + public void removeFromSet(String key, String value) { + redisTemplate.opsForSet().remove(key, value); + } + + /** + * 检查给定的元素是否在变量中。 + */ + public Boolean isElementExistsInSet(String key, String obj) { + return redisTemplate.opsForSet().isMember(key, obj); + } + + + //- - - - - - - - - - - - - - - - - - - - - hash类型 - - - - - - - - - - - - - - - - - - - - + + /** + * 加入缓存 + */ + public void addToMap(String key, Map map) { + redisTemplate.opsForHash().putAll(key, map); + } + + /** + * 验证指定 key 下 有没有指定的 hashkey + */ + public Boolean isElementExistsInMap(String key, String hashKey) { + return redisTemplate.opsForHash().hasKey(key, hashKey); + } + + /** + * 获取指定key的值string + */ + public String getMapValue(String key1, String key2) { + return String.valueOf(redisTemplate.opsForHash().get(key1, key2)); + } + + /** + * 删除指定 hash 的 HashKey + * + * @return 删除成功的 数量 + */ + public Long removeFromMap(String key, String hashKeys) { + return redisTemplate.opsForHash().delete(key, hashKeys); + } +} diff --git a/src/main/java/com/ai/da/common/utils/SnowflakeUtil.java b/src/main/java/com/ai/da/common/utils/SnowflakeUtil.java new file mode 100644 index 00000000..03298fb4 --- /dev/null +++ b/src/main/java/com/ai/da/common/utils/SnowflakeUtil.java @@ -0,0 +1,137 @@ +package com.ai.da.common.utils; + + +public class SnowflakeUtil { + + // ==============================Fields=========================================== + /** 开始时间截 (2015-01-01) */ + private final long twepoch = 1420041600000L; + + /** 机器id所占的位数 */ + private final long workerIdBits = 5L; + + /** 数据标识id所占的位数 */ + private final long datacenterIdBits = 5L; + + /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** 支持的最大数据标识id,结果是31 */ + private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); + + /** 序列在id中占的位数 */ + private final long sequenceBits = 12L; + + /** 机器ID向左移12位 */ + private final long workerIdShift = sequenceBits; + + /** 数据标识id向左移17位(12+5) */ + private final long datacenterIdShift = sequenceBits + workerIdBits; + + /** 时间截向左移22位(5+5+12) */ + private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + + /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** 工作机器ID(0~31) */ + private long workerId; + + /** 数据中心ID(0~31) */ + private long datacenterId; + + /** 毫秒内序列(0~4095) */ + private long sequence = 0L; + + /** 上次生成ID的时间截 */ + private long lastTimestamp = -1L; + + //==============================Constructors===================================== + /** + * 构造函数 + * @param workerId 工作ID (0~31) + * @param datacenterId 数据中心ID (0~31) + */ + public SnowflakeUtil(long workerId, long datacenterId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format + ("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (datacenterId > maxDatacenterId || datacenterId < 0) { + throw new IllegalArgumentException(String.format + ("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); + } + this.workerId = workerId; + this.datacenterId = datacenterId; + } + + // ==============================Methods========================================== + /** + * 获得下一个ID (该方法是线程安全的) + * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format + ("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) // + | (datacenterId << datacenterIdShift) // + | (workerId << workerIdShift) // + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + //==============================Test============================================= + /** 测试 */ + public static void main(String[] args) { + SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); + long id = idWorker.nextId(); + System.out.println("id:"+id); + //id:768842202204864512 + } +} + diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 5fa712cc..27a20693 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -53,4 +53,22 @@ public class GenerateController { return Response.success(generateService.generateDislike(generateDetailId, timeZone)); } + @PostMapping("/prepare") + public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ + return Response.success(generateService.prepareForGenerate(generateThroughImageTextDTO)); + } + + @ApiOperation(value = "取消继续生成") + @PostMapping("/stopWaiting") + public Response stopWaiting(@RequestParam("uniqueId") Long uniqueId){ + generateService.cancelGenerate(uniqueId); + return Response.success("stop waiting successfully"); + } + + @ApiOperation(value = "获取生成结果") + @PostMapping("/result") + public Response getGenerateResult(@RequestParam("uniqueId") Long uniqueId){ + GenerateCollectionVO generateResult = generateService.getGenerateResult(uniqueId); + return Response.success(generateResult); + } } diff --git a/src/main/java/com/ai/da/mapper/entity/Generate.java b/src/main/java/com/ai/da/mapper/entity/Generate.java index fc1f4e33..f45cb144 100644 --- a/src/main/java/com/ai/da/mapper/entity/Generate.java +++ b/src/main/java/com/ai/da/mapper/entity/Generate.java @@ -27,6 +27,11 @@ public class Generate { */ private Long accountId; + /** + * 唯一id,用于保持消息的唯一性 + */ + private Long uniqueId; + /** * Sketchboard Printboard */ diff --git a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java index ca12d04a..df1c9ab9 100644 --- a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java +++ b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java @@ -5,10 +5,14 @@ import io.swagger.annotations.ApiModelProperty; import lombok.Data; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; @Data @ApiModel("GenerateThroughImageTextDTO") public class GenerateThroughImageTextDTO { + @NotNull(message = "userId cannot be empty") + @ApiModelProperty("用户id") + Long userId; @ApiModelProperty("caption") String text; @@ -40,4 +44,7 @@ public class GenerateThroughImageTextDTO { @NotBlank(message = "timeZone cannot be empty!") @ApiModelProperty("本地时区,比如 'Asia/Tokyo' 东京时间 , 'Asia/Shanghai' 北京时间 由js本地获取") String timeZone; + + @ApiModelProperty("唯一id,用于保持消息唯一性") + Long uniqueId; } diff --git a/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java new file mode 100644 index 00000000..29c77243 --- /dev/null +++ b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java @@ -0,0 +1,25 @@ +package com.ai.da.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class GenerateToPythonDTO { + + private Long user_id; + + private String image_url; + + private String category; + + private String str; + + private Integer mode; + + private String version; + + private String gender; +} diff --git a/src/main/java/com/ai/da/model/vo/GenerateCollectionVO.java b/src/main/java/com/ai/da/model/vo/GenerateCollectionVO.java index 433e86c8..aa8fb7c6 100644 --- a/src/main/java/com/ai/da/model/vo/GenerateCollectionVO.java +++ b/src/main/java/com/ai/da/model/vo/GenerateCollectionVO.java @@ -19,6 +19,13 @@ public class GenerateCollectionVO { @ApiModelProperty("生成的图片信息") private List generatedCollectionItems; + @ApiModelProperty("在当前队列中的排序") + private Long rankPosition; + + public GenerateCollectionVO(Long rankPosition) { + this.rankPosition = rankPosition; + } + public GenerateCollectionVO(Long generateId, Long collectionId, List generatedCollectionItems) { this.generateId = generateId; this.collectionId = collectionId; diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index 578de9ef..e12eef12 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -23,7 +23,6 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.netty.util.internal.StringUtil; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import org.springframework.beans.factory.annotation.Value; @@ -2238,7 +2237,7 @@ public class PythonService { throw new BusinessException("system error!"); } - public List generateSketchOrPrint(Long userId, String url, String category, String text, int mode, String modelName, String gender) { + public List generateSketchOrPrint(GenerateToPythonDTO generateToPythonDTO) { //限流校验 AccessLimitUtils.validate("generateSketchOrPrint", 5); OkHttpClient client = new OkHttpClient().newBuilder() @@ -2248,18 +2247,10 @@ public class PythonService { .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) .build(); MediaType mediaType = MediaType.parse("application/json"); - Map content = Maps.newHashMap(); - content.put("user_id", userId); - content.put("image_url", url); - content.put("category", category); - content.put("mode", mode); - content.put("str", text); - content.put("version", "1"); - content.put("gender", gender); - RequestBody body = RequestBody.create(mediaType, JSON.toJSONString(content, SerializerFeature.WriteMapNullValue)); + RequestBody body = RequestBody.create(mediaType, JSON.toJSONString(generateToPythonDTO, SerializerFeature.WriteMapNullValue)); Request request = new Request.Builder() -// .url(accessPythonIp + ":2828/aida/diffusion") // .url("http://18.167.251.121:9992") +// .url("http://127.0.0.1:5000/api/diffusion") .url(accessPythonIp + ":" + accessPythonPort + "/api/diffusion") .method("POST", body) .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") @@ -2268,10 +2259,11 @@ public class PythonService { Response response = null; String bodyString ; try { - log.info("generateSketchOrPrint请求入参content###{}", JSON.toJSONString(content, SerializerFeature.WriteMapNullValue)); + log.info("generateSketchOrPrint请求入参content###{}", JSON.toJSONString(generateToPythonDTO, SerializerFeature.WriteMapNullValue)); response = client.newCall(request).execute(); } catch (IOException ioException) { log.error("PythonService##generateSketchOrPrint异常###{}", ExceptionUtil.getThrowableList(ioException)); + throw new BusinessException("generate.interface.error"); } //去除限流 AccessLimitUtils.validateOut("generateSketchOrPrint"); diff --git a/src/main/java/com/ai/da/service/GenerateService.java b/src/main/java/com/ai/da/service/GenerateService.java index 0d92a07f..9f17e7ea 100644 --- a/src/main/java/com/ai/da/service/GenerateService.java +++ b/src/main/java/com/ai/da/service/GenerateService.java @@ -24,4 +24,13 @@ public interface GenerateService extends IService { void updateLikeStatusBatch(List generateDetailIdList, Byte hasLike, Long libraryId, String timeZone); List selectBatchByLibraryId(List libraryId); + + GenerateCollectionVO getGenerateResult(Long uniqueId); + + Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO); + + Long getRankPosition(Long uniqueId); + + void cancelGenerate(Long uniqueId); + } diff --git a/src/main/java/com/ai/da/service/RabbitMQService.java b/src/main/java/com/ai/da/service/RabbitMQService.java new file mode 100644 index 00000000..b03a578d --- /dev/null +++ b/src/main/java/com/ai/da/service/RabbitMQService.java @@ -0,0 +1,11 @@ +package com.ai.da.service; + +import org.springframework.stereotype.Service; + +@Service +public interface RabbitMQService { + + void publishMessage(String message); + + Integer getMessageCount(String queueUrl); +} diff --git a/src/main/java/com/ai/da/service/impl/CollectionElementServiceImpl.java b/src/main/java/com/ai/da/service/impl/CollectionElementServiceImpl.java index 1c6307d9..a44defca 100644 --- a/src/main/java/com/ai/da/service/impl/CollectionElementServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/CollectionElementServiceImpl.java @@ -810,9 +810,11 @@ public class CollectionElementServiceImpl extends ServiceImpl i @Resource private MinioUtil minioUtil; + @Resource + private RabbitMQService rabbitMQService; + + @Resource + private RedisUtil redisUtil; + + @Value("${redis.key.consumptionOrder}") + private String consumptionOrderKey; + + @Value("${redis.key.cancelSet}") + private String cancelSetKey; + + @Value("${redis.key.exceptionMap}") + private String exceptionMapKey; + + @Value("${redis.key.resultMap}") + private String resultMapKey; + @Override public GenerateCaptionVO generateCaption(Long sketchElementId) { CollectionElement collectionElement = collectionElementMapper.selectById(sketchElementId); @@ -69,16 +91,13 @@ public class GenerateServiceImpl extends ServiceImpl i @Transactional(rollbackFor = Exception.class) public GenerateCollectionVO generateThroughImageText(GenerateThroughImageTextDTO generateThroughImageTextDTO) { // 1、获取用户信息 - AuthPrincipalVo userHolder = UserContext.getUserHolder(); + Long accountId = generateThroughImageTextDTO.getUserId(); String generateType = generateThroughImageTextDTO.getGenerateType(); - Long accountId = userHolder.getId(); - if (!GenerateModeEnum.getGenerateModeList().contains(generateType)){ - throw new BusinessException("unknown.generate.type"); - } // 2、判断必须入参是否为非空 Generate generate = new Generate(); generate.setAccountId(accountId); + generate.setUniqueId(generateThroughImageTextDTO.getUniqueId()); generate.setLevel1Type(generateThroughImageTextDTO.getLevel1Type()); // 当level1type是sketchboard时,存数据库需要加上当前性别 generate.setGenerateType(generate.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? @@ -87,27 +106,30 @@ public class GenerateServiceImpl extends ServiceImpl i generate.setModelName(StringUtil.isNullOrEmpty(generateThroughImageTextDTO.getVersion()) ? ModelNameEnum.MODEL_0.getCode() : generateThroughImageTextDTO.getVersion()); generate.setCreateDate(DateUtil.getByTimeZone(generateThroughImageTextDTO.getTimeZone())); - String text = generateThroughImageTextDTO.getText(); Long elementId = generateThroughImageTextDTO.getCollectionElementId(); validateGeneraType(generate, text, elementId,generateType); - // 3、将请求信息落库 - // 3.1 sketch或print在t_collection_element表中的信息是否需要更新 如 level2Type + // 2.1 sketch或print在t_collection_element表中的信息是否需要更新 如 level2Type CollectionElement collectionElement = collectionElementService.editLevel2Type(elementId, generateThroughImageTextDTO.getLevel2Type()); - // 3.2 将本次generate的请求信息添加到t_generate表中 - save(generate); - - // 4、向模型发起请求 + // 3、向模型发起请求 int mode = GenerateModeEnum.TEXT.getValue().equals(generateType) ? GenerateModeEnum.TEXT.getCode() : GenerateModeEnum.TEXT_IMAGE.getCode(); String category = generateThroughImageTextDTO.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? "sketch" : generateThroughImageTextDTO.getLevel1Type().equals(PRINT_BOARD.getRealName()) ? "print" : "moodboard"; -// text = !StringUtil.isNullOrEmpty(text) && generateThroughImageTextDTO.getVersion().equals("1") ? "painting style, " + text : text; - List generatedSketchUrl = pythonService.generateSketchOrPrint(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), - category, text, mode, generateThroughImageTextDTO.getVersion(), generateThroughImageTextDTO.getGender()); + AsyncCallerUtil asyncCallerUtil = new AsyncCallerUtil(); + List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? null : collectionElement.getUrl(), + category, text, mode, "1", generateThroughImageTextDTO.getGender()),0L); +// List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), +// category, text, mode, "1", generateThroughImageTextDTO.getGender())); + if (CollectionUtils.isEmpty(generatedSketchUrl)){ + return null; + } + + // 4、将请求信息落库,将本次generate的请求信息添加到t_generate表中 + save(generate); // 5、处理模型返回的数据 // 5.1 将相应的url保存到数据库 @@ -283,4 +305,122 @@ public class GenerateServiceImpl extends ServiceImpl i return generateDetailMapper.selectList(qw); } + + @Override + public Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO) { + // 1、参数检查,判断必须参数是否为空 + if (Objects.isNull(generateThroughImageTextDTO.getUserId())){ + throw new BusinessException("userId cannot be empty"); + } + String generateType = generateThroughImageTextDTO.getGenerateType(); + if (!GenerateModeEnum.getGenerateModeList().contains(generateType)){ + throw new BusinessException("unknown.generate.type"); + } + String text = generateThroughImageTextDTO.getText(); + Long elementId = generateThroughImageTextDTO.getCollectionElementId(); + validateGeneraType(new Generate(), text, elementId,generateType); + + // 2、生成唯一id + SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); + long snowflakeId = idWorker.nextId(); + + if (AsyncCallerUtil.waitingStatus.containsKey(snowflakeId)){ + snowflakeId = idWorker.nextId(); + } + generateThroughImageTextDTO.setUniqueId(snowflakeId); + String jsonString = JSON.toJSONString(generateThroughImageTextDTO); + + // 3、加入redis排队,便于获取实时排队信息 + Double maxScore = redisUtil.getMaxScore(consumptionOrderKey); + redisUtil.addToZSet(consumptionOrderKey, String.valueOf(snowflakeId),maxScore); + + // 4、将消息发布到MQ消息队列 + rabbitMQService.publishMessage(jsonString); + + // 5、返回唯一id + return snowflakeId; + } + + @Override + public Long getRankPosition(Long uniqueId) { + return redisUtil.getRank(consumptionOrderKey, String.valueOf(uniqueId)); + } + + @Override + public GenerateCollectionVO getGenerateResult(Long uniqueId) { + // 1、判断该请求是否已经异常 + Boolean isMember = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); + if (isMember){ + throw new BusinessException("generate.interface.error"); + } + + // 2、判断该请求是否还在排队 + Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); + if (existsInZSet){ + // 排队中,给出当前排序位置 + return new GenerateCollectionVO(getRankPosition(uniqueId) + 1L); + } + + // 3、判断redis中有没有 + boolean hasHashKey = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); + if (hasHashKey){ + // 3.1 有直接从redis中拿 + String resultString = redisUtil.getMapValue(resultMapKey, String.valueOf(uniqueId)); + return JSONObject.parseObject(resultString,GenerateCollectionVO.class); + } + + // 3.2 判断数据库中有没有 + Generate generate = selectByUniqueId(uniqueId); + if (Objects.isNull(generate)){ + // 3.3 还没执行完,给出当前位置 + return new GenerateCollectionVO(0L); + } + Long generateId = generate.getId(); + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("generate_id",generateId); + List generateDetails = generateDetailMapper.selectList(qw); + if (CollectionUtils.isEmpty(generateDetails)){ + // 会有这种情况吗?存到generate中,但是还没存到generateDetail中 + return new GenerateCollectionVO(0L); + } + + List generatedCollectionItems = new ArrayList<>(); + generateDetails.forEach(item -> { + GenerateCollectionItemVO generateCollectionItemVO = new GenerateCollectionItemVO(); + generateCollectionItemVO.setGenerateItemId(item.getId()); + generateCollectionItemVO.setGenerateItemUrl(minioUtil.getPresignedUrl(item.getUrl(), 24 * 60)); + generatedCollectionItems.add(generateCollectionItemVO); + }); + + return new GenerateCollectionVO(generateId, null, generatedCollectionItems); + } + + public Generate selectByUniqueId(Long uniqueId){ + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("unique_id",uniqueId); + + return getOne(qw); + } + + @Override + public void cancelGenerate(Long uniqueId) { + // 1、确认当前消息是否还在排队中 + Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); + if (exists){ + // 1.1、将需要取消的唯一id加入redis,以便及时取消生成 + redisUtil.addToSet(cancelSetKey, String.valueOf(uniqueId)); + // 1.2 将需要取消的id从redis的ConsumptionOrder中删除 + redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + }else { + // 2、判断该消息是否异常 + boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); + // 3、判断该消息是否已经消费结束 + Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); + if (!hasKey && !existsInResult){ + // 设置取等待状态为false + AsyncCallerUtil.waitingStatus.put(uniqueId,false); + // 3、直接发送取消请求到python端 + } + } + } } diff --git a/src/main/java/com/ai/da/service/impl/RabbitMQServiceImpl.java b/src/main/java/com/ai/da/service/impl/RabbitMQServiceImpl.java new file mode 100644 index 00000000..41743964 --- /dev/null +++ b/src/main/java/com/ai/da/service/impl/RabbitMQServiceImpl.java @@ -0,0 +1,76 @@ +package com.ai.da.service.impl; + +import cn.hutool.core.exceptions.ExceptionUtil; +import com.ai.da.common.RabbitMQ.MQPublisher; +import com.ai.da.common.config.exception.BusinessException; +import com.ai.da.service.RabbitMQService; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +public class RabbitMQServiceImpl implements RabbitMQService { + + @Resource + private MQPublisher mqPublisher; + + @Override + public void publishMessage(String message) { + mqPublisher.sendGenerateMessage(message); + } + + @Override + public Integer getMessageCount(String queueUrl) { + + String url = "http://localhost:15672/api/queues/%2f/generate-queue"; + + OkHttpClient client = new OkHttpClient().newBuilder() + .connectTimeout(30, TimeUnit.SECONDS) + .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) + .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) + .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) + .build(); + Request request = new Request.Builder() + .url(queueUrl) + .method("GET",null) + .addHeader("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=") + .build(); + Response response = null; + try { + response = client.newCall(request).execute(); + } catch (IOException ioException) { + log.error("RabbitMQService##" + "getMessage异常###{}", ExceptionUtil.getThrowableList(ioException)); + } + + String bodyString; + // 生成失败 + if (Objects.isNull(response) || Objects.isNull(response.body())) { + log.error("RabbitMQService##getMessageCount异常###{}", "response or body is empty!"); + throw new BusinessException("compose-layer.interface.exception"); + }else if (response.code() != HttpURLConnection.HTTP_OK){ + log.error("RabbitMQService##getMessageCount异常###{}", "Response error!Response code ## " + response.code() + " ##"); + throw new BusinessException("compose-layer.interface.exception"); + } else { + try { + bodyString = response.body().string(); + } catch (IOException e) { + throw new BusinessException("compose-layer.interface.exception"); + } + } + JSONObject jsonObject = JSON.parseObject(bodyString); + String messageCount = jsonObject.get("messages").toString(); + return Integer.parseInt(messageCount); + } +} diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 9027b77f..da9009b5 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -62,6 +62,24 @@ minio.bucketName.results=aida-results minio.bucketName.sysImage=aida-sys-image minio.bucketName.users=aida-users minio.bucketName.collectionElement=aida-collection-element +redirect_url=http://18.167.251.121:7788 +spring.rabbitmq.host=18.167.251.121 +spring.rabbitmq.port=5672 +spring.rabbitmq.username=rabbit +spring.rabbitmq.password=123456 +spring.rabbitmq.virtual-host=/ +spring.redis.host=127.0.0.1 +spring.redis.port=6379 +spring.redis.database=1 +spring.redis.lettuce.pool.max-active=8 +spring.redis.lettuce.pool.max-idle=8 +spring.redis.lettuce.pool.min-idle=0 +spring.redis.lettuce.pool.max-wait=5 + +redis.key.consumptionOrder=ConsumptionOrder +redis.key.cancelSet=CancelSet +redis.key.exceptionMap=ExceptionMap +redis.key.resultMap=ResultMap diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8550475d..ec94939b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,8 +1,8 @@ #����application-test�ļ�(���Ի���) -#spring.profiles.active=test +spring.profiles.active=test #����application-prod�ļ�(��������) -spring.profiles.active=prod +#spring.profiles.active=prod #����application-dev�ļ�(��������) #spring.profiles.active=dev From 16f590dea5bbaec9439e2780e8a46135ff33de69 Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 22 Jan 2024 11:17:53 +0800 Subject: [PATCH 03/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9redis=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application-test.properties | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index da9009b5..4f31f098 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -70,9 +70,10 @@ spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ -spring.redis.host=127.0.0.1 +spring.redis.host=18.167.251.121 spring.redis.port=6379 spring.redis.database=1 +spring.redis.password=Aidlab spring.redis.lettuce.pool.max-active=8 spring.redis.lettuce.pool.max-idle=8 spring.redis.lettuce.pool.min-idle=0 From ec39bf9a9716b7ac71522b9e6ba408275711214f Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 22 Jan 2024 12:03:36 +0800 Subject: [PATCH 04/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9redis=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application-test.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 4f31f098..914d2ca9 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -70,7 +70,7 @@ spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ -spring.redis.host=18.167.251.121 +spring.redis.host=172.31.11.32 spring.redis.port=6379 spring.redis.database=1 spring.redis.password=Aidlab From 404172e5f2bb9df982cce324ac3a688e936d1ccc Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 22 Jan 2024 14:00:05 +0800 Subject: [PATCH 05/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E7=9A=84=E8=AF=B7=E6=B1=82=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/ai/da/controller/GenerateController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 27a20693..7b0b0a30 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -53,6 +53,7 @@ public class GenerateController { return Response.success(generateService.generateDislike(generateDetailId, timeZone)); } + @ApiOperation(value = "发起生成请求,异步获取结果") @PostMapping("/prepare") public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ return Response.success(generateService.prepareForGenerate(generateThroughImageTextDTO)); @@ -66,7 +67,7 @@ public class GenerateController { } @ApiOperation(value = "获取生成结果") - @PostMapping("/result") + @GetMapping("/result") public Response getGenerateResult(@RequestParam("uniqueId") Long uniqueId){ GenerateCollectionVO generateResult = generateService.getGenerateResult(uniqueId); return Response.success(generateResult); From 164454b5b2b7404c2514037b4e71b511074a9270 Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 22 Jan 2024 16:21:32 +0800 Subject: [PATCH 06/15] =?UTF-8?q?BUGFIX=EF=BC=9A=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=BC=A0=E8=BE=93=E8=BF=87=E7=A8=8B=E4=B8=ADlong=E7=B2=BE?= =?UTF-8?q?=E5=BA=A6=E5=8F=91=E7=94=9F=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ai/da/common/RabbitMQ/MQConsumer.java | 6 ------ .../com/ai/da/controller/GenerateController.java | 8 +++++--- .../ai/da/service/impl/GenerateServiceImpl.java | 16 +++++++++++----- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index bce55fcc..3e9127f5 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -64,12 +64,6 @@ public class MQConsumer { // 2.2 将该消息从取消列表中删除 redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId)); } else { - try { - // 模拟耗时 - Thread.sleep(40000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); if (!Objects.isNull(generateCollectionVO)){ HashMap generateResult = new HashMap<>(); diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 7b0b0a30..173370f2 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -55,12 +55,14 @@ public class GenerateController { @ApiOperation(value = "发起生成请求,异步获取结果") @PostMapping("/prepare") - public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ - return Response.success(generateService.prepareForGenerate(generateThroughImageTextDTO)); + public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ + Long l = generateService.prepareForGenerate(generateThroughImageTextDTO); + // 防止long精度丢失,这里转为String类型进行传输 + return Response.success(String.valueOf(l)); } @ApiOperation(value = "取消继续生成") - @PostMapping("/stopWaiting") + @GetMapping("/stopWaiting") public Response stopWaiting(@RequestParam("uniqueId") Long uniqueId){ generateService.cancelGenerate(uniqueId); return Response.success("stop waiting successfully"); diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 8203d379..ba729e4b 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -121,7 +121,7 @@ public class GenerateServiceImpl extends ServiceImpl i generateThroughImageTextDTO.getLevel1Type().equals(PRINT_BOARD.getRealName()) ? "print" : "moodboard"; AsyncCallerUtil asyncCallerUtil = new AsyncCallerUtil(); List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? null : collectionElement.getUrl(), - category, text, mode, "1", generateThroughImageTextDTO.getGender()),0L); + category, text, mode, "1", generateThroughImageTextDTO.getGender()),generateThroughImageTextDTO.getUniqueId()); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); if (CollectionUtils.isEmpty(generatedSketchUrl)){ @@ -320,7 +320,13 @@ public class GenerateServiceImpl extends ServiceImpl i Long elementId = generateThroughImageTextDTO.getCollectionElementId(); validateGeneraType(new Generate(), text, elementId,generateType); - // 2、生成唯一id + // 2、确定当前排队人数总数,超过15个,暂停使用当前功能 + Long zSetTotal = redisUtil.getZSetTotal(consumptionOrderKey); + if (zSetTotal.equals(15L)){ + return null; + } + + // 3、生成唯一id SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); long snowflakeId = idWorker.nextId(); @@ -330,14 +336,14 @@ public class GenerateServiceImpl extends ServiceImpl i generateThroughImageTextDTO.setUniqueId(snowflakeId); String jsonString = JSON.toJSONString(generateThroughImageTextDTO); - // 3、加入redis排队,便于获取实时排队信息 + // 4、加入redis排队,便于获取实时排队信息 Double maxScore = redisUtil.getMaxScore(consumptionOrderKey); redisUtil.addToZSet(consumptionOrderKey, String.valueOf(snowflakeId),maxScore); - // 4、将消息发布到MQ消息队列 + // 5、将消息发布到MQ消息队列 rabbitMQService.publishMessage(jsonString); - // 5、返回唯一id + // 6、返回唯一id return snowflakeId; } From a9ce35200c543b6f2e445d56cf55ff4f2160f8bf Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 22 Jan 2024 16:27:46 +0800 Subject: [PATCH 07/15] =?UTF-8?q?BUGFIX=EF=BC=9A=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java | 1 + src/main/resources/application-test.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index 8f5dac91..f8303f8f 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -59,6 +59,7 @@ public class AsyncCallerUtil { } catch (InterruptedException | ExecutionException | BusinessException e) { // 处理异常 log.error("发生错误 : " + e); + e.printStackTrace(); // 取消定时任务 assert timeoutTask != null; timeoutTask.cancel(true); diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 914d2ca9..471ccb08 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -51,7 +51,7 @@ spring.servlet.multipart.max-request-size= 5MB access.python.ip=http://18.167.251.121 #access.python.ip=http://18.167.251.121:9991/ #access.python.port=9992 -access.python.port=9990 +access.python.port=9991 # minIO服务配置之信息 minio.endpoint=https://www.minio.aida.com.hk:9000 From 96858c2cc33174f6c6035f5aa942f8d752b5457e Mon Sep 17 00:00:00 2001 From: xupei Date: Wed, 24 Jan 2024 11:43:56 +0800 Subject: [PATCH 08/15] =?UTF-8?q?TASK:=E5=BC=82=E6=AD=A5=E8=B0=83=E7=94=A8?= =?UTF-8?q?generate=E5=8F=8A=E5=8F=96=E6=B6=88generate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ai/da/common/RabbitMQ/MQConsumer.java | 32 ++-- .../ai/da/common/utils/AsyncCallerUtil.java | 18 +-- .../ai/da/controller/GenerateController.java | 10 +- .../com/ai/da/mapper/entity/Generate.java | 4 +- .../dto/GenerateThroughImageTextDTO.java | 2 +- .../ai/da/model/dto/GenerateToPythonDTO.java | 2 + .../java/com/ai/da/python/PythonService.java | 47 +++++- .../com/ai/da/service/GenerateService.java | 8 +- .../da/service/impl/GenerateServiceImpl.java | 141 ++++++++++-------- .../resources/application-test.properties | 7 +- 10 files changed, 159 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index 3e9127f5..29c7e939 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -48,12 +48,11 @@ public class MQConsumer { log.info("============start listening=========="); GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); - Long uniqueId = generateThroughImageTextDTO.getUniqueId(); - // 1、将消息从redis排队队列中删除 - redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + String uniqueId = generateThroughImageTextDTO.getUniqueId(); + try { // 2、判断当前消息是否在取消列表中 - Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, String.valueOf(uniqueId)); + Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId); if (isMember) { try { // 2.1 手动确认该消息 @@ -62,40 +61,43 @@ public class MQConsumer { log.error("手动确认,不返回队列重新消费"); } // 2.2 将该消息从取消列表中删除 - redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId)); +// redisUtil.removeFromSet(cancelSetKey, uniqueId); } else { GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); +// try { +// Thread.sleep(15000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } + // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); if (!Objects.isNull(generateCollectionVO)){ HashMap generateResult = new HashMap<>(); - generateResult.put(String.valueOf(uniqueId), JSONObject.toJSONString(generateCollectionVO)); + generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO)); // 将结果存在redis中 ,为空时不要存 redisUtil.addToMap(resultMapKey, generateResult); } } } catch (BusinessException e) { - log.error(e.getMessage()); + log.error(e.getMsg()); // channel.basicNack() 为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue try { // 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); + // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); } catch (IOException exception) { log.error("手动确认,取消返回队列,不再重新消费"); } // 将入参和错误信息存入数据库 - String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + " Exception message : " + e.getMessage(); + String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + + " Exception message : " + e.getMsg(); HashMap exceptionInfo = new HashMap<>(); exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage); // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } - -// log.info(JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class).toString()); -// try { -// Thread.sleep(10000); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } log.info("============end listening=========="); } diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index f8303f8f..1f80cc41 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; @Component public class AsyncCallerUtil { - public static Map waitingStatus = new HashMap<>(); + public static Map waitingStatus = new HashMap<>(); private static PythonService pythonService; @@ -27,9 +27,10 @@ public class AsyncCallerUtil { return CompletableFuture.supplyAsync(() -> pythonService.generateSketchOrPrint(generateToPython)); } - public List generate(GenerateToPythonDTO generateToPython, Long requestId) { - ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - waitingStatus.put(requestId, true); + public List generate(GenerateToPythonDTO generateToPython) { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); + String taskId = generateToPython.getTasks_id(); + waitingStatus.put(taskId, true); ScheduledFuture timeoutTask = null; try { @@ -37,10 +38,10 @@ public class AsyncCallerUtil { // 10秒后第一次确认,之后每隔10秒确认一次用户选择结果 timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> { // 调用另一个接口获取用户的选择 - if (!waitingStatus.get(requestId)) { + if (!waitingStatus.get(taskId)) { // 如果用户选择取消,则取消对generate的调用,cancel判断是否成功取消 generateResult.cancel(true); - waitingStatus.remove(requestId); + waitingStatus.remove(taskId); } System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); }, 10, 10, TimeUnit.SECONDS); @@ -54,16 +55,15 @@ public class AsyncCallerUtil { // 处理结果 System.out.println("generate 响应: " + result); System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai")); - waitingStatus.remove(requestId); + waitingStatus.remove(taskId); return result; } catch (InterruptedException | ExecutionException | BusinessException e) { // 处理异常 log.error("发生错误 : " + e); - e.printStackTrace(); // 取消定时任务 assert timeoutTask != null; timeoutTask.cancel(true); - throw new BusinessException("generate.interface.error"); + throw new BusinessException(e.getMessage()); } finally { // 关闭线程池 // executorService.shutdown(); diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 173370f2..0c1506b7 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -55,22 +55,20 @@ public class GenerateController { @ApiOperation(value = "发起生成请求,异步获取结果") @PostMapping("/prepare") - public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO){ - Long l = generateService.prepareForGenerate(generateThroughImageTextDTO); - // 防止long精度丢失,这里转为String类型进行传输 - return Response.success(String.valueOf(l)); + public Response prepareForGenerate(@Valid @RequestBody GenerateThroughImageTextDTO generateThroughImageTextDTO) { + return Response.success(generateService.prepareForGenerate(generateThroughImageTextDTO)); } @ApiOperation(value = "取消继续生成") @GetMapping("/stopWaiting") - public Response stopWaiting(@RequestParam("uniqueId") Long uniqueId){ + public Response stopWaiting(@RequestParam("uniqueId") String uniqueId) { generateService.cancelGenerate(uniqueId); return Response.success("stop waiting successfully"); } @ApiOperation(value = "获取生成结果") @GetMapping("/result") - public Response getGenerateResult(@RequestParam("uniqueId") Long uniqueId){ + public Response getGenerateResult(@RequestParam("uniqueId") String uniqueId) { GenerateCollectionVO generateResult = generateService.getGenerateResult(uniqueId); return Response.success(generateResult); } diff --git a/src/main/java/com/ai/da/mapper/entity/Generate.java b/src/main/java/com/ai/da/mapper/entity/Generate.java index f45cb144..9fad23fb 100644 --- a/src/main/java/com/ai/da/mapper/entity/Generate.java +++ b/src/main/java/com/ai/da/mapper/entity/Generate.java @@ -28,9 +28,9 @@ public class Generate { private Long accountId; /** - * 唯一id,用于保持消息的唯一性 + * 唯一id */ - private Long uniqueId; + private String uniqueId; /** * Sketchboard Printboard diff --git a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java index df1c9ab9..fd45533c 100644 --- a/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java +++ b/src/main/java/com/ai/da/model/dto/GenerateThroughImageTextDTO.java @@ -46,5 +46,5 @@ public class GenerateThroughImageTextDTO { String timeZone; @ApiModelProperty("唯一id,用于保持消息唯一性") - Long uniqueId; + String uniqueId; } diff --git a/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java index 29c77243..93553db0 100644 --- a/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java +++ b/src/main/java/com/ai/da/model/dto/GenerateToPythonDTO.java @@ -22,4 +22,6 @@ public class GenerateToPythonDTO { private String version; private String gender; + + private String tasks_id; } diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index e12eef12..e8f37f2f 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -56,6 +56,9 @@ public class PythonService { private String accessPythonIp; @Value("${access.python.port:''}") private String accessPythonPort; + @Value("${access.generate.port:''}") + private String accessGeneratePort; + @Resource private PythonTAllInfoService pythonTAllInfoService; @@ -2251,9 +2254,10 @@ public class PythonService { Request request = new Request.Builder() // .url("http://18.167.251.121:9992") // .url("http://127.0.0.1:5000/api/diffusion") - .url(accessPythonIp + ":" + accessPythonPort + "/api/diffusion") +// .url(accessPythonIp + ":" + accessPythonPort + "/api/diffusion") + .url(accessPythonIp + ":" + accessPythonPort + "/api/generate_image") .method("POST", body) - .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") +// .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") .addHeader("Content-Type", "application/json") .build(); Response response = null; @@ -2263,23 +2267,28 @@ public class PythonService { response = client.newCall(request).execute(); } catch (IOException ioException) { log.error("PythonService##generateSketchOrPrint异常###{}", ExceptionUtil.getThrowableList(ioException)); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException(ioException.getMessage()); } //去除限流 AccessLimitUtils.validateOut("generateSketchOrPrint"); // 判断是否生成失败 - if (Objects.isNull(response) || Objects.isNull(response.body())) { + if (Objects.isNull(response.body())) { log.error("PythonService##generateSketchOrPrint异常###{}", "response or body is empty!"); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException("PythonService##generateSketchOrPrint异常###: response or body is empty!"); } else if (response.code() != HttpURLConnection.HTTP_OK){ log.error("PythonService##generateSketchOrPrint异常###{}", "Response error!Response code ## " + response.code() + " ##"); - throw new BusinessException("generate.interface.error"); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException("PythonService##generateSketchOrPrint异常### Response error!Response code ## " + response.code() + " ##"); } else { try { bodyString = response.body().string(); } catch (IOException e) { - throw new BusinessException("generate.interface.error"); + log.error(e.getMessage()); +// throw new BusinessException("generate.interface.error"); + throw new BusinessException(e.getMessage()); } } JSONObject jsonObject = JSON.parseObject(bodyString); @@ -2417,4 +2426,28 @@ public class PythonService { //生成失败 throw new BusinessException("cloth-classification.interface.exception"); } + + public void cancelGenerateTask(String taskId){ + OkHttpClient client = new OkHttpClient().newBuilder() + .connectTimeout(30, TimeUnit.SECONDS) + .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) + .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) + .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) + .build(); + HttpUrl.Builder builder = HttpUrl.parse(accessPythonIp + ":" + accessGeneratePort + "/cancel_task").newBuilder(); + builder.addQueryParameter("task_id",taskId); + Request request = new Request.Builder() + .url(builder.build().toString()) + .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") + .addHeader("Content-Type", "application/json") + .build(); + try { + log.info("getGenerateResult请求入参content###{}", taskId); + client.newCall(request).execute(); + } catch (IOException ioException) { + log.error("PythonService##getGenerateResult异常###{}", ExceptionUtil.getThrowableList(ioException)); + throw new BusinessException("generate.interface.error"); + } + } + } diff --git a/src/main/java/com/ai/da/service/GenerateService.java b/src/main/java/com/ai/da/service/GenerateService.java index 9f17e7ea..cb97d9f8 100644 --- a/src/main/java/com/ai/da/service/GenerateService.java +++ b/src/main/java/com/ai/da/service/GenerateService.java @@ -25,12 +25,12 @@ public interface GenerateService extends IService { List selectBatchByLibraryId(List libraryId); - GenerateCollectionVO getGenerateResult(Long uniqueId); + GenerateCollectionVO getGenerateResult(String uniqueId); - Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO); + String prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO); - Long getRankPosition(Long uniqueId); + Long getRankPosition(String uniqueId); - void cancelGenerate(Long uniqueId); + void cancelGenerate(String uniqueId); } diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index ba729e4b..5fe3c4a4 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import io.minio.errors.MinioException; import io.netty.util.internal.StringUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -35,6 +36,7 @@ import java.util.*; import static com.ai.da.common.enums.CollectionLevel1TypeEnum.*; +@Slf4j @Service public class GenerateServiceImpl extends ServiceImpl implements GenerateService { @@ -77,7 +79,7 @@ public class GenerateServiceImpl extends ServiceImpl i @Override public GenerateCaptionVO generateCaption(Long sketchElementId) { CollectionElement collectionElement = collectionElementMapper.selectById(sketchElementId); - if (Objects.isNull(collectionElement)){ + if (Objects.isNull(collectionElement)) { throw new BusinessException("the.image.does.not.exist.please.reselect"); } String url = collectionElement.getUrl(); @@ -101,14 +103,14 @@ public class GenerateServiceImpl extends ServiceImpl i generate.setLevel1Type(generateThroughImageTextDTO.getLevel1Type()); // 当level1type是sketchboard时,存数据库需要加上当前性别 generate.setGenerateType(generate.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? - generateType + " (" +generateThroughImageTextDTO.getGender() + ")": + generateType + " (" + generateThroughImageTextDTO.getGender() + ")" : generateType); generate.setModelName(StringUtil.isNullOrEmpty(generateThroughImageTextDTO.getVersion()) ? ModelNameEnum.MODEL_0.getCode() : generateThroughImageTextDTO.getVersion()); generate.setCreateDate(DateUtil.getByTimeZone(generateThroughImageTextDTO.getTimeZone())); String text = generateThroughImageTextDTO.getText(); Long elementId = generateThroughImageTextDTO.getCollectionElementId(); - validateGeneraType(generate, text, elementId,generateType); + validateGeneraType(generate, text, elementId, generateType); // 2.1 sketch或print在t_collection_element表中的信息是否需要更新 如 level2Type CollectionElement collectionElement = collectionElementService.editLevel2Type(elementId, generateThroughImageTextDTO.getLevel2Type()); @@ -120,11 +122,11 @@ public class GenerateServiceImpl extends ServiceImpl i String category = generateThroughImageTextDTO.getLevel1Type().equals(SKETCH_BOARD.getRealName()) ? "sketch" : generateThroughImageTextDTO.getLevel1Type().equals(PRINT_BOARD.getRealName()) ? "print" : "moodboard"; AsyncCallerUtil asyncCallerUtil = new AsyncCallerUtil(); - List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? null : collectionElement.getUrl(), - category, text, mode, "1", generateThroughImageTextDTO.getGender()),generateThroughImageTextDTO.getUniqueId()); + List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? "" : collectionElement.getUrl(), + category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); - if (CollectionUtils.isEmpty(generatedSketchUrl)){ + if (CollectionUtils.isEmpty(generatedSketchUrl)) { return null; } @@ -139,8 +141,8 @@ public class GenerateServiceImpl extends ServiceImpl i GenerateCollectionItemVO generateCollectionItemVO = new GenerateCollectionItemVO(); String md5 = MD5Utils.encryptFile(minioUtil.getPresignedUrl(item, 24 * 60), Boolean.FALSE); // 通过MD5值和level1Type,判断不同level1Type下相同的图片是否被like过 - List> libraryIdList = generateDetailMapper.getLibraryIdThroughMD5(md5, generateThroughImageTextDTO.getLevel1Type()); - if (!libraryIdList.isEmpty()){ + List> libraryIdList = generateDetailMapper.getLibraryIdThroughMD5(md5, generateThroughImageTextDTO.getLevel1Type()); + if (!libraryIdList.isEmpty()) { generateDetail.setIsLike((byte) 1); generateDetail.setLibraryId(libraryIdList.get(0).get("library_id")); generateCollectionItemVO.setIsLiked(Boolean.TRUE); @@ -161,22 +163,22 @@ public class GenerateServiceImpl extends ServiceImpl i return new GenerateCollectionVO(generate.getId(), collectionId, generatedCollectionItems); } - private void validateGeneraType(Generate generate, String text, Long elementId,String generateType) { + private void validateGeneraType(Generate generate, String text, Long elementId, String generateType) { switch (generateType) { case "text": - if (StringUtil.isNullOrEmpty(text)){ + if (StringUtil.isNullOrEmpty(text)) { throw new BusinessException("please.input.the.caption"); } generate.setText(text); break; case "image": - if (Objects.isNull(elementId)){ + if (Objects.isNull(elementId)) { throw new BusinessException("please.choose.an.image"); } generate.setCollectionElementId(elementId); break; case "text-image": - if (StringUtil.isNullOrEmpty(text) || Objects.isNull(elementId)){ + if (StringUtil.isNullOrEmpty(text) || Objects.isNull(elementId)) { throw new BusinessException("please.input.the.caption.and.choose.an.image"); } generate.setText(text); @@ -191,21 +193,21 @@ public class GenerateServiceImpl extends ServiceImpl i // 1、判断参数是否正确 // 1.1 必须参数是否非空 if (SKETCH_BOARD.getRealName().equals(generateLikeDTO.getLevel1Type())) { - if (StringUtil.isNullOrEmpty(generateLikeDTO.getLevel2Type())){ + if (StringUtil.isNullOrEmpty(generateLikeDTO.getLevel2Type())) { throw new BusinessException("level2Type.cannot.be.empty"); } - if (StringUtil.isNullOrEmpty(generateLikeDTO.getGender())){ + if (StringUtil.isNullOrEmpty(generateLikeDTO.getGender())) { throw new BusinessException("gender.cannot.be.empty"); } } // 1.2 判断参数是否真实有效 Long generateDetailId = generateLikeDTO.getGenerateDetailId(); GenerateDetail generateDetail = generateDetailMapper.selectById(generateDetailId); - if (Objects.isNull(generateDetail)){ + if (Objects.isNull(generateDetail)) { throw new BusinessException("generateItem.does.not.exist"); } Generate generate = getById(generateDetail.getGenerateId()); - if (!generateLikeDTO.getLevel1Type().equals(generate.getLevel1Type())){ + if (!generateLikeDTO.getLevel1Type().equals(generate.getLevel1Type())) { throw new BusinessException("level1Type.does.not.match"); } @@ -213,8 +215,8 @@ public class GenerateServiceImpl extends ServiceImpl i // 2.1、不能重复喜欢 // 2.1.1 判断该图片是否被喜欢过 Library libraryDetail = libraryService.getById(generateDetail.getLibraryId()); - if ( (Objects.nonNull(generateDetail.getLibraryId()) && !generateDetail.getLibraryId().equals(0L)) - || Objects.nonNull(libraryDetail)){ + if ((Objects.nonNull(generateDetail.getLibraryId()) && !generateDetail.getLibraryId().equals(0L)) + || Objects.nonNull(libraryDetail)) { throw new BusinessException("duplicate.likes.are.not.allowed"); } @@ -237,7 +239,7 @@ public class GenerateServiceImpl extends ServiceImpl i public Boolean generateDislike(Long generateDetailId, String timeZone) { // 1、确定generateDetail中是否有这条记录 GenerateDetail generateDetail = generateDetailMapper.selectById(generateDetailId); - if (Objects.isNull(generateDetail)){ + if (Objects.isNull(generateDetail)) { throw new BusinessException("generateItem.does.not.exist"); } @@ -287,7 +289,7 @@ public class GenerateServiceImpl extends ServiceImpl i generateDetailMapper.update(generateDetail, queryWrapper); } - public void updateLikeStatusBatch(List generateDetailIdList, Byte hasLike, Long libraryId, String timeZone){ + public void updateLikeStatusBatch(List generateDetailIdList, Byte hasLike, Long libraryId, String timeZone) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.in("id", generateDetailIdList); @@ -299,93 +301,101 @@ public class GenerateServiceImpl extends ServiceImpl i generateDetailMapper.update(generateDetail, queryWrapper); } - public List selectBatchByLibraryId(List libraryId){ + public List selectBatchByLibraryId(List libraryId) { QueryWrapper qw = new QueryWrapper<>(); - qw.in("library_id",libraryId); + qw.in("library_id", libraryId); return generateDetailMapper.selectList(qw); } @Override - public Long prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO) { + public String prepareForGenerate(GenerateThroughImageTextDTO generateThroughImageTextDTO) { // 1、参数检查,判断必须参数是否为空 - if (Objects.isNull(generateThroughImageTextDTO.getUserId())){ + if (Objects.isNull(generateThroughImageTextDTO.getUserId())) { throw new BusinessException("userId cannot be empty"); } String generateType = generateThroughImageTextDTO.getGenerateType(); - if (!GenerateModeEnum.getGenerateModeList().contains(generateType)){ + if (!GenerateModeEnum.getGenerateModeList().contains(generateType)) { throw new BusinessException("unknown.generate.type"); } String text = generateThroughImageTextDTO.getText(); Long elementId = generateThroughImageTextDTO.getCollectionElementId(); - validateGeneraType(new Generate(), text, elementId,generateType); + validateGeneraType(new Generate(), text, elementId, generateType); - // 2、确定当前排队人数总数,超过15个,暂停使用当前功能 - Long zSetTotal = redisUtil.getZSetTotal(consumptionOrderKey); - if (zSetTotal.equals(15L)){ - return null; + // 2、生成唯一id 使用uuid + String uuid = UUID.randomUUID().toString(); + +// SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); +// long snowflakeId = idWorker.nextId(); + + int num = 1; + // 判断与已经正常生成结果的uuid中有相同的id + while (redisUtil.isElementExistsInMap(resultMapKey, uuid) && num < 10) { + uuid = UUID.randomUUID().toString(); + num++; } - - // 3、生成唯一id - SnowflakeUtil idWorker = new SnowflakeUtil(0, 0); - long snowflakeId = idWorker.nextId(); - - if (AsyncCallerUtil.waitingStatus.containsKey(snowflakeId)){ - snowflakeId = idWorker.nextId(); + // 无依据确定的数字 + if (num > 10){ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + uuid = UUID.randomUUID().toString(); } - generateThroughImageTextDTO.setUniqueId(snowflakeId); + generateThroughImageTextDTO.setUniqueId(uuid); String jsonString = JSON.toJSONString(generateThroughImageTextDTO); - // 4、加入redis排队,便于获取实时排队信息 + // 3、加入redis排队,便于获取实时排队信息 Double maxScore = redisUtil.getMaxScore(consumptionOrderKey); - redisUtil.addToZSet(consumptionOrderKey, String.valueOf(snowflakeId),maxScore); + redisUtil.addToZSet(consumptionOrderKey, uuid, maxScore); - // 5、将消息发布到MQ消息队列 + // 4、将消息发布到MQ消息队列 rabbitMQService.publishMessage(jsonString); - // 6、返回唯一id - return snowflakeId; + // 5、返回唯一id + return uuid; } @Override - public Long getRankPosition(Long uniqueId) { - return redisUtil.getRank(consumptionOrderKey, String.valueOf(uniqueId)); + public Long getRankPosition(String uniqueId) { + return redisUtil.getRank(consumptionOrderKey, uniqueId); } @Override - public GenerateCollectionVO getGenerateResult(Long uniqueId) { + public GenerateCollectionVO getGenerateResult(String uniqueId) { // 1、判断该请求是否已经异常 - Boolean isMember = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); - if (isMember){ + Boolean isMember = redisUtil.isElementExistsInMap(exceptionMapKey, uniqueId); + if (isMember) { throw new BusinessException("generate.interface.error"); } // 2、判断该请求是否还在排队 - Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); - if (existsInZSet){ + Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); + if (existsInZSet) { // 排队中,给出当前排序位置 return new GenerateCollectionVO(getRankPosition(uniqueId) + 1L); } // 3、判断redis中有没有 - boolean hasHashKey = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); - if (hasHashKey){ + boolean hasHashKey = redisUtil.isElementExistsInMap(resultMapKey, uniqueId); + if (hasHashKey) { // 3.1 有直接从redis中拿 - String resultString = redisUtil.getMapValue(resultMapKey, String.valueOf(uniqueId)); - return JSONObject.parseObject(resultString,GenerateCollectionVO.class); + String resultString = redisUtil.getMapValue(resultMapKey, uniqueId); + return JSONObject.parseObject(resultString, GenerateCollectionVO.class); } // 3.2 判断数据库中有没有 Generate generate = selectByUniqueId(uniqueId); - if (Objects.isNull(generate)){ + if (Objects.isNull(generate)) { // 3.3 还没执行完,给出当前位置 return new GenerateCollectionVO(0L); } Long generateId = generate.getId(); QueryWrapper qw = new QueryWrapper<>(); - qw.eq("generate_id",generateId); + qw.eq("generate_id", generateId); List generateDetails = generateDetailMapper.selectList(qw); - if (CollectionUtils.isEmpty(generateDetails)){ + if (CollectionUtils.isEmpty(generateDetails)) { // 会有这种情况吗?存到generate中,但是还没存到generateDetail中 return new GenerateCollectionVO(0L); } @@ -401,7 +411,7 @@ public class GenerateServiceImpl extends ServiceImpl i return new GenerateCollectionVO(generateId, null, generatedCollectionItems); } - public Generate selectByUniqueId(Long uniqueId){ + public Generate selectByUniqueId(String uniqueId){ QueryWrapper qw = new QueryWrapper<>(); qw.eq("unique_id",uniqueId); @@ -409,23 +419,24 @@ public class GenerateServiceImpl extends ServiceImpl i } @Override - public void cancelGenerate(Long uniqueId) { + public void cancelGenerate(String uniqueId) { // 1、确认当前消息是否还在排队中 - Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, String.valueOf(uniqueId)); - if (exists){ + Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); + if (exists) { // 1.1、将需要取消的唯一id加入redis,以便及时取消生成 - redisUtil.addToSet(cancelSetKey, String.valueOf(uniqueId)); + redisUtil.addToSet(cancelSetKey, uniqueId); // 1.2 将需要取消的id从redis的ConsumptionOrder中删除 - redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId)); + redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); }else { // 2、判断该消息是否异常 - boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, String.valueOf(uniqueId)); + boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, uniqueId); // 3、判断该消息是否已经消费结束 - Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, String.valueOf(uniqueId)); + Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, uniqueId); if (!hasKey && !existsInResult){ // 设置取等待状态为false AsyncCallerUtil.waitingStatus.put(uniqueId,false); // 3、直接发送取消请求到python端 + pythonService.cancelGenerateTask(uniqueId); } } } diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 471ccb08..490d7868 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -50,8 +50,8 @@ spring.servlet.multipart.max-request-size= 5MB #access.python.ip=http://43.198.80.117 access.python.ip=http://18.167.251.121 #access.python.ip=http://18.167.251.121:9991/ -#access.python.port=9992 -access.python.port=9991 +access.python.port=9992 +#access.python.port=9991 # minIO服务配置之信息 minio.endpoint=https://www.minio.aida.com.hk:9000 @@ -70,7 +70,8 @@ spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ -spring.redis.host=172.31.11.32 +#spring.redis.host=172.31.11.32 +spring.redis.host=18.167.251.121 spring.redis.port=6379 spring.redis.database=1 spring.redis.password=Aidlab From d4568ad7400328350bd1e5600b153733ef76ad5e Mon Sep 17 00:00:00 2001 From: xupei Date: Wed, 24 Jan 2024 11:47:19 +0800 Subject: [PATCH 09/15] =?UTF-8?q?redis=20=E9=85=8D=E7=BD=AE=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application-test.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 490d7868..01678779 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -70,8 +70,8 @@ spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ -#spring.redis.host=172.31.11.32 -spring.redis.host=18.167.251.121 +spring.redis.host=172.31.11.32 +#spring.redis.host=18.167.251.121 spring.redis.port=6379 spring.redis.database=1 spring.redis.password=Aidlab From 1d2cb17d54053fc2cc2e3f53eaa574fa6acc1d73 Mon Sep 17 00:00:00 2001 From: xupei Date: Wed, 24 Jan 2024 14:44:31 +0800 Subject: [PATCH 10/15] =?UTF-8?q?TASK:generate=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=B6=88=E8=B4=B9=E8=80=85=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=B9=B6=E5=8F=91=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ai/da/common/RabbitMQ/MQConfig.java | 16 ++--- .../com/ai/da/common/RabbitMQ/MQConsumer.java | 65 +++++++++++++++++-- .../ai/da/common/RabbitMQ/MQPublisher.java | 15 ----- .../ai/da/common/utils/AsyncCallerUtil.java | 7 +- .../java/com/ai/da/python/PythonService.java | 4 +- .../da/service/impl/GenerateServiceImpl.java | 1 + 6 files changed, 73 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java index 880def93..bf0a360a 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java @@ -21,10 +21,10 @@ public class MQConfig { public MQConfig() { } - @Bean - FanoutExchange fanoutRasaExchange() { - return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); - } +// @Bean +// FanoutExchange fanoutRasaExchange() { +// return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); +// } /** * 创建队列,使用工作模式,不用定义交换机 @@ -37,9 +37,9 @@ public class MQConfig { /** * 将队列绑定到交换机上【队列订阅交换机】 */ - @Bean - Binding bindingExchangeRasa() { - return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); - } +// @Bean +// Binding bindingExchangeRasa() { +// return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); +// } } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index 29c7e939..eb30d23a 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -42,13 +42,13 @@ public class MQConsumer { @Value("${redis.key.resultMap}") private String resultMapKey; - @RabbitListener(queues = MQConfig.GENERATE_QUEUE) - @RabbitHandler - public void generate(Message msg, Channel channel) { + public void generate(Message msg, Channel channel, String consumerName) { log.info("============start listening=========="); + long start = System.currentTimeMillis(); GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); String uniqueId = generateThroughImageTextDTO.getUniqueId(); + log.info("From " + consumerName + " : " + uniqueId); try { // 2、判断当前消息是否在取消列表中 @@ -71,7 +71,7 @@ public class MQConsumer { // } // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); - if (!Objects.isNull(generateCollectionVO)){ + if (!Objects.isNull(generateCollectionVO)) { HashMap generateResult = new HashMap<>(); generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO)); // 将结果存在redis中 ,为空时不要存 @@ -98,7 +98,64 @@ public class MQConsumer { // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } + long end = System.currentTimeMillis(); + + log.info(" task_id: " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒"); log.info("============end listening=========="); } + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer1(Message msg, Channel channel) { + generate(msg, channel, "consumer 1"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer2(Message msg, Channel channel) { + generate(msg, channel, "consumer 2"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer3(Message msg, Channel channel) { + generate(msg, channel, "consumer 3"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer4(Message msg, Channel channel) { + generate(msg, channel, "consumer 4"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer5(Message msg, Channel channel) { + generate(msg, channel, "consumer 5"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer6(Message msg, Channel channel) { + generate(msg, channel, "consumer 6"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer7(Message msg, Channel channel) { + generate(msg, channel, "consumer 7"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer8(Message msg, Channel channel) { + generate(msg, channel, "consumer 8"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer9(Message msg, Channel channel) { + generate(msg, channel, "consumer 9"); + } + } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java index 40b7baea..b0429110 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java @@ -1,9 +1,7 @@ package com.ai.da.common.RabbitMQ; -import com.ai.da.service.RabbitMQService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,8 +14,6 @@ public class MQPublisher { @Resource private AmqpTemplate amqpTemplate; - @Resource - private RabbitMQService rabbitMQService; public void sendGenerateMessage(String mm) { log.info("send message:" + mm); @@ -25,15 +21,4 @@ public class MQPublisher { } - public void getMsgCount() { -//// AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(MQConfig.GENERATE_QUEUE)); -// -// QueueInformation queueInfo = rabbitAdmin.getQueueInfo(MQConfig.GENERATE_QUEUE); -//// assert queueInfo != null; -// -//// System.out.println(declareOk.getMessageCount()); -// System.out.println(queueInfo.getMessageCount()); -//// return declareOk.getMessageCount(); -// return queueInfo.getMessageCount(); - } } diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index 1f80cc41..640880d5 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -43,18 +43,13 @@ public class AsyncCallerUtil { generateResult.cancel(true); waitingStatus.remove(taskId); } - System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); + log.info("===============持续等待==============="); }, 10, 10, TimeUnit.SECONDS); - System.out.println("开始阻塞 : " + DateUtil.getByTimeZone("Asia/Shanghai")); // 阻塞,等待结果 List result = generateResult.get(); // 取消定时任务 timeoutTask.cancel(true); - - // 处理结果 - System.out.println("generate 响应: " + result); - System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai")); waitingStatus.remove(taskId); return result; } catch (InterruptedException | ExecutionException | BusinessException e) { diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index e8f37f2f..292c6243 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -2242,7 +2242,7 @@ public class PythonService { public List generateSketchOrPrint(GenerateToPythonDTO generateToPythonDTO) { //限流校验 - AccessLimitUtils.validate("generateSketchOrPrint", 5); +// AccessLimitUtils.validate("generateSketchOrPrint", 5); OkHttpClient client = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) @@ -2271,7 +2271,7 @@ public class PythonService { throw new BusinessException(ioException.getMessage()); } //去除限流 - AccessLimitUtils.validateOut("generateSketchOrPrint"); +// AccessLimitUtils.validateOut("generateSketchOrPrint"); // 判断是否生成失败 if (Objects.isNull(response.body())) { diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 5fe3c4a4..5ff34fb5 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -126,6 +126,7 @@ public class GenerateServiceImpl extends ServiceImpl i category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); + log.info("generate 响应 : " + generatedSketchUrl); if (CollectionUtils.isEmpty(generatedSketchUrl)) { return null; } From 65bed9dada9b6d89b3fb47e4e093b869d3a433dd Mon Sep 17 00:00:00 2001 From: xupei Date: Wed, 24 Jan 2024 16:57:53 +0800 Subject: [PATCH 11/15] =?UTF-8?q?TASK:generate=20=E5=8F=96=E6=B6=88generat?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/da/common/utils/AsyncCallerUtil.java | 1 + .../java/com/ai/da/python/PythonService.java | 25 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index 640880d5..7ef9e3a0 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -46,6 +46,7 @@ public class AsyncCallerUtil { log.info("===============持续等待==============="); }, 10, 10, TimeUnit.SECONDS); + log.info("阻塞等待结果..."); // 阻塞,等待结果 List result = generateResult.get(); // 取消定时任务 diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index 292c6243..b8ecb4e2 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -56,8 +56,6 @@ public class PythonService { private String accessPythonIp; @Value("${access.python.port:''}") private String accessPythonPort; - @Value("${access.generate.port:''}") - private String accessGeneratePort; @Resource private PythonTAllInfoService pythonTAllInfoService; @@ -2427,27 +2425,32 @@ public class PythonService { throw new BusinessException("cloth-classification.interface.exception"); } - public void cancelGenerateTask(String taskId){ + public Boolean cancelGenerateTask(String taskId){ OkHttpClient client = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) .build(); - HttpUrl.Builder builder = HttpUrl.parse(accessPythonIp + ":" + accessGeneratePort + "/cancel_task").newBuilder(); - builder.addQueryParameter("task_id",taskId); + String url = accessPythonIp + ":" + accessPythonPort + "/api/generate_cancel/" + taskId + "/"; Request request = new Request.Builder() - .url(builder.build().toString()) - .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") - .addHeader("Content-Type", "application/json") + .url(url) +// .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") +// .addHeader("Content-Type", "application/json") .build(); + Response response; try { - log.info("getGenerateResult请求入参content###{}", taskId); - client.newCall(request).execute(); + log.info("cancelGenerateTask请求入参content###{}", taskId); + response = client.newCall(request).execute(); } catch (IOException ioException) { - log.error("PythonService##getGenerateResult异常###{}", ExceptionUtil.getThrowableList(ioException)); + log.error("PythonService##cancelGenerateTask异常###{}", ExceptionUtil.getThrowableList(ioException)); throw new BusinessException("generate.interface.error"); } + + if (response.code() != HttpURLConnection.HTTP_OK){ + return Boolean.FALSE; + } + return Boolean.TRUE; } } From c35a188abb285088d5e33fda56de00a6f08a0cea Mon Sep 17 00:00:00 2001 From: xupei Date: Thu, 25 Jan 2024 13:37:17 +0800 Subject: [PATCH 12/15] =?UTF-8?q?TASK:generate=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=80=BB=E8=BE=91Bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ai/da/common/RabbitMQ/MQConfig.java | 2 +- .../com/ai/da/common/RabbitMQ/MQConsumer.java | 14 +++--- .../ai/da/common/utils/AsyncCallerUtil.java | 15 ++++--- .../java/com/ai/da/python/PythonService.java | 44 ++++++++++--------- .../da/service/impl/GenerateServiceImpl.java | 19 ++++---- .../resources/application-prod.properties | 21 +++++++++ src/main/resources/application.properties | 4 +- 7 files changed, 75 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java index bf0a360a..d4e24508 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java @@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Value; public class MQConfig { public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange"; - public static final String GENERATE_QUEUE = "generate-queue"; + public static final String GENERATE_QUEUE = "generate-queue-prod"; public MQConfig() { } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index eb30d23a..77f46dbd 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -63,12 +63,12 @@ public class MQConsumer { // 2.2 将该消息从取消列表中删除 // redisUtil.removeFromSet(cancelSetKey, uniqueId); } else { + /*try { + Thread.sleep(15000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + }*/ GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO); -// try { -// Thread.sleep(15000); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); if (!Objects.isNull(generateCollectionVO)) { @@ -100,8 +100,8 @@ public class MQConsumer { } long end = System.currentTimeMillis(); - log.info(" task_id: " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒"); - log.info("============end listening=========="); + log.info(" task_id: " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒"); + log.info("=============end listening==========="); } @RabbitListener(queues = MQConfig.GENERATE_QUEUE) diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index 7ef9e3a0..35039431 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -30,21 +30,20 @@ public class AsyncCallerUtil { public List generate(GenerateToPythonDTO generateToPython) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); String taskId = generateToPython.getTasks_id(); - waitingStatus.put(taskId, true); ScheduledFuture timeoutTask = null; + if (!waitingStatus.containsKey(taskId)) waitingStatus.put(taskId, true); try { CompletableFuture> generateResult = callGenerateAsync(generateToPython); - // 10秒后第一次确认,之后每隔10秒确认一次用户选择结果 + // 5秒后第一次确认,之后每隔10秒确认一次用户选择结果 timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> { // 调用另一个接口获取用户的选择 if (!waitingStatus.get(taskId)) { - // 如果用户选择取消,则取消对generate的调用,cancel判断是否成功取消 + // 如果用户选择取消,则取消对generate的调用 generateResult.cancel(true); waitingStatus.remove(taskId); - } - log.info("===============持续等待==============="); - }, 10, 10, TimeUnit.SECONDS); + } else log.info("===============持续等待==============="); + }, 5, 10, TimeUnit.SECONDS); log.info("阻塞等待结果..."); // 阻塞,等待结果 @@ -53,6 +52,10 @@ public class AsyncCallerUtil { timeoutTask.cancel(true); waitingStatus.remove(taskId); return result; + } catch (CancellationException e) { + // generateResult.cancel(true);通过抛出异常取消该任务 + log.info("==========成功取消generate任务=========="); + return null; } catch (InterruptedException | ExecutionException | BusinessException e) { // 处理异常 log.error("发生错误 : " + e); diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index b8ecb4e2..5a37b3b7 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -107,7 +107,7 @@ public class PythonService { if (Objects.nonNull(response.body())) { responseBody = response.body().string(); JSONObject responseObj = JSON.parseObject(responseBody); - log.info("moodboard与printboard图片合成 python返回###{}",responseObj); + log.info("moodboard与printboard图片合成 python返回###{}", responseObj); return responseObj.get("data").toString(); } } catch (IOException | JSONException e) { @@ -389,7 +389,7 @@ public class PythonService { all.addAll(new ArrayList<>(DesignPythonItem.SKIRT_TROUSERS)); return all; } - }else if (modelSex.equals(Sex.MALE.getValue())) { + } else if (modelSex.equals(Sex.MALE.getValue())) { Long randomIndex = RandomsUtil.randomSysFile(0L, 3L); if (randomIndex == 0) { return DesignPythonItem.TOPS; @@ -422,11 +422,11 @@ public class PythonService { long noPinNum = printBoardElements.stream().filter(f -> f.getHasPin() == 0).count(); if (noPinNum == 0L) { return 0; - }else { + } else { long pinNum = printBoardElements.stream().filter(f -> f.getHasPin() == 1).count(); if (8 - pinNum < 4) { return RandomsUtil.randomSysFile(0L, 8 - pinNum + 1); - }else { + } else { return RandomsUtil.randomSysFile(0L, 4L + 1); } } @@ -1600,7 +1600,7 @@ public class PythonService { printBoardElements = elementVO.getPrintBoardElements() .stream() .filter(f -> !elementVO.getHasUseMd5List().contains(f.getMd5())).collect(Collectors.toList()); - }else { + } else { printBoardElements = elementVO.getPrintBoardElements(); } if (CollectionUtil.isEmpty(printBoardElements)) { @@ -2081,7 +2081,7 @@ public class PythonService { skirt.setPrint(designPythonItemPrint); skirt.setPath("aida-sys-image/images/female/trousers/trousers_974.jpg"); response.add(skirt); - }else { + } else { DesignPythonItem top = new DesignPythonItem(); top.setType(MalePosition.TOPS.getValue()); top.setColor("none"); @@ -2195,7 +2195,9 @@ public class PythonService { throw new BusinessException("design.interface.exception"); } - /** 暂时未用 */ + /** + * 暂时未用 + */ public String generateSketchCaption(String url) { //限流校验 AccessLimitUtils.validate("generateSketchCaption", 5); @@ -2259,7 +2261,7 @@ public class PythonService { .addHeader("Content-Type", "application/json") .build(); Response response = null; - String bodyString ; + String bodyString; try { log.info("generateSketchOrPrint请求入参content###{}", JSON.toJSONString(generateToPythonDTO, SerializerFeature.WriteMapNullValue)); response = client.newCall(request).execute(); @@ -2276,7 +2278,7 @@ public class PythonService { log.error("PythonService##generateSketchOrPrint异常###{}", "response or body is empty!"); // throw new BusinessException("generate.interface.error"); throw new BusinessException("PythonService##generateSketchOrPrint异常###: response or body is empty!"); - } else if (response.code() != HttpURLConnection.HTTP_OK){ + } else if (response.code() != HttpURLConnection.HTTP_OK) { log.error("PythonService##generateSketchOrPrint异常###{}", "Response error!Response code ## " + response.code() + " ##"); // throw new BusinessException("generate.interface.error"); throw new BusinessException("PythonService##generateSketchOrPrint异常### Response error!Response code ## " + response.code() + " ##"); @@ -2356,7 +2358,7 @@ public class PythonService { if (Objects.isNull(response) || Objects.isNull(response.body())) { log.error("PythonService##composeLayers异常###{}", "response or body is empty!"); throw new BusinessException("compose-layer.interface.exception"); - }else if (response.code() != HttpURLConnection.HTTP_OK){ + } else if (response.code() != HttpURLConnection.HTTP_OK) { log.error("PythonService##composeLayers异常###{}", "Response error!Response code ## " + response.code() + " ##"); throw new BusinessException("compose-layer.interface.exception"); } else { @@ -2381,10 +2383,10 @@ public class PythonService { return item0.getString("synthesis_url"); } - public String getClothCategory(String path,String gender){ + public String getClothCategory(String path, String gender) { HashMap content = new HashMap<>(); - content.put("sketch_img_url",path); - content.put("colony",gender); + content.put("sketch_img_url", path); + content.put("colony", gender); List> contents = Collections.singletonList(content); String jsonString = JSON.toJSONString(contents, SerializerFeature.WriteNullStringAsEmpty); @@ -2397,7 +2399,7 @@ public class PythonService { if (Objects.isNull(response) || Objects.isNull(response.body())) { log.error("PythonService##GetClothCategory###{}", "response or body is empty!"); throw new BusinessException("cloth-classification.interface.exception"); - } else if (response.code() != HttpURLConnection.HTTP_OK){ + } else if (response.code() != HttpURLConnection.HTTP_OK) { log.error("PythonService##GetClothCategory###{}", "Response error!Response code ## " + response.code() + " ##"); throw new BusinessException("cloth-classification.interface.exception"); } else { @@ -2408,7 +2410,7 @@ public class PythonService { } } JSONObject jsonObject = JSON.parseObject(bodyString); - try{ + try { Boolean result = JSON.parseObject(JSON.toJSONString(response)).getBoolean("successful"); if (result && jsonObject.get("msg").equals("OK!")) { JSONObject data = jsonObject.getJSONObject("data"); @@ -2416,7 +2418,7 @@ public class PythonService { JSONObject map = (JSONObject) list.get(0); return map.get("category").toString(); } - }catch (NullPointerException e){ + } catch (NullPointerException e) { log.info("getClothCategory 失败###{},未返回category", jsonObject); throw new BusinessException("cloth-classification.interface.exception"); } @@ -2425,14 +2427,14 @@ public class PythonService { throw new BusinessException("cloth-classification.interface.exception"); } - public Boolean cancelGenerateTask(String taskId){ + public Boolean cancelGenerateTask(String taskId) { OkHttpClient client = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) .readTimeout(60, TimeUnit.SECONDS)//读取超时(单位:秒) .writeTimeout(60, TimeUnit.SECONDS)//写入超时(单位:秒) .build(); - String url = accessPythonIp + ":" + accessPythonPort + "/api/generate_cancel/" + taskId + "/"; + String url = accessPythonIp + ":" + accessPythonPort + "/api/generate_cancel/" + taskId; Request request = new Request.Builder() .url(url) // .addHeader("Authorization", "Basic YWlkbGFiOjEyMw==") @@ -2444,12 +2446,14 @@ public class PythonService { response = client.newCall(request).execute(); } catch (IOException ioException) { log.error("PythonService##cancelGenerateTask异常###{}", ExceptionUtil.getThrowableList(ioException)); - throw new BusinessException("generate.interface.error"); + return null; } - if (response.code() != HttpURLConnection.HTTP_OK){ + if (response.code() != HttpURLConnection.HTTP_OK) { + log.info("generate-python 取消请求失败"); return Boolean.FALSE; } + log.info("generate-python 取消请求成功"); return Boolean.TRUE; } diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 5ff34fb5..81d48ddc 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -123,7 +123,7 @@ public class GenerateServiceImpl extends ServiceImpl i generateThroughImageTextDTO.getLevel1Type().equals(PRINT_BOARD.getRealName()) ? "print" : "moodboard"; AsyncCallerUtil asyncCallerUtil = new AsyncCallerUtil(); List generatedSketchUrl = asyncCallerUtil.generate(new GenerateToPythonDTO(accountId, Objects.isNull(collectionElement) ? "" : collectionElement.getUrl(), - category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); + category, text, mode, "1", generateThroughImageTextDTO.getGender(), generateThroughImageTextDTO.getUniqueId())); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); log.info("generate 响应 : " + generatedSketchUrl); @@ -336,7 +336,7 @@ public class GenerateServiceImpl extends ServiceImpl i num++; } // 无依据确定的数字 - if (num > 10){ + if (num > 10) { try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -412,9 +412,9 @@ public class GenerateServiceImpl extends ServiceImpl i return new GenerateCollectionVO(generateId, null, generatedCollectionItems); } - public Generate selectByUniqueId(String uniqueId){ + public Generate selectByUniqueId(String uniqueId) { QueryWrapper qw = new QueryWrapper<>(); - qw.eq("unique_id",uniqueId); + qw.eq("unique_id", uniqueId); return getOne(qw); } @@ -423,19 +423,22 @@ public class GenerateServiceImpl extends ServiceImpl i public void cancelGenerate(String uniqueId) { // 1、确认当前消息是否还在排队中 Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); - if (exists) { + Boolean flag = Boolean.FALSE; + if (exists) flag = redisUtil.getRank(consumptionOrderKey, uniqueId) > 1L ? Boolean.TRUE : Boolean.FALSE; + // 不管flag的默认值是true还是false,只要exists为false,&& 将短路 + if (exists && flag) { // 1.1、将需要取消的唯一id加入redis,以便及时取消生成 redisUtil.addToSet(cancelSetKey, uniqueId); // 1.2 将需要取消的id从redis的ConsumptionOrder中删除 redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); - }else { + } else { // 2、判断该消息是否异常 boolean hasKey = redisUtil.isElementExistsInMap(exceptionMapKey, uniqueId); // 3、判断该消息是否已经消费结束 Boolean existsInResult = redisUtil.isElementExistsInMap(resultMapKey, uniqueId); - if (!hasKey && !existsInResult){ + if (!hasKey && !existsInResult) { // 设置取等待状态为false - AsyncCallerUtil.waitingStatus.put(uniqueId,false); + AsyncCallerUtil.waitingStatus.put(uniqueId, false); // 3、直接发送取消请求到python端 pythonService.cancelGenerateTask(uniqueId); } diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 3abdcf3b..a9510207 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -54,3 +54,24 @@ minio.bucketName.sysImage=aida-sys-image minio.bucketName.users=aida-users minio.bucketName.collectionElement=aida-collection-element redirect_url=http://18.167.251.121:7788 + +spring.rabbitmq.host=18.167.251.121 +spring.rabbitmq.port=5672 +spring.rabbitmq.username=rabbit +spring.rabbitmq.password=123456 +spring.rabbitmq.virtual-host=/ + +spring.redis.host=172.31.11.32 +#spring.redis.host=18.167.251.121 +spring.redis.port=6379 +spring.redis.database=1 +spring.redis.password=Aidlab +spring.redis.lettuce.pool.max-active=8 +spring.redis.lettuce.pool.max-idle=8 +spring.redis.lettuce.pool.min-idle=0 +spring.redis.lettuce.pool.max-wait=5 + +redis.key.consumptionOrder=ConsumptionOrder +redis.key.cancelSet=CancelSet +redis.key.exceptionMap=ExceptionMap +redis.key.resultMap=ResultMap \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ec94939b..8550475d 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,8 +1,8 @@ #����application-test�ļ�(���Ի���) -spring.profiles.active=test +#spring.profiles.active=test #����application-prod�ļ�(��������) -#spring.profiles.active=prod +spring.profiles.active=prod #����application-dev�ļ�(��������) #spring.profiles.active=dev From e90c4f72394efcb5f8caf3fc109b0244cc057bfd Mon Sep 17 00:00:00 2001 From: xupei Date: Thu, 25 Jan 2024 13:44:28 +0800 Subject: [PATCH 13/15] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b9bca48c..a37c5319 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,10 @@ version: "3" services: aida_back: - container_name: develop-version-aida-back + container_name: stable-version-aida-back build: . - volumes: - # 数据挂载 - - /workspace/home/aida/file/:/workspace/home/aida/file/ +# volumes: +# # 数据挂载 +# - /workspace/home/aida/file/:/workspace/home/aida/file/ ports: - - "10090:5567" \ No newline at end of file + - "10086:5567" \ No newline at end of file From aa270b5f7d5415dcb5ac2a8d7e5e73ddc283a3cc Mon Sep 17 00:00:00 2001 From: xupei Date: Thu, 25 Jan 2024 14:11:12 +0800 Subject: [PATCH 14/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=8E=92=E9=98=9F?= =?UTF-8?q?=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/ai/da/service/impl/GenerateServiceImpl.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 81d48ddc..8039fbcd 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -360,6 +360,7 @@ public class GenerateServiceImpl extends ServiceImpl i @Override public Long getRankPosition(String uniqueId) { + // rank 从0开始 return redisUtil.getRank(consumptionOrderKey, uniqueId); } @@ -374,8 +375,10 @@ public class GenerateServiceImpl extends ServiceImpl i // 2、判断该请求是否还在排队 Boolean existsInZSet = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); if (existsInZSet) { - // 排队中,给出当前排序位置 - return new GenerateCollectionVO(getRankPosition(uniqueId) + 1L); + // 排队中,给出当前排序位置,rank从0开始 + Long rankPosition = getRankPosition(uniqueId); + // 有9个消费者,所以当rank>8即当前请求至少排在第九位时,其实际排队位置为9-8+1,当rank <=8,请求均在处理中 + return new GenerateCollectionVO( rankPosition > 8L ? rankPosition - 8 + 1 : 1L); } // 3、判断redis中有没有 From 4c0dd27332fd8c4fadec2b270b3d4cb5d8a3ec3a Mon Sep 17 00:00:00 2001 From: xupei Date: Fri, 26 Jan 2024 13:12:04 +0800 Subject: [PATCH 15/15] =?UTF-8?q?generate=20=E8=AE=B0=E5=BD=95=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/da/controller/GenerateController.java | 6 ++- .../ai/da/mapper/GenerateCancelMapper.java | 7 +++ .../ai/da/mapper/entity/GenerateCancel.java | 44 +++++++++++++++++++ .../com/ai/da/service/GenerateService.java | 2 +- .../da/service/impl/GenerateServiceImpl.java | 23 +++++++--- 5 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/ai/da/mapper/GenerateCancelMapper.java create mode 100644 src/main/java/com/ai/da/mapper/entity/GenerateCancel.java diff --git a/src/main/java/com/ai/da/controller/GenerateController.java b/src/main/java/com/ai/da/controller/GenerateController.java index 0c1506b7..b3f6fb5b 100644 --- a/src/main/java/com/ai/da/controller/GenerateController.java +++ b/src/main/java/com/ai/da/controller/GenerateController.java @@ -61,8 +61,10 @@ public class GenerateController { @ApiOperation(value = "取消继续生成") @GetMapping("/stopWaiting") - public Response stopWaiting(@RequestParam("uniqueId") String uniqueId) { - generateService.cancelGenerate(uniqueId); + public Response stopWaiting(@RequestParam("userId") Long userId, + @RequestParam("uniqueId") String uniqueId, + @RequestParam("timeZone") String timeZone) { + generateService.cancelGenerate(userId, uniqueId, timeZone); return Response.success("stop waiting successfully"); } diff --git a/src/main/java/com/ai/da/mapper/GenerateCancelMapper.java b/src/main/java/com/ai/da/mapper/GenerateCancelMapper.java new file mode 100644 index 00000000..3e24ea09 --- /dev/null +++ b/src/main/java/com/ai/da/mapper/GenerateCancelMapper.java @@ -0,0 +1,7 @@ +package com.ai.da.mapper; + +import com.ai.da.common.config.mybatis.plus.CommonMapper; +import com.ai.da.mapper.entity.GenerateCancel; + +public interface GenerateCancelMapper extends CommonMapper { +} diff --git a/src/main/java/com/ai/da/mapper/entity/GenerateCancel.java b/src/main/java/com/ai/da/mapper/entity/GenerateCancel.java new file mode 100644 index 00000000..8726b9ed --- /dev/null +++ b/src/main/java/com/ai/da/mapper/entity/GenerateCancel.java @@ -0,0 +1,44 @@ +package com.ai.da.mapper.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +import java.util.Date; + +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +@TableName("t_generate_cancel") +public class GenerateCancel { + + /** + * ID + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 用户ID + */ + private Long accountId; + + /** + * 唯一id(任务id) + */ + private String uniqueId; + + /** + * 创建时间 + */ + private Date createDate; + + public GenerateCancel(Long accountId, String uniqueId, Date createDate) { + this.accountId = accountId; + this.uniqueId = uniqueId; + this.createDate = createDate; + } +} diff --git a/src/main/java/com/ai/da/service/GenerateService.java b/src/main/java/com/ai/da/service/GenerateService.java index cb97d9f8..d1a327bd 100644 --- a/src/main/java/com/ai/da/service/GenerateService.java +++ b/src/main/java/com/ai/da/service/GenerateService.java @@ -31,6 +31,6 @@ public interface GenerateService extends IService { Long getRankPosition(String uniqueId); - void cancelGenerate(String uniqueId); + void cancelGenerate(Long userId, String uniqueId, String timeZone); } diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 8039fbcd..095db937 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -6,6 +6,7 @@ import com.ai.da.common.enums.GenerateModeEnum; import com.ai.da.common.enums.ModelNameEnum; import com.ai.da.common.utils.*; import com.ai.da.mapper.CollectionElementMapper; +import com.ai.da.mapper.GenerateCancelMapper; import com.ai.da.mapper.GenerateDetailMapper; import com.ai.da.mapper.GenerateMapper; import com.ai.da.mapper.entity.*; @@ -64,6 +65,9 @@ public class GenerateServiceImpl extends ServiceImpl i @Resource private RedisUtil redisUtil; + @Resource + private GenerateCancelMapper generateCancelMapper; + @Value("${redis.key.consumptionOrder}") private String consumptionOrderKey; @@ -330,8 +334,10 @@ public class GenerateServiceImpl extends ServiceImpl i // long snowflakeId = idWorker.nextId(); int num = 1; - // 判断与已经正常生成结果的uuid中有相同的id - while (redisUtil.isElementExistsInMap(resultMapKey, uuid) && num < 10) { + // 判断已经正常生成结果的uuid或正在排队的uuid中是否有相同的id + while ((redisUtil.isElementExistsInMap(resultMapKey, uuid) || + redisUtil.isElementExistsInZSet(consumptionOrderKey, uuid)) + && num < 10) { uuid = UUID.randomUUID().toString(); num++; } @@ -378,7 +384,7 @@ public class GenerateServiceImpl extends ServiceImpl i // 排队中,给出当前排序位置,rank从0开始 Long rankPosition = getRankPosition(uniqueId); // 有9个消费者,所以当rank>8即当前请求至少排在第九位时,其实际排队位置为9-8+1,当rank <=8,请求均在处理中 - return new GenerateCollectionVO( rankPosition > 8L ? rankPosition - 8 + 1 : 1L); + return new GenerateCollectionVO(rankPosition > 8L ? rankPosition - 8 + 1 : 1L); } // 3、判断redis中有没有 @@ -393,7 +399,7 @@ public class GenerateServiceImpl extends ServiceImpl i Generate generate = selectByUniqueId(uniqueId); if (Objects.isNull(generate)) { // 3.3 还没执行完,给出当前位置 - return new GenerateCollectionVO(0L); + return new GenerateCollectionVO(1L); } Long generateId = generate.getId(); QueryWrapper qw = new QueryWrapper<>(); @@ -401,7 +407,7 @@ public class GenerateServiceImpl extends ServiceImpl i List generateDetails = generateDetailMapper.selectList(qw); if (CollectionUtils.isEmpty(generateDetails)) { // 会有这种情况吗?存到generate中,但是还没存到generateDetail中 - return new GenerateCollectionVO(0L); + return new GenerateCollectionVO(1L); } List generatedCollectionItems = new ArrayList<>(); @@ -423,7 +429,8 @@ public class GenerateServiceImpl extends ServiceImpl i } @Override - public void cancelGenerate(String uniqueId) { + @Transactional(rollbackFor = Exception.class) + public void cancelGenerate(Long userId, String uniqueId, String timeZone) { // 1、确认当前消息是否还在排队中 Boolean exists = redisUtil.isElementExistsInZSet(consumptionOrderKey, uniqueId); Boolean flag = Boolean.FALSE; @@ -446,5 +453,9 @@ public class GenerateServiceImpl extends ServiceImpl i pythonService.cancelGenerateTask(uniqueId); } } + + // 3、考虑加一张表,专门用于记录哪些用户在什么时间进行了取消操作 + GenerateCancel generateCancel = new GenerateCancel(userId, uniqueId, DateUtil.getByTimeZone(timeZone)); + generateCancelMapper.insert(generateCancel); } }