kafka目前与很多大数据框架比如spark、flink进行对接,另外在很多业务系统中通过存放上游日志信息供下流拉取的作用。之前在实习的资金中台部门中,部门的的计费系统通过拉取kafka存放applog中的计费事件,通过spark streaming的流式处理对计费事件按广告主进行聚合,再进行接下来的实时计费的流程。
kafka在业界的使用非常广泛,之前一直没有深入了解其中的原理,于是目前打算写一系列的博客来对kafka进行学习,同时也希望能够和大家一起学习交流。
Kafka简介Kafka 是一个分布式的流处理平台。它具有以下特点:
- 支持消息的发布和订阅,类似于 RabbitMQ、ActiveMQ 等消息队列。
- 支持数据实时处理。
- 能保证消息的可靠性投递。
- 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错。
- 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量。
上面都是一些概念性质的东西,其实最本质Kafka就是一个消息中间件,类似于rocketmq,rabbitmq等,能够起到异步、解耦、削峰的作用。
- 异步
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中异步获取数据即可。
- 解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
- 削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。
mac环境下Kafka的安装与测试 1.安装zookeeper因为kafka要依赖zookeeper所以我们需要提前安装zookeeper(通过brew方式安装)
brew install zookeeper
默认安装位置
启动文件:/usr/local/Cellar/zookeeper/3.6.2/bin
配置文件: /usr/local/etc/zookeeper/
2. 启动zookeeper 服务nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
这里也可以通过brew services的启动方式brew services start zookeeper来进行启动。
3. 安装Kafkabrew install kafka
4. 修改Kafka服务配置文件#进入到kafka软件包的 config 目录 cd /usr/local/etc/kafka #列出当前 config 文件下的文件,可以看到有个 server.properties 文件 ll #复制 server.properties 文件为 server-1.properties cp server.properties server-1.properties #复制 server.properties 文件为 server-2.properties cp server.properties server-2.properties
到目前为止,我们下载了 Kafka ,复制了 Kafka 配置文件,下一步需要修改这三个配置文件,使其组成集群。
下面在新的命令行终端中编辑 Kafka 的三个配置文件(server.properties、 server-1.properties、server-2.properties),修改文件名和修改内容说明如下:
| 配置文件名称 | 修改内容 |
|---|---|
| server.properties | 此文件不修改,保持默认值 |
| server-1.properties | 修改 broker.id=1,log.dirs=/tmp/kafka-logs-1,新增 listeners=PLAINTEXT://:9093 三个参数值 |
| server-2.properties | 修改 broker.id=2,log.dirs=/tmp/kafka-logs-2 和 listeners=PLAINTEXT://:9094 三个参数值 |
至此,参数修改完成。下一步,启动 Kafka 集群。上面我们看到的三个 Kafka 配置文件,每个配置文件对应 Kafka 集群中一个节点(称为 Broker)。
5. 启动集群依次运行如下命令,启动 Kafka 集群:
#切换到启动kafka命令所在目录 cd /usr/local/Cellar/kafka/2.6.0/bin #使用配置文件 server.properties 启动第一个 Kafka Broker,注意:命令最后的 & 符号表示以后台进程启动,启动完成后,按回车键,回到命令行,启动另一个 Kafka Broker。 ./kafka-server-start /usr/local/etc/kafka/server.properties & #使用配置文件 server-1.properties 启动第二个 Kafka Broker 。启动完成后,按回车键,回到命令行,启动另一个 Kafka Broker 。 ./kafka-server-start /usr/local/etc/kafka/server-1.properties & #使用配置文件 server-2.properties 启动第三个 Kafka Broker。启动完成后,按回车键,回到命令行。 ./kafka-server-start /usr/local/etc/kafka/server-2.properties & #查看当前运行的java进程。如下图,出现三个 kafka 进程,说明三个 Broker 的 Kafka 集群启动成功。 jps
到目前为止,单机版三个 Broker 的 Kafka 集群已经安装成功。
我们简要说一下Kafka中有关消息的整体流程。生产者生产消息,将消息发送到 Kafka 服务器(实际上消息存储到了 Kafka 中的 Topic 里面)。消费者消费消息,从 Kafka 服务器读取消息。这里的 Kafka 服务器相当于一个中间人,用于存储生产者和消费者交互的数据(消息),下一步我们要在 Kafka 集群上创建一个 Topic ,用于存储消息,创建一个消息生产者,向 Topic 发送消息。创建一个消息消费者,从 Topic 读取消息。
6. 在集群中创建Topic进行测试使用 Shell 命令在 Kafka 集群中创建一个 Topic ,名称为 myFirstTopic。
在 /opt/kafka/bin 目录下运行命令创建一个名为 myFirstTopic 的 Topic。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myFirstTopic
在 /usr/local/Cellar/kafka/2.6.0/bin 目录下运行命令,查看 Topic 创建是否成功:
./kafka-topics --zookeeper localhost:2181 --list
在 /usr/local/Cellar/kafka/2.6.0/bin 目录下运行命令,启动消息生产者,用于向 Topic 发送消息:
./kafka-console-producer --broker-list localhost:9092 --topic myFirstTopic
重新打开一个新的命令行终端,启动消息消费者,用于从 Topic 中读取消息:
./kafka-console-consumer --bootstrap-server localhost:9092 --topic myFirstTopic
在消息生产者所在命令行终端中输入 hello kafka ,然后按回车键,消息发送到 Topic 。此时在消息消费者所在的命令行中,可以看到 hello kafka 消息已经收到了。



