**
Apache Pulsar—比 kafka 更加优秀的消息队列**
文章目录- Apache Pulsar---比 kafka 更加优秀的消息队列
- 前言
- 一、Pulsar是什么?
- 二、订阅模型
- 三、路由策略与持久化
- 四、整体架构
- 应用优势
- 应用案例
前言
众所周知,消息队列有两种形式,一种是点对点的队列模式,一种是发布订阅模式。而Pulsar是发布订阅模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。
Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。
提示:以下是本篇文章正文内容,下面案例可供参考
一、Pulsar是什么?Pulsar是发布订阅模式的分布式消息队列,生产者生产消息,发到Pulsar topic中,最终被消费者取用。
其中:
Topic是Pulsar的核心概念,表示一个“channel”,Producer可以写入数据,Consumer从中消费数据(Kafka、RocketMQ都是这样)。
Topic名称的URL类似如下的结构:
{persistent|non-persistent}://tenant/namespace/topic
其中:
persistent|non-persistent表示数据是否持久化(Pulsar支持消息持久化和非持久化两种模式)
Tenant为租户,Namespace一般聚合一系列相关的Topic,一个租户下可以有多个Namespace。
在cluster情况下,结构如下所示:
其中:
图中Property即为租户,每个租户下可以有多个Namespace,每个Namespace下有多个Topic。Namespace是Pulsar中的操作单元,包括Topic是配置在Namespace级别的,包括多地域复制,消息过期策略等都是配置在Namespace上的。
Pulsar提供了灵活的消息模型,支持四种订阅类型:
- Exclusive subscription:排他的,只能有一个Consumer,接收一个Topic所有的消息
- Shared subscription:共享的,可以同时存在多个Consumer,每个Consumer处理Topic中一部消息(Shared模型是不保证消息顺序的,Consumer数量可以超过分区的数量)
- Failover subscription:Failover模式,同一时刻只有一个有效的Consumer,其余的Consumer作为备用节点,在Master Consumer不可用后进行替代(看起来适用于数据量小,且解决单点故障的场景)
- Key_Shared subscription:支持多个并发消费者,因此你可以通过增加消费者的数量来轻松降低处理延迟。在此方面,它提供了消息队列类型的语义,因为每个消息都可以独立于其他消息进行处理。这种订阅类型与传统的 Shared 订阅类型的不同之处就在于它在消费者之间分发数据的方式。与任何消费者都可以处理任意消息的传统消息传递不同,在 Pulsar 的 Key_Shared 订阅中,消息被分配到消费者中,并保证具有相同 key 的消息被发送到同一个消费者。
Pulsar提供了一些策略来处理消息到Partition的路由(MessageRouter):
- Single partitioning:Producer随机选择一个Partition并将所有消息写入到这个分区
- Round robin partitioning :采用Round robin的方式,轮训所有分区进行消息写入
- Hash partitioning:这种模式每条消息有一个Key,Producer根据消息的Key的哈希值进行分区的选择(Key相同的消息可以保证顺序)。
- Custom partitioning:用户自定义路由策略
不同于别的MQ系统,Pulsar允许Consumer的数量超过分区的数量(对于RocketMQ,超过分区数的Consumer会分配不到分区而“空跑”)。在Shared subscription的订阅模式下,Consumer数量可以大于分区的数量,每个Consumer处理每个Partition中的一部分消息,不保证消息的顺序。
在持久化方面。Pulsar通过BookKeeper来存储消息,保证消息不会丢失。
四、整体架构
Pulsar采用“存储和服务分离”的两层架构(这是Pulsar区别于其他MQ系统最重要的一点,也是所谓的“下一代消息系统”的核心):
Broker:提供发布和订阅的服务(Pulsar的组件)
Bookie:提供存储能力(BookKeeper的存储组件)
优势是Broker成为了stateless的组件,可以水平扩容(RocketMQ的Broker是包含存储的,是有状态的,Broker的扩容更像是“拆分”)。高可靠,一致性等通过BookKeeper去保证。
应用优势 应用案例
1.使用 Key_Shared 订阅进行身份拼接
public class Click {
static final String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
static final String MY_TOPIC = "persistent://tracking/web-activity/tags"";
static final String SUBscriptION = "aggregator-sub";
public static void main() throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.build();
ConsumerBuilder consumerBuilder =
//身份拼接
client.newConsumer(JSONSchema.of(TrackingTag.class))
.topic(MY_TOPIC)
.subscriptionName(SUBscriptION)
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(new TagMessageListener());
//lambda
IntStream.range(0, 4).forEach(i -> {
String name = String.format("mq-consumer-%d", i);
try {
consumerBuilder
.consumerName(name)
.subscribe();
} catch (PulsarClientException e) {
e.printStackTrace();
}
});
}
}



