1、接入超分功能

2、添加积分系统
3、新增订单查询,积分详细查询
This commit is contained in:
2024-03-15 15:38:56 +08:00
parent bf05f88c00
commit 305324fe1a
35 changed files with 798 additions and 55 deletions

View File

@@ -22,7 +22,7 @@ import java.util.Objects;
@Slf4j
@Component
public class MQConsumer {
public class GenerateConsumer {
@Resource
private GenerateService generateService;
@@ -30,13 +30,13 @@ public class MQConsumer {
@Resource
private RedisUtil redisUtil;
@Value("${redis.key.consumptionOrder}")
@Value("${redis.key.orderForGenerate}")
private String consumptionOrderKey;
@Value("${redis.key.cancelSet}")
@Value("${redis.key.generateCancelSet}")
private String cancelSetKey;
@Value("${redis.key.exceptionMap}")
@Value("${redis.key.generateExceptionMap}")
private String exceptionMapKey;
@Value("${redis.key.resultMap}")

View File

@@ -1,16 +1,8 @@
package com.ai.da.common.RabbitMQ;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
@Configuration
public class MQConfig {
@@ -18,7 +10,10 @@ public class MQConfig {
public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange";
// public static final String GENERATE_QUEUE = "generate-queue-prod";
// public static final String GENERATE_QUEUE = "generate-queue-test";
public static final String GENERATE_QUEUE = "generate-queue-dev";
// public static final String GENERATE_QUEUE = "generate-queue-dev";
public static final String GENERATE_QUEUE = "generate-queue";
public static final String SR_QUEUE = "SR-queue-dev";
public MQConfig() {
}
@@ -32,10 +27,15 @@ public class MQConfig {
* 创建队列,使用工作模式,不用定义交换机
*/
@Bean
public Queue queueRasa() {
public Queue generateQueue() {
return new Queue(GENERATE_QUEUE);
}
@Bean
public Queue SRQueue() {
return new Queue(SR_QUEUE);
}
/**
* 将队列绑定到交换机上【队列订阅交换机】
*/

View File

@@ -18,7 +18,11 @@ public class MQPublisher {
public void sendGenerateMessage(String mm) {
log.info("send message:" + mm);
amqpTemplate.convertAndSend(MQConfig.GENERATE_QUEUE, mm);
}
public void sendSRMessage(String mm) {
log.info("send message:" + mm);
amqpTemplate.convertAndSend(MQConfig.SR_QUEUE, mm);
}
}

View File

@@ -0,0 +1,111 @@
package com.ai.da.common.RabbitMQ;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.common.utils.RedisUtil;
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
import com.ai.da.model.dto.SuperResolutionDTO;
import com.ai.da.model.vo.GenerateCollectionVO;
import com.ai.da.service.SuperResolutionService;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
@Slf4j
@Component
public class SRConsumer {
@Resource
private RedisUtil redisUtil;
@Value("${redis.key.orderForSR}")
private String consumptionOrderKey;
@Value("${redis.key.SRCancelSet}")
private String cancelSetKey;
@Value("${redis.key.SRExceptionMap}")
private String exceptionMapKey;
@Value("${redis.key.resultMap}")
private String resultMapKey;
@Resource
private SuperResolutionService superResolutionService;
public void superResolution(Message msg, Channel channel, String consumerName){
log.info("============start listening==========");
long start = System.currentTimeMillis();
SuperResolutionDTO superResolutionDTO = JSONObject.parseObject(msg.getBody(), SuperResolutionDTO.class);
String uniqueId = superResolutionDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId);
try {
// 2、判断当前消息是否在取消列表中
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId);
if (isMember) {
try {
// 2.1 手动确认该消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
log.error("手动确认,不返回队列重新消费");
}
} else {
/*try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
String srOutput = superResolutionService.SR(superResolutionDTO);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
if (!StringUtil.isNullOrEmpty(srOutput)) {
HashMap<String, String> generateResult = new HashMap<>();
generateResult.put(uniqueId, srOutput);
// 将结果存在redis中 ,为空时不要存
redisUtil.addToMap(resultMapKey, generateResult);
}
}
} catch (BusinessException e) {
log.error(e.getMsg());
// channel.basicNack() 为不确认deliveryTag对应的消息第二个参数是否应用于多消息第三个参数是否requeue
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(superResolutionDTO) +
" Exception message " + e.getMsg();
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("=============end listening===========");
}
@RabbitListener(queues = MQConfig.SR_QUEUE)
@RabbitHandler
public void SRConsumer1(Message msg, Channel channel) {
superResolution(msg, channel, "consumer 1");
}
}

View File

@@ -14,8 +14,8 @@ public enum CreditsEventsEnum {
DAILY_CHECKIN("Daily Check-In", "50"),
SOCIAL_MEDIA_SHARING("Social Media Sharing","50"),
BUY_CREDITS("Buy Credits","2000"),
// 6USD -> 1000 points ==> 10HKD -> 215 points ==> 2HKD -> 43points
BUY_CREDITS("Buy Credits","43"),
SUPER_RESOLUTION("Super Resolution","300"),

View File

@@ -2,6 +2,7 @@ package com.ai.da.common.utils;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.model.dto.GenerateToPythonDTO;
import com.ai.da.model.dto.SuperResolutionDTO;
import com.ai.da.python.PythonService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,6 +15,7 @@ import java.util.concurrent.*;
@Component
public class AsyncCallerUtil {
// 存放状态 表示当前任务是否需要继续等待,默认持续等待
public static Map<String, Boolean> waitingStatus = new HashMap<>();
private static PythonService pythonService;
@@ -70,4 +72,52 @@ public class AsyncCallerUtil {
}
}
public CompletableFuture<String> callSRAsync(SuperResolutionDTO superResolutionDTO) {
return CompletableFuture.supplyAsync(() -> pythonService.superResolution(superResolutionDTO));
}
public String SR(SuperResolutionDTO superResolutionDTO) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
String taskId = superResolutionDTO.getUniqueId();
ScheduledFuture<?> timeoutTask = null;
if (!waitingStatus.containsKey(taskId)) waitingStatus.put(taskId, true);
try {
CompletableFuture<String> generateResult = callSRAsync(superResolutionDTO);
// 5秒后第一次确认之后每隔10秒确认一次用户选择结果
timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> {
// 调用另一个接口获取用户的选择
if (!waitingStatus.get(taskId)) {
// 如果用户选择取消则取消对generate的调用
generateResult.cancel(true);
waitingStatus.remove(taskId);
} else log.info("===============持续等待===============");
}, 5, 10, TimeUnit.SECONDS);
log.info("阻塞等待结果...");
// 阻塞,等待结果
String result = generateResult.get();
// 取消定时任务
timeoutTask.cancel(true);
waitingStatus.remove(taskId);
return result;
} catch (CancellationException e) {
// generateResult.cancel(true);通过抛出异常取消该任务
log.info("==========成功取消generate任务==========");
return null;
} catch (InterruptedException | ExecutionException | BusinessException e) {
// 处理异常
log.error("发生错误 " + e, e);
// 取消定时任务
assert timeoutTask != null;
timeoutTask.cancel(true);
throw new BusinessException(e.getMessage());
} finally {
// 关闭线程池
// executorService.shutdown();
// scheduledExecutorService.shutdown();
}
}
}