栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka介绍

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka介绍

文章目录
      • 1. Kafka概述
        • 1.1 为什么有消息系统
        • 1.2 Kafka核心概念
        • 1.3 Kafka集群架构
      • 2. kafka集群安装部署
      • 3. kafka集群启动和停止
        • 3.1 启动
        • 3.2 停止
        • 3.3 一键启动和停止脚本
      • 4. kafka的命令行的管理使用
      • 5. kafka的生产者和消费者api代码开发
        • 5.1 生产者代码开发
        • 5.2 消费者代码开发
      • 1. kafka分区策略
      • 2. kafka的文件存储机制
        • 2.1 概述
        • 2.2 数据消费问题讨论
        • 2.3 Segment文件
        • 2.4 kafka如何快速查询数据
        • 2.5 kafka高效文件存储设计特点
      • 3. 为什么Kafka速度那么快
        • 3.1 顺序读写
        • 3.2 Page Cache
        • 3.3 零拷贝(sendfile)
      • 4. kafka整合flume
      • 5. kafka监控工具安装和使用
        • 5.1. Kafka Manager
        • 5.2. KafkaOffsetMonitor
        • 5.3. Kafka Eagle
      • 1. kafka内核原理
        • 1.1 ISR机制
        • 1.2 HW&LEO原理
      • 2. producer消息发送原理
        • 2.1 producer核心流程概览
      • 3. producer核心参数
        • 3.1 常见异常处理
        • 3.2 提升消息吞吐量
        • 3.3 请求超时
        • 3.4 ACK参数
        • 3.5 重试乱序
      • 4. broker核心参数
      • 5. consumer消费原理
        • 5.1 Offset管理
        • 5.2 Coordinator
      • 6. consumer消费者Rebalance策略
        • 6.1 range策略
        • 6.2 round-robin策略
        • 6.3 sticky策略
      • 7. consumer核心参数
    • 五、优秀书籍推荐
    • kafka知识要点
        • 4.1 如何提升生产者的吞吐量?
        • 4.2 如何保证Kafka内部数据不丢失?
        • 4.3 积压了百万消息如何处理?
        • 4.4 生产者遇到了异常如何处理?
        • 4.5 说一下Kafka的HW,LEO的更新机制
        • 4.6 Zookeeper对于Kafka的作用是什么?
        • 4.7 讲一讲Kafka的ack的三种机制
        • 4.8 Kafka如何不重复消费数据
        • 4.9 如何保证同一分区一定有序

1. Kafka概述 1.1 为什么有消息系统

解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.2 Kafka核心概念
	Kafka是最初由linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
	
	kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。

1.3 Kafka集群架构

  • producer

     消息生产者,发布消息到Kafka集群的终端或服务
    
  • broker

    Kafka集群中包含的服务器,一个borker表示kafka集群中的一个节点
    
  • topic

    每条发布到Kafka集群的消息属于的类别,即Kafka是面向 topic 的。
    更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。
    
  • partition

每个 topic 包含一个或多个partition。Kafka分配的单位是partition
  • replica

    partition 的副本,保障 partition 的高可用。
    
  • consumer

    从Kafka集群中消费消息的终端或服务
    
  • consumer group

    每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
    
  • leader

    每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互
    
  • follower

    Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
    
  • controller

    	知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?如果某个broker崩溃了,是谁负责监听这个broker崩溃?这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。
    
    
  • zookeeper

    (1)	Kafka 通过 zookeeper 来存储集群的meta元数据信息
    (2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,保证有一台新的broker会成为controller角色。
    
  • offset

    消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
    kafka0.8 版本之前offset保存在zookeeper上。
    kafka0.8 版本之后offset保存在kafka集群上。
    
    	它是把消费者在topic的位置通过kafka集群内部有一个默认的topic,
    	名称叫_consumer_offsets,他默认有50各分区
    
  • ISR机制

    	光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。
    
    	ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。
    
2. kafka集群安装部署
  • 1、下载安装包(http://kafka.apache.org)

    kafka_2.11-1.0.1.tgz
    
  • 2、规划安装目录

    /kkb/install
    
  • 3、上传安装包到服务器中

    通过FTP工具上传安装包到node01服务器上
    
  • 4、解压安装包到指定规划目录

    tar -zxvf kafka_2.11-1.0.1.tgz -C /kkb/install
    
  • 5、重命名解压目录

    mv kafka_2.11-1.0.1 kafka
    
  • 6、修改配置文件

    • 在node01上修改

      • 进入到kafka安装目录下有一个config目录

        • vi server.properties

          #指定kafka对应的broker id ,唯一
          broker.id=0
          #指定数据存放的目录
          log.dirs=/kkb/install/kafka/kafka-logs
          #指定zk地址
          zookeeper.connect=node01:2181,node02:2181,node03:2181
          #指定是否可以删除topic ,默认是false 表示不可以删除
          delete.topic.enable=true
          #指定broker主机名
          host.name=node01
          
      • 配置kafka环境变量

        • vi /etc/profile

          export KAFKA_HOME=/kkb/install/kafka
          export PATH=$PATH:$KAFKA_HOME/bin
          
  • 6、分发kafka安装目录到其他节点

    scp -r kafka node02:/kkb/install
    scp -r kafka node03:/kkb/install
    scp /etc/profile node02:/etc
    scp /etc/profile node03:/etc
    
  • 7、修改node02和node03上的配置

    • node02

      • vi server.properties

        #指定kafka对应的broker id ,唯一
        broker.id=1
        #指定数据存放的目录
        log.dirs=/kkb/install/kafka/kafka-logs
        #指定zk地址
        zookeeper.connect=node01:2181,node02:2181,node03:2181
        #指定是否可以删除topic ,默认是false 表示不可以删除
        delete.topic.enable=true
        #指定broker主机名
        host.name=node02
        
    • node03

      • vi server.properties

        #指定kafka对应的broker id ,唯一
        broker.id=2
        #指定数据存放的目录
        log.dirs=/kkb/install/kafka/kafka-logs
        #指定zk地址
        zookeeper.connect=node01:2181,node02:2181,node03:2181
        #指定是否可以删除topic ,默认是false 表示不可以删除
        delete.topic.enable=true
        #指定broker主机名
        host.name=node03
        
3. kafka集群启动和停止 3.1 启动
  • 先启动zk集群

  • 然后在所有节点执行脚本

    nohup kafka-server-start.sh /kkb/install/kafka/config/server.properties >/dev/null 2>&1 &
    
  • 一键启动kafka

    • start_kafka.sh

      #!/bin/sh
      for host in node01 node02 node03
      do
              ssh $host "source /etc/profile;nohup kafka-server-start.sh /kkb/install/kafka/config/server.properties >/dev/null 2>&1 &" 
              echo "$host kafka is running"
      
      done
      
3.2 停止
  • 所有节点执行关闭kafka脚本

    kafka-server-stop.sh
    
  • 一键停止kafka

    • stop_kafka.sh

      #!/bin/sh
      for host in node01 node02 node03
      do
        ssh $host "source /etc/profile;nohup /kkb/install/kafka/bin/kafka-server-stop.sh &" 
        echo "$host kafka is stopping"
      done
      
3.3 一键启动和停止脚本
  • kafkaCluster.sh

    #!/bin/sh
    case $1 in 
    "start"){
    for host in node01 node02 node03 
    do
      ssh $host "source /etc/profile; nohup /kkb/install/kafka/bin/kafka-server-start.sh /kkb/install/kafka/config/server.properties > /dev/null 2>&1 &"   
      echo "$host kafka is running..."  
    done
    };;
    
    "stop"){
    for host in node01 node02 node03 
    do
      ssh $host "source /etc/profile; nohup /kkb/install/kafka/bin/kafka-server-stop.sh >/dev/null  2>&1 &"   
      echo "$host kafka is stopping..."  
    done
    };;
    esac
    
  • 启动

    sh kafkaCluster.sh start
    
  • 停止

    sh kafkaCluster.sh stop
    
4. kafka的命令行的管理使用
  • 1、创建topic

    • kafka-topics.sh

      kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
      
  • 2、查询所有的topic

    • kafka-topics.sh

      kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 
      
  • 3、查看topic的描述信息

    • kafka-topics.sh

      kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  
      
  • 4、删除topic

    • kafka-topics.sh

      kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 
      
  • 5、模拟生产者写入数据到topic中

    • kafka-console-producer.sh

      kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 
      
  • 6、模拟消费者拉取topic中的数据

    • kafka-console-consumer.sh

      kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
      
5. kafka的生产者和消费者api代码开发 5.1 生产者代码开发
  • 创建maven工程引入依赖

            
                org.apache.kafka
                kafka-clients
                1.0.1
            
    
  • 代码开发

    package com.kaikeba.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    //todo:需求:开发kafka生产者代码
    public class KafkaProducerStudy {
        public static void main(String[] args) {
            //准备配置属性
            Properties props = new Properties();
            //kafka集群地址
            props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            //acks它代表消息确认机制
            props.put("acks", "all");
            //重试的次数
            props.put("retries", 0);
            //批处理数据的大小,每次写入多少数据到topic
            props.put("batch.size", 16384);
            //可以延长多久发送数据
            props.put("linger.ms", 1);
            //缓冲区的大小
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer producer = new KafkaProducer(props);
            for (int i = 0; i < 100; i++)
                //这里需要三个参数,第一个:topic的名称,第二个参数:表示消息的key,第三个参数:消息具体内容
                producer.send(new ProducerRecord("test", Integer.toString(i), "hello-kafka-"+i));
    
            producer.close();
        }
    }
    
5.2 消费者代码开发
  • 自动提交偏移量代码开发

    package com.kaikeba.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    //todo:需求:开发kafka消费者代码(自动提交偏移量)
    public class KafkaConsumerStudy {
        public static void main(String[] args) {
            //准备配置属性
            Properties props = new Properties();
            //kafka集群地址
            props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            //消费者组id
            props.put("group.id", "test");
            //自动提交偏移量
            props.put("enable.auto.commit", "true");
            //自动提交偏移量的时间间隔
            props.put("auto.commit.interval.ms", "1000");
            //默认是latest
            //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
            props.put("auto.offset.reset","earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer consumer = new KafkaConsumer(props);
            //指定消费哪些topic
            consumer.subscribe(Arrays.asList("test"));
            while (true) {
                //指定每个多久拉取一次数据
                ConsumerRecords records = consumer.poll(100);
                for (ConsumerRecord record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
    
    
  • 手动提交偏移量代码开发

    package com.kaikeba.consumer;
    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    //todo:需求:开发kafka消费者代码(手动提交偏移量)
    public class KafkaConsumerControllerOffset {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            props.put("group.id", "controllerOffset");
            //关闭自动提交,改为手动提交偏移量
            props.put("enable.auto.commit", "false");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer consumer = new KafkaConsumer(props);
            //指定消费者要消费的topic
            consumer.subscribe(Arrays.asList("test"));
    
            //定义一个数字,表示消息达到多少后手动提交偏移量
            final int minBatchSize = 20;
    
            //定义一个数组,缓冲一批数据
            List> buffer = new ArrayList>();
            while (true) {
                ConsumerRecords records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    //insertIntoDb(buffer);  拿到数据之后,进行消费
                    System.out.println("缓冲区的数据条数:"+buffer.size());
                    System.out.println("我已经处理完这一批数据了...");
                    consumer.commitSync();
                    buffer.clear();
                }
            }
        }
    }
    
    
1. kafka分区策略
kafka的分区策略决定了producer生产者产生的一条消息最后会写入到topic的哪一个分区中
  • 1、指定具体的分区号
//1、给定具体的分区号,数据就会写入到指定的分区中
producer.send(new ProducerRecord("test", 0,Integer.toString(i), "hello-kafka-"+i));

  • 2、不给定具体的分区号,给定key的值(key不断变化)
//2、不给定具体的分区号,给定一个key值, 这里使用key的 hashcode%分区数=分区号
producer.send(new ProducerRecord("test", Integer.toString(i), "hello-kafka-"+i));
  • 3、不给定具体的分区号,也不给对应的key
//3、不给定具体的分区号,也不给定对应的key ,这个它会进行轮训的方式把数据写入到不同分区中
producer.send(new ProducerRecord("test", "hello-kafka-"+i));
  • 4、自定义分区

    • 定义一个类实现接口Partitioner
    package com.kaikeba.partitioner;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    //todo:需求:自定义kafka的分区函数
    public class MyPartitioner implements Partitioner{
        
        public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
            //获取topic分区数
            int partitions = cluster.partitionsForTopic(topic).size();
            
            //key.hashCode()可能会出现负数 -1 -2 0 1 2
            //Math.abs 取绝对值
            return Math.abs(key.hashCode()% partitions);
    
        }
    
        public void close() {
            
        }
    
        public void configure(Map map) {
    
        }
    }
    
    
    • 配置自定义分区类
    //在Properties对象中添加自定义分区类
    props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner");
    
2. kafka的文件存储机制 2.1 概述
	同一个topic下有多个不同的partition,每个partition为一个目录,partition命名的规则是topic的名称加上一个序号,序号从0开始。

	每一个partition目录下的文件被平均切割成大小相等(默认一个文件是1G,可以手动去设置)的数据文件,每一个数据文件都被称为一个段(segment file),但每个段消息数量不一定相等,这种特性能够使得老的segment可以被快速清除。默认保留7天的数据。
	每次满1G后,在写入到一个新的文件中。

	另外每个partition只需要支持顺序读写就可以。如上图所示:
首先00000000000000000000.log是最早产生的文件,该文件达到1G后又产生了新的00000000000002025849.log文件,新的数据会写入到这个新的文件里面。
	这个文件到达1G后,数据又会写入到下一个文件中。也就是说它只会往文件的末尾追加数据,这就是顺序写的过程,生产者只会对每一个partition做数据的追加(写操作)。
2.2 数据消费问题讨论
问题:如何保证消息消费的有序性呢?比如说生产者生产了0到100个商品,那么消费者在消费的时候按照0到100这个从小到大的顺序消费?

*** 那么kafka如何保证这种有序性呢?***
难度就在于,生产者生产出0到100这100条数据之后,通过一定的分组策略存储到broker的partition中的时候,
比如0到10这10条消息被存到了这个partition中,10到20这10条消息被存到了那个partition中,这样的话,消息在分组存到partition中的时候就已经被分组策略搞得无序了。

那么能否做到消费者在消费消息的时候全局有序呢?
遇到这个问题,我们可以回答,在大多数情况下是做不到全局有序的。但在某些情况下是可以做到的。比如我的partition只有一个,这种情况下是可以全局有序的。

那么可能有人又要问了,只有一个partition的话,哪里来的分布式呢?哪里来的负载均衡呢?
所以说,全局有序是一个伪命题!全局有序根本没有办法在kafka要实现的大数据的场景来做到。但是我们只能保证当前这个partition内部消息消费的有序性。

结论:一个partition中的数据是有序的吗?回答:间隔有序,不连续。

针对一个topic里面的数据,只能做到partition内部有序,不能做到全局有序。特别是加入消费者的场景后,如何保证消费者的消费的消息的全局有序性,
这是一个伪命题,只有在一种情况下才能保证消费的消息的全局有序性,那就是只有一个partition。
2.3 Segment文件
  • Segment file是什么
	生产者生产的消息按照一定的分区策略被发送到topic中partition中,partition在磁盘上就是一个目录,该目录名是topic的名称加上一个序号,在这个partition目录下,有两类文件,一类是以log为后缀的文件,一类是以index为后缀的文件,每一个log文件和一个index文件相对应,这一对文件就是一个segment file,也就是一个段。
	其中的log文件就是数据文件,里面存放的就是消息,而index文件是索引文件,索引文件记录了元数据信息。log文件达到1个G后滚动重新生成新的log文件
  • Segment文件特点
    segment文件命名的规则:partition全局的第一个segment从0(20个0)开始,后续的每一个segment文件名是上一个segment文件中最后一条消息的offset值。

那么这样命令有什么好处呢?
假如我们有一个消费者已经消费到了368776(offset值为368776),那么现在我们要继续消费的话,怎么做呢?

看下图,分2个步骤;
	第1步是从所有文件log文件的的文件名中找到对应的log文件,第368776条数据位于上图中的“00000000000000368769.log”这个文件中,
这一步涉及到一个常用的算法叫做“二分查找法”(假如我现在给你一个offset值让你去找,你首先是将所有的log的文件名进行排序,然后通过二分查找法进行查找,
很快就能定位到某一个文件,紧接着拿着这个offset值到其索引文件中找这条数据究竟存在哪里);
第2步是到index文件中去找第368776条数据所在的位置。

索引文件(index文件)中存储这大量的元数据,而数据文件(log文件)中存储这大量的消息。

索引文件(index文件)中的元数据指向对应的数据文件(log文件)中消息的物理偏移地址。
2.4 kafka如何快速查询数据

	上图的左半部分是索引文件,里面存储的是一对一对的key-value,其中key是消息在数据文件(对应的log文件)中的编号,比如“1,3,6,8……”,
	分别表示在log文件中的第1条消息、第3条消息、第6条消息、第8条消息……,那么为什么在index文件中这些编号不是连续的呢?
	这是因为index文件中并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。
	这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

	其中以索引文件中元数据8,1686为例,其中8代表在右边log数据文件中从上到下第8个消息(在全局partiton表示第368777个消息),其中1686表示该消息的物理偏移地址(位置)为1686。
	
	要是读取offset=368777的消息,从00000000000000368769.log文件中的1325的位置进行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了?
	

参数说明:

关键字解释说明
8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message sizemessage大小
4 byte CRC32用crc32校验message
1 byte “magic"表示本次发布Kafka服务程序协议版本号
1 byte “attributes"表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length表示key的长度,当key为-1时,K byte key字段不填
K byte key可选
value bytes payload表示实际消息数据。
	这个就需要涉及到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
2.5 kafka高效文件存储设计特点
  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
3. 为什么Kafka速度那么快
	Kafka是大数据领域无处不在的消息中间件,目前广泛使用在企业内部的实时数据管道,并帮助企业构建自己的流计算应用程序。
	Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得我们一探究竟。
3.1 顺序读写
  • 磁盘顺序读写性能要高于内存的随机读写
	众所周知Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
	磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升
3.2 Page Cache
	为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:

(1)避免Object消耗:如果是使用Java堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
(2)避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。
3.3 零拷贝(sendfile)
  • 零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。

     Kafka利用linux操作系统的 "零拷贝(zero-copy)" 机制在消费端做的优化。
    
    • 首先来了解下数据从文件发送到socket网络连接中的常规传输路径

      比如:读取文件,再用socket发送出去
      传统方式实现:
      先读取、再发送,实际经过1~4四次copy。
      buffer = File.read 
      Socket.send(buffer)
      
      • 第一步:操作系统从磁盘读取数据到内核空间(kernel space)的Page Cache缓冲区
      • 第二步:应用程序读取内核缓冲区的数据copy到用户空间(user space)的缓冲区
      • 第三步:应用程序将用户空间缓冲区的数据copy回内核空间到socket缓冲区
      • 第四步:操作系统将数据从socket缓冲区copy到网卡,由网卡进行网络传输

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。

重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。

显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。

这种场景:是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。试想,如果读取磁盘的数据需要用程序进一步处理的话,必须要经过第二次和第三次数据copy,让应用程序在内存缓冲区处理。

	此时我们会发现用户态“空空如也”。数据没有来到用户态,而是直接在核心态就进行了传输,但这样依然还是有多次复制。首先数据被读取到read buffer中,然后发到socket buffer,最后才发到网卡。虽然减少了用户态和核心态的切换,但依然存在多次数据复制。

如果可以进一步减少数据复制的次数,甚至没有数据复制是不是就会做到最快呢?
  • DMA

    • DMA,全称叫Direct Memory Access,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。如下图:

    • DMA,则可以绕过CPU,硬件自己去直接访问系统主内存。如下图

    • 回到本文中的文件传输,有了DMA后,就可以实现绝对的零拷贝了,因为网卡是直接去访问系统主内存的。如下图:

  • 总结

    	Kafka采用顺序读写、Page Cache、零拷贝以及分区分段等这些设计,再加上在索引方面做的优化,另外Kafka数据读写也是批量的而不是单条的,使得Kafka具有了高性能、高吞吐、低延时的特点。这样Kafka提供大容量的磁盘存储也变成了一种优点
    
    Java的NIO提供了FileChannle,它的transferTo、transferFrom方法就是Zero Copy。
    
4. kafka整合flume
  • 1、安装flume

  • 2、添加flume的配置

    • vi flume-kafka.conf
    #为我们的source channel  sink起名
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #指定我们的source数据收集策略
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /kkb/install/flumeData/files
    a1.sources.r1.inputCharset = utf-8
    
    #指定我们的source收集到的数据发送到哪个管道
    a1.sources.r1.channels = c1
    
    #指定我们的channel为memory,即表示所有的数据都装进memory当中
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
    #指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = kaikeba
    a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    
  • 3、创建topic

    kafka-topics.sh --create --topic kaikeba --partitions 3 --replication-factor 2  --zookeeper node01:2181,node02:2181,node03:2181
    
  • 4、启动flume

    bin/flume-ng agent -n a1 -c myconf -f myconf/flume-kafka.conf -Dflume.root.logger=info,console
    
  • 5、启动kafka控制台消费者,验证数据写入成功

    kafka-console-consumer.sh --topic kaikeba --bootstrap-server node01:9092,node02:9092,node03:9092  --from-beginning
    
5. kafka监控工具安装和使用 5.1. Kafka Manager
kafkaManager它是由雅虎开源的可以监控整个kafka集群相关信息的一个工具。
(1)可以管理几个不同的集群
(2)监控集群的状态(topics, brokers, 副本分布, 分区分布)
(3)创建topic、修改topic相关配置
  • 1、上传安装包

    kafka-manager-1.3.0.4.zip
    
  • 2、解压安装包

    • unzip kafka-manager-1.3.0.4.zip -d /kkb/install
  • 3、修改配置文件

    • 进入到conf

      • vim application.conf

        #修改kafka-manager.zkhosts的值,指定kafka集群地址
        kafka-manager.zkhosts="node01:2181,node02:2181,node03:2181"
        
  • 4、启动kafka-manager

    • 启动zk集群,kafka集群,再使用root用户启动kafka-manager服务。
    • bin/kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口
    • -Dconfig.file=conf/application.conf指定配置文件
    nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &
    
    nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 -Dpidfile.path=kafkamanager.pid  &
    
  • 5、访问地址

    • kafka-manager所在的主机名:8080

5.2. KafkaOffsetMonitor
该监控是基于一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全

(1)消费者组列表
(2)查看topic的历史消费信息.
(3)每个topic的所有parition列表(topic,pid,offset,logSize,lag,owner)
(4)对consumer消费情况进行监控,并能列出每个consumer offset,滞后数据。
  • 1、下载安装包

    KafkaOffsetMonitor-assembly-0.2.0.jar
    
  • 2、在服务器上新建一个目录kafka_moitor,把jar包上传到该目录中

  • 3、在kafka_moitor目录下新建一个脚本

    • vim start_kafka_web.sh
    #!/bin/sh
    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk node01:2181,node02:2181,node03:2181 --port 8089 --refresh 10.seconds --retain 1.days
    
  • 4、启动脚本

    nohup sh start_kafka_web.sh &
    
  • 5、访问地址

    在浏览器中即可使用ip:8089访问kafka的监控页面。
    

5.3. Kafka Eagle
  • 1、下载Kafka Eagle安装包

    • http://download.smartloli.org/
      • kafka-eagle-bin-1.2.3.tar.gz
  • 2、解压

    • tar -zxvf kafka-eagle-bin-1.2.3.tar.gz -C /kkb/install
    • 解压之后进入到kafka-eagle-bin-1.2.3目录中
      • 得到kafka-eagle-web-1.2.3-bin.tar.gz
      • 然后解压 tar -zxvf kafka-eagle-web-1.2.3-bin.tar.gz
      • 重命名 mv kafka-eagle-web-1.2.3 kafka-eagle-web
  • 3、修改配置文件

    • 进入到conf目录

      • 修改system-config.properties

        # 填上你的kafka集群信息
        kafka.eagle.zk.cluster.alias=cluster1
        cluster1.zk.list=node01:2181,node02:2181,node03:2181
        
        # kafka eagle页面访问端口
        kafka.eagle.webui.port=8048
        
        # kafka sasl authenticate
        kafka.eagle.sasl.enable=false
        kafka.eagle.sasl.protocol=SASL_PLAINTEXT
        kafka.eagle.sasl.mechanism=PLAIN
        kafka.eagle.sasl.client=/kkb/install/kafka-eagle-bin-1.2.3/kafka-eagle-web/conf/kafka_client_jaas.conf
        
        #  添加刚刚导入的ke数据库配置,我这里使用的是mysql
        kafka.eagle.driver=com.mysql.jdbc.Driver
        kafka.eagle.url=jdbc:mysql://node03:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
        kafka.eagle.username=root
        kafka.eagle.password=123456
        
  • 4、配置环境变量

    • vi /etc/profile

      export KE_HOME=/kkb/install/kafka-eagle-bin-1.2.3/kafka-eagle-web
      export PATH=$PATH:$KE_HOME/bin
      
  • 5、启动kafka-eagle

    • 进入到$KE_HOME/bin目录
      • 执行脚本sh ke.sh start
  • 6、访问地址

    • 启动成功后在浏览器中输入http://node01:8048/ke就可以访问kafka eagle 了。

      • 用户名:admin
      • password:123456
      • 登录首页

      • 仪表盘信息

      • kafka集群信息

      • zookeeper集群

      • topic信息

      • consumer消费者信息

      • zk客户端命令

1. kafka内核原理 1.1 ISR机制
	光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?
	不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。

	ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。

	如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺
1.2 HW&LEO原理
  • LEO

    last end offset,日志末端偏移量,标识当前日志文件中下一条待写入的消息的offset。举一个例子,若LEO=10,那么表示在该副本日志上已经保存了10条消息,位移范围是[0,9]。
    
  • HW

    	Highwatermark,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。任何一个副本对象的HW值一定不大于其LEO值。
    	小于或等于HW值的所有消息被认为是“已提交的”或“已备份的”。HW它的作用主要是用来判断副本的备份进度.
    	
    	下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。
    

    leader持有的HW即为分区的HW,同时leader所在broker还保存了所有follower副本的leo
    
    (1)关系:leader的leo >= follower的leo >= leader保存的follower的leo >= leader的hw >= follower的hw
    (2)原理:上面关系反应出各个值的更新逻辑的先后
    
    • 更新LEO的机制

      • 注意
        • follower副本的LEO保存在2个地方
      (1)follower副本所在的broker缓存里。
      (2)leader所在broker的缓存里,也就是leader所在broker的缓存上保存了该分区所有副本的LEO。
      
      • 更新LEO的时机

        • follower更新LEO
        (1)follower的leo更新时间
        	每当follower副本写入一条消息时,leo值会被更新
        	
        (2)leader端的follower副本的leo更新时间
        	当follower从leader处fetch消息时,leader获取follower的fetch请求中offset参数,更新保存在leader端follower的leo。
        
        • leader更新LEO
        (1)leader本身的leo的更新时间:leader向log写消息时
        
    • 更新HW的机制

      • follower更新HW

        follower更新HW发生在其更新完LEO后,即follower向log写完数据,它就会尝试更新HW值。具体算法就是比较当前LEO(已更新)与fetch响应中leader的HW值,取两者的小者作为新的HW值。
        
      • leader更新HW

        • leader更新HW的时机
        (1)producer 向 leader 写消息时
        (2)leader 处理 follower 的 fetch 请求时
        (3)某副本成为leader时
        (4)broker 崩溃导致副本被踢出ISR时
        
        • leader更新HW的方式
          当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值。
          这里的满足条件主要是指副本要满足以下两个条件之一:
          (1)处于ISR中
          (2)副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认值是10秒)
        

2. producer消息发送原理 2.1 producer核心流程概览

  • 1、ProducerInterceptors是一个拦截器,对发送的数据进行拦截

    ps:说实话这个功能其实没啥用,我们即使真的要过滤,拦截一些消息,也不考虑使用它,我们直接发送数据之前自己用代码过滤即可
    
  • 2、Serializer 对消息的key和value进行序列化

  • 3、通过使用分区器作用在每一条消息上,实现数据分发进行入到topic不同的分区中

  • 4、RecordAccumulator收集消息,实现批量发送

    它是一个缓冲区,可以缓存一批数据,把topic的每一个分区数据存在一个队列中,然后封装消息成一个一个的batch批次,最后实现数据分批次批量发送。
    
  • 5、Sender线程从RecordAccumulator获取消息

  • 6、构建ClientRequest对象

  • 7、将ClientRequest交给 NetWorkClient准备发送

  • 8、NetWorkClient 将请求放入到KafkaChannel的缓存

  • 9、发送请求到kafka集群

  • 10、调用回调函数,接受到响应

3. producer核心参数
  • 回顾之前的producer生产者代码
package com.kaikeba.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;


public class KafkaProducerStudyDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //准备配置属性
        Properties props = new Properties();
        //kafka集群地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //acks它代表消息确认机制   // 1 0 -1 all
        props.put("acks", "all");
        //重试的次数
        props.put("retries", 0);
        //批处理数据的大小,每次写入多少数据到topic
        props.put("batch.size", 16384);
        //可以延长多久发送数据
        props.put("linger.ms", 1);
        //缓冲区的大小
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //添加自定义分区函数
        props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner");

        Producer producer = new KafkaProducer(props);
        for (int i = 0; i < 100; i++) {

            // 这是异步发送的模式
            producer.send(new ProducerRecord("test", Integer.toString(i), "hello-kafka-"+i), new Callback() {
                public void onCompletion(Recordmetadata metadata, Exception exception) {
                    if(exception == null) {
                        // 消息发送成功
                        System.out.println("消息发送成功");
                    } else {
                        // 消息发送失败,需要重新发送
                    }
                }

            });

            // 这是同步发送的模式
            //producer.send(record).get();
            // 你要一直等待人家后续一系列的步骤都做完,发送消息之后
            // 有了消息的回应返回给你,你这个方法才会退出来
        }
        producer.close();
    }

}
3.1 常见异常处理
  • 不管是异步还是同步,都可能让你处理异常,常见的异常如下:

    1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException
    
    2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可
    
    3)NetworkException:网络异常,重试即可
    我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了。
    
  • retries

    • 重新发送数据的次数
  • retry.backoff.ms

    • 两次重试之间的时间间隔
3.2 提升消息吞吐量
  • buffer.memory

    • 设置发送消息的缓冲区,默认值是33554432,就是32MB
    如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
    
  • compression.type

    • producer用于压缩数据的压缩类型。默认是none表示无压缩。可以指定gzip、snappy
    • 压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
  • batch.size

    • producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。
    • 默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去
    如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。
    
  • linger.ms

    • 这个值默认是0,就是消息必须立即被发送
    	一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kB,自然就会发送出去。
    	但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
    
3.3 请求超时
  • max.request.size
    • 这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb
    • 这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M)
  • request.timeout.ms
    • 这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒
    • 如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理
3.4 ACK参数

acks参数,其实是控制发送出去的消息的持久化机制的。

  • acks=0

    • 生产者只管发数据,不管消息是否写入成功到broker中,数据丢失的风险最高
    	producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了。
    你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出去,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。
    
  • acks=1

    • 只要leader写入成功,就认为消息成功了.
    	默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader.
    
  • acks=all,或者 acks=-1

    • 这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知.
    这种方式数据最安全,但是性能最差。
    
  • 如果要想保证数据不丢失,得如下设置

    (1)min.insync.replicas = 2
    	ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了。
    
    (2)acks = -1
    	每次写成功一定是leader和follower都成功才可以算做成功,这样leader挂了,follower上是一定有这条数据,不会丢失。
    	
    (3)retries = Integer.MAX_VALUE
    	无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试。
    	除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
    
3.5 重试乱序
  • max.in.flight.requests.per.connection
    • 每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息
4. broker核心参数
  • server.properties配置文件核心参数

    【broker.id】
    每个broker都必须自己设置的一个唯一id
    
    【log.dirs】
    这个极为重要,kafka的所有数据就是写入这个目录下的磁盘文件中的,如果说机器上有多块物理硬盘,那么可以把多个目录挂载到不同的物理硬盘上,然后这里可以设置多个目录,这样kafka可以数据分散到多块物理硬盘,多个硬盘的磁头可以并行写,这样可以提升吞吐量。
    
    【zookeeper.connect】
    连接kafka底层的zookeeper集群的
    
    【Listeners】
    broker监听客户端发起请求的端口号,默认是9092
    
    【unclean.leader.election.enable】
    默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader
    
    【delete.topic.enable】
    默认true,允许删除topic
    
    【log.retention.hours】
    可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了
    
5. consumer消费原理 5.1 Offset管理

​ 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 __consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。

5.2 Coordinator
  • Coordinator的作用

    	每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance.
    	根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的.
    	consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
    
  • 如何选择哪台是coordinator

    	首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。
    	比如说:groupId,"membership-consumer-group" -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区,consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。
    

6. consumer消费者Rebalance策略
比如我们消费的一个topic主题有12个分区:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假设我们的消费者组里面有三个消费者。
6.1 range策略
range策略就是按照partiton的序号范围
	p0~3             consumer1
	p4~7             consumer2
	p8~11            consumer3
默认就是这个策略
6.2 round-robin策略
consumer1:	0,3,6,9
consumer2:	1,4,7,10
consumer3:	2,5,8,11

但是前面的这两个方案有个问题:
	假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3
	这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上

6.3 sticky策略
	最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer
的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

consumer1: 0-3
consumer2:  4-7
consumer3:  8-11 

假设consumer3挂了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11

7. consumer核心参数
【heartbeat.interval.ms】
默认值:3000
consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

【session.timeout.ms】
默认值:10000	
kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

【max.poll.interval.ms】
默认值:300000
如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

【fetch.max.bytes】
默认值:1048576
获取一条消息最大的字节数,一般建议设置大一些

【max.poll.records】
默认值:500条
一次poll返回消息的最大条数,

【connections.max.idle.ms】
默认值:540000
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收

【auto.offset.reset】
  earliest
		当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费		  
	latest
		当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
	none
		topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
注:我们生产里面一般设置的是latest

【enable.auto.commit】
默认值:true
设置为自动提交offset

【auto.commit.interval.ms】
默认值:60 * 1000
每隔多久更新一下偏移量

官网查看kafka参数http://kafka.apache.org/10/documentation.html

五、优秀书籍推荐

kafka知识要点 4.1 如何提升生产者的吞吐量?

1)buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

	  Long startTime=System.currentTime();
	  producer.send(record, new Callback() {
	  @Override
		public void onCompletion(Recordmetadata metadata, Exception exception) {
			if(exception == null) {
				// 消息发送成功
				System.out.println("消息发送成功");  
			} else {
				// 消息发送失败,需要重新发送
			}
		}
	});
    Long endTime=System.currentTime();
    If(endTime - startTime > 100){//说明内存被压满了
     说明有问题
     }		

2)compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。
3)batch.size,设置meigebatch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下。
4)linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

4.2 如何保证Kafka内部数据不丢失?

如果要回答这个问题的话,要从三个角度去回答:Producer,consumer,broker。

producer

acks参数:
acks = 0
	生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,导致broker端没有收到消息,消息也就丢失了。实际上它只是把消息发送到了socketBuffer(缓存)中,而socketBuffer什么时候被提交到broker端并不关心,它不担保broker端是否收到了消息,但是这样的配置对retry是不起作用的,因为producer端都不知道是否发生了错误,而且对于offset的获取永远都是-1,因为broker端可能还没有开始写数据。这样不保险的操作为什么还有这样的配置?kafka对于收集海量数据,如果在收集某一项日志时是允许数据量有一定丢失的话,是可以用这种配置来收集日志。
acks = 1(默认值)
	生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。其实就是消息只发给了leader leader收到消息后会返回ack到producer端。如果消息无法写入leader时(选举、宕机等情况时),生产都会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息,如果消息成功写入,在被其它副本同步数据时leader  崩溃,那么此条数据还是会丢失,因为新选举的leader是没有收到这条消息,ack设置为1是消息可靠性和吞吐量折中的方案。
acks = all (或-1)
	生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow 同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。

retry参数:
在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。
  可恢复性的错误:
      如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的,也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复,数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔,以确保在重试时可恢复性错误都已恢复。
  不可恢复性的错误:
      如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理,那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等,把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端。
          
配置方案
1.高可用型
  配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)
  优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。
  缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长
  
2.折中型
  配置:acks = 1  retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)
  优点:保证了消息的可靠性和吞吐量,是个折中的方案
  缺点:性能处于2者中间
  
3.高吞吐型 
  配置:acks = 0
  优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求
  缺点:不知道发送的消息是 否成功

Consumer

group.id:
	consumer group分组的一个id,消费者隶属的消费组名称。在kafka中只允许消息只能被某个组里面的一个consumer端消费,如果为空,则会报异常。对于一个新的consumer加入到消费时,肯定会隶属于哪个组,只有这样才能消费数据
auto.offset.reset = earliest(最早) /latest(最晚)
	从何处开始进行消费  当一个新加入的consumer要进行消费数据,如果这个consumer是做数据分析工作的,是需要以前的历史数据那就需要从最早的位置消费数据,如果仅仅是查看消费情况,那可以从最晚位置开始消费数据
enable.auto.commit = true/false(默认true) 
	是否开启自动提交消费位移的功能,默认开启.  当设置为true时,意味着由kafka的consumer端自己间隔一定的时间会自动提交offset,如果设置成了fasle,也就是由客户端(自己写代码)来提交,那就还得控制提交的时间间隔auto.commit.interval.msauto.commit.interval.ms
	当enable.auto.commit设置为true时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。

配置方案

在consumer消费阶段,对offset的处理,关系到是否丢失数据,是否重复消费数据,因此,我们把处理好offset就可以做到exactly-once && at-least-once(只消费一次)数据。当enable.auto.commit=true时    表示由kafka的consumer端自动提交offset,当你在pull(拉取)30条数据,在处理到第20条时自动提交了offset,但是在处理21条的时候出现了异常,当你再次pull数据时,由于之前是自动提交的offset,所以是从30条之后开始拉取数据,这也就意味着21-30条的数据发生了丢失。

当enable.auto.commit=false时,由于上面的情况可知自动提交offset时,如果处理数据失败就会发生数据丢失的情况。那我们设置成手动提交。当设置成false时,由于是手动提交的,可以处理一条提交一条,也可以处理一批,提交一批,由于consumer在消费数据时是按一个batch来的,当pull了30条数据时,如果我们处理一条,提交一个offset,这样会严重影响消费的能力,那就需要我们来按一批来处理,或者设置一个累加器,处理一条加1,如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。

Broker

1.replication-factor >=2
在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2
分区ISR队列集合中最少有多少个副本,默认值是1

3.unclean.leander.election.enable = false     
是否允许从ISR队列中选举leader副本,默认值是false不允许,如果设置成true,则可能会造成数据丢失。
4.3 积压了百万消息如何处理?

据我了解,在使用消息队列遇到的问题中,消息积压这个问题,应该是最常遇到的问题了,并且,这个问题还不太好解决。我们都知道,消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。所以,我们先来分析下,在使用消息队列时,如何来优化代码的性能,避免出现消息积压。然后再来看看,如果你的线上系统出现了消息积压,该如何进行紧急处理,最大程度地避免消息积压对业务的影响。

  1. 最大程度避免消息积压

    生产者

    ​ 提升吞吐量

    消费者

    ​ 扩容,扩分区

    ​ 增加consumer

  2. 如何处理消息积压

    日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,我们排查消息积压原因,是有一些相对固定而且比较有效的方法的。能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

4.4 生产者遇到了异常如何处理?
  1. 添加重试功能和重试时间间隔
  2. 对于重试也失败了任务进行特殊处理
4.5 说一下Kafka的HW,LEO的更新机制

4.6 Zookeeper对于Kafka的作用是什么?

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的

注:参考Kafka核心原理的Kafka的集群管理机制

4.7 讲一讲Kafka的ack的三种机制

acks参数,其实是控制发送出去的消息的持久化机制的
1)如果acks=0,那么producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。

2)acks=all,或者acks=-1:这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知

3)acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader

4.8 Kafka如何不重复消费数据
  1. 保存并查询

    给每个消息都设置一个独一无二的key,消费的时候把这些key记录下来,然后每次消费的时候都查询一下,看这个key是否消费过,如果没有消费过才消费。

  2. 幂等

    幂等(Idempotence) 本来是一个数学上的概念,它是这样定义的:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。我们举个例子来说明一下。在不考虑并发的情况下,“将账户 X 的余额设置为 100 元”,执行一次后对系统的影响是,账户 X 的余额变成了 100 元。只要提供的参数 100 元不变,那即使再执行多少次,账户 X 的余额始终都是 100 元,不会变化,这个操作就是一个幂等的操作。再举一个例子,“将账户 X 的余额加 100 元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。从对系统的影响结果来说:At least once + 幂等消费 = Exactly once

    那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。下面我给你介绍几种常用的设计幂等操作的方法:1. 利用数据库的唯一约束实现幂等例如我们刚刚提到的那个不具备幂等特性的转账的例子:将账户 X 的余额加 100 元。在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

    1. 为更新的数据设置前置条件

    为更新的数据设置前置条件另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。比如,刚刚我们说过,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新

4.9 如何保证同一分区一定有序

两种方案:
方案一,kafka topic 只设置一个partition分区
方案二,producer将消息发送到指定partition分区
解析:
方案一:kafka默认保证同一个partition分区内的消息是有序的,则可以设置topic只使用一个分区,这样消息就是全局有序,缺点是只能被consumer group里的一个消费者消费,降低了性能,不适用高并发的情况
方案二:既然kafka默认保证同一个partition分区内的消息是有序的,则producer可以在发送消息时可以指定需要保证顺序的几条消息发送到同一个分区,这样消费者消费时,消息就是有序。

但是个时候还有个问题就是消息重试的时候会让消息顺序打乱,所以还需要设置这个参数:
max.in.flight.requests.per.connection 默认值5,设置为1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/301553.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号