1、完善超分功能

2、完善积分系统
3、新增任务列表
This commit is contained in:
2024-03-26 14:58:43 +08:00
parent 305324fe1a
commit d411b428f8
26 changed files with 807 additions and 328 deletions

View File

@@ -8,12 +8,16 @@ import org.springframework.context.annotation.Configuration;
public class MQConfig {
public static final String GENERATE_EXCHANGE_FANOUT = "generate-exchange";
// public static final String GENERATE_QUEUE = "generate-queue-prod";
// public static final String GENERATE_QUEUE = "generate-queue-prod";
// public static final String GENERATE_QUEUE = "generate-queue-test";
// public static final String GENERATE_QUEUE = "generate-queue-dev";
public static final String GENERATE_QUEUE = "generate-queue";
public static final String GENERATE_QUEUE = "generate-queue-dev";
// public static final String GENERATE_QUEUE = "generate-queue";
public static final String SR_QUEUE = "SR-queue-dev";
public static final String SR_QUEUE = "SR-queue-dev";
// public static final String SR_QUEUE = "SR-queue-local";
// public static final String SR_RESULT_QUEUE = "SuperResolution-local";
public static final String SR_RESULT_QUEUE = "SuperResolution-dev";
public MQConfig() {
}
@@ -36,6 +40,11 @@ public class MQConfig {
return new Queue(SR_QUEUE);
}
@Bean
public Queue SRResultQueue() {
return new Queue(SR_RESULT_QUEUE);
}
/**
* 将队列绑定到交换机上【队列订阅交换机】
*/

View File

@@ -2,13 +2,15 @@ package com.ai.da.common.RabbitMQ;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.common.utils.RedisUtil;
import com.ai.da.model.dto.GenerateThroughImageTextDTO;
import com.ai.da.model.dto.SuperResolutionDTO;
import com.ai.da.model.vo.GenerateCollectionVO;
import com.ai.da.model.dto.TaskDTO;
import com.ai.da.service.SuperResolutionService;
import com.ai.da.service.TaskListService;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.rabbitmq.client.Channel;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
@@ -18,6 +20,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
@Slf4j
@@ -27,6 +30,9 @@ public class SRConsumer {
@Resource
private RedisUtil redisUtil;
@Resource
private TaskListService taskListService;
@Value("${redis.key.orderForSR}")
private String consumptionOrderKey;
@@ -36,21 +42,33 @@ public class SRConsumer {
@Value("${redis.key.SRExceptionMap}")
private String exceptionMapKey;
@Value("${redis.key.resultMap}")
private String resultMapKey;
@Value("${redis.key.taskList}")
private String taskListKey;
@Resource
private SuperResolutionService superResolutionService;
public void superResolution(Message msg, Channel channel, String consumerName){
log.info("============start listening==========");
/**
* 请求超分处理
*/
public void superResolution(Message msg, Channel channel, String consumerName) {
log.info("============SR start listening==========");
long start = System.currentTimeMillis();
SuperResolutionDTO superResolutionDTO = JSONObject.parseObject(msg.getBody(), SuperResolutionDTO.class);
String uniqueId = superResolutionDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId);
SuperResolutionDTO superResolutionDTO;
String uniqueId = null;
try {
superResolutionDTO = JSONObject.parseObject(msg.getBody(), SuperResolutionDTO.class);
uniqueId = superResolutionDTO.getUniqueId();
log.info("From " + consumerName + " : " + uniqueId);
superResolutionService.updateSROutput(uniqueId, "Executing", null);
taskListService.updateTaskStatusOrOutputRedis(uniqueId, "Executing", null);
/*try {
Thread.sleep(2 * 60 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
// 2、判断当前消息是否在取消列表中
Boolean isMember = redisUtil.isElementExistsInSet(cancelSetKey, uniqueId);
if (isMember) {
@@ -61,45 +79,112 @@ public class SRConsumer {
log.error("手动确认,不返回队列重新消费");
}
} else {
/*try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
String srOutput = superResolutionService.SR(superResolutionDTO);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
if (!StringUtil.isNullOrEmpty(srOutput)) {
HashMap<String, String> generateResult = new HashMap<>();
generateResult.put(uniqueId, srOutput);
// 将结果存在redis中 ,为空时不要存
redisUtil.addToMap(resultMapKey, generateResult);
}
// 请求python端进行超分
superResolutionService.SR(superResolutionDTO);
}
} catch (BusinessException e) {
log.error(e.getMsg());
superResolutionDTO = JSONObject.parseObject(msg.getBody(), SuperResolutionDTO.class);
// channel.basicNack() 为不确认deliveryTag对应的消息第二个参数是否应用于多消息第三个参数是否requeue
setErrorMessage(msg, channel, e.getMsg(), superResolutionDTO);
} catch (JSONException e) {
log.error(e.getMessage());
setErrorMessage(msg, channel, e.getMessage(), null);
} catch (Exception e) {
log.error(e.getMessage());
superResolutionDTO = JSONObject.parseObject(msg.getBody(), SuperResolutionDTO.class);
setErrorMessage(msg, channel, e.getMessage(), superResolutionDTO);
}
long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("=============SR end listening===========");
}
/**
* 获取超分结果
*/
public void getSRResult(Message msg, Channel channel, String consumerName) {
log.info("============SRResult start listening==========");
long start = System.currentTimeMillis();
JSONObject result = null;
String taskId = null;
try {
result = JSONObject.parseObject(msg.getBody(), JSONObject.class);
taskId = result.get("tasks_id").toString();
} catch (JSONException e) {
log.error("SRResult 返回数据格式不合规范");
log.error(e.getMessage());
setErrorMessage(msg, channel, e.getMessage(), null);
}
try {
// channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 2、判断状态是否成功
if ("SUCCESS".equals(result.get("status").toString())) {
String output = result.get("data").toString();
superResolutionService.setSRResult(taskId, output, "success");
taskListService.updateTaskStatusOrOutputRedis(taskId, "success", output);
} else {
superResolutionService.setSRResult(taskId, null, "fail");
taskListService.updateTaskStatusOrOutputRedis(taskId, "fail", null);
HashMap<String, String> exceptionInfo = new HashMap<>();
// 获取输入信息
String task = redisUtil.getFromString(taskListKey + taskId + taskId.substring(taskId.lastIndexOf("-") + 1));
Gson gson = new Gson();
Type type = new TypeToken<TaskDTO<SuperResolutionDTO>>() {
}.getType();
TaskDTO<SuperResolutionDTO> taskDTO = gson.fromJson(task, type);
// 将输入信息和报错信息均存入redis todo 加判空
exceptionInfo.put(taskId, "Input ==> " + taskDTO.getInputParam() + "Fail Message ==> " + result.get("message").toString());
// 将报错信息存入redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
} catch (Exception e) {
log.error(e.getMessage());
// channel.basicNack() 为不确认deliveryTag对应的消息第二个参数是否应用于多消息第三个参数是否requeue
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
} catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费");
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject.toJSONString(superResolutionDTO) +
" Exception message " + e.getMsg();
HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
}
long end = System.currentTimeMillis();
log.info(" task_id " + uniqueId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("=============end listening===========");
log.info(" task_id " + taskId + "----------" + consumerName + " 执行时长:" + (end - start) + "毫秒");
log.info("=============SRResult end listening===========");
}
private void setErrorMessage(Message msg, Channel channel, String message, SuperResolutionDTO superResolutionDTO) {
String uniqueId;
try {
// 第二个参数是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
uniqueId = superResolutionDTO.getUniqueId();
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, uniqueId);
} catch (Exception exception) {
log.error("手动确认,取消返回队列,不再重新消费");
throw new BusinessException("发生错误,手动确认消息");
}
// 将入参和错误信息存入redis
String exceptionMessage = JSONObject.toJSONString(superResolutionDTO) +
" Exception message " + message;
// " Exception message " + e.getMessage();
HashMap<String, String> exceptionInfo = new HashMap<>();
uniqueId = superResolutionDTO.getUniqueId();
exceptionInfo.put(String.valueOf(uniqueId), exceptionMessage);
// 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo);
taskListService.updateTaskStatusOrOutputRedis(uniqueId, "fail", null);
}
@RabbitListener(queues = MQConfig.SR_QUEUE)
@@ -108,4 +193,11 @@ public class SRConsumer {
superResolution(msg, channel, "consumer 1");
}
@RabbitListener(queues = MQConfig.SR_RESULT_QUEUE)
@RabbitHandler
public void SRResultConsumer1(Message msg, Channel channel) {
getSRResult(msg, channel, "consumer 1");
}
}