- 三 高吞吐消息中间件Kafka
- 1.Kafka 简介
- 2.Kafka 基本架构
- 3.Kafka 主要术语
- 4.kafka 工作流程
- 5.Kafka 实践
- 5.1 安装配置JDK
- 5.2 安装配置Zookeeper
- 5.3 安装配置Kafka
- 5.4 SpringBoot整合Kafka
三 高吞吐消息中间件Kafka 1.Kafka 简介
Kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 支持离线数据处理和实时数据处理。
- 支持在线水平扩展。
- Producers(生产者)
生产者将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。 - Consumers(消费者)
消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。 - Topics(主题)
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储。主题就好比数据库的表,尤其是分库分表之后的逻辑表。 - Partition(分区)
主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先进先出的顺序读取。无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。 - Brokers(经纪人)
一个独立的Kafka 服务器被称为broker。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
(1) 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
(2) 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
(3) 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。 - Replicas(副本)
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker 上,每个broker 可以保存成百上千个属于不同主题和分区的副本。 - Offset(偏移量)
消息写入的时候,每个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。有些时候没有指定某一个分区的offset,这个工作kafka帮我们完成。
-生产过程分析
- 写入方式。
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)。 - 分区。
Kafka集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题(topic)看作进行了分区的日志文件(partition log)。主题的每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。
-
副本。
副本有以下两种类型:
(1)首领副本。每个分区都有一个首领副本。为了保证数据一致性,所有生产者请求和消费者请求都会经过这个副本。
(2)跟随者副本。首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。 -
写入流程。
producer写入消息流程如下:
-消费过程分析
- 消费模型。
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。 - 消费者组。
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。
- 消费方式。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式,可批量消费也可逐条消费。
pull模式也有不足之处,当kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为避免这种情况,可在拉请求中设置参数,允许消费者请求在等待数据中进行阻塞。
- 上传jdk压缩包到服务器并解压。
- 配置环境变量
(1)添加配置:vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0 (本地解压的路径)
export PATH=$PATH:$JAVA_HOME/bin
(2)生效:source /etc/profile
(3)验证:java -version
5.2 安装配置Zookeeper
- 上传zookeeper-3.4.14.tar.gz到服务器并解压。
# 解压 tar -zxf zookeeper-3.4.14.tar.gz -C /zookeeper
- 修改Zookeeper保存数据的目录(dataDir)
# 进入目录 cd /zookeeper/zookeeper-3.4.14/conf # 复制zoo_sample.cfg命名为zoo.cfg cp zoo_sample.cfg zoo.cfg # 编辑zoo.cfg文件 vim zoo.cfg # 修改dataDir dataDir=/var/lagou/zookeeper/data
- 编辑/etc/profile
# 设置环境变量ZOO_LOG_DIR,指定Zookeeper保存日志的位置 export ZOO_LOG_DIR=/var/test/zookkeeper/log # ZOOKEEPER_PREFIX指向Zookeeper的解压目录 export ZOOKEEPER_PREFIX=/zookeeper-3.4.14 # 将Zookeeper的bin目录添加到PATH中 export PATH=$PATH:$ZOOKEEPER_PREFIX/bin # 使配置生效 source /etc/profile5.3 安装配置Kafka
- 上传kafka_2.12-1.0.2.tgz到服务器并解压。
- 配置环境变量并生效。
(1)添加配置:vim /etc/profile export KAFKA_HOME=/kafka/kafka-2.12 (本地解压的路径) export PATH=$PATH:$KAFKA_HOME/bin (2)生效:source /etc/profile
-
配置/kafka_2.12config中的server.properties文件。
修改Kafka连接Zookeeper的地址,连接地址是localhost:2181,myKafka 是Kafka在Zookeeper中的根节点路径。
添加内容:zookeeper.connect=localhost:2181/myKafka -
启动zookeeper并查看状态
zkServer.sh start
zkServer.sh status -
启动kafka 或后台启动kafka
kafka-server-start.sh config/server.properties
kafka-server-start.sh -daemon config/server.properties
已经有很多优秀作者总结了相关内容,这里不做详细阐述了。
这里进行文章推荐(这篇文章作者给出实践源码):SpringBoot整合Kafka



