功能:前端设定时间,实现指定时间发送邮件。
技术:MQ异步延迟消息
整体思想:
延迟消息config封装:(SendMailDelayConfig:包括
SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME转发监听队列,负责将拥堵在拥堵队列中的消息转发到业务处理消息监听逻辑中
SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME消息拥堵队列,时间结束前消息会拥堵在此,消息时间结束后会将消息转发到业务逻辑处理队列中
QUEUE_SEND_MAIL_DELAY业务处理队列,及发邮件监听消息,消息拥堵结束后会将消息发送到此处
),
延迟消息消息体封装:(DLXMessage:数据传递)
延迟消息发送:(SendMailDelayService:将业务逻辑中设定时间等参数发送到拥堵队列中SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME)
延迟消息转发监听:(SendMailDelayTradeReceiver:拥堵队列时间结束后,会被该监听器监听,监听队列名称SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME,监听后将消息中数据转发到业务处理消息队列中message.getQueueName(),通常情况下该队列名称是动态的,及我们在入口处传递的,并将此队列名称放到消息体中,该队列名称一般为消息业务处理逻辑的消息名称:发邮件)
延迟消息业务处理监听 :(QUEUE_SEND_MAIL_DELAY:监听转发监听器推过来的消息,处理业务逻辑)
1.业务层代码:生成将要发送的邮件在草稿箱,获取定时时间等基础参数
if(!StringUtils.isEmpty(mailDto.getDelayTime())){
res = sendMailByMQService.sendDelayMailByMQ(mail.getId(),mailDto.getFollowId(),sdf.parse(mailDto.getDelayTime()));
System.err.println("延迟:"+mail.getId());
System.err.println("延迟:"+mailDto.getDelayTime());
}else{
res = sendMailByMQService.sendMailByMQ(mail.getId(),mailDto.getFollowId());
System.err.println("不延迟:"+mailDto.getDelayTime());
}
2.业务逻辑:封装方法,像指定消息封装设定时间
package com.shallnew.wmallgenie.rabbitmq.sender;
import com.alibaba.fastjson.JSONObject;
import com.shallnew.wmallgenie.dto.MailDto;
import com.shallnew.wmallgenie.dto.R;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailByMQConfig;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailConfig;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.message.DelayMessageStruct;
import com.shallnew.wmallgenie.rabbitmq.message.MessageStruct;
import com.shallnew.wmallgenie.utils.StringConvertUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@Slf4j
public class SendMailByMQService {
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private SendMailDelayService sendMailDelayService;
public R sendMailByMQ(Long id,String followId) {
MessageStruct messageStruct = new MessageStruct();
messageStruct.setId(id);
messageStruct.setFollowId(followId);
this.rabbitTemplate.convertAndSend(SendMailByMQConfig.SEND_MAILBYMQ_EXCHANGENAME, SendMailByMQConfig.SEND_MAILBYMQ_ROUTING_KEY, messageStruct);
return R.ok();
}
public R sendDelayMailByMQ(Long id,String followId,Date delayTime) {
DelayMessageStruct delayMessageStruct = new DelayMessageStruct();
delayMessageStruct.setId(id);
delayMessageStruct.setFollowId(followId);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time = delayTime.getTime()-new Date().getTime();
delayMessageStruct.setDelayTime(time);
String message = JSONObject.toJSonString(delayMessageStruct);
sendMailDelayService.sendMessage(SendMailDelayConfig.SEND_MAIL_DELAY_EXCHANGE,SendMailDelayConfig.QUEUE_SEND_MAIL_DELAY, message, time);
//this.rabbitTemplate.convertAndSend(SendMailByMQConfig.SEND_MAILBYMQ_EXCHANGENAME, SendMailByMQConfig.SEND_MAILBYMQ_ROUTING_KEY, delayMessageStruct);
return R.ok();
}
}
3.MQ延迟消息异步发送
package com.shallnew.wmallgenie.rabbitmq.sender;
import com.alibaba.fastjson.JSON;
import com.shallnew.wmallgenie.rabbitmq.config.MailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.message.DLXMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class SendMailDelayService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange,String queueName, String message, long times) {
//消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上
//发送前,把消息进行封装,转发时应转发到指定 queueName 队列上
DLXMessage dlxMessage = new DLXMessage(SendMailDelayConfig.SEND_MAIL_DELAY_EXCHANGE,queueName,message,times);
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(times + "");
return message;
}
};
rabbitTemplate.convertAndSend(exchange,
SendMailDelayConfig.SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME, JSON.toJSonString(dlxMessage), processor);
}
}
消息体封装类:
package com.shallnew.wmallgenie.rabbitmq.message;
import java.io.Serializable;
public class DLXMessage implements Serializable {
private static final long serialVersionUID = 9956432152000L;
private String exchange;
private String queueName;
private String content;
private long times;
public DLXMessage() {
super();
}
public DLXMessage(String queueName, String content, long times) {
super();
this.queueName = queueName;
this.content = content;
this.times = times;
}
public DLXMessage(String exchange, String queueName, String content, long times) {
super();
this.exchange = exchange;
this.queueName = queueName;
this.content = content;
this.times = times;
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public long getTimes() {
return times;
}
public void setTimes(long times) {
this.times = times;
}
}
延迟消息config
package com.shallnew.wmallgenie.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SendMailDelayConfig {
//exchange name
public static final String SEND_MAIL_DELAY_EXCHANGE = "exchange.send.mail.delay";
//DLX repeat QUEUE 死信接收转发队列,时间设置时用该队列接收
public static final String SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME = "queue.send.mail.delay.repeat";
//TTL QUEUE 死信拥堵队列
public static final String SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME = "queue.send.mail.delay.dead";
//Hello :最终发消息/处理业务队列
public static final String QUEUE_SEND_MAIL_DELAY = "queue.send.mail.delay";
//信道配置
@Bean
public DirectExchange sendMailDelayExchange() {
return new DirectExchange(SEND_MAIL_DELAY_EXCHANGE, true, false);
}
@Bean
public Queue sendMailDelayQueue() {
Queue queue = new Queue(QUEUE_SEND_MAIL_DELAY,true);
return queue;
}
@Bean
public Binding sendMailDelayBinding() {
//队列绑定到exchange上,再绑定好路由键
return BindingBuilder.bind(sendMailDelayQueue()).to(sendMailDelayExchange()).with(QUEUE_SEND_MAIL_DELAY);
}
//下面是延迟队列的配置
//转发队列
@Bean
public Queue repeatTradeSendMailDelayQueue() {
Queue queue = new Queue(SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME,true,false,false);
return queue;
}
//绑定转发队列
@Bean
public Binding repeatTradeSendMailDelayBinding() {
return BindingBuilder.bind(repeatTradeSendMailDelayQueue()).to(sendMailDelayExchange()).with(SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME);
}
//死信队列 -- 消息在死信队列上堆积,消息超时时,会把消息转发到转发队列,转发队列根据消息内容再把转发到指定的队列上
@Bean
public Queue deadLetterSendMailDelayQueue() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", SEND_MAIL_DELAY_EXCHANGE);
arguments.put("x-dead-letter-routing-key", SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME);
//拥堵队列
Queue queue = new Queue(SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);
return queue;
}
//绑定死信队列
@Bean
public Binding deadLetterSendMailDelayBinding() {
return BindingBuilder.bind(deadLetterSendMailDelayQueue()).to(sendMailDelayExchange()).with(SEND_MAIL_DELAY_DEAD_LETTER_QUEUE_NAME);
}
}
延迟消息监听:消息转发(时间结束后转发到业务处理类)
package com.shallnew.wmallgenie.rabbitmq.receiver;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.shallnew.wmallgenie.rabbitmq.config.MailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.message.DLXMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@Slf4j
public class SendMailDelayTradeReceiver {
@Autowired
private RabbitTemplate rabbitTemplate;
//监听转发队列,有消息时,把消息转发到目标队列
@RabbitListener(queues = SendMailDelayConfig.SEND_MAIL_DELAY_REPEAT_TRADE_QUEUE_NAME)
public void sendMailDelayTradeMessage(String content, Channel channel, @Headers Map headers) {
try {
//此时,才把消息发送到指定队列,而实现延迟功能
DLXMessage message = JSON.parseObject(content, DLXMessage.class);
System.err.println("将消息转发给其他队列"+message.getQueueName());
rabbitTemplate.convertAndSend(SendMailDelayConfig.SEND_MAIL_DELAY_EXCHANGE,message.getQueueName(), message.getContent());
System.err.println("转发到:"+message.getQueueName());
Long deliveryTay = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTay,false);
} catch (IOException e) {
e.printStackTrace();
Long deliveryTay = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(deliveryTay,false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
延迟消息监听,业务处理(发邮件)
package com.shallnew.wmallgenie.rabbitmq.receiver;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.shallnew.wmallgenie.config.MatuConfig;
import com.shallnew.wmallgenie.contants.LocusEnum;
import com.shallnew.wmallgenie.dao.CsMsgPushMapper;
import com.shallnew.wmallgenie.dao.MailAccountMapper;
import com.shallnew.wmallgenie.dao.MailContentMapper;
import com.shallnew.wmallgenie.dao.MailFollowMapper;
import com.shallnew.wmallgenie.entity.*;
import com.shallnew.wmallgenie.rabbitmq.config.MailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.config.SendMailDelayConfig;
import com.shallnew.wmallgenie.rabbitmq.message.DelayMessageStruct;
import com.shallnew.wmallgenie.rabbitmq.sender.AnnaylizeOssDirSpaceService;
import com.shallnew.wmallgenie.rabbitmq.sender.ReceiverMailByOneAccountService;
import com.shallnew.wmallgenie.service.*;
import com.shallnew.wmallgenie.shiro.utils.DateUtils;
import com.shallnew.wmallgenie.utils.DownLoadUrlUtils;
import com.shallnew.wmallgenie.utils.RSAUtils;
import com.shallnew.wmallgenie.utils.mail.MailMQUtils;
import com.shallnew.wmallgenie.utils.mail.MailUtils;
import com.shallnew.wmallgenie.utils.xss.UUIDUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import javax.mail.internet.MimeMessage;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.*;
@Component
@Slf4j
public class SendMailDelayReceiver {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MailContentMapper mailContentMapper;
@Autowired
private MailAccountMapper accountMapper;
@Autowired
private MailFollowMapper followMapper;
@Autowired
private MailAttachmentService attachmentService;
@Autowired
private TemplateEngine templateEngine;
@Autowired
private CustomerService customerService;
@Autowired
private MailAccountService accountService;
@Autowired
private SupplierService supplierService;
@Autowired
private CsGroupService groupService;
@Autowired
private AnnaylizeOssDirSpaceService annaylizeOssDirSpaceService;
@Autowired
private CustomLocusService customLocusService;
@Autowired
private CustomCommunityService customCommunityService;
@Autowired
private CsUserService csUserService;
@Autowired
private ReceiverMailByoneAccountService receiverMailByOneAccountService;
@Autowired
private CsMsgPushMapper msgPushMapper;
@Autowired
private SupplierContactService supplierContactService;
@RabbitListener(queues = SendMailDelayConfig.QUEUE_SEND_MAIL_DELAY)
public void delayMessage(Message msg, Channel channel, @Headers Map headers) {
DelayMessageStruct delayMessageStruct = JSON.parseObject(msg.getPayload()+"", DelayMessageStruct.class);
Long id = delayMessageStruct.getId();
MailContent mailDto = mailContentMapper.selectByPrimaryKey(id);
List attachmentList = attachmentService.queryByMail(id);
List