栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【rabbitmq怎样实现消费分组?rabbitmq怎么实现类似kafka的comsumerGroup分组机制?】

【rabbitmq怎样实现消费分组?rabbitmq怎么实现类似kafka的comsumerGroup分组机制?】

说明

最近使用rabbitmq作为消息队列,发现由生产者产生的一条消息要被集群A消费,同时一个月后可能会接入集群B也要消费该消息,在过去我们使用kafka的时候,kafka有consumer.group-id的概念,不同服务定义不同的consumer.group-id就可以做到集群A都是A服务属于同一个consumer.group-id,消费的时候集群A中只有一个节点会消费,同时接入集群B也是一样只要消费者Id不一样即可正常消费,很容易就实现了消费分组,同样的道理我们现在使用rabbitmq怎么去做到类似的消费分组功能呢?

利用rabbitmq的广播交换机FanoutExchange来实现消息分组消费

为什么能实现?这里的交换机FanoutExchange其实就类似于我们的定义的通道,每来一组消息我们就定义一个新的名称的FanoutExchange交换机,比如login事件消息要发送业务服务A和大数据服务B,我们就定义一个Login_FanoutExchange交换机,然后业务服务A需要使用该Login_FanoutExchange交换机过来的消息那么由业务服务A自行去定义自己的Queue并和该Login_FanoutExchange交换机进行绑定,绑定方式这里不介绍,有配置文件绑定,也有注解的绑定方式,自行选择你喜欢的,然后这个QueueA就可以消费该Login_FanoutExchange交换机过来的消息了,同样的,一个月后大数据服务B要使用该交换机Login_FanoutExchange的消息了,那么服务B也定义自己的QueueB去和该交换机绑定就行了,后面再来一个C服务也无所谓,同样定义自己的QueueC做绑定即可,这里要注意的就是这里面的Queue的name值设置每个服务应该不一样,Queue的name其实就相当于consumer.group-id以便rabbitmq来确定是不是同一个消费的标识;
后面如果有其他的一组消息事件定义不同的FanoutExchange即可,如果类比kafka,每一个这个FanoutExchange就相当于kafka的topic,每一个Queue的name值就相当于kafka的consumer.group-id,这样就实现了消息分组;
Queue的定义记得设置持久化就行,嗯,就是new Queue(“name”,true)第二个参数为true就行,毕竟交换机是不存储消息的,只有Queue可以持久化消息;

大概的图就是这样子:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687612.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号