栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MongoDb + RabbitMQ 搭建系统通知模块

MongoDb + RabbitMQ 搭建系统通知模块

MongoDb + RabbitMQ 搭建系统通知模块

使用springboot实现

一、选择MongoDB和RabbitMQ理由
    对于公告消息,本设计是设计为 为每个用户创建一条公告信息(原因是:方便记录用户对于消息的已读和未读状态,这样设计会更符合用户需求),因此设计两个表,message(存储消息及其发送者消息) 和 message_ref(存储接收者,及已读和新接收状态)数据库选择为MongoDB,原因是MongoDB对于海量数据以及高并发情况下的读写数据很有优势,同时他的数据存储是以文档结构存储,简单来说就是用JSON格式,他会更加类似关系型数据库的存储,同时MongoDB从3.X开始支持集合的连接查询,就可以实现message和message_ref连接查询,查询出某个用户拥有的信息。由于存在海量数据,高并发情况下,MongoDB也支持不了瞬时写入百万数据,因此引入消息队列MQ来进行削峰填谷。选择RabbitMQ的原因是,它的可靠性和稳定性比较好,而且不仅支持消息的异步收发,还支持消息的同步收发
二、简单逻辑

1.发送系统通知时,先把消息数据插入message表中,然后把消息推送到相关的消息队列(以用户id作为routing-key)中去

3.消费者消费消息时,取消自动应答,每次从消息队列中取数据的时候,把数据插入message_ref(lastFlag为true【新信息】,readFlag为false【未读】)中,插入成功后再发送消息的Ack应答,让消息队列删除消息

3.前端设计定时器轮训:每次读取消息的时候,先从消息队列中接收消息(步骤3;然后把message_ref中的lastFlag改为false,更新的数量即为新消息的数量;查询readFlag为false的数据(查询未读数据),然后把这些数据传给前端

三、实现

有些地方引用了hutool工具,可自行引入依赖

        
        
            cn.hutool
            hutool-all
            5.4.0
        
3.1、MongoDB数据库表的设计 3.1.1.message集合

集合相当于MySQL中的数据表,但是没有固定的表结构。集合有什么字段,取决于保存在其中的数据。下面这张表格是Message集合中JSON数据的结构要求。

字段类型备注
_idUUID自动生成的主键值
uuidUUIDUUID值,并且设置有唯一性索引,防止消息被重复消费
senderIdInteger发送者ID,就是用户ID。如果是系统自动发出,这个ID值是0
senderPhotoString发送者的头像URL。在消息页面要显示发送人的头像
senderNameString发送者名称,也就是用户姓名。在消息页面要显示发送人的名字
msgString消息正文
sendTimeDate发送时间

uuid : 防止重复消费

消息积压过多的情况下,如果第一次轮询还没结束,第二次轮询就开始了,那就可能出现把重复的数据写入数据库中,因此如果每条MQ消息都有唯一的UUID,第一个消费者把消息保存到数据库,那么第二个消费者就无法再把这条消息保存到数据库,解决了消息的重复消费问题。

3.1.2 message_ref集合

虽然message集合记录的是消息内容及其发送者,message_ref集合来记录接收人和已读状态。

字段类型备注
_idUUID主键
messageIdUUIDmessage记录的_id
receiverIdString接收人ID
readFlagBoolean是否已读
lastFlagBoolean是否为新接收的消息
3.1.3 连接查询

执行两个集合的联合查询,根据接收人来查询消息,并且按照消息发送时间降序排列,查询前50条记录

db.message.aggregate([
        {
                $set: {
                        "id": { $toString: "$_id" }
                }
        },
        {
                $lookup:{
                        from:"message_ref",
                        localField:"id",
                        foreignField:"messageId",
                        as:"ref"
                },
        },
        { $match:{"ref.receiverId": 1} },
        { $sort: {sendTime : -1} },
        { $skip: 0 },
        { $limit: 50 }
])


解析1.$set: {          "id": { $toString: "$_id" }       }添加一个字段,字段名为id,值为"_id"的值,格式为string类型3.$lookup:{             from:"message_ref",             localField:"id",             foreignField:"messageId",             as:"ref"          },连接另一个表from:连接哪个表localField :以自己的那个字段与另一个表连接foreignField:另一个表的连接字段as:给连接的那个表起别名3.$match:{"ref.receiverId": 1}查询条件(相当于where),该例子表示where ref.receiverId = 14.$sort: {sendTime : -1}排序,该例子为根据sendTime按降序排5.$skip: 0 从那条数据开始(数据是从0开始算的,即0为第一条数据),相当于分页查询的start ,即sql语句limit start,length中的start6.$limit: 50查询几条数据,相当于分页查询的length,即sql语句limit start,length中的length
3.2 程序中实现数据表实体设计

依赖导入

        
        
            org.springframework.boot
            spring-boot-starter-data-mongodb
        

mongoDB配置

spring:  #MongoDB  data:    mongodb:      #ip      host: localhost           #端口      port: 37017      #数据库      database: pbms      authentication-database: admin      username: admin      password: abc133456
3.2.1 pojo实体类
@Data
@document(collection = "message")
public class MessageEntity implements Serializable {
    @Id
    private String _id;

    @Indexed(unique = true)
    private String uuid;

    @Indexed
    private Integer senderId;

    private String senderPhoto=""; //默认系统图片

    private String senderName;

    @Indexed
    private Date sendTime;

    private String msg;

}
@document(collection = "message_ref")
@Data
public class MessageRefEntity implements Serializable {
    @Id
    private String _id;

    @Indexed
    private String messageId;

    @Indexed
    private Integer receiverId;

    @Indexed
    private Boolean readFlag;

    @Indexed
    private Boolean lastFlag;
}

解析1.@document(collection = "message") 表示映射到Mongodb文档上的领域对象3.@Id 表示某个域为ID域3.@Indexed 表示某个字段为Mongodb的索引字段
3.2.2 DAO

messageDao

@Repository
public class MessageDao {
    @Autowired
    private MongoTemplate mongoTemplate;

    //插入消息数据
    public String insert(MessageEntity entity) {
        //把北京时间转换成格林尼治时间
        Date sendTime = entity.getSendTime();
        sendTime = DateUtil.offset(sendTime, DateField.HOUR, -8);

        entity.setSendTime(sendTime);
        entity = mongoTemplate.save(entity);
        return entity.get_id();
    }

    //分页查询某个人的信息
    public List searchMessageByPage(int userId,long start,long length){
        JSonObject json = new JSonObject();
        json.set("$toString","$_id");
        //类比上面所说的连接查询,以那个为逻辑
        Aggregation aggregation = Aggregation.newAggregation(
                Aggregation.addFields().addField("id").withValue(json).build(),
                Aggregation.lookup("message_ref","id","messageId","ref"),
                Aggregation.match(Criteria.where("ref.receiverId").is(userId)),
                Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")),
                Aggregation.skip(start),
                Aggregation.limit(length)
        );
        AggregationResults message = mongoTemplate.aggregate(aggregation, "message", HashMap.class);
        List list = message.getMappedResults();
        list.forEach(one -> {
            List refList = (List) one.get("ref");
            MessageRefEntity entity = refList.get(0);
            Boolean readFlag = entity.getReadFlag();
            String id = entity.get_id();
            one.remove("ref");
            one.put("readFlag",readFlag);
            one.put("refId",id);
            one.remove("_id");
            //把格林尼治时间转换成北京时间
            Date sendTime = (Date) one.get("sendTime");
            sendTime = DateUtil.offset(sendTime, DateField.HOUR, 8);

            String today = DateUtil.today();
            //如果是今天的消息,只显示发送时间,不需要显示日期
            if (today.equals(DateUtil.date(sendTime).toDateStr())) {
                one.put("sendTime", DateUtil.format(sendTime, "HH:mm"));
            }
            //如果是以往的消息,只显示日期,不显示发送时间
            else {
                one.put("sendTime", DateUtil.format(sendTime, "yyyy/MM/dd"));
            }
        });
        return list;
    }

    //根据id查询某条消息详细内容
    public HashMap searchMessageById(String id){
        HashMap message = mongoTemplate.findById(id, HashMap.class, "message");
        Date sendTime = (Date) message.get("sendTime");
        //把格林尼治时间转换成北京时间
        sendTime = DateUtil.date(sendTime).offset(DateField.HOUR, 8);
        message.replace("sendTime", DateUtil.format(sendTime, "yyyy-MM-dd HH:mm"));
        return message;

    }
}

MessageRefDao

@Repository
public class MessageRefDao {
    @Autowired
    private MongoTemplate mongoTemplate;

    public String insert(MessageRefEntity entity){
        entity = mongoTemplate.save(entity);
        return entity.get_id();
    }

    
    public long searchUnreadCount(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("readFlag").is(false).and("receiverId").is(userId));
        long count = mongoTemplate.count(query, MessageRefEntity.class);
        return count;
    }

    
    public long searchLastCount(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("lastFlag").is(true).and("receiverId").is(userId));
        Update update = new Update();
        update.set("lastFlag",false);
        UpdateResult result = mongoTemplate.updateMulti(query, update, "message_ref");
        long count = result.getModifiedCount();
        return count;
    }

    
    public long updateUnreadMessage(String id){
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        Update update = new Update();
        update.set("readFlag",true);
        UpdateResult result = mongoTemplate.updateFirst(query, update, "message_ref");
        long count = result.getModifiedCount();
        return count;
    }

    
    public long deleteMessageRefById(String id){
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        DeleteResult messageRef = mongoTemplate.remove(query, "message_ref");
        long deletedCount = messageRef.getDeletedCount();
        return deletedCount;
    }

    
    public long deleteUserMessageRef(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("receiverId").is(userId));
        DeleteResult ref = mongoTemplate.remove(query, "message_ref");
        long deletedCount = ref.getDeletedCount();
        return deletedCount;
    }
}

3.3 业务层设计

基本上是引用DAO层

public interface MessageService {
    public String insertMessage(MessageEntity entity);

    public String insertRef(MessageRefEntity entity);

    public long searchUnreadCount(int userId);

    public long searchLastCount(int userId);

    public List searchMessageByPage(int userId, long start, int length) ;

    public HashMap searchMessageById(String id);

    public long updateUnreadMessage(String id) ;

    public long deleteMessageRefById(String id);

    public long deleteUserMessageRef(int userId);

}

@Service
public class MessageServiceImpl implements MessageService {
    @Autowired
    private MessageDao messageDao;

    @Autowired
    private MessageRefDao messageRefDao;

    @Override
    public String insertMessage(MessageEntity entity) {
        String id = messageDao.insert(entity);
        return id;
    }

    @Override
    public String insertRef(MessageRefEntity entity) {
        String id = messageRefDao.insert(entity);
        return id;
    }

    @Override
    public long searchUnreadCount(int userId) {
        long count = messageRefDao.searchUnreadCount(userId);
        return count;
    }

    @Override
    public long searchLastCount(int userId) {
        long count = messageRefDao.searchLastCount(userId);
        return count;
    }

    @Override
    public List searchMessageByPage(int userId, long start, int length) {
        List list = messageDao.searchMessageByPage(userId, start, length);
        return list;
    }

    @Override
    public HashMap searchMessageById(String id) {
        HashMap map = messageDao.searchMessageById(id);
        return map;
    }

    @Override
    public long updateUnreadMessage(String id) {
        long count = messageRefDao.updateUnreadMessage(id);
        return count;
    }

    @Override
    public long deleteMessageRefById(String id) {
        long count = messageRefDao.deleteMessageRefById(id);
        return count;
    }

    @Override
    public long deleteUserMessageRef(int userId) {
        long count = messageRefDao.deleteUserMessageRef(userId);
        return count;
    }
}

3.4 web层 1.获取分页消息列表
@ApiModel
@Data
public class SearchMessageByPageForm {
    @NotNull
    @Min(1)
    private Integer page;

    @NotNull
    @Range(min = 1,max = 40)
    private Integer length;
}

@RestController
@RequestMapping("/message")
@Api("消息模块网络接口")
public class MessageController {
    @Autowired
    private JwtUtil jwtUtil;

    @Autowired
    private MessageService messageService;

    @PostMapping("/searchMessageByPage")
    @ApiOperation("获取分页消息列表")
    public R searchMessageByPage(@Valid @RequestBody SearchMessageByPageForm form, @RequestHeader("token") String token) {
        int userId = jwtUtil.getUserId(token);
        int page = form.getPage();
        int length = form.getLength();
        long start = (page - 1) * length;
        List list = messageService.searchMessageByPage(userId, start, length);
        return R.ok().put("result", list);
    }
}

3.根据ID查询消息
@ApiModel
@Data
public class SearchMessageByIdForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/searchMessageById")
    @ApiOperation("根据ID查询消息")
    public R searchMessageById(@Valid @RequestBody SearchMessageByIdForm form) {
        HashMap map = messageService.searchMessageById(form.getId());
        return R.ok().put("result", map);
    }
}

3.把未读消息更新成已读消息
@ApiModel
@Data
public class UpdateUnreadMessageForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/updateUnreadMessage")
    @ApiOperation("未读消息更新成已读消息")
    public R updateUnreadMessage(@Valid @RequestBody UpdateUnreadMessageForm form) {
        long rows = messageService.updateUnreadMessage(form.getId());
        return R.ok().put("result", rows == 1 ? true : false);
    }
}

4.删除消息
@Data
@ApiModel
public class DeleteMessageRefByIdForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/deleteMessageRefById")
    @ApiOperation("删除消息")
    public R deleteMessageRefById(@Valid @RequestBody DeleteMessageRefByIdForm form){
        long rows=messageService.deleteMessageRefById(form.getId());
        return R.ok().put("result", rows == 1 ? true : false);
    }
}

5.轮询接收系统消息
public class MessageController {
    ……

    @GetMapping("/refreshMessage")
    @ApiOperation("刷新用户的消息")
    public R refreshMessage(@RequestHeader("token") String token) {
        int userId = jwtUtil.getUserId(token);
        //异步接收消息
        messageTask.receiveAysnc(userId + "");
        //查询接收了多少条消息
        long lastRows=messageService.searchLastCount(userId);
        //查询未读数据
        long unreadRows = messageService.searchUnreadCount(userId);
        return R.ok().put("lastRows", lastRows).put("unreadRows", unreadRows);
    }
}

3.5 使用RabbitMQ实现削峰填谷 3.5.1 导入依赖

        com.rabbitmq
    amqp-client
    5.9.0


        org.springframework.boot
    spring-boot-starter-amqp


3.5.2 配置类

注意这里可能存在rabbitMQ连接不上的问题:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

原因是不支持使用默认用户进行非本地连接,应新建用户

如何新建用户

@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory getFactory(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");  //rabbitMQ所处服务器ip
        factory.setPort(5672); //端口
        factory.setUsername("admin"); //用户名
        factory.setPassword("admin"); //密码
        return factory;
    }
}

3.5.3 收发消息

这里只是使用的rabbitMQ的简单模式,即一个消费者对应一个队列

如有其他需求可自行学习rabbitMQ的其他模式

rabbitMQ相关笔记

@Slf4j
@Component
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;
    @Autowired
    private MessageService messageService;

    
    public void send(String topic, MessageEntity entity) {
        String id = messageService.insertMessage(entity); //向MongoDB保存消息数据,返回消息ID
        //向RabbitMQ发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //连接到某个Topic
            
            channel.queueDeclare(topic, true, false, false, null);
            HashMap header = new HashMap(); //存放属性数据
            header.put("messageId", id);
            //创建AMQP协议参数对象,添加附加属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
            
            channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e) {
            log.error("执行异常", e);
            throw new PBMSException("向MQ发送消息失败");
        }
    }
    
    @Async
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);

    }
    
    public int receive(String topic) {
        int i = 0;
        try (//接收消息数据
             Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 从队列中获取消息,不自动确认
            channel.queueDeclare(topic, true, false, false, null);
            //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
            while (true) {
                //创建响应接收数据,禁止自动发送Ack应答
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();
                    Map header = properties.getHeaders(); //获取附加属性对象
                    String messageId = header.get("messageId").toString();
                    byte[] body = response.getBody();//获取消息正文
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity); //把消息存储在MongoDB中
                    //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                } else {
                    break; //接收不到消息,则退出死循环
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
        }
        return i;
    }

    
    @Async
    public int receiveAysnc(String topic) {
        return receive(topic);
    }

    
    public void deleteQueue(String topic) {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDelete(topic);
            log.debug("消息队列成功删除");
        } catch (Exception e) {
            log.error("删除队列失败", e);
            throw new PBMSException("删除队列失败");
        }
    }
    
    @Async
    public void deleteQueueAsync(String topic) {
        deleteQueue(topic);
    }
}


3.6 使用

1.发送消息
在要发送信息的地方:创建MessageEntity,调用messageService.insertMessage,插入message表,然后调用messageTask.send发送消息到消息队列

2.接收消息
调用messageController.refreshMessage()接收消息,同时可获取新消息数量和未读消息数量

附录

测试数据

	@Test
    void contextLoads() {
        for (int i = 1; i <= 100; i++) {
            MessageEntity message = new MessageEntity();
            message.setUuid(IdUtil.simpleUUID());
            message.setSenderId(0);
            message.setSenderName("系统消息");
            message.setMsg("这是第" + i + "条测试消息");
            message.setSendTime(new Date());
            String id=messageService.insertMessage(message);

            MessageRefEntity ref=new MessageRefEntity();
            ref.setMessageId(id);
            ref.setReceiverId(11); //注意:这是接收人ID
            ref.setLastFlag(true);
            ref.setReadFlag(false);
            messageService.insertRef(ref);
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745168.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号