TASK:模块化;

This commit is contained in:
shahaibo
2025-04-21 21:50:03 +08:00
parent 05295b1a0d
commit 98e1f37c1d

View File

@@ -8,6 +8,7 @@ import com.ai.da.model.vo.PoseTransformationVO;
import com.ai.da.service.DesignService; import com.ai.da.service.DesignService;
import com.ai.da.service.GenerateService; import com.ai.da.service.GenerateService;
import com.ai.da.service.UserLikeGroupService; import com.ai.da.service.UserLikeGroupService;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
@@ -320,26 +321,30 @@ public class GenerateConsumer {
} }
private void processToProductImageBatchResult(Message msg, Channel channel) { private void processToProductImageBatchResult(Message msg, Channel channel) {
log.info("============processToProductImageResult listening=========="); log.info("============processToProductImageResultBatch listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class); JSONObject generateResult = JSONObject.parseObject(msg.getBody(), JSONObject.class);
log.info("toProductImage response : {}", generateResult); log.info("toProductImageBatch response : {}", generateResult);
try { try {
log.info("tasks_id : {} start ", generateResult.get("tasks_id")); log.info("tasks_id : {} start ", generateResult.get("tasks_id"));
if (!StringUtils.isEmpty(generateResult.get("progress"))) { if (!StringUtils.isEmpty(generateResult.getString("progress"))) {
String progress = generateResult.get("progress"); String progress = generateResult.getString("progress");
String url = generateResult.get("result"); JSONArray result = generateResult.getJSONArray("result");
String taskId = generateResult.get("tasks_id"); String url = null;
if (!StringUtils.isEmpty(result)) {
url = result.getString(0);
}
String taskId = generateResult.getString("tasks_id");
userLikeGroupService.toProductBatch(taskId, url, progress); userLikeGroupService.toProductBatch(taskId, url, progress);
} else { } else {
// 修改redis中的数据状态为exception // 修改redis中的数据状态为exception
String key = toProductImageResultKey + ":" + generateResult.get("tasks_id"); String key = toProductImageResultKey + ":" + generateResult.get("tasks_id");
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.getString("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中 // 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>(); HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data")); exceptionInfo.put(generateResult.getString("tasks_id"), generateResult.getString("data"));
// 存redis // 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo); redisUtil.addToMap(exceptionMapKey, exceptionInfo);
} }
@@ -348,7 +353,7 @@ public class GenerateConsumer {
try { try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, generateResult.get("tasks_id")); redisUtil.removeFromZSet(consumptionOrderKey, generateResult.getString("tasks_id"));
} catch (IOException exception) { } catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费"); log.error("手动确认,取消返回队列,不再重新消费");
} }
@@ -363,33 +368,30 @@ public class GenerateConsumer {
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start)); log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("============ProcessToProductImageResult End listening=========="); log.info("============ProcessToProductImageBatchResult End listening==========");
} }
private void processRelightBatchResult(Message msg, Channel channel) { private void processRelightBatchResult(Message msg, Channel channel) {
log.info("============processRelightResult listening=========="); log.info("============processRelightResult listening==========");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> generateResult = JSONObject.parseObject(msg.getBody(), Map.class); JSONObject generateResult = JSONObject.parseObject(msg.getBody(), JSONObject.class);
log.info("toProductImage response : {}", generateResult); log.info("relightBatch response : {}", generateResult);
try { try {
log.info("tasks_id : {} start ", generateResult.get("tasks_id")); log.info("tasks_id : {} start ", generateResult.get("tasks_id"));
if (generateResult.get("status").equals("SUCCESS")) { if (generateResult.get("status").equals("SUCCESS")) {
String url = generateResult.get("image_url"); String progress = generateResult.getString("progress");
String taskId = generateResult.get("tasks_id"); String url = generateResult.getJSONArray("result").getString(0);
String category = generateResult.get("category"); String taskId = generateResult.getString("tasks_id");
generateService.processRelightResult(taskId, url, category); userLikeGroupService.relightBatch(taskId, url, progress);
} else if (generateResult.get("status").equals("NO_FACE")) {
String taskId = generateResult.get("tasks_id");
userLikeGroupService.relight(taskId);
} else { } else {
// 修改redis中的数据状态为exception // 修改redis中的数据状态为exception
String key = relightResultKey + ":" + generateResult.get("tasks_id"); String key = relightResultKey + ":" + generateResult.get("tasks_id");
redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.get("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME); redisUtil.addToString(key, new Gson().toJson(new GenerateResultVO(generateResult.getString("tasks_id"), null, null, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中 // 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>(); HashMap<String, String> exceptionInfo = new HashMap<>();
exceptionInfo.put(generateResult.get("tasks_id"), generateResult.get("data")); exceptionInfo.put(generateResult.getString("tasks_id"), generateResult.getString("data"));
// 存redis // 存redis
redisUtil.addToMap(exceptionMapKey, exceptionInfo); redisUtil.addToMap(exceptionMapKey, exceptionInfo);
} }
@@ -398,7 +400,7 @@ public class GenerateConsumer {
try { try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除 // 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil.removeFromZSet(consumptionOrderKey, generateResult.get("tasks_id")); redisUtil.removeFromZSet(consumptionOrderKey, generateResult.getString("tasks_id"));
} catch (IOException exception) { } catch (IOException exception) {
log.error("手动确认,取消返回队列,不再重新消费"); log.error("手动确认,取消返回队列,不再重新消费");
} }
@@ -413,7 +415,7 @@ public class GenerateConsumer {
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start)); log.info("tasks_id : {}, end , message : {}, 执行时长: {} 毫秒", generateResult.get("tasks_id"), generateResult.get("message"), (end - start));
log.info("============ProcessRelightResult End listening=========="); log.info("============ProcessRelightBatchResult End listening==========");
} }
@RabbitListener(queues = "#{rabbitMQProperties.queues.generate}") @RabbitListener(queues = "#{rabbitMQProperties.queues.generate}")
@@ -498,15 +500,15 @@ public class GenerateConsumer {
public void getDesignBatchResult(Message msg, Channel channel) { public void getDesignBatchResult(Message msg, Channel channel) {
processDesignBatchResult(msg, channel); processDesignBatchResult(msg, channel);
} }
@RabbitListener(queues = "#{rabbitMQProperties.queues.toProductImageBatch}") // @RabbitListener(queues = "#{rabbitMQProperties.queues.toProductImageBatch}")
@RabbitHandler // @RabbitHandler
public void getToProductImageBatchResult(Message msg, Channel channel) { // public void getToProductImageBatchResult(Message msg, Channel channel) {
processToProductImageBatchResult(msg, channel); // processToProductImageBatchResult(msg, channel);
} // }
//
@RabbitListener(queues = "#{rabbitMQProperties.queues.relightBatch}") // @RabbitListener(queues = "#{rabbitMQProperties.queues.relightBatch}")
@RabbitHandler // @RabbitHandler
public void getRelightBatchResult(Message msg, Channel channel) { // public void getRelightBatchResult(Message msg, Channel channel) {
processRelightBatchResult(msg, channel); // processRelightBatchResult(msg, channel);
} // }
} }