Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
Kakfa的诸多痛点如下:
- 扩展 Kafka 十分棘手,这是由于 broker 与存储数据的耦合架构结构所致。剥离一个 broker 意味着它必须复制 topic 分区和副本,这非常耗时。
- 没有与租户完全隔离的本地多租户。
- 存储会变得非常昂贵,尽管可以长时间存储数据,但是由于成本问题却很少用到它。
- 万一副本不同步,有可能丢失消息。
- 必须提前计划和计算 broker、topic、分区和副本的数量(确保计划的未来使用量增长),以避免扩展问题,这非常困难。
- 如果仅需要消息传递系统,则使用偏移量可能会很复杂。
- 集群重新平衡会影响相连的生产者和消费者的性能。
- MirrorMaker[3] Geo 复制机制存在问题。像 Uber 这样的公司已经创建了自己的解决方案来克服这些问题。
Pulsar 的特性如下:
- 内置多租户, 不同的团队可以使用相同的集群并将其隔离,解决了许多管理难题。它支持隔离、身份验证、授权和配额。
- 多层体系结构: Pulsar 将所有 topic 数据存储在由 Apache BookKeeper 支持的专业数据层中。 存储和消息传递的分离解决了扩展、重新平衡和维护集群的许多问题。它还提高了可靠性,几乎不可能丢失数据。 另外,在读取数据时可以直连 BookKeeper,且不影响实时摄取。例如,可以使用 Presto 对 topic 执行 SQL 查询,类似于 KSQL,但不会影响实时数据处理。
- 虚拟 topic: 由于采用 n 层体系结构,因此对 topic 的数量没有限制,topic 及其存储是分离的。用户还可以创建非持久性 topic。
- N 层存储: Kafka 的一个问题是,存储费用可能变高。因此,它很少用于存储"冷"数据,并且消息经常被删除,Apache Pulsar 可以借助分层存储自动将旧数据卸载到 Amazon S3 或其他数据存储系统,并且仍然向客户端展示透明视图;Pulsar 客户端可以从时间开始节点读取,就像所有消息都存在于日志中一样。
- Pulsar Function: 易于部署、轻量级计算过程、对开发人员友好的 API,无需运行自己的流处理引擎(如 Kafka)。
- 安全性: 它具有内置的代理、多租户安全性、可插拔的身份验证等特性。
- 快速重新平衡: 分区被分为易于重新平衡的分片。
- 服务器端重复数据删除和无效字段: 无需在客户端中执行此操作,也可以在压缩期间删除重复数据。
- 内置 Schema registry(架构注册表): 支持多种策略,易于操作。
- 地理复制和内置 Discovery: 易于将集群复制到多个区域。
- 集成的负载均衡器 和 Prometheus 指标。
- 多重集成: Kafka、RabbitMQ 等。
- 支持多种编程语言, 例如 GoLang、Java、Scala、Node、Python……
Pulsar 的劣势
- 相对缺乏支持、文档和案例。
- n 层体系结构导致需要更多组件:BookKeeper。
- 插件和客户端相对 Kafka 较少。
- 云中的支持较少,Confluent 具有托管云产品。
Pulsar 可用于广泛的场景:
- 分布式日志。
- 事件溯源,用于永久性事件存储。
- 微服务。
- SQL 分析。
- Serverless 功能。
什么时候应该考虑 Pulsar?
- 同时需要像 RabbitMQ 这样的队列和 Kafka 这样的流处理程序。
- 需要易用的地理复制。
- 实现多租户,并确保每个团队的访问权限。
- 需要长时间保留消息,并且不想将其卸载到另一个存储中。
- 需要高性能,基准测试表明 Pulsar 提供了更低的延迟和更高的吞吐量。
部署Pulsar集群包括以下步骤(按顺序):
部署一个 ZooKeeper 集群 (可选)
初始化集群元数据
部署一个 Bookeeper 集群
部署一个或多个 Pulsar brokers
下载并解压
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz $ tar xvzf apache-pulsar-2.8.1-bin.tar.gz $ cd apache-pulsar-2.8.1
解压后的文件目录包含以下子目录:
| 目录 | 内容 |
|---|---|
| bin | command-line tools of Pulsar, such as pulsar and pulsar-admin |
| conf | Pulsar的配置文件,包含broker配置,ZooKeeper 配置 等等 |
| data | Zookeeper 和 Bookeeper 使用的数据保存目录 |
| lib | Pulsar 使用的 JAR 文件 |
| logs | 日志目录 |
要启用内置的connector,你必须通过如下方式在每台 broker 上下载 connector 发行版压缩包:https://pulsar.apache.org/zh-CN/download/
例如
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.8.1/connectors/{connector}-2.8.1.nar
$ mkdir connectors
$ mv pulsar-io-aerospike-2.8.1.nar connectors
$ ls connectors
pulsar-io-aerospike-2.8.1.nar
部署zookeeper,如果已部署,可忽略
以下方式为pulsar自带zookeeper部署
将所有的 Zookeeper 服务器信息添加到conf/zookeeper.conf配置文件(文件在 Pulsar 的安装目录里面)。 如下所示:
server.1=zk1.us-west.example.com:2888:3888 server.2=zk2.us-west.example.com:2888:3888 server.3=zk3.us-west.example.com:2888:3888
在每台机器,你必须在每个节点的myid文件里面配置集群唯一的ID。默认情况下,这个文件在每台机器的data/zookeeper目录里面(你可以通过修改dataDir配置项修改这个路径)。
如,在地址为zk1.us-west.example.com的Zookeeper 服务器上,你能够使用如下命令设置myid:
$ mkdir -p data/zookeeper $ echo 1 > data/zookeeper/myid
在zk2.us-west.example.com服务器上,这命令可以是:echo 2>data/zookeeper/myid。
一旦你在每台机器增加了zookeeper.conf配置文件,并且设置了myid,你能够在所有机器上使用pulsar-daemon命令去启动Zookeeper 服务(前台运行或者后台运行)。
$ bin/pulsar-daemon start zookeeper 如果像关闭zookeeper $ bin/pulsar-daemon stop zookeeper初始化集群元数据
./pulsar initialize-cluster-metadata --cluster pulsar-cluster-1 --zookeeper vm61:2181,vm62:2181,vm63:2181 --configuration-store vm61:2181,vm62:2181,vm63:2181 --web-service-url http://vm61:8089,vm62:8089,vm63:8089 --web-service-url-tls https://vm61:8443,vm62:8443,vm63:8443 --broker-service-url pulsar://vm61:6650,vm62:6650,vm63:6650 --broker-service-url-tls pulsar+ssl://vm61:6651,vm62:6651,vm63:6651
注意:
- 集群Web服务的URL+端口,URL是一个标准的DNS名称,默认端口8080,不建议修改。(由于自建zookeeper占用了8080端口,于是更改成了8089)
| 参数 | 说明 |
|---|---|
| --cluster | 集群名字 |
| --zookeeper | A “local” ZooKeeper connection string for the cluster. This connection string only needs to include one machine in the ZooKeeper cluster. |
| --configuration-store | 整个集群实例的配置存储连接字符串。 As with the --zookeeper flag, this connection string only needs to include one machine in the ZooKeeper cluster. |
| --web-service-url | The web service URL for the cluster, plus a port. This URL should be a standard DNS name. The default port is 8080 (you had better not use a different port). |
| --web-service-url-tls | If you use TLS, you also need to specify a TLS web service URL for the cluster. The default port is 8443 (you had better not use a different port). |
| --broker-service-url | Broker服务的URL,用于与集群中的brokers进行交互。 这个 URL 不应该使用和 web 服务 URL 同样的 DNS名称,而应该是用pulsar方案。 默认端口是6650(我们不建议使用其他端口)。 |
| --broker-service-url-tls | 如果使用TLS,你必须为集群指定一个 TLS web 服务URL,以及用于集群中 broker TLS 服务的URL。 默认端口是6651(不建议使用其他端口)。 |
通过配置文件conf/bookkeeper.conf去配置 BookKeeper bookies。 配置 bookies 最重要的一步,是要确保zkServers设置为 Zookeeper 集群的连接信息。 如下所示:
zkServers=vm61:2181,vm62:2181,vm63:2181 advertisedAddress=vm61
注意:
可自选修改
- journalDirectories=/home/kafka/data/bookkeeper/journal ledgerDirectories=/home/kafka/data/bookkeeper/ledgers prometheusStatsHttpPort=8100
- prometheusStatsHttpPort默认是8000,但实际上在bookkeeper.conf中,httpServerPort默认也是8000,会导致端口被占用。
- advertisedAddress:指定当前节点的主机名或IP地址
zkServers:指定zookeeper集群,用来将bookkeeper节点的元数据存放在zookeeper集群
journalDirectories:当前bookkeeper节点的journal数据存放目录。 如果需要提高磁盘写入性能,可以指定多个目录用来存放journal数据,关键是每一个目录必须在不同的磁盘,不然反而会影响写入性能
ledgerDirectories:当前bookkeeper节点的ledger存放目录
启动和校验
后台运行 $ bin/pulsar-daemon start bookie 显式运行 $ bin/pulsar bookie 校验是否成功 $ bin/bookkeeper shell bookiesanityBroker集群部署(三台)
配置 Broker
Broker 配置中有一些非常重要的参数,这些参数可以确保每个Broker 连接到已部署的 ZooKeeper 集群。 需要确认 zookeeperServers 和 configurationStoreServers 配置项的值是正确的。 在当前情况下,由于只有一个集群,没有单独用来存储配置的 Zookeeper 集群,那么配置项configurationStoreServers和zookeeperServers 是一样的值。
zookeeperServers=vm61:2181,vm62:2181,vm63:2181 configurationStoreServers=vm61:2181,vm62:2181,vm63:2181 advertisedAddress=vm61 (其他各为vm62和vm63)
你必须配置集群的名字( 初始化集群元数据 提供的集群名字必须和这个配置项匹配):
clusterName=pulsar-cluster-1
此外,初始化集群的时候提供的 broker 和 web 服务的端口也必须和下面的配置匹配(特别是当你定义了和默认值不同的端口的时候)
brokerServicePort=6650 brokerServicePortTls=6651 webServicePort=8089 webServicePortTls=8443
启动
显式启动 $ bin/pulsar broker 后台启动 $ bin/pulsar-daemon start broker配置client
修改conf/client.conf中的webServiceUrl和brokerServiceUrl的值
webServiceUrl=http://vm61:8089,vm62:8089,vm63:8089 brokerServiceurl=pulsar://vm61:6650,vm62:6650,vm63:6650生产消息
生产10条消息
./pulsar-client produce my-topic -n 10 -m “Hello Pulsar”
./pulsar-client consume my-topic -n 100 -s “consumer-test” -t “Exclusive”



