栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

SparkStreaming-预热kafka

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SparkStreaming-预热kafka

Kafka: 1.kafka中的数据有序

Q:如何保证kafka广播接收过程中的数据有序?

A:多分区的情况做到:分区有序,全局无序;

      要保证全局有序:将所有数据发到同一个分区(partition)中。

2.kafka中如何保证数据不丢失

Q:如何保证数据不丢失?这是三个问题。

        ①.producer端如何保证数据不丢失;

        ②.Broker端如何保证数据不丢失;

        ③.consumer端如何保证数据不丢失。

问题分析:

①.producer端生产数据提交有两种方式:一种同步模式,另一种异步模式。同步模式:生产一条,发送一条,保存完成后再发送下一条,确保数据不丢失,但是效率太低;异步模式:在producer端开启一个buffer缓冲区,保存了多条一次性发送,但是一但出现保存失败,全部丢失,这时候ack机制出现:0:producer一次发送一批数据,不管成功与否,继续发送;1:producer一次发送一批数据,leader保存成功,就继续发送;-1:producer 一次发送一批数据,leader和follower都保存成功再发送。

②.Broker端中有ISR列表:维护broker中partition中的副本(partition副本),作为leader的主partition一旦发生宕机,则会在ISR列表中选择partition众多副本中最接近partition数据的副本作为partition也就是leader(上位,替换),这样就确保数据不丢失。

③.consumer端:变自动提交偏移量为手动提交偏移量(我们在消费数据准备保存在数据库时,有个消费偏移量提升,其中有两个配置文件进行设置,一个消费者,一个生产者。就是数据达到多少数量的时候提交一次,或者多少秒提交一次,这就是偏移量)(问题场景:我们在kafka消费数据,消费到了,提交了偏移量,准备保存到数据库的时候失败了,造成了数据丢失。场景二:消费数据,保存到数据库,还没等提交偏移量,宕机了,重启后,从上次未消费的地方重新消费,这样会消费两次,造成数据重复),所以我们需要将自动提交偏移量变为手动提交偏移量,消费完,保存完成后,再提交偏移量,这样就保证数据不丢失也可以重复消费。

 3.kafka为什么快

Q:kafka为什么快?(参考留声机机制)

A:①.采用的是pageCache页缓存技术;②.顺序读写;③.寻址机制。

4.kafka消费者consumer配置文件

        kafka消费者配置文件:

Properties props = new Properties();
//连接kafka集群
     props.put("bootstrap.servers", "localhost:9092");
//任何一个消费者都隶属于一个消费者组,这是消费者组id
     props.put("group.id", "test");
//自动提交偏移量,默认是true
     props.put("enable.auto.commit", "true");
//隔多久自动提交一次
     props.put("auto.commit.interval.ms", "1000");
//key的反序列化
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

         任意时刻,一个partition中的数据只能被消费者组下面的一个消费者所消费。

 5.kafka生产者producer配置文件

        kafka生产者配置文件:

Properties props = new Properties();
//连接kafka集群
 props.put("bootstrap.servers", "localhost:9092");
//ack机制:all就是-1,也就是生产的数据,需要leader和follower都保存好再发送
 props.put("acks", "all");
//发送数据保存失败重试次数
 props.put("retries", 0);
//发送一个多大的批次
 props.put("batch.size", 16384);
//隔多长时间发送一次
 props.put("linger.ms", 1);
//producer端的buffer缓冲区大小
 props.put("buffer.memory", 33554432);
//key的序列化
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列话
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer producer = new KafkaProducer<>(props);

        以上配置参数按最短的来,木桶短板效应,批次、间隔时间..

  1. 任意事刻,一个partition中的数据只能被一个消费者组下面的一个消费者所消费。
  2. 副本因子不能大于Broker数
  3. 寻址机制

        一个.segment文件段里面有两个文件,一个.index文件保存的是稀疏索引,一个.log文件保存的是数据

        第一个segment文件段:

-rw-r--r-- 1 root root 10485760 May 17 16:00 00000000000000000000.index

-rw-r--r-- 1 root root        0 May 16 22:14 00000000000000000000.log

        第二个segment文件段:

-rw-r--r-- 1 root root 10485760 May 17 16:00 00000000000005798453.index

-rw-r--r-- 1 root root        0 May 16 22:14 00000000000005798453.log

        第三个segment文件段:

-rw-r--r-- 1 root root 10485760 May 17 16:00 000000000000009867432.index

-rw-r--r-- 1 root root        0 May 16 22:14 000000000000009867432.log

如何命名.log和.index文件

        当前segment文件段的名字是上一个segment文件段中.log文件最后一条数据的偏移量。

查找数据的方式:折半查找,也叫二分查找

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279559.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号