diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java index 880def93..bf0a360a 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java @@ -21,10 +21,10 @@ public class MQConfig { public MQConfig() { } - @Bean - FanoutExchange fanoutRasaExchange() { - return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); - } +// @Bean +// FanoutExchange fanoutRasaExchange() { +// return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); +// } /** * 创建队列,使用工作模式,不用定义交换机 @@ -37,9 +37,9 @@ public class MQConfig { /** * 将队列绑定到交换机上【队列订阅交换机】 */ - @Bean - Binding bindingExchangeRasa() { - return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); - } +// @Bean +// Binding bindingExchangeRasa() { +// return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); +// } } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java index 29c7e939..eb30d23a 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java @@ -42,13 +42,13 @@ public class MQConsumer { @Value("${redis.key.resultMap}") private String resultMapKey; - @RabbitListener(queues = MQConfig.GENERATE_QUEUE) - @RabbitHandler - public void generate(Message msg, Channel channel) { + public void generate(Message msg, Channel channel, String consumerName) { log.info("============start listening=========="); + long start = System.currentTimeMillis(); GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); String uniqueId = generateThroughImageTextDTO.getUniqueId(); + log.info("From " + consumerName + " : " + uniqueId); try { // 2、判断当前消息是否在取消列表中 @@ -71,7 +71,7 @@ public class MQConsumer { // } // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); - if (!Objects.isNull(generateCollectionVO)){ + if (!Objects.isNull(generateCollectionVO)) { HashMap generateResult = new HashMap<>(); generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO)); // 将结果存在redis中 ,为空时不要存 @@ -98,7 +98,64 @@ public class MQConsumer { // 存redis redisUtil.addToMap(exceptionMapKey, exceptionInfo); } + long end = System.currentTimeMillis(); + + log.info(" task_id: " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒"); log.info("============end listening=========="); } + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer1(Message msg, Channel channel) { + generate(msg, channel, "consumer 1"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer2(Message msg, Channel channel) { + generate(msg, channel, "consumer 2"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer3(Message msg, Channel channel) { + generate(msg, channel, "consumer 3"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer4(Message msg, Channel channel) { + generate(msg, channel, "consumer 4"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer5(Message msg, Channel channel) { + generate(msg, channel, "consumer 5"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer6(Message msg, Channel channel) { + generate(msg, channel, "consumer 6"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer7(Message msg, Channel channel) { + generate(msg, channel, "consumer 7"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer8(Message msg, Channel channel) { + generate(msg, channel, "consumer 8"); + } + + @RabbitListener(queues = MQConfig.GENERATE_QUEUE) + @RabbitHandler + public void generateConsumer9(Message msg, Channel channel) { + generate(msg, channel, "consumer 9"); + } + } diff --git a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java index 40b7baea..b0429110 100644 --- a/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java +++ b/src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java @@ -1,9 +1,7 @@ package com.ai.da.common.RabbitMQ; -import com.ai.da.service.RabbitMQService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,8 +14,6 @@ public class MQPublisher { @Resource private AmqpTemplate amqpTemplate; - @Resource - private RabbitMQService rabbitMQService; public void sendGenerateMessage(String mm) { log.info("send message:" + mm); @@ -25,15 +21,4 @@ public class MQPublisher { } - public void getMsgCount() { -//// AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(MQConfig.GENERATE_QUEUE)); -// -// QueueInformation queueInfo = rabbitAdmin.getQueueInfo(MQConfig.GENERATE_QUEUE); -//// assert queueInfo != null; -// -//// System.out.println(declareOk.getMessageCount()); -// System.out.println(queueInfo.getMessageCount()); -//// return declareOk.getMessageCount(); -// return queueInfo.getMessageCount(); - } } diff --git a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java index 1f80cc41..640880d5 100644 --- a/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java +++ b/src/main/java/com/ai/da/common/utils/AsyncCallerUtil.java @@ -43,18 +43,13 @@ public class AsyncCallerUtil { generateResult.cancel(true); waitingStatus.remove(taskId); } - System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); + log.info("===============持续等待==============="); }, 10, 10, TimeUnit.SECONDS); - System.out.println("开始阻塞 : " + DateUtil.getByTimeZone("Asia/Shanghai")); // 阻塞,等待结果 List result = generateResult.get(); // 取消定时任务 timeoutTask.cancel(true); - - // 处理结果 - System.out.println("generate 响应: " + result); - System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai")); waitingStatus.remove(taskId); return result; } catch (InterruptedException | ExecutionException | BusinessException e) { diff --git a/src/main/java/com/ai/da/python/PythonService.java b/src/main/java/com/ai/da/python/PythonService.java index e8f37f2f..292c6243 100644 --- a/src/main/java/com/ai/da/python/PythonService.java +++ b/src/main/java/com/ai/da/python/PythonService.java @@ -2242,7 +2242,7 @@ public class PythonService { public List generateSketchOrPrint(GenerateToPythonDTO generateToPythonDTO) { //限流校验 - AccessLimitUtils.validate("generateSketchOrPrint", 5); +// AccessLimitUtils.validate("generateSketchOrPrint", 5); OkHttpClient client = new OkHttpClient().newBuilder() .connectTimeout(30, TimeUnit.SECONDS) .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) @@ -2271,7 +2271,7 @@ public class PythonService { throw new BusinessException(ioException.getMessage()); } //去除限流 - AccessLimitUtils.validateOut("generateSketchOrPrint"); +// AccessLimitUtils.validateOut("generateSketchOrPrint"); // 判断是否生成失败 if (Objects.isNull(response.body())) { diff --git a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java index 5fe3c4a4..5ff34fb5 100644 --- a/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java @@ -126,6 +126,7 @@ public class GenerateServiceImpl extends ServiceImpl i category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); // List generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // category, text, mode, "1", generateThroughImageTextDTO.getGender())); + log.info("generate 响应 : " + generatedSketchUrl); if (CollectionUtils.isEmpty(generatedSketchUrl)) { return null; }