1)若干Producer,可以是服务器日志,页面生产的page view等.Producer使用push模式将消息发布到Broker.
2)若干Broker
3)若干Consumer,Consumer使用pull模式从Broker订阅并消费消息.
4)一个zookeeper集群:kafka通过zookeeper管理集群配置,选举Leader,以及在Consumer Group发生变化时进行rebalance.
就比如说有4个partitione,consumergroup中有两个consumer消费,此时1个consumer消费两个partition,现在又来了两个consumer,那就调整成一个consumer消费一个parition.
2.组件1)Broker:一个kafka节点就是一个Broker,一个或多个Broker组成一个kafka集群.
2)Topic:kafka根据topic对消息进行分类,发布到kafka集群的每条消息都要指定一个topic.
3)ConsumerGroup:每个Consumer都属于一个ConsumerGroup,一条消息可以发送到多个不同的ConsumerGroup,但是一个ConsumerGoup只能有一个Consumer消费该消息.
4)Partition:topic物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的.
3.kafka的文件存储机制kafka中的消息是以topic进行分类的,生产者通过topic向Broker发送消息,消费者通过topic消费消息.一个topic可以分为若干partition,一个parition可以分为若干segment(段),topic是逻辑上的概念,partition,segment是物理上的概念.
为了方便说明,假设这个kafka集群只有一个Broker,即只有一个kafka节点,在这个Broker中配置server.properties.log.dirs=/opt/kafka/logs,设置kafka消息文件存储目录.
注:什么时候使用server.properties:启动kafka集群时,即在kafka的每个节点上
kafka-server-start.sh /opt/kafka-1.1.1/conf/server.properties &
这时创建一个topic:
kafka-topics.sh --zookeeper cdh:2181 --create --replication-factor 1 --partitions 4 --topic topic_test
此时在/opt/kafka/logs目录下可以看到生成了4个目录
(kafka本身是分布式的,它的目录存储在服务器本地,不是存储在hdfs上的)
即topic:topic_test的4个parition,每个partition一个目录.
每个partition相当于一个巨型文件被平均分配到segment文件中,每个segment文件的大小相等,但是每个segment文件中的消息数量不一定相等.
segment文件由两部分组成,分别为".index"索引文件和".log"数据文件,两个文件的命名规则为:partition全局第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值(消息的个数),如下:
下图以上面的segment为例,展示出segment00000000000000170410的".index"文件和".log"文件的对应关系.
".index"索引文件存储元数据,".log文件存储消息",以"00000000000000170410.index"索引文件的元数据中[3,348]为例,3表示第三个消息.即全局的第170410+3=170413个消息.该消息的物理偏移地址为348.如何通过offset查找message呢?
查找offset=170418的消息,首先查找segment文件
第一个segment文件为00000000000000000000.index
第二个segment文件为00000000000000170410.index
第三个segment文件为00000000000000239430.index
所以这个offset=170418落在了第二segment文件中,然后根据00000000000000170410.index文件中[8,1325]定位到1325物理偏移量位置进行读取.
4.replicas副本
如上图topic的每个parition有3个副本,p1表示Partiiton1的副本,其中一个replica为Leader,其余的为Follower.这里Broker1的p1为Leader.Broker2和Broker3上的p1为Follower.Leader处理对该Partition的所有读写请求.与此同时,Follower会被动定期地去复制Leader上的数据.如果Leader发生故障或者挂掉,一个新的Leader会被选举,副本数对kafka的吞吐率由一定的影响,但是极大的增强了可用性.
5.ISR HW LEO 1)ISR=in sync replicas同步副本列表一个partition所有的副本统称为AR(all replicas),ISR是AR的子集,Leader维护ISR,Follower从Leader同步数据有一些延迟.超过指定的
replica.log.max.messges(延迟条数)--已经被去掉了
replica.log.time.max.ms(延迟时间)
把Follower剔除ISR,存入OSR(out of sync replicas不同步副本列表).新加入的Follower也会先存到OSR.AR=ISR+OSR
2)LEO=log end offset日志结束偏移量,即Leader的最后一条消息的位置HW=high water高水位,指Consumer能消费的位置,受ISR中最慢Follower的限制.下图说明了当Producer生产消息到Broker后,ISR.HW,LEO的流转过程:
6.kafka分区在集群中的分配策略 1.producer如何把消息发送给对应分区1.当key为空时,消息随机发送到各个分区(各个版本会有不同,有的是采用轮询方式,有的是随机,有的是一定时间内只发给固定的partition,隔一段时间后随机换一个)
2.用key的hash值对partition的个数取模,决定要把消息发送到哪个partition上.
2.消费者的分区分配策略在kafka内部存在两种分区的分配策略,range(范围分区,默认)和roundrobin(轮询分区).
过程range范围分区(默认的)
假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费1个.
最终分配结果如下:
C1:0,1,2,3
C2:4,5,6
C3:7,8,9
假如有11个分区将是
C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10
假如我们有两个topicT1,T2,分别有10个分区,最后的分配结果将会是这样:
C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)



