栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Kafka技术栈实用总结版】

【Kafka技术栈实用总结版】

Kafka技术栈实用版
  • Kafka简介
    • 一、Kafka基础
      • 1、kafka的安装部署
      • 2、Kafka的基本概念
      • 3、创建topic
      • 4、发送与消费消息
      • 5、消息的细节
      • 6、查看消费组的详细信息
    • 二、Kafka主题与分区
      • 1、Topic
      • 2、Partition
      • 3、Kafka日志文件
    • 三、Kafka集群
      • 1、搭建Kafka集群(3个broker)
      • 2、副本(Replication)
      • 3、集群消息发送与消费
      • 4、分区消费组的消费者细节
    • 四、Kafka的生产者
      • 1、java方式生产者实现
      • 2、生产者的ack参数
      • 3)消息发送的缓存区
    • 五、Kafka的消费者
      • 1、java方式消费者实现
      • 2、消费者手动与自动提交offset
      • 3、消费者Poll消息过程
      • 4、其他的一些配置
    • 六、Kafka集群机制
      • 1、Controller
      • 2、Reblance机制
    • 七、Kafka-eagle监控平台
      • 1、搭建
      • 八、资源链接

Kafka简介

Kafka是最初由linkedin公司开发,是⼀个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper 协调的分布式消息系统,它的最⼤的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

一、Kafka基础 1、kafka的安装部署

1)JDK环境和Zookeeper环境的搭建。
JDK downloads:https://www.oracle.com/java/technologies/downloads/
Zookeeper downloads:http://zookeeper.apache.org/releases.html
2)下载Kafka的安装包:http://kafka.apache.org/downloads
3)上传到kafka目录上: /usr/local/kafka,解压。
4)进⼊到config⽬录内,修改server.properties

#broker.id属性在kafka集群中必须要是唯⼀ 
broker.id=0 
#kafka部署的机器ip和提供服务的端⼝号 
listeners=PLAINTEXT://x.x.x.x:9092  
#kafka的消息存储⽂件 
log.dir=/usr/local/data/kafka-logs 
#kafka连接zookeeper的地址 
zookeeper.connect=x.x.x.x:2181

5)进⼊到bin⽬录内,执⾏以下命令来启动kafka服务器(带着配置⽂件

./kafka-server-start.sh -daemon ../config/server.properties

6)检测Kafka是否启动成功:
进⼊到zk内查看是否有kafka的节点: /brokers/ids/0

2、Kafka的基本概念
名称解释
Broker消息中间件处理节点(kafka服务器),⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群。
TopicTopic 是逻辑上的概念,Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic。
Producer消息⽣产者,向Broker发送消息的客户端。
Consumer消息消费者,从Broker读取消息的客户端。
3、创建topic

通过kafka命令向zk中创建⼀个主题:

./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 --partitions 1 --topic test

查看当前zk中所有的主题:

./kafka-topics.sh --list --zookeeper x.x.x.x:2181 test

删除Topic:

bin/kafka-topics.sh --zookeeper x.x.x.x:2181 --delete --topic test

注:需要server.properties中设置delete.topic.enable=true否则只是标记删除(如果当前Topic没有使用过,即没有传输过信息,则可以彻底删除。)。

4、发送与消费消息

发送消息:
把消息发送给broker中的某个topic,打开⼀个kafka发送消息的客户端,然后开始⽤客户端向
kafka服务器发送消息。

./kafka-console-producer.sh --bootstrap-server x.x.x.x:9092 --topic test

./kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic test

–bootstrap-server:指定了连接kafka集群的地址。
–broker-list:指定了连接kafka集群的地址。

消费消息:
打开⼀个消费消息的客户端,向kafka服务器的某个主题消费消息。
⽅式⼀:从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费。

./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --topic test

⽅式⼆:从当前主题中的第⼀条消息开始消费。

./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --from-beginning --topic test
5、消息的细节

1)生产者将消息发送给broker,broker会将消息保存在本地的日志文件中。
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
2)消息的保存是有序的,通过offset偏移量来描述消息的有序性。
3)消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置。

6、查看消费组的详细信息

通过以下命令可以查看到消费组的相关信息:

./kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --describe --group testGroup


重点关注以下信息:
current-offset:最后被消费的消息的偏移量。
Log-end-offset:消息总量(最后⼀条消息的偏移量)。
Lag:积压了多少条消息。

二、Kafka主题与分区 1、Topic

主题(topic)在kafka中是⼀个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被
订阅该topic的消费者消费。
但是有⼀个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被
保存到log日志文件中的。为了解决这个文件过大的问题,kafka提出了Partition分区的概念。

2、Partition

1)分区的概念
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
a)分区存储,可以解决统⼀存储文件过大的问题(分布式存储)。
b)提供了读写的吞吐量:读和写可以同时在多个分区中进行(可以并行写)。

1.1)分区的策略

  • 轮询分配策略(默认)
    如果在生产消息时,key为null,则使用轮询算法均衡地分配分区。
  • 随机策略(早版本默认,现已不是)
    也是为了均匀的写入到每个分区,但后续轮询策略表现更佳,故很少使用随机策略了。
  • 按Key分配策略

按key分配策略,有可能出现“数据倾斜”。例如,某个key包含了大量的数据,因为key值一样,所以所有的数据将都分配到一个分区中,造成该分区的数量远大于其他的分区。

2)创建多分区的主题

./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 --partitions 2 --topic test1

查看topic的分区信息

./kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic test1

修改分区数量

bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6 

注:修改分区数时,仅能增加分区个数。若是使其减少partition个数,则会报错。

3、Kafka日志文件

1)000000000.log: 这个文件中保存的就是消息。
2)__consumer_offsets-49:
kafka内部自己创建了__consumer_offsets主题包含了50个分区(通过offsets.topic.num.partitions设置)。这个主题用来存放消费者消费某个主题的偏移量(offset)。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题 consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。

  • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数
  • 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值。
    3)文件中保存的消息,默认保存7天。七天到后消息会被删除。
三、Kafka集群 1、搭建Kafka集群(3个broker)

准备3个server.properties文件,每个文件中的这些内容要调整:
server.properties

broker.id=0 
listeners=PLAINTEXT://x.x.x.x:9092 
log.dir=/usr/local/data/kafka-logs

server1.properties

broker.id=1 
listeners=PLAINTEXT://x.x.x.x:9093 
log.dir=/usr/local/data/kafka-logs-1

server2.properties

broker.id=2 
listeners=PLAINTEXT://x.x.x.x:9094 
log.dir=/usr/local/data/kafka-logs-2

然后分别启动三个brokers:

./kafka-server-start.sh -daemon ../config/server.properties

搭建完后通过查看zk中的/brokers/ids 看是否启动成功。

2、副本(Replication)

副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。下面例子:创建1
个主题,2个分区、3个副本。

./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic


通过查看主题信息,其中的关键数据:

  • replicas:
    当前副本存在的broker节点 。
  • leader:副本里的概念
    每个partition都有⼀个broker作为leader。
    消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上面。副本里的leader专门用来接收消息。接收到消息,其他follower通过poll的方式来同步数据。
  • follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker挂掉,那么就会进行新leader的选举,至于怎么选,在之后的controller的概念中介绍。
  • ISR:
    可以同步的broker节点和已同步的broker节点,存放在ISR集合中。

    kafka集群中由多个broker组成。
    ⼀个broker中存放⼀个topic的不同partition——副本。
3、集群消息发送与消费

Kafka集群消息的发送:

./kafka-console-producer.sh --broker-list x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --topic my-replicated-topic

Kafka集群消息的消费:

./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --from-beginning --topic my-relicated-topic

指定消费组来消费消息:

./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
4、分区消费组的消费者细节

  • 图中Kafka集群有两个broker,每个broker中有多个partition。⼀个partition只能被⼀个消费组里的某⼀个消费者消费,从而保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同⼀个topic中的多个partition中保证总的消费顺序性。⼀个消费者可以消费多个partition。
  • 消费组中消费者的数量不能比⼀个topic中的partition数量多,否则多出来的消费者消费不到消息。
  • 如果消费者挂了,那么会触发rebalance机制(后面介绍),会让其他消费者来消费该分区。
四、Kafka的生产者 1、java方式生产者实现

引入依赖

 
 org.apache.kafka 
 kafka-clients 
 2.4.1 
 

生产者发送消息的基本实现:
同步发消息

public class MySimpleProducer { 
 private final static String TOPIC_NAME = "my-replicated-topic"; 
 public static void main(String[] args) throws ExecutionException, 
InterruptedException { 
 //1.设置参数 
 Properties props = new Properties(); 
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094"); 

 //把发送的key从字符串序列化为字节数组 
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()); 
 //把发送消息value从字符串序列化为字节数组 
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()); 

 //2.创建⽣产消息的客户端,传⼊参数 
 Producer producer = new KafkaProducer(props); 
 //3.创建消息 
 //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容 
 //0:代表发送消息到哪个分区上
 ProducerRecord producerRecord = new ProducerRecord<> 
(TOPIC_NAME, 0,"mykeyvalue","hellokafka"); 
 
 //4.发送消息,得到消息发送的元数据并输出 
 Recordmetadata metadata = producer.send(producerRecord).get();
  
 System.out.println("同步⽅式发送消息结果:" + "topic-" +metadata.topic() 
 + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset()); 
 } 
}

异步发消息
生产者发消息,发送完后不用等待broker给回复,直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果。

  producer.send(producerRecord, new Callback() {
        public void onCompletion(Recordmetadata metadata, Exception
        exception) {
         if (exception != null) {
             System.err.println("发送消息失败:"+ exception.getStackTrace());
         }
         if (metadata != null) {
             System.out.println("异步⽅式发送消息结果:" + "topic-"+ metadata
             .topic() + "|partition-"+ metadata.partition() + "|offset-" +
             metadata.offset());
          }
        }
     });
2、生产者的ack参数

在同步发消息的场景下:生产者发到broker上后,ack会有3种不同的选择:
1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下⼀条消息。性能最高,但是最容易丢消息。
2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下⼀条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
3)acks=-1或all: 需要等待min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有⼀个备份存活就不会丢失数据。这是最强的数据保证。⼀般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
代码配置:

props.put(ProducerConfig.ACKS_CONFIG, "1");

props.put(ProducerConfig.RETRIES_CONFIG, 3); 
//重试间隔设置 
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

3)消息发送的缓存区

kafka默认会创建⼀个消息缓冲区,用来存放要发送的消息,缓冲区是32m。

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker。

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker。

props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
五、Kafka的消费者 1、java方式消费者实现
public class MySimpleConsumer {
  private final static String TOPIC_NAME = "my-replicated-topic";
  private final static String CONSUMER_GROUP_NAME = "testGroup";
  public static void main(String[] args) {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
    "x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094");
     // 消费分组名
     props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
     StringDeserializer.class.getName());
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());
     
     //1.创建⼀个消费者的客户端
     KafkaConsumer consumer = new KafkaConsumer(props);
     //2. 消费者订阅主题列表
     consumer.subscribe(Arrays.asList(TOPIC_NAME));
     while (true) {
        
        ConsumerRecords records =
        consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord record : records) {
        //4.打印消息
            System.out.printf("收到消息:partition = %d,offset = %d,
            key =%s, value = %s%n", record.partition(),record.offset(),
            record.key(), record.value());
        }
    }
 }
}
2、消费者手动与自动提交offset

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
1)自动提交offset
消费者poll到消息后默认情况下,会⾃动向broker的_consumer_offsets主题提交当前主题-分
区消费的偏移量。
设置自动提交参数 - 默认

// 是否自动提交offset,默认就是true 
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); 
// 自动提交offset的间隔时间 
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此
时消费者挂了,于是下⼀个消费者会从已提交的offset的下⼀个位置开始消费消息。之前未被
消费的消息就丢失掉了。
2)手动提交offset
需要把⾃动提交的配置改成false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交分手动同步提交和手动异步提交:

  • 手动同步提交:
    在消费完消息后调用同步提交的方法,当集群返回ack前⼀直阻塞,返回ack后表示提交成功,执行之后的逻辑。
if (records.count() > 0) { 
 // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功 
 // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了 
 consumer.commitSync(); 
}
  • 手动异步提交:
    在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用。
if (records.count() > 0) { 
    // ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑 
    consumer.commitAsync(new OffsetCommitCallback() { 
    @Override 
    public void onComplete(Map 
 offsets, Exception exception) { 
    if (exception != null) { 
        System.err.println("Commit failed for " + offsets); 
        System.err.println("Commit failed exception: " + 
        exception.getStackTrace()); 
     } 
    } 
 }); 
}
3、消费者Poll消息过程

消费者建立了与broker之间的长连接,开始poll消息,默认情况下,消费者⼀次会poll500条消息。

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点可以通过这个值进行设置:

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。

ConsumerRecords records = consumer.poll(Duration.ofMillis(
1000));

消费者发送⼼跳的时间间隔

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); 

kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); 
4、其他的一些配置

1)指定分区消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

2)消息回溯消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); 
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

3)指定offset消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); 
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

4)新消费组的消费offset规则
新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)。

  • latest:默认的,消费新消息
  • earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
六、Kafka集群机制 1、Controller
  • 集群中谁来充当controller
    每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller。
  • controller负责管理整个集群中的所有分区和副本的状态:
    1)当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
    2)当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
    3)当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
2、Reblance机制

前提:消费组中的消费者没有指明分区来消费 。
触发的条件:当消费组中的消费者和分区的关系发生变化的时候 。
分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略 :

  • range:根据公示计算得到每个消费消费哪几个分区:前面的消费者是分区总数/消费
    者数量+1,之后的消费者是分区总数/消费者数量 。

  • 轮询:大家轮着来 。

  • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之
    前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。
    III)HW和LEO
    HW俗称高水位,HighWatermark的缩写,取⼀个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。
    另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
    LEO是某个副本最后消息的消息位置(log-end-offset)。

七、Kafka-eagle监控平台 1、搭建

1)去Kafka-eagle官网下载压缩包
EFAK (kafka-eagle.org)
然后前置环境JDK按照好。
2)放到Linux指定目录中,解压

tar -zxvf kafka-eagle-bin.x.x.x.tar.gz

3)给kafka-eagle配置环境变量

export KE_HOME=/usr/local/kafkaeagle/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

4)修改kafka-eagle里面conf里面system-config.properties文件

5)运行bin目录下的ke.sh

./ke.sh start

运行成功!
6)在浏览器访问10.0.50.245:8048/ke

7)成功进入

基础完,待续…

八、资源链接

zookeeper官网
JDK Download
Kafka官网下载

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583987.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号