Kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份的、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。
流平台特征:
-
能够允许发布和订阅数据-消息队列
-
提供相应的容错机制
-
流数据能够及时被处理
broker、topic、partition、replica、message、producer、consumer、consumer group、offset
设计动机:-
具有高吞吐量来支持大规模事件流
-
能够很好处理大量积压的数据
-
能够低延迟处理传统数据应用场景
-
能够支持分区、分布式,实时的处理消息,同时具有容错保障机制
-
消息持久化:使用文件系统和依赖于页缓存,存储到磁盘上
-
高吞吐量:zero-copy,支持百万级别每秒的吞吐量
-
扩展性:依赖于ZooKeeper对集群进行协调管理,更容易水平扩展而无需将整个集群停机
-
多客户端支持:核心模块使用Scala开发,提供了多种开发语言的接入;与当前主流的大数据框架(如Flume、Hadoop、Hbase、Hive等)都可以很好的集成
-
Kafka Streams:0.10版本之后引入Kafka Streams,一个用Java实现的用于流处理的jar
-
安全机制:通过SSL和SASL(Kerberos)验证机制;支持代理与ZooKeeper链接身份验证;通讯时数据加密;客户端读写权限认证;支持与外部其他认证授权服务的集成
-
数据备份:可以为每个主题指定副本数
-
轻量级:kafka的代理是无状态的,即代理本身不记录消息的消费情况,同时集群本身几乎不需要生产者和消费者的状态信息
-
消息压缩:支持三种压缩方式:Gzip、Snappy、LZ4,把多条message组成messageset,再把messageset放到一条message里,提高压缩比率
Kafka没有ZooKeeper是不能工作的。
ZooKeeper是一个分布式的、开源的程序协调服务,是 hadoop 项目下的一个子项目。他提供的主要功能包括:配置管理、名字服务、分布式锁、集群管理。
配置管理在我们的应用中除了代码外,还有一些就是各种配置。比如数据库连接等。一般使用配置文件的方式,在代码中引入这些配置文件。但是如果我们配置非常多,有很多服务器都需要这个配置,往往需要寻找一种集中管理配置的方法,集中修改配置,所有应用都可以获得变更。ZooKeeper使用Zab一致性协议来提供一致性。现在有很多开源项目使用 ZooKeeper 来维护配置,比如Hbase。
名字服务比如集中管理域名到ip的映射,提供一个统一的访问入口,统一维护。
分布式锁协调多个分布式进程之间的活动。比如在一个分布式环境中,为了提高可靠性,集群的每台服务器上都部署着同样的服务。通常还有一种做法就是使用分布式锁,在某个时刻只让一个服务工作,当这台服务出问题的时候锁释放,立即协调到另外的服务。这在很多分布式系统中都是这么做,这种设计叫Leader Election(leader 选举)。
集群管理在分布式的集群中,经常会由于各种原因,比如硬件故障,软件故障,网络问题,有些 节点会进进出出。有新的节点加入进来,也有老的节点退出集群。这个时候,集群中其他机器需要感知到这种变化,然后根据这种变化做出对应的决策。比如一个分布式的SOA(面向服务)架构中,服务是一个集群提供的,当消费者访问某个服务时,就需要采用某种机制发现现在有哪些节点可以提供该服务(这也称之为服务发现)。
Kafka使用ZooKeeper-
broker注册:
ZooKeeper上会有一个专门用来进行Broker服务器列表记录的节点,Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。
-
topic注册:
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由ZooKeeper在维护,由专门的节点来记录。
-
生产者负载均衡:
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持ZooKeeper方式实现负载均衡。
-
消费者负载均衡:
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
-
协调分区与消费者的关系
-
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID。
-
在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper上记录“消息分区”与Consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID写入到 ZooKeeper 对应消息分区的临时节点上
-
-
消息消费offset记录:
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到ZooKeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
-
消费者注册
-
消费者在初始化启动时册到消费者组。每个消费者启动时,都会到ZooKeeper的指定节点下创建一个属于自己的消费者节点,完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
-
对消费者组中的消费者的变化注册监听一旦发现消费者新增或减少,就触发消费者的负载均衡。
-
对Broker变化注册监听。消费者需要对broker的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。
-
进行消费者负载均衡。通常,对于一个消费者组,如果组内的消费者发生变更或Broker发生变更,会发出消费者负载均衡。
-
3. API maven dependencykafka_2.12-2.5.0,(2.12为scala编译器版本号,2.5.0为kafka版本号) 该文件夹包括了所有相关的运行文件及配置文件,其子文件夹binwindows 下放的是在Windows系统启动zookeeper和kafka的可执行文件,子文件夹config下放的是zookeeper和kafka的配置文件。 由于kafka依赖于ZooKeeper,所以在本机启动Kafka之前,需要先启动ZooKeeper。(可通过kafka运行文件中的启动脚本或zk的启动脚本,sh、cmd、bat)
结构 clients adminorg.apache.kafka kafka-clients2.1.0
创建Topic:createTopics(Collection
删除Topic:deleteTopics(Collection
查看所有Topic:listTopics()
查询Topic:describeTopics(Collection
查询集群信息:describeCluster()
查询ACL信息:describeAcls(AclBindingFilter filter)
创建ACL信息:createAcls(Collection acls)
删除ACL信息:deleteAcls(Collection filters)
查询配置信息:describeConfigs(Collection
修改配置信息:alterConfigs(Map
修改副本的日志目录:alterReplicaLogDirs(Map
查询节点的日志目录信息:describeLogDirs(Collection
查询副本的日志目录信息:describeReplicaLogDirs(Collection
增加分区:createPartitions(Map
-
Properties、Map
-
实例化一个KafkaConsumer
-
订阅主题 subscribe或订阅分区assign
-
管理偏移量,seek
-
开始消费
-
Properties、Map
-
bootstrap.servers
-
key.serializer
-
value.serializer
-
-
实例化一个KafkaProducer
-
实例化一个ProducerRecord
-
调用KafkaProducer的send方法
-
关闭KafkaProducer



