Merge branch 'dev/dev' into dev/dev_shb

# Conflicts:
#	src/main/java/com/ai/da/controller/AccountController.java
#	src/main/java/com/ai/da/mapper/primary/entity/GenerateCancel.java
#	src/main/java/com/ai/da/mapper/primary/entity/LibraryCopy.java
#	src/main/java/com/ai/da/mapper/primary/entity/LibraryModelPointCopy.java
#	src/main/java/com/ai/da/service/AccountService.java
#	src/main/java/com/ai/da/service/LibraryService.java
#	src/main/java/com/ai/da/service/impl/AccountServiceImpl.java
#	src/main/java/com/ai/da/service/impl/GenerateServiceImpl.java
#	src/main/java/com/ai/da/service/impl/LibraryServiceImpl.java
#	src/main/resources/application-prod.properties
#	src/main/resources/application-test.properties
This commit is contained in:
shahaibo
2024-02-19 13:02:58 +08:00
38 changed files with 1537 additions and 183 deletions

View File

@@ -0,0 +1,45 @@
package com.ai.da.common.RabbitMQ;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
@Configuration
public class MQConfig {
public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange";
public static final String GENERATE_QUEUE = "generate-queue-prod";
public MQConfig() {
}
// @Bean
// FanoutExchange fanoutRasaExchange() {
// return new FanoutExchange(GENERATE_EXCHANGE_FANOUT);
// }
/**
* 创建队列,使用工作模式,不用定义交换机
*/
@Bean
public Queue queueRasa() {
return new Queue(GENERATE_QUEUE);
}
/**
* 将队列绑定到交换机上【队列订阅交换机】
*/
// @Bean
// Binding bindingExchangeRasa() {
// return BindingBuilder.bind(queueRasa()).to(fanoutRasaExchange());
// }
}

View File

@@ -0,0 +1,161 @@
package com.ai.da.common.RabbitMQ;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.common.utils.RedisUtil;
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
import com.ai.da.model.vo.GenerateCollectionVO;
import com.ai.da.service.GenerateService;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
@Slf4j
@Component
public class MQConsumer {
@Resource
private GenerateService generateService;
@Resource
private RedisUtil redisUtil;
@Value("${redis.key.consumptionOrder}")
private String consumptionOrderKey;
@Value("${redis.key.cancelSet}")
private String cancelSetKey;
@Value("${redis.key.exceptionMap}")
private String exceptionMapKey;
@Value("${redis.key.resultMap}")
private String resultMapKey;
public void generate(Message msg, Channel channel, String consumerName) {
log.info("============start listening==========");
long start = System.currentTimeMillis();
GenerateThroughImageTextDTO generateThroughImageTextDTO = JSONObject.parseObject(msg.getBody(), GenerateThroughImageTextDTO.class);
String uniqueId = generateThroughImageTextDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId);
try {
// 2、判断当前消息是否在取消列表中
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId);
if (isMember) {
try {
// 2.1 手动确认该消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
log.error("手动确认,不返回队列重新消费");
}
// 2.2 将该消息从取消列表中删除
// redisUtil.removeFromSet(cancelSetKey, uniqueId);
} else {
/*try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
GenerateCollectionVO generateCollectionVO = generateService.generateThroughImageText(generateThroughImageTextDTO);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
if (!Objects.isNull(generateCollectionVO)) {
HashMap<String, String> generateResult = new HashMap<>();
generateResult.put(uniqueId, JSONObject.toJSONString(generateCollectionVO));
// 将结果存在redis中 ,为空时不要存
redisUtil.addToMap(resultMapKey, generateResult);
}
}
} catch (BusinessException e) {
log.error(e.getMsg());
// channel.basicNack() 为不确认deliveryTag对应的消息第二个参数是否应用于多消息第三个参数是否requeue
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(generateThroughImageTextDTO) +
" Exception message " + e.getMsg();
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("=============end listening===========");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer1(Message msg, Channel channel) {
generate(msg, channel, "consumer 1");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer2(Message msg, Channel channel) {
generate(msg, channel, "consumer 2");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer3(Message msg, Channel channel) {
generate(msg, channel, "consumer 3");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer4(Message msg, Channel channel) {
generate(msg, channel, "consumer 4");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer5(Message msg, Channel channel) {
generate(msg, channel, "consumer 5");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer6(Message msg, Channel channel) {
generate(msg, channel, "consumer 6");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer7(Message msg, Channel channel) {
generate(msg, channel, "consumer 7");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer8(Message msg, Channel channel) {
generate(msg, channel, "consumer 8");
}
@RabbitListener(queues = MQConfig.GENERATE_QUEUE)
@RabbitHandler
public void generateConsumer9(Message msg, Channel channel) {
generate(msg, channel, "consumer 9");
}
}

View File

@@ -0,0 +1,24 @@
package com.ai.da.common.RabbitMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class MQPublisher {
private final String url = "http://localhost:15672/api/queues/%2f/generate-queue";
@Resource
private AmqpTemplate amqpTemplate;
public void sendGenerateMessage(String mm) {
log.info("send message:" + mm);
amqpTemplate.convertAndSend(MQConfig.GENERATE_QUEUE, mm);
}
}

View File

@@ -29,6 +29,9 @@ public class MyTaskScheduler {
// 用户到期时间戳
Long timestamp = account.getValidEndTime(); // 替换为你的时间戳
if (null == timestamp) {
continue;
}
// 获取当前时间戳
Long currentTimestamp = System.currentTimeMillis();

View File

@@ -0,0 +1,42 @@
package com.ai.da.common.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> getRedisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化类型
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 方法过期,改为下面代码
// objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化类型
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

View File

@@ -0,0 +1,73 @@
package com.ai.da.common.utils;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.model.dto.GenerateToPythonDTO;
import com.ai.da.python.PythonService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.*;
@Slf4j
@Component
public class AsyncCallerUtil {
public static Map<String, Boolean> waitingStatus = new HashMap<>();
private static PythonService pythonService;
@Autowired
public void setPythonService(PythonService pythonService) {
AsyncCallerUtil.pythonService = pythonService;
}
public CompletableFuture<List<String>> callGenerateAsync(GenerateToPythonDTO generateToPython) {
return CompletableFuture.supplyAsync(() -> pythonService.generateSketchOrPrint(generateToPython));
}
public List<String> generate(GenerateToPythonDTO generateToPython) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
String taskId = generateToPython.getTasks_id();
ScheduledFuture<?> timeoutTask = null;
if (!waitingStatus.containsKey(taskId)) waitingStatus.put(taskId, true);
try {
CompletableFuture<List<String>> generateResult = callGenerateAsync(generateToPython);
// 5秒后第一次确认之后每隔10秒确认一次用户选择结果
timeoutTask = scheduledExecutorService.scheduleAtFixedRate(() -> {
// 调用另一个接口获取用户的选择
if (!waitingStatus.get(taskId)) {
// 如果用户选择取消则取消对generate的调用
generateResult.cancel(true);
waitingStatus.remove(taskId);
} else log.info("===============持续等待===============");
}, 5, 10, TimeUnit.SECONDS);
log.info("阻塞等待结果...");
// 阻塞,等待结果
List<String> result = generateResult.get();
// 取消定时任务
timeoutTask.cancel(true);
waitingStatus.remove(taskId);
return result;
} catch (CancellationException e) {
// generateResult.cancel(true);通过抛出异常取消该任务
log.info("==========成功取消generate任务==========");
return null;
} catch (InterruptedException | ExecutionException | BusinessException e) {
// 处理异常
log.error("发生错误 " + e);
// 取消定时任务
assert timeoutTask != null;
timeoutTask.cancel(true);
throw new BusinessException(e.getMessage());
} finally {
// 关闭线程池
// executorService.shutdown();
// scheduledExecutorService.shutdown();
}
}
}

View File

@@ -188,7 +188,7 @@ public class MinioUtil {
}
}
}
return bucketName + path;
return bucketName + "/" + path;
}
// public String upload(String bucketName, String path, File file) {

View File

@@ -0,0 +1,126 @@
package com.ai.da.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Set;
@Slf4j
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, String> redisTemplate;
//- - - - - - - - - - - - - - - - - - - - - ZSet类型 - - - - - - - - - - - - - - - - - - - -
/**
* 向ZSet中添加元素
*/
public void addToZSet(String key, String value, Double score) {
redisTemplate.opsForZSet().add(key, value, score);
}
/**
* 从ZSet中删除元素
*/
public void removeFromZSet(String key, String value) {
redisTemplate.opsForZSet().remove(key, value);
}
/**
* 获取指定元素的当前排列顺序
*/
public Long getRank(String key, String value) {
return redisTemplate.opsForZSet().rank(key, value);
}
/**
* 获取当前ZSet中的最大score
*/
public Double getMaxScore(String key) {
Set<ZSetOperations.TypedTuple<String>> set = redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 0);
if (!CollectionUtils.isEmpty(set)) {
Double score = set.iterator().next().getScore();
return score + 1.0;
} else {
return 1.0;
}
}
/**
* 判断元素是否存在
*/
public Boolean isElementExistsInZSet(String key, String value) {
return redisTemplate.opsForZSet().score(key, value) != null;
}
/**
* 获取当前ZSet中数据量的总和
*/
public Long getZSetTotal(String key) {
return redisTemplate.opsForZSet().zCard(key);
}
//- - - - - - - - - - - - - - - - - - - - - set类型 - - - - - - - - - - - - - - - - - - - -
/**
* 将数据放入set缓存
*/
public void addToSet(String key, String value) {
redisTemplate.opsForSet().add(key, value);
}
/**
* 弹出变量中的元素
*/
public void removeFromSet(String key, String value) {
redisTemplate.opsForSet().remove(key, value);
}
/**
* 检查给定的元素是否在变量中。
*/
public Boolean isElementExistsInSet(String key, String obj) {
return redisTemplate.opsForSet().isMember(key, obj);
}
//- - - - - - - - - - - - - - - - - - - - - hash类型 - - - - - - - - - - - - - - - - - - - -
/**
* 加入缓存
*/
public void addToMap(String key, Map<String, String> map) {
redisTemplate.opsForHash().putAll(key, map);
}
/**
* 验证指定 key 下 有没有指定的 hashkey
*/
public Boolean isElementExistsInMap(String key, String hashKey) {
return redisTemplate.opsForHash().hasKey(key, hashKey);
}
/**
* 获取指定key的值string
*/
public String getMapValue(String key1, String key2) {
return String.valueOf(redisTemplate.opsForHash().get(key1, key2));
}
/**
* 删除指定 hash 的 HashKey
*
* @return 删除成功的 数量
*/
public Long removeFromMap(String key, String hashKeys) {
return redisTemplate.opsForHash().delete(key, hashKeys);
}
}

View File

@@ -34,6 +34,7 @@ public class SendEmailUtil {
* 发信地址
*/
private static String SEND_ADDRESS = "info@aida.com.hk";
private final static String CODE_CREATE_SEND_ADDRESS = "info@code-create.com.hk";
/**
* 登入主题
*/
@@ -219,18 +220,20 @@ public class SendEmailUtil {
JSONObject jsonObject = new JSONObject();
// 设置试用订单相关数据
jsonObject.put("userName", account.getUserName());
// 用户到期时间戳
Long timestamp = account.getValidEndTime(); // 替换为你的时间戳
if (null != timestamp) {
// 获取当前时间戳
Long currentTimestamp = System.currentTimeMillis();
// 获取当前时间戳
Long currentTimestamp = System.currentTimeMillis();
// 计算时间差(毫秒)
long timeDifference = currentTimestamp - timestamp;
// 计算时间差(毫秒)
long timeDifference = currentTimestamp - timestamp;
// 向上取整计算天数
long days = (timeDifference + 24 * 60 * 60 * 1000 - 1) / (24 * 60 * 60 * 1000);
jsonObject.put("days", days);
// 向上取整计算天数
long days = (timeDifference + 24 * 60 * 60 * 1000 - 1) / (24 * 60 * 60 * 1000);
jsonObject.put("days", days);
}
return jsonObject.toJSONString();
}
@@ -269,4 +272,40 @@ public class SendEmailUtil {
jsonObject.put("email", trialOrder.getEmail());
return jsonObject.toJSONString();
}
private final static Long UPGRADE_NOTIFICATION_ID = 118855L;
public static void sendUpgradeNotification(Account account, String senderAddress) {
try {
// 实例化一个认证对象
Credential cred = new Credential(SECRET_ID, SECRET_KEy);
HttpProfile httpProfile = new HttpProfile();
httpProfile.setEndpoint("ses.tencentcloudapi.com");
ClientProfile clientProfile = new ClientProfile();
clientProfile.setHttpProfile(httpProfile);
SesClient client = new SesClient(cred, "ap-hongkong", clientProfile);
SendEmailRequest req = new SendEmailRequest();
if (StringUtils.isEmpty(senderAddress)) {
senderAddress = CODE_CREATE_SEND_ADDRESS;
}
req.setFromEmailAddress(senderAddress);
req.setDestination(new String[]{account.getUserEmail()});
// 根据邮件类型设置不同的主题和模板
String subject = "";
Template template = new Template();
subject = "Upcoming AiDA 3.0 Launch and Scheduled Maintenance";
template.setTemplateID(UPGRADE_NOTIFICATION_ID);
template.setTemplateData(buildAccountData(account));
req.setSubject(subject);
req.setTemplate(template);
// 发送邮件
SendEmailResponse resp = client.SendEmail(req);
log.info("短信发送结果res###{}", SendEmailResponse.toJsonString(resp));
} catch (TencentCloudSDKException e) {
log.info("邮件发送失败###{}", e.toString());
throw new BusinessException("failed.to.send.mail");
}
}
}

View File

@@ -0,0 +1,137 @@
package com.ai.da.common.utils;
public class SnowflakeUtil {
// ==============================Fields===========================================
/** 开始时间截 (2015-01-01) */
private final long twepoch = 1420041600000L;
/** 机器id所占的位数 */
private final long workerIdBits = 5L;
/** 数据标识id所占的位数 */
private final long datacenterIdBits = 5L;
/** 支持的最大机器id结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/** 支持的最大数据标识id结果是31 */
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
/** 序列在id中占的位数 */
private final long sequenceBits = 12L;
/** 机器ID向左移12位 */
private final long workerIdShift = sequenceBits;
/** 数据标识id向左移17位(12+5) */
private final long datacenterIdShift = sequenceBits + workerIdBits;
/** 时间截向左移22位(5+5+12) */
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
/** 生成序列的掩码这里为4095 (0b111111111111=0xfff=4095) */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/** 工作机器ID(0~31) */
private long workerId;
/** 数据中心ID(0~31) */
private long datacenterId;
/** 毫秒内序列(0~4095) */
private long sequence = 0L;
/** 上次生成ID的时间截 */
private long lastTimestamp = -1L;
//==============================Constructors=====================================
/**
* 构造函数
* @param workerId 工作ID (0~31)
* @param datacenterId 数据中心ID (0~31)
*/
public SnowflakeUtil(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format
("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format
("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format
("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift) //
| (datacenterId << datacenterIdShift) //
| (workerId << workerIdShift) //
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
//==============================Test=============================================
/** 测试 */
public static void main(String[] args) {
SnowflakeUtil idWorker = new SnowflakeUtil(0, 0);
long id = idWorker.nextId();
System.out.println("id:"+id);
//id:768842202204864512
}
}