依赖sql表application.yml配置文件实体类MailLog公用类MailConstantsRabbitMq配置类RabbitMqConfigService层、Mapper层代码controller层发送消息到rabbitmq消息接收,发送邮件MailReceiver定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)前端邮件模板mail.html
依赖
org.springframework.boot
spring-boot-starter-amqp
com.baomidou
mybatis-plus-boot-starter
3.3.1.tmp
io.springfox
springfox-swagger2
2.7.0
com.github.xiaoymin
swagger-bootstrap-ui
1.9.6
org.springframework.boot
spring-boot-starter-mail
org.springframework.boot
spring-boot-starter-data-redis
sql表
DROp TABLE IF EXISTS `t_mail_log`;
CREATE TABLE `t_mail_log` (
`msgId` VARCHAR(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息id',
`eid` INT(11) DEFAULT NULL COMMENT '接收员工id',
`status` INT(1) DEFAULT NULL COMMENT '状态(0:消息投递中 1:投递成功 2:投递失败)',
`routeKey` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '路由键',
`exchange` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '交换机',
`count` INT(1) DEFAULT NULL COMMENT '重试次数',
`tryTime` DATETIME DEFAULT NULL COMMENT '重试时间',
`createTime` DATETIME DEFAULT NULL COMMENT '创建时间',
`updateTime` DATETIME DEFAULT NULL COMMENT '更新时间',
UNIQUE KEY `msgId` (`msgId`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
INSERT INTO `t_mail_log`(`msgId`,`eid`,`status`,`routeKey`,`exchange`,`count`,`tryTime`,`createTime`,`updateTime`) VALUES
('123',538,1,'mail.routing.key','mail.exchange',0,'2021-09-04 21:11:56','2021-09-04 21:10:56','2021-09-04 21:10:56');
application.yml配置文件
spring:
redis:
#超时时间
timeout: 10000ms
#服务器地址
host: 106.14.223.42
#服务器端口
port: 6379
#数据库
database: 0
mail:
host: smtp.163.com
username: zsjia8@163.com #QQ邮箱
password: CVSXFAKKHTSAWCZY #授权令牌
default-encoding: utf-8 #编码格式
protocol: smtp #协议
port: 25
#rabbitmq配置
rabbitmq:
username: admin
password: admin
host: 106.14.223.42
port: 5672
#消息确认回调
publisher-/confirm/i-type: correlated
#消息失败回调
publisher-returns: true
#开启手动确认回调
listener:
simple:
acknowledge-mode: manual
thymeleaf:
cache: false
prefix: classpath:/templates/
suffix: .html
profiles:
active: pro
comment.avatar: /images/avatar.jpg
server:
port: 8099
mybatis-plus:
#配置Mapper映射文件
mapper-locations: classpath*:/mapper
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("t_mail_log")
@ApiModel(value="MailLog对象", description="")
public class MailLog implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "消息id")
private String msgId;
@ApiModelProperty(value = "接收员工id")
private Integer eid;
@ApiModelProperty(value = "状态(0:消息投递中 1:投递成功 2:投递失败)")
private Integer status;
@ApiModelProperty(value = "路由键")
private String routeKey;
@ApiModelProperty(value = "交换机")
private String exchange;
@ApiModelProperty(value = "重试次数")
private Integer count;
@ApiModelProperty(value = "重试时间")
private LocalDateTime tryTime;
@ApiModelProperty(value = "创建时间")
private LocalDateTime createTime;
@ApiModelProperty(value = "更新时间")
private LocalDateTime updateTime;
}
公用类MailConstants
package com.ljh.po;
public class MailConstants {
//消息处于投递状态
public static final Integer DELIVERING = 0;
//消息投递成功
public static final Integer SUCCESS = 1;
//消息投递失败
public static final Integer FAILURE = 2;
//最大重试次数
public static final Integer MAX_TRY_COUNT = 3;
//消息超时时间
public static final Integer MSG_TIMEOUT = 1;
//队列
public static final String MAIL_QUEUE_NAME = "mail.queue";
//交换机
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
//路由键
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}
RabbitMq配置类RabbitMqConfig
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private IMailLogService mailLogService;
//日志
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConfig.class);
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause)->{
String msgId = correlationData.getId();
System.out.println("msgId:"+msgId);
if (ack){
LOGGER.info("消息发送成功========》{}",msgId);
mailLogService.update(new UpdateWrapper().set("status",1).eq("msgId",msgId));
}
else {
LOGGER.info("消息发送失败==========>{}",msgId);
}
});
rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
LOGGER.info("{}============>消息发送到queue时失败"+msg.getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue(){
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
Service层、Mapper层代码
public interface IMailLogService extends IServicecontroller层发送消息到rabbitmq{ } @Service public class MailLogServiceImpl extends ServiceImpl implements IMailLogService { } @Repository @Mapper public interface MailLogMapper extends baseMapper { }
String msgId = UUID.randomUUID().toString();
MailLog mailLog = new MailLog();
mailLog.setMsgId(msgId);
mailLog.setEid(user1.getId().intValue());
mailLog.setStatus(MailConstants.DELIVERING);
mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
mailLog.setCount(0);
mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
mailLog.setCreateTime(LocalDateTime.now());
mailLog.setUpdateTime(LocalDateTime.now());
mailLogMapper.insert(mailLog);
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME
,user1,new CorrelationData(msgId));
消息接收,发送邮件MailReceiver
@Component
public class MailReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JavaMailSender javaMailSender;
@Autowired
private MailProperties mailProperties;
@Autowired
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel) throws IOException {
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg);
//获取员工实体类
User user = (User) message.getPayload();
MessageHeaders headers = message.getHeaders();
//获取序列号
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//获取消息id(msgId)
String msgId = (String) headers.get("spring_returned_message_correlation");
//声明redis
HashOperations hashOperations = redisTemplate.opsForHash();
try {
//判断redis是否存在msgId,如果有,直接返回
if (hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("消息已经消费过了============》{}",msgId);
channel.basicAck(tag,false);
return;
}
//发件人
helper.setFrom(mailProperties.getUsername());
//邮箱地址
helper.setTo(user.getEmail());
//日期
helper.setSentDate(new Date());
//主题
helper.setSubject("入职欢迎邮件");
//邮件内容
Context context = new Context();
context.setVariable("name",user.getNickname());
String mail = templateEngine.process("mail", context);
helper.setText(mail,true);
//发送邮件
javaMailSender.send(msg);
//加入redis
hashOperations.put("mail_log",msgId,"ok");
//手动回调
channel.basicAck(tag,false);
} catch (Exception e) {
channel.basicNack(tag,false,true);
LOGGER.error("邮件发送失败======={}",e.getMessage());
}
}
}
定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)
@Component
public class MailTask {
@Autowired
private IMailLogService mailLogService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private loginRepository loginRepository;
@Scheduled(cron = "0/10 * * * * ?")//10s执行一次
public void mailtask(){
//首先查询状态为正在发送中的消息而且重试时间小于当前时间
List list = mailLogService.list(new QueryWrapper()
.eq("status", 0)
.lt("tryTime", LocalDateTime.now()));
//如果重试次数大于3次不在重试设置为发送失败状态
list.forEach(mailLog -> {
if (mailLog.getCount()>=3){
mailLogService.update(new UpdateWrapper()
.set("status",2)
.eq("msgId",mailLog.getMsgId()));
}
//更新重试次数,更新时间,重试时间
mailLogService.update(new UpdateWrapper().set("count",mailLog.getCount()+1)
.set("updateTime",LocalDateTime.now())
.set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
.eq("msgId",mailLog.getMsgId()));
//获取需要发送的消息信息
User user = loginRepository.getOne(mailLog.getEid().longValue());
//System.out.println("===============>定时task获取到的user"+user);
//发送消息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
user,new CorrelationData(mailLog.getMsgId()));
});
}
}
前端邮件模板mail.html
欢迎邮件
欢迎 ,成功注册在线笔记系统



