1、generate 异步生成及获取排队情况
2、generate 取消生成
This commit is contained in:
45
src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java
Normal file
45
src/main/java/com/ai/da/common/RabbitMQ/MQConfig.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
@Configuration
|
||||
public class MQConfig {
|
||||
|
||||
public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange";
|
||||
public static final String GENERATE_QUEUE = "generate-queue";
|
||||
|
||||
public MQConfig() {
|
||||
}
|
||||
|
||||
@Bean
|
||||
FanoutExchange fanoutRasaExchange() {
|
||||
return new FanoutExchange(GENERATE_EXCHANGE_FANOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建队列,使用工作模式,不用定义交换机
|
||||
*/
|
||||
@Bean
|
||||
public Queue queueRasa() {
|
||||
return new Queue(GENERATE_QUEUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将队列绑定到交换机上【队列订阅交换机】
|
||||
*/
|
||||
@Bean
|
||||
Binding bindingExchangeRasa() {
|
||||
return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange());
|
||||
}
|
||||
|
||||
}
|
||||
108
src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java
Normal file
108
src/main/java/com/ai/da/common/RabbitMQ/MQConsumer.java
Normal file
@@ -0,0 +1,108 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import com.ai.da.common.config.exception.BusinessException;
|
||||
import com.ai.da.common.utils.RedisUtil;
|
||||
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
|
||||
import com.ai.da.model.vo.GenerateCollectionVO;
|
||||
import com.ai.da.service.GenerateService;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MQConsumer {
|
||||
|
||||
@Resource
|
||||
private GenerateService generateService;
|
||||
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
@Value("${redis.key.consumptionOrder}")
|
||||
private String consumptionOrderKey;
|
||||
|
||||
@Value("${redis.key.cancelSet}")
|
||||
private String cancelSetKey;
|
||||
|
||||
@Value("${redis.key.exceptionMap}")
|
||||
private String exceptionMapKey;
|
||||
|
||||
@Value("${redis.key.resultMap}")
|
||||
private String resultMapKey;
|
||||
|
||||
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
|
||||
@RabbitHandler
|
||||
public void generate(Message msg, Channel channel) {
|
||||
log.info("============start listening==========");
|
||||
|
||||
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class);
|
||||
Long uniqueId = generateThroughImageTextDTO.getUniqueId();
|
||||
// 1、将消息从redis排队队列中删除
|
||||
redisUtil.removeFromZSet(consumptionOrderKey, String.valueOf(uniqueId));
|
||||
try {
|
||||
// 2、判断当前消息是否在取消列表中
|
||||
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, String.valueOf(uniqueId));
|
||||
if (isMember) {
|
||||
try {
|
||||
// 2.1 手动确认该消息
|
||||
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
||||
} catch (IOException ex) {
|
||||
log.error("手动确认,不返回队列重新消费");
|
||||
}
|
||||
// 2.2 将该消息从取消列表中删除
|
||||
redisUtil.removeFromSet(cancelSetKey, String.valueOf(uniqueId));
|
||||
} else {
|
||||
try {
|
||||
// 模拟耗时
|
||||
Thread.sleep(40000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO);
|
||||
if (!Objects.isNull(generateCollectionVO)){
|
||||
HashMap<String, String> generateResult = new HashMap<>();
|
||||
generateResult.put(String.valueOf(uniqueId), JSONObject.toJSONString(generateCollectionVO));
|
||||
// 将结果存在redis中 ,为空时不要存
|
||||
redisUtil.addToMap(resultMapKey, generateResult);
|
||||
}
|
||||
|
||||
}
|
||||
} catch (BusinessException e) {
|
||||
log.error(e.getMessage());
|
||||
// channel.basicNack() 为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
|
||||
try {
|
||||
// 第二个参数,是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。
|
||||
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
|
||||
} catch (IOException exception) {
|
||||
log.error("手动确认,取消返回队列,不再重新消费");
|
||||
}
|
||||
// 将入参和错误信息存入数据库
|
||||
String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) + " Exception message : " + e.getMessage();
|
||||
HashMap<String, String> exceptionInfo = new HashMap<>();
|
||||
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
|
||||
// 存redis
|
||||
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
|
||||
}
|
||||
|
||||
// log.info(JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class).toString());
|
||||
// try {
|
||||
// Thread.sleep(10000);
|
||||
// } catch (InterruptedException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
log.info("============end listening==========");
|
||||
}
|
||||
|
||||
}
|
||||
39
src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java
Normal file
39
src/main/java/com/ai/da/common/RabbitMQ/MQPublisher.java
Normal file
@@ -0,0 +1,39 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import com.ai.da.service.RabbitMQService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.AmqpTemplate;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MQPublisher {
|
||||
|
||||
private final String url = "http://localhost:15672/api/queues/%2f/generate-queue";
|
||||
|
||||
@Resource
|
||||
private AmqpTemplate amqpTemplate;
|
||||
@Resource
|
||||
private RabbitMQService rabbitMQService;
|
||||
|
||||
public void sendGenerateMessage(String mm) {
|
||||
log.info("send message:" + mm);
|
||||
amqpTemplate.convertAndSend(MQConfig.GENERATE_QUEUE, mm);
|
||||
|
||||
}
|
||||
|
||||
public void getMsgCount() {
|
||||
//// AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(MQConfig.GENERATE_QUEUE));
|
||||
//
|
||||
// QueueInformation queueInfo = rabbitAdmin.getQueueInfo(MQConfig.GENERATE_QUEUE);
|
||||
//// assert queueInfo != null;
|
||||
//
|
||||
//// System.out.println(declareOk.getMessageCount());
|
||||
// System.out.println(queueInfo.getMessageCount());
|
||||
//// return declareOk.getMessageCount();
|
||||
// return queueInfo.getMessageCount();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user