栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka系列-3、kafka高级

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka系列-3、kafka高级

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

大数据系列文章目录

kafka的中文网站

目录
  • kafka的分片副本机制
    • 分片机制
    • 副本
  • kafka如何保证数据不丢失
    • 如何保证生产端数据不丢失
    • 如果保证broker端和消费者端数据不丢失
      • broker
      • 消费者端
  • 消息存储及查询机制
  • 生产者数据分发策略
  • 消费者负载均衡机制
  • kafka的监控工具:kafka-eagle
  • Kafka中数据积压问题
  • Kafka配额限速机制
    • 限制producer端的速率
    • 限制consumer端的速率
    • 取消kafka的Quota配置
  • 结束语

kafka的分片副本机制 分片机制

主要解决了单台服务器存储容量有限的问题。
当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片。

副本

副本备份机制解决了数据存储的高可用问题。
当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。

kafka如何保证数据不丢失 如何保证生产端数据不丢失


1、 消息生产分为同步模式和异步模式
2、 消息确认分为三个状态

  • 0:生产者只负责发送数据
  • 1:某个partition的leader收到数据给出响应
  • -1或all:某个partition的所有副本都收到数据后给出响应

3、在同步模式下

  • 生产者等待10S,如果broker没有给出ack响应,就认为失败。
  • 生产者重试3次,如果还没有响应,就报错。

4、在异步模式下

  • 先将数据保存在生产者端的buffer中。Buffer大小是2万条。 32M
  • 满足数据阈值或者时间阈值其中的一个条件就可以发送数据。
  • 发送一批数据的大小是500条。16Kb

如果broker迟迟不给ack,而buffer又满了。开发者可以设置是否直接清空buffer中的数据。

如果保证broker端和消费者端数据不丢失 broker
  • broker端的消息不丢失,其实就是用partition副本机制来保证。
  • Producer ack -1(all). 能够保证所有的副本都同步好了数据。其中一台机器挂了,并不影响数据的完整性。
消费者端

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。

而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于offset的信息写入的时候并不是每条消息消费完成后写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。

消息存储及查询机制


segment段中有两个核心的文件一个是log,一个是index。 当log文件等于1G时,新的会写入到下一个segment 中。通过下图中的数据,可以看到一个segment段差不多会存储70万条数据。

需求: 读取 offset=368776 的message消息数据, 数据集如下

第一步: 确定segment段


第二步: 通过segment file 查找 message

生产者数据分发策略

kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略

  • 策略一:用户指定了partition

生产就不会调用DefaultPartitioner.partition()方法,数据分发策略的时候,可以指定数据发往哪个partition。当ProducerRecord的构造参数中有partition的时候,就可以发送到对应partition上

  • 策略二:用户发生数据的时候指定了key没有指定partition ,采用hash算法

注意: 如果key一直不变,同一个key算出来的hash值是个固定值。如果是固定值,这种hash取模就没有意义。Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

- 策略三: 当用户既没有指定partition也没有key。采用粘性的划分策略(Sticky Partitioning Strategy)方案(2.4以上版本新特性,老版本为轮询)

Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
原因:
kafka在发送消息的时候, 采用批处理方案, 当达到一批后进行分送, 但是如果一批数据中有不同分区的数据, 就无法放置到一个批处理中, 而老版本中轮询方案, 就会导致一批数据被分到多个小的批次中,从而影响效率, 故在新版本中, 采用这种粘性的划分策略

消费者负载均衡机制


一个partition只能被一个组中的成员消费。
所以如果消费组中有多于partition数量的消费者,那么一定会有消费者无法消费数据。

kafka的topic中一条消息只能被一个消费者组中一个消费者所消费,但是可以被不同消费者组的一个消费者所消费。

kafka的监控工具:kafka-eagle

在开发工作中,当业务前提不复杂时,可以使用Kafka命令来进行一些集群的管理工作。但如果业务变得复杂,例如:我们需要增加group、topic分区,此时,我们再使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助我们完成日常的管理工作,将会大大提高对于Kafka集群管理的效率,而且我们使用工具来监控消费者在Kafka中消费情况。

早期,要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager,但随着我们对监控的功能要求、性能要求的提高,这些工具已经无法满足。

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。

官网地址:https://www.kafka-eagle.org/

安装kafka-eagle可直接参考: 官网

Kafka中数据积压问题

Kafka消费者消费数据的速度是非常快的,但如果由于处理Kafka消息时,由于有一些外部IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。

第一步: 使用kafka-eagle查看数据积压情况




第二步: 解决数据积压问题

出现积压的原因:

  • 因为数据写入目的容器失败,从而导致消费失败
  • 因为网络延迟消息消费失败
  • 消费逻辑过于复杂, 导致消费过慢,出现积压问题

解决方案:

  • 对于第一种, 我们常规解决方案, 处理目的容器,保证目的容器是一直可用状态
  • 对于第二种, 如果之前一直没问题, 只是某一天出现, 可以调整消费的超时时间
  • 对于第三种, 一般解决方案,调整消费代码, 消费更快即可, 利于消费者的负载均衡策略,提升消费者数量
Kafka配额限速机制

生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch 操作进行流量限制,防止个别业务压爆服务器。

限制producer端的速率

为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试,观察生产消息的速率

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

结果:

50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

限制consumer端的速率

对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:

bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试,观察消息消费的速率

bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test
--fetch-size 1048576 --messages 500000

结果为:
MB.sec:1.0743

取消kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --
entity-default
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients -- entity-default
结束语

kafka系列的文章到这就结束了,kafka系列的文章一共3篇,有小伙伴需要的可以去看另外两篇。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号