TASK: 发送邮件功能及发送失败后的重试机制
This commit is contained in:
@@ -0,0 +1,96 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
|
||||
import com.ai.da.common.utils.MailUtil;
|
||||
import com.ai.da.model.dto.BasicEmailParamDTO;
|
||||
import com.ai.da.service.EmailService;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.core.io.InputStreamSource;
|
||||
import org.springframework.stereotype.Component;
|
||||
import software.amazon.awssdk.core.exception.RetryableException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import javax.mail.MessagingException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class EmailRetryConsumer {
|
||||
|
||||
@Resource
|
||||
private MailUtil mailUtil;
|
||||
@Resource
|
||||
private MQPublisher mqPublisher;
|
||||
@Resource
|
||||
private EmailService emailService;
|
||||
|
||||
// @RabbitListener(queues = "#{rabbitMQProperties.deadLetter.queue}")
|
||||
public void handleRetry(Map<String, String> mailParams, Message message, Channel channel) throws IOException {
|
||||
long tag = message.getMessageProperties().getDeliveryTag();
|
||||
try {
|
||||
log.info("死信队列收到消息:{}", message);
|
||||
// 处理邮件发送参数
|
||||
BasicEmailParamDTO basicEmailParamDTO = JSONObject.parseObject(mailParams.get("dto"), BasicEmailParamDTO.class);
|
||||
String fileName = mailParams.get("filename");
|
||||
InputStreamSource inputStreamSource = Objects.isNull(mailParams.get("source")) ?
|
||||
null : JSONObject.parseObject(mailParams.get("source"), InputStreamSource.class);
|
||||
JSONObject templateParams = JSONObject.parseObject(mailParams.get("templateParams"), JSONObject.class);
|
||||
String templateName = mailParams.get("templatePath");
|
||||
long logId = Long.parseLong(mailParams.get("logId"));
|
||||
basicEmailParamDTO.setContent(mailUtil.setContent(templateParams, templateName));
|
||||
// 发邮件
|
||||
int lastReturnCode = mailUtil.sendMail(basicEmailParamDTO, fileName, inputStreamSource);
|
||||
if (lastReturnCode == 250) {
|
||||
log.info("邮件发送成功!Subject : {}", basicEmailParamDTO.getSubject());
|
||||
emailService.updateStatus(logId, EmailService.DELIVERED);
|
||||
} else if (lastReturnCode == 450) {
|
||||
log.info("目标邮箱 {} 暂时不可用,请稍后重试", (Object) basicEmailParamDTO.getMailTo());
|
||||
// 重试
|
||||
retry(mailParams, message, channel, tag, logId);
|
||||
} else if (lastReturnCode == 550) {
|
||||
log.info("目标邮箱 {} 不可用,邮件发送失败", (Object) basicEmailParamDTO.getMailTo());
|
||||
emailService.updateStatus(logId, EmailService.FAILED);
|
||||
} else {
|
||||
log.info("邮件发送失败,Subject : {}, 状态码: {}", basicEmailParamDTO.getSubject(), lastReturnCode);
|
||||
retry(mailParams, message, channel, tag, logId);
|
||||
emailService.updateStatus(logId, EmailService.FAILED);
|
||||
}
|
||||
channel.basicAck(tag, false);
|
||||
} catch (RetryableException e) {
|
||||
log.info("邮件重试发生异常:RetryableException -> {}", e.getMessage());
|
||||
channel.basicAck(tag, false); // 确认原消息
|
||||
} catch (Exception e) {
|
||||
log.info("邮件重试发生异常:Exception -> {}", e.getMessage());
|
||||
channel.basicAck(tag, false); // 确认原消息
|
||||
}
|
||||
}
|
||||
|
||||
private int getRetryAttempt(Message message) {
|
||||
Integer attempt = message.getMessageProperties()
|
||||
.getHeader("x-retry-attempt");
|
||||
return attempt != null ? attempt : 1;
|
||||
}
|
||||
|
||||
private void retry(Map<String, String> mailParams, Message message, Channel channel, long tag, long logId) throws IOException{
|
||||
int attempt = getRetryAttempt(message);
|
||||
if (attempt >= 3) { // 最大重试次数
|
||||
channel.basicReject(tag, false);
|
||||
emailService.updateStatus(logId, EmailService.FAILED);
|
||||
log.error("重试结束,邮件最终发送失败: {}", mailParams);
|
||||
} else {
|
||||
log.info("重新将邮件信息发送到重试队列");
|
||||
mqPublisher.sendEmailMsg(mailParams, attempt);
|
||||
channel.basicAck(tag, false); // 确认原消息
|
||||
// 更新数据库
|
||||
emailService.updateRetryCount(logId, attempt + 1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,43 +1,82 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class MQConfig {
|
||||
|
||||
@Autowired
|
||||
private RabbitMQProperties rabbitMQProperties;
|
||||
|
||||
@Bean
|
||||
public Queue generateQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getGenerate());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue SRQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getSr());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue SRResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getSrResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue generateResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getGenerateResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue toProductImageResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getToProductImageResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue relightResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getRelightResult());
|
||||
}
|
||||
}
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class MQConfig {
|
||||
|
||||
@Autowired
|
||||
private RabbitMQProperties rabbitMQProperties;
|
||||
|
||||
@Bean
|
||||
public Queue generateQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getGenerate());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue SRQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getSr());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue SRResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getSrResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue generateResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getGenerateResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue toProductImageResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getToProductImageResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue relightResultQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getRelightResult());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue poseTransformQueue() {
|
||||
return new Queue(rabbitMQProperties.getQueues().getPoseTransform());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue mailRetryQueue() {
|
||||
// 普通队列,不绑定DLX(首次失败后才进入MQ)
|
||||
// durable 持久化队列
|
||||
return QueueBuilder.durable(rabbitMQProperties.getQueues().getEmailRetry())
|
||||
// 关键参数:绑定死信交换机
|
||||
.withArgument("x-dead-letter-exchange", rabbitMQProperties.getDeadLetter().getExchange())
|
||||
// 可选:指定死信路由键(默认使用原消息的路由键)
|
||||
.withArgument("x-dead-letter-routing-key", rabbitMQProperties.getDeadLetter().getRoutingKey())
|
||||
.build();
|
||||
}
|
||||
|
||||
// 新增死信交换机
|
||||
@Bean
|
||||
public DirectExchange deadLetterExchange() {
|
||||
return new DirectExchange(rabbitMQProperties.getDeadLetter().getExchange());
|
||||
}
|
||||
|
||||
// 新增死信队列
|
||||
@Bean
|
||||
public Queue deadLetterQueue() {
|
||||
return QueueBuilder.durable(rabbitMQProperties.getDeadLetter().getQueue()).build();
|
||||
}
|
||||
|
||||
// 绑定死信队列
|
||||
@Bean
|
||||
public Binding deadLetterBinding() {
|
||||
return BindingBuilder.bind(deadLetterQueue())
|
||||
.to(deadLetterExchange())
|
||||
.with(rabbitMQProperties.getDeadLetter().getRoutingKey());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,29 +1,56 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.AmqpTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MQPublisher {
|
||||
|
||||
@Autowired
|
||||
private RabbitMQProperties rabbitMQProperties;
|
||||
|
||||
@Autowired
|
||||
private AmqpTemplate amqpTemplate;
|
||||
|
||||
public void sendGenerateMessage(String mm) {
|
||||
log.info("send message: " + mm);
|
||||
amqpTemplate.convertAndSend(rabbitMQProperties.getQueues().getGenerate(), mm);
|
||||
}
|
||||
|
||||
public void sendSRMessage(String mm) {
|
||||
log.info("send message: " + mm);
|
||||
amqpTemplate.convertAndSend(rabbitMQProperties.getQueues().getSr(), mm);
|
||||
}
|
||||
}
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.AmqpTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MQPublisher {
|
||||
|
||||
@Autowired
|
||||
private RabbitMQProperties rabbitMQProperties;
|
||||
|
||||
@Autowired
|
||||
private AmqpTemplate amqpTemplate;
|
||||
|
||||
public void sendGenerateMessage(String mm) {
|
||||
log.info("send generate message: {}", mm);
|
||||
amqpTemplate.convertAndSend(rabbitMQProperties.getQueues().getGenerate(), mm);
|
||||
}
|
||||
|
||||
public void sendSRMessage(String mm) {
|
||||
log.info("send message: {}", mm);
|
||||
amqpTemplate.convertAndSend(rabbitMQProperties.getQueues().getSr(), mm);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param mailParams 含有的字段
|
||||
* {"dto": basicEmailParamDTO, "filename": fileName, "source": inputStreamSource,
|
||||
* "templateParams": jsonObject, "templatePath": path}
|
||||
* 邮件发送参数,附件文件名,附件数据
|
||||
* @param retryTimes 重试次数(初始为0)
|
||||
*/
|
||||
public void sendEmailMsg(Map<String, String> mailParams, int retryTimes){
|
||||
log.info("send email MQ message: {} ", mailParams);
|
||||
// // 重新入队(指数退避) 时间单位:毫秒
|
||||
long newDelay = (long) (5000 * Math.pow(2, retryTimes + 1));
|
||||
log.info("send email MQ delay: {} ms, retry attempt: {}", newDelay, retryTimes + 1);
|
||||
amqpTemplate.convertAndSend(
|
||||
rabbitMQProperties.getQueues().getEmailRetry(),
|
||||
mailParams,
|
||||
m -> {
|
||||
m.getMessageProperties().setExpiration(String.valueOf(newDelay));
|
||||
m.getMessageProperties().setHeader("x-retry-attempt", retryTimes + 1);
|
||||
return m;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,35 +1,45 @@
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "rabbitmq")
|
||||
@Data
|
||||
public class RabbitMQProperties {
|
||||
|
||||
private Queues queues;
|
||||
private Exchange exchange;
|
||||
|
||||
@Data
|
||||
public static class Queues {
|
||||
private String generate;
|
||||
private String sr;
|
||||
private String srResult;
|
||||
private String generateResult;
|
||||
private String toProductImageResult;
|
||||
private String relightResult;
|
||||
private String poseTransform;
|
||||
private String designBatch;
|
||||
private String relightBatch;
|
||||
private String toProductImageBatch;
|
||||
private String poseTransformBatch;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Exchange {
|
||||
private String generate;
|
||||
}
|
||||
}
|
||||
|
||||
package com.ai.da.common.RabbitMQ;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "rabbitmq")
|
||||
@Data
|
||||
public class RabbitMQProperties {
|
||||
|
||||
private Queues queues;
|
||||
private Exchange exchange;
|
||||
private DeadLetter deadLetter; // 新增死信配置
|
||||
|
||||
@Data
|
||||
public static class Queues {
|
||||
private String generate;
|
||||
private String sr;
|
||||
private String srResult;
|
||||
private String generateResult;
|
||||
private String toProductImageResult;
|
||||
private String relightResult;
|
||||
private String poseTransform;
|
||||
private String emailRetry;
|
||||
private String designBatch;
|
||||
private String relightBatch;
|
||||
private String toProductImageBatch;
|
||||
private String poseTransformBatch;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Exchange {
|
||||
private String generate;
|
||||
}
|
||||
|
||||
// 新增死信配置内部类
|
||||
@Data
|
||||
public static class DeadLetter {
|
||||
private String exchange;
|
||||
private String queue;
|
||||
private String routingKey;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user