@@ -1,24 +1,22 @@
package com.ai.da.common.RabbitMQ ;
import com.ai.da.common.config.exception.BusinessException ;
import com.ai.da.common.constant.CommonConstant ;
import com.ai.da.common.utils.RedisUtil ;
import com.ai.da.model.dto.GenerateThroughImageTextDTO ;
import com.ai.da.model.vo.GenerateResultVO ;
import com.ai.da.model.vo.PoseTransformationVO ;
import com.ai.da.service.GenerateService ;
import com.ai.da.service.UserLikeGroupService ;
import com.alibaba.fastjson.JSONObject ;
import com.google.gson.Gson ;
import com.rabbitmq.client.Channel ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.tomcat.jni.Time ;
import org.springframework.amqp.core.Message ;
import org.springframework.amqp.rabbit.annotation.RabbitHandler ;
import org.springframework.amqp.rabbit.annotation.RabbitListener ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.stereotype.Component ;
import org.springframework.util.StringUtils ;
import javax.annotation.Resource ;
import java.io.IOException ;
@@ -258,6 +256,55 @@ public class GenerateConsumer {
log . info ( " ============ProcessRelightResult End listening========== " ) ;
}
public void processPoseTransformResult ( Message msg , Channel channel ) {
log . info ( " ============ProcessPoseTransformResult listening========== " ) ;
long start = System . currentTimeMillis ( ) ;
Map < String , String > generateResult = JSONObject . parseObject ( msg . getBody ( ) , Map . class ) ;
log . info ( " PoseTransformation response : {} " , generateResult ) ;
try {
log . info ( " tasks_id : {} start " , generateResult . get ( " tasks_id " ) ) ;
if ( generateResult . get ( " status " ) . equals ( " SUCCESS " ) ) {
String gifUrl = generateResult . get ( " gif_url " ) ;
String taskId = generateResult . get ( " tasks_id " ) ;
String videoUrl = generateResult . get ( " video_url " ) ;
String imageUrl = generateResult . get ( " image_url " ) ;
generateService . processPoseTransformResult ( taskId , gifUrl , videoUrl , imageUrl ) ;
} else {
// 修改redis中的数据状态为exception
String key = generateResultKey + " : " + generateResult . get ( " tasks_id " ) ;
redisUtil . addToString ( key , new Gson ( ) . toJson ( new PoseTransformationVO ( null , generateResult . get ( " tasks_id " ) , null , null , null , ( byte ) 0 , " Fail " ) ) , CommonConstant . GENERATE_RESULT_EXPIRE_TIME ) ;
// 将异常信息存到exception中
HashMap < String , String > exceptionInfo = new HashMap < > ( ) ;
exceptionInfo . put ( generateResult . get ( " tasks_id " ) , generateResult . get ( " message " ) ) ;
// 存redis
redisUtil . addToMap ( exceptionMapKey , exceptionInfo ) ;
}
} catch ( Exception e ) {
log . error ( e . getMessage ( ) ) ;
try {
channel . basicAck ( msg . getMessageProperties ( ) . getDeliveryTag ( ) , false ) ;
// 将消息从redis排队队列中删除,需保证被消费的消息存储到db之后再从redis删除
redisUtil . removeFromZSet ( consumptionOrderKey , generateResult . get ( " tasks_id " ) ) ;
} catch ( IOException exception ) {
log . error ( " 手动确认,取消返回队列,不再重新消费 " ) ;
}
// 将入参和错误信息存入数据库
String exceptionMessage = JSONObject . toJSONString ( generateResult ) +
" Exception message : " + e . getMessage ( ) ;
HashMap < String , String > exceptionInfo = new HashMap < > ( ) ;
exceptionInfo . put ( String . valueOf ( generateResult . get ( " tasks_id " ) ) , exceptionMessage ) ;
// 存redis
redisUtil . addToMap ( exceptionMapKey , exceptionInfo ) ;
}
long end = System . currentTimeMillis ( ) ;
log . info ( " tasks_id : {}, end , message : {}, 执行时长: {} 毫秒 " , generateResult . get ( " tasks_id " ) , generateResult . get ( " message " ) , ( end - start ) ) ;
log . info ( " ============ProcessPoseTransformResult End listening========== " ) ;
}
@RabbitListener ( queues = " #{rabbitMQProperties.queues.generate} " )
@RabbitHandler
public void generateConsumer1 ( Message msg , Channel channel ) {
@@ -329,4 +376,10 @@ public class GenerateConsumer {
public void getRelightResult ( Message msg , Channel channel ) {
processRelightResult ( msg , channel ) ;
}
@RabbitListener ( queues = " #{rabbitMQProperties.queues.poseTransform} " )
@RabbitHandler
public void getPoseTransformationResult ( Message msg , Channel channel ) {
processPoseTransformResult ( msg , channel ) ;
}
}