TASK:generate 添加队列消费者,增加并发量

This commit is contained in:
2024-01-24 14:44:31 +08:00
parent d4568ad740
commit 1d2cb17d54
6 changed files with 73 additions and 35 deletions

View File

@@ -21,10 +21,10 @@ public class MQConfig {
public MQConfig() { public MQConfig() {
} }
@Bean // @Bean
FanoutExchange fanoutRasaExchange() { // FanoutExchange fanoutRasaExchange() {
return new FanoutExchange(GENERATE_EXCHANGE_FANOUT); // return new FanoutExchange(GENERATE_EXCHANGE_FANOUT);
} // }
/** /**
* 创建队列,使用工作模式,不用定义交换机 * 创建队列,使用工作模式,不用定义交换机
@@ -37,9 +37,9 @@ public class MQConfig {
/** /**
* 将队列绑定到交换机上【队列订阅交换机】 * 将队列绑定到交换机上【队列订阅交换机】
*/ */
@Bean // @Bean
Binding bindingExchangeRasa() { // Binding bindingExchangeRasa() {
return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange()); // return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange());
} // }
} }

View File

@@ -42,13 +42,13 @@ public class MQConsumer {
@Value("${redis.key.resultMap}") @Value("${redis.key.resultMap}")
private String resultMapKey; private String resultMapKey;
@RabbitListener(queues = MQConfig.GENERATE_QUEUE) public void generate(Message msg, Channel channel, String consumerName) {
@RabbitHandler
public void generate(Message msg, Channel channel) {
log.info("============start listening=========="); log.info("============start listening==========");
long start = System.currentTimeMillis();
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class); GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class);
String uniqueId = generateThroughImageTextDTO.getUniqueId(); String uniqueId = generateThroughImageTextDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId);
try { try {
// 2、判断当前消息是否在取消列表中 // 2、判断当前消息是否在取消列表中
@@ -71,7 +71,7 @@ public class MQConsumer {
// } // }
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId); redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
if (!Objects.isNull(generateCollectionVO)){ if (!Objects.isNull(generateCollectionVO)) {
HashMap<String, String> generateResult = new HashMap<>(); HashMap<String, String> generateResult = new HashMap<>();
generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO)); generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO));
// 将结果存在redis中 ,为空时不要存 // 将结果存在redis中 ,为空时不要存
@@ -98,7 +98,64 @@ public class MQConsumer {
// 存redis // 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo); redisUtil.addToMap(exceptionMapKey, exceptionInfo);
} }
long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("============end listening=========="); log.info("============end listening==========");
} }
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer1(Message msg, Channel channel) {
generate(msg, channel, "consumer 1");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer2(Message msg, Channel channel) {
generate(msg, channel, "consumer 2");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer3(Message msg, Channel channel) {
generate(msg, channel, "consumer 3");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer4(Message msg, Channel channel) {
generate(msg, channel, "consumer 4");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer5(Message msg, Channel channel) {
generate(msg, channel, "consumer 5");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer6(Message msg, Channel channel) {
generate(msg, channel, "consumer 6");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer7(Message msg, Channel channel) {
generate(msg, channel, "consumer 7");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer8(Message msg, Channel channel) {
generate(msg, channel, "consumer 8");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer9(Message msg, Channel channel) {
generate(msg, channel, "consumer 9");
}
} }

View File

@@ -1,9 +1,7 @@
package com.ai.da.common.RabbitMQ; package com.ai.da.common.RabbitMQ;
import com.ai.da.service.RabbitMQService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -16,8 +14,6 @@ public class MQPublisher {
@Resource @Resource
private AmqpTemplate amqpTemplate; private AmqpTemplate amqpTemplate;
@Resource
private RabbitMQService rabbitMQService;
public void sendGenerateMessage(String mm) { public void sendGenerateMessage(String mm) {
log.info("send message:" + mm); log.info("send message:" + mm);
@@ -25,15 +21,4 @@ public class MQPublisher {
} }
public void getMsgCount() {
//// AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(MQConfig.GENERATE_QUEUE));
//
// QueueInformation queueInfo = rabbitAdmin.getQueueInfo(MQConfig.GENERATE_QUEUE);
//// assert queueInfo != null;
//
//// System.out.println(declareOk.getMessageCount());
// System.out.println(queueInfo.getMessageCount());
//// return declareOk.getMessageCount();
// return queueInfo.getMessageCount();
}
} }

View File

@@ -43,18 +43,13 @@ public class AsyncCallerUtil {
generateResult.cancel(true); generateResult.cancel(true);
waitingStatus.remove(taskId); waitingStatus.remove(taskId);
} }
System.out.println("持续等待...... : " + DateUtil.getByTimeZone("Asia/Shanghai")); log.info("===============持续等待===============");
}, 10, 10, TimeUnit.SECONDS); }, 10, 10, TimeUnit.SECONDS);
System.out.println("开始阻塞 : " + DateUtil.getByTimeZone("Asia/Shanghai"));
// 阻塞,等待结果 // 阻塞,等待结果
List<String> result = generateResult.get(); List<String> result = generateResult.get();
// 取消定时任务 // 取消定时任务
timeoutTask.cancel(true); timeoutTask.cancel(true);
// 处理结果
System.out.println("generate 响应: " + result);
System.out.println("schedule finish time : " + DateUtil.getByTimeZone("Asia/Shanghai"));
waitingStatus.remove(taskId); waitingStatus.remove(taskId);
return result; return result;
} catch (InterruptedException | ExecutionException | BusinessException e) { } catch (InterruptedException | ExecutionException | BusinessException e) {

View File

@@ -2242,7 +2242,7 @@ public class PythonService {
public List<String> generateSketchOrPrint(GenerateToPythonDTO generateToPythonDTO) { public List<String> generateSketchOrPrint(GenerateToPythonDTO generateToPythonDTO) {
//限流校验 //限流校验
AccessLimitUtils.validate("generateSketchOrPrint", 5); // AccessLimitUtils.validate("generateSketchOrPrint", 5);
OkHttpClient client = new OkHttpClient().newBuilder() OkHttpClient client = new OkHttpClient().newBuilder()
.connectTimeout(30, TimeUnit.SECONDS) .connectTimeout(30, TimeUnit.SECONDS)
.pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒) .pingInterval(5, TimeUnit.SECONDS)//websocket轮训间隔(单位:秒)
@@ -2271,7 +2271,7 @@ public class PythonService {
throw new BusinessException(ioException.getMessage()); throw new BusinessException(ioException.getMessage());
} }
//去除限流 //去除限流
AccessLimitUtils.validateOut("generateSketchOrPrint"); // AccessLimitUtils.validateOut("generateSketchOrPrint");
// 判断是否生成失败 // 判断是否生成失败
if (Objects.isNull(response.body())) { if (Objects.isNull(response.body())) {

View File

@@ -126,6 +126,7 @@ public class GenerateServiceImpl extends ServiceImpl<GenerateMapper, Generate> i
category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId())); category, text, mode, "1", generateThroughImageTextDTO.getGender() ,generateThroughImageTextDTO.getUniqueId()));
// List<String> generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(), // List<String> generatedSketchUrl = pythonService.generateSketchOrPrint(new GenerateToPythonDTO(accountId, Objects.isNull(elementId) ? null : collectionElement.getUrl(),
// category, text, mode, "1", generateThroughImageTextDTO.getGender())); // category, text, mode, "1", generateThroughImageTextDTO.getGender()));
log.info("generate 响应 " + generatedSketchUrl);
if (CollectionUtils.isEmpty(generatedSketchUrl)) { if (CollectionUtils.isEmpty(generatedSketchUrl)) {
return null; return null;
} }