各位朋友大家好,今年年初有幸负责公司的一个调研系统的项目,主要背景:为了解决集团内部各种调研活动的开展,其中主要包括满意度调研,组织氛围调研,价值观践行调研及其他通用调研等。今天分享一下RabbitMQ在我们项目上的应用。
遇到的问题系统有个功能就是给集团参与调研的人员推送问卷信息,包括各种推送渠道(内部聊天软件,邮件,短信,待办等),刚开始设计是多线程并发处理信息,直到出了一次生产事故,同时博主也是承担巨大的压力下进行了系统重新架构调整。先说一下事故,有次好多个类型的问卷几乎同时发布,导致填写人员太多,系统并发太高,扛不住宕机了,当时发送问卷的线程也随即挂掉,导致有些人员没有收到问卷填写地址,一时间不得不紧急停机维护,而我也受到了领导的关爱哈哈,部门领导和我主管领导坐我身后看我处理数据及架构调整,影响太深刻了。
总结一下,就是当系统宕机之后,跑的进程随即停止导致的问题。
预解决方案1.将需要发送的数据先存储到数据库中,然后每发送一条问卷信息,记录其状态,当系统宕机之后,已发送的数据就会有记录存储,然后跑个定时任务去查询没有处理的数据,再次推送用户。
2.第二种方案就是将需要发送的数据存储到RabbitMQ中,利用消息队列的特性,手动确认问卷信息发送状态。
实现RabbitMQ解决方案本项目采用生产端点对点消息模型,消费端消息手动确认的方式实现交互。
1.添加RabbitMQ相关依赖
org.springframework.boot spring-boot-starter-amqp
2.配置文件
spring:
rabbitmq:
host: ip地址
port: 端口
username: 用户名
password: 密码
virtual-host: 虚拟主机
listener:
simple:
# 手动确认
acknowledge-mode: manual
retry:
enabled: true
3.生产者部分代码
// [2.将消息写入消息队列]
for (int i = 0; i < releaseEmpList.size(); i++) {
ReleaseEmpDTO releaseEmpDTO = releaseEmpList.get(i);
// 写入OA消息队列
CorrelationData correlationDataOA = new CorrelationData(UUID.randomUUID().toString());
RabbitmqOARelease rabbitmqOARelease = createRabbitmqOARelease(quBatch, link, releaseEmpDTO);
if (i == releaseEmpList.size() - 1) {
rabbitmqOARelease.setFlag(true);
}
rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_OA, rabbitmqOARelease, correlationDataOA);
// 写入HR消息队列
CorrelationData correlationDataHR = new CorrelationData(UUID.randomUUID().toString());
RabbitmqHRRelease rabbitmqHRRelease = createRabbitmqHRRelease(quBatch, link, releaseEmpDTO);
rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_HR, rabbitmqHRRelease, correlationDataHR);
}
4.消费者部分代码
@RabbitListener(queuesToDeclare = @Queue(value = CommonConstant.RABBITMQ_QUEUES_OA))
public void oaRelease(RabbitmqOARelease rabbitmqOARelease, Message message, Channel channel) throws IOException {
if (null == rabbitmqOARelease) {
return;
}
// 给OA发送待办或待阅
notifyAsyncTask.sendTodoOne(rabbitmqOARelease.getSendContext(), rabbitmqOARelease.getQuNotify());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//手工ack
channel.basicAck(deliveryTag, false);
if (rabbitmqOARelease.getFlag()) {
// 期次状态改为发布
QuBatch result = quBatchMapper.selectOne(new QuBatch() {{
setId(rabbitmqOARelease.getBatchId());
}});
if (null != result && result.getStatus().intValue() == CommonConstant.STATUS_PUBLISHING.intValue()) {
// 发布状态
result.setStatus(CommonConstant.STATUS_PUBLISH);
result.setBatchStartTime(new Date());
quBatchMapper.updateByPrimaryKeySelective(result);
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}



