1.Reciever方式。
Kafka与Streaming有Reciever和Direct两种方式,Reciever的问题是偏移量保存在zookeeper中,容易造成zk压力过大,而且Reciever获取数据和处理数据的线程不是同一批,可能会导致数据的积压,receiver从kafka拉取数据存储到Spark executor的内存中,大量数据积压容易导致OOM的情况,为了数据不丢失,还需要启动预写日志机制,把Kafka数据同步写入到HDFS中。虽然可以保证数据零丢失但是无法实现exactly-once语义,因为Spark和Zk之间可能不同步。
来源:https://blog.csdn.net/li17610380561/article/details/80131246 要配置预写日志机制,首先要调用StreamingContext的checkpoint()方法设置一个checkpoint目录, 然后需要将spark.streaming.receiver.writeAheadLog.enable参数设置为true 然而这种极强的可靠性机制,会导致Receiver的吞吐量大幅度下降, 因为单位时间内有相当一部分时间需要将数据写入预写日志, 如果又希望开启预写日志机制,确保数据零损失,又不希望影响系统的吞吐量, 那么可以创建多个输入DStream启动多个Reciver, 然后将这些receiver接收到的数据使用ssc.union()方法将这些dstream中的数据进行合并 此外在启用了预写日志机制之后,推荐将复制持久化机制禁用掉, 因为所有数据已经保存在容错的文件系统中了,不需要再用复制机制进行持久化, 保存一份副本了,只要将输入的DStream的持久化机制设置一下即可
2.Direct方式
周期性地读取topic分区的最新offset的数据,生成batch运算,从而定义每个batch的offset范围,也就是需要spark自己维护offset。当处理数据的job启动时,会使用Kafka的简单consumer api 来获取Kafka指定offset范围的数据,可以保证输入端数据exactly-once 语义,要保证数据的不丢失,不需要再启动预写日志机制,因为Kafka如果本身做了副本,就可以通过Kafka副本进行恢复。降低资源,direct不需要receiver,因此申请的executor可以全部用于运算。但是不会保证输出系统时的exactly-once,除非输出到外部系统时有幂等性。
因为: 当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下, 当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。 解决办法也比较粗暴,我们手动关闭闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。 但是,细心的朋友一定会发现, 这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。 作者:JavaGuide 链接:https://juejin.cn/post/6844904094021189639 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。二、Kafka Producer的Ack机制
来源:https://blog.csdn.net/weixin_45079319/article/details/104055782?spm=1001.2014.3001.5502
Kafka的Ack机制指producer的消息发送确认机制,其影响kafka集群的吞吐量和消息可靠性。
Ack=0,相当于异步发送,意味着producer不等待broker同步完成,消息发送完毕继续发送下一批信息。提供了最低延迟,但持久性最弱,当服务器发生故障时很可能发生数据丢失。如果leader死亡,producer继续发送消息,broker接收不到数据就会造成数据丢失。
Ack=1,producer要等待leader成功收到消息并确认,才发送下一条message。提供较低的延迟性以及较好的持久性。如果leader刚接收到数据,还没有同步到follower时,假如leader节点挂掉。
Ack=-1,leader收到所有消息,且follower同步完数据,才发送下一条数据。延迟性最差,持久性最好(即可靠性最好)。
三种参数设置性能递减,可靠性递增。为了保证数据的可靠性,需要将发送方式设置为同步(sync)
同时,Ack默认值为1,此时吞吐量与可靠性折中。实际生产中可以根据实际需求进行调整。
三、Kafka消息积压来源:https://www.cnblogs.com/bigdatalearnshare/p/14278093.html
典型场景- 实时/消费任务挂掉
比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。
那么在我们重新启动这个实时应用进行消费之前,这段时间的消息就会被滞后处理,如果数据量很大,可就不是简单重启应用直接消费就能解决的。Kafka分区数设置的不合理(太少)和消费者"消费能力"不足
Kafka单分区生产消息的速度qps通常很高,如果消费者因为某些原因(比如受业务逻辑复杂度影响,消费时间会有所不同),就会出现消费滞后的情况。
此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。Kafka消息的key不均匀,导致分区间数据不均衡
在使用Kafka producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。
- 实时/消费任务挂掉导致的消费滞后
A. 任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。
此外,建议将任务纳入监控体系,当任务出现问题时,及时通知相关负责人处理。当然任务重启脚本也是要有的,还要求实时框架异常处理能力要强,避免数据不规范导致的不能重新拉起任务。
B. 任务启动从上次提交offset处开始消费处理
如果积压的数据量很大,需要增加任务的处理能力,比如增加资源,让任务能尽可能的快速消费处理,并赶上消费最新的消息Kafka分区少了
如果数据量很大,合理的增加Topic分区数是关键。如果利用的是Spark流和Kafka direct approach方式,也可以对KafkaRDD进行repartition重分区,增加并行度处理。由于Kafka消息key设置的不合理,导致分区数据不均衡
可以在Kafka producer处,给key加随机后缀,使其均衡。
下面内容都是根据这两篇博客拼凑而来的:
1.利用 Partition 实现并行处理https://blog.csdn.net/kzadmxz/article/details/101576401
https://zhuanlan.zhihu.com/p/183808742?utm_source=wechat_timeline
每个 Topic 都包含一个或多个 Partition,不同 Partition 可位于不同节点。
一方面,由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 文件夹位于同一台计算机上,也可通过配置让同一节点上的不同 Partition 置于不同的磁盘上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。能并行处理,速度肯定会有提升,多个工人肯定比一个工人干的快。
Kafka是将消息记录持久化到本地磁盘中的,磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。
磁盘完成一个I/O请求所花费的时间,它由寻道时间、旋转延迟和数据传输时间三部分构成。 寻道时间是红色箭头找到粉红色环形那块区域的时间,假如说我想要的数据就在3,4,5这三个区域, 那旋转延时就是说我现在的红色箭头处于6这个位置要找到数据开始的地方3这个时间 数据传输就是读取3,4,5这三块区域的时间了3.Page Cache
Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:
避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题
相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,提供了 write-behind、read-ahead以及flush等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
Broker 收到数据后,写磁盘时只是将数据写入 Page Cache,并不保证数据一定完全写入磁盘。从这一点看,可能会造成机器宕机时,Page Cache 内的数据未写入磁盘从而造成数据丢失。但是这种丢失只发生在机器断电等造成操作系统不工作的场景,而这种场景完全可以由 Kafka 层面的 Replication 机制去解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。也正因如此,Kafka 虽然提供了 flush.messages 和 flush.ms 两个参数,但是 Kafka 并不建议将 Page Cache 中的数据强制 Flush 到磁盘
4.零拷贝技术零拷贝(Zero-copy)技术指在计算机执行操作时, CPU 不需要先将数据从一个内存区域复制到另一个内存区域, 从而可以减少上下文切换以及 CPU 的拷贝时间。 它的作用是在数据报从网络设备到用户程序空间传递的过程中, 减少数据拷贝次数,减少系统调用,实现 CPU 的零参与,彻底消除 CPU 在这方面的负载。
Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程。这一过程的性能直接影响 Kafka 的整体吞吐量。
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间, 也有访问底层硬件设备的权限。为了避免用户进程直接操作内核,保证内核安全, 操作系统将虚拟内存划分为两部分, 一部分是内核空间(Kernel-space),一部分是用户空间(User-space)。 https://www.icode9.com/content-3-860919.html 使用文件I/O:文件I/O不提供缓冲机制,每次使用都会引起系统调用,文件I/O的操作是在用户地址空间与IO设备之间传递。 避免内核与用户空间之间的数据拷贝:通过避免内核与用户空间之间的数据拷贝,来实现零拷贝。 写时复制技术:数据不会立即拷贝,而是等需要修改的时候再进行部分拷贝,感觉这一部分的技术逻辑与Make相似。
传统的 Linux 系统中,标准的 I/O 接口(例如read,write)都是基于数据拷贝操作的,即 I/O 操作会导致数据在内核地址空间的缓冲区和用户地址空间的缓冲区之间进行拷贝,所以标准 I/O 也被称作缓存 I/O。这样做的好处是,如果所请求的数据已经存放在内核的高速缓冲存储器中,那么就可以减少实际的 I/O 操作,但坏处就是数据拷贝的过程,会导致 CPU 开销
数据从网络传输到文件需要 4 次数据拷贝、4 次上下文切换和两次系统调用(上图左边)。
首先通过 DMA copy 将网络数据拷贝到内核态 Socket Buffer
然后应用程序将内核态 Buffer 数据读入用户态(CPU copy)
接着用户程序将用户态 Buffer 再拷贝到内核态(CPU copy)
最后通过 DMA copy 将数据拷贝到磁盘文件
DMA(Direct Memory Access):直接存储器访问。 DMA 是一种无需 CPU 的参与,让外设和系统内存之间进行双向数据传输的硬件机制。 使用 DMA 可以使系统 CPU 从实际的 I/O 数据传输过程中摆脱出来,从而大大提高系统的吞吐率。
数据落盘通常都是非实时的,kafka 生产者数据持久化也是如此。Kafka 的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率,就是上一节提到的 Page Cache。
对于 kafka 来说,Producer 生产的数据存到 broker,这个过程读取到 socket buffer 的网络数据,其实可以直接在内核空间完成落盘。并没有必要将 socket buffer 的网络数据,读取到应用进程缓冲区;在这里应用进程缓冲区其实就是 broker,broker 收到生产者的数据,就是为了持久化。
在此特殊场景下:接收来自 socket buffer 的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用 mmap 内存文件映射。
Memory Mapped Files:简称 mmap,也有叫 MMFile 的, 使用 mmap 的目的是将内核中读缓冲区(read buffer)的地址与用户空间的缓冲区(user buffer)进行映射。 从而实现内核缓冲区与应用程序内存的共享,省去了将数据从内核读缓冲区(read buffer)拷贝到用户缓冲区(user buffer)的过程。 它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。 完成映射之后你对物理内存的操作会被同步到硬盘上。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。
mmap 也有一个很明显的缺陷——不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。Kafka 提供了一个参数——producer.type来控制是不是主动flush;如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync);写入 mmap 之后立即返回 Producer 不调用 flush 就叫异步(async),默认是 sync。
类比上边的生产消息也是经过四次 copy
首先通过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝)
然后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝)
接着用户程序通过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝)
最后通过 DMA 拷贝将数据拷贝到 NIC Buffer
Linux 2.4+ 内核通过sendfile系统调用,提供了零拷贝。数据通过 DMA 拷贝到内核态 Buffer 后,直接通过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
Kafka 在这里采用的方案是通过NIO的 transferTo/transferFrom 调用操作系统的 sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝
Kafka数据读写也是批量的而不是单条的。
因此,除了操作系统提供的低级批处理之外,Kafka 的客户端和 broker 还会在通过网络发送数据之前,在一个批处理中累积多条记录 (包括读和写)。记录的批处理分摊了网络往返的开销,使用了更大的数据包从而提高了带宽利用率。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。
6.数据压缩在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络IO。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
Producer 可将数据压缩后发送给 broker,从而减少网络传输代价,目前支持的压缩算法有:Snappy、Gzip、LZ4。数据压缩一般都是和批处理配套使用来作为优化手段的。
五、消息分区策略来源: https://www.cnblogs.com/listenfwind/p/12465409.html https://www.cnblogs.com/snidget/p/12757698.html
所谓分区写入策略,即是生产者将数据写入到kafka主题后,kafka如何将数据分配到不同分区中的策略。
常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略,而随机策略则是较老版本的分区策略,不过由于其分配的均衡性不如轮询策略,故而后来改成了轮询策略为默认策略。
所谓轮询策略,即按顺序轮流将每条数据分配到每个分区中。能最大限度保证所有消息都平均分配到每一个分区。
随机策略,也就是每次都随机地将消息分配到每个分区。其实大概就是先得出分区的数量,然后每次获取一个随机数,用该随机数确定消息发送到哪个分区。
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储。
至于要如何实现,那也简单,只要让生产者发送的时候指定key就行。欸刚刚不是说默认的是轮询策略吗?其实啊,kafka默认是实现了两个策略,没指定key的时候就是轮询策略,有的话那激素按键保存策略了。
上面有说到一个场景,那就是要顺序发送消息到kafka。前面提到的方案是让所有数据存储到一个分区中,但其实更好的做法,就是使用这种按键保存策略。让需要顺序存储的数据都指定相同的键,而不需要顺序存储的数据指定不同的键,这样一来,即实现了顺序存储的需求,又能够享受到kafka多分区的优势,岂不美哉。
来源: https://www.cnblogs.com/listenfwind/p/12465409.html https://blog.csdn.net/weixin_43888806/article/details/1000636501.基本概念
在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。可以在设置主题的时候可以通过replication-factor参数来设置,也可以在broker级别中设置defalut.replication-factor来指定,一般我们都设置为3。
多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,三个副本中有一个副本是leader,两个副本是follower,leader负责消息的读写,follower负责定期从leader中复制最新的消息,保证follower和leader的消息一致性,当leader宕机后,会从follower中选举出新的leader负责读写消息,通过分区副本的架构,虽然引入了数据冗余,但是保证了kafka的高可靠。
在每个分区的leader都会维护一个ISR列表,ISR里面就是follower在broker的编号,只有跟得上leader的follower副本才能加入到ISR列表,只有这个列表里面的follower才能被选举为leader,所以在leader挂了的时候,并且unclean.leader.election.enable=false(关闭不完全的leader选举)的情况下,会从ISR列表中选取第一个follower作为新的leader,来保证消息的高可靠性。
综上所述,要保证kafka消息的可靠性,至少需要配置一下参数: topic级别:replication-factor>=3; producer级别:acks=-1;同时发送模式设置producer.type=sync; broker级别:关闭不完全的leader选举,即unclean.leader.election.enable=false;2.数据一致性
一致性指的是不管是老的leader还是新的leader,consumer都能读到一样的数据。
假设分区副本为3,副本0为leader,副本1和2位follower,在ISR列表里面副本0已经写入了message4,但是consumer只能读取message2,这是因为所有副本都同步了message2,只有High water mark以上的message才能被consumer读取,而High water mark取决于ISR列表里偏移量最小的分区,对应上图中的副本2;
所以在message还没有被follower同步完成时会被认为是"不安全的",如果consumer读取了副本0中的message4,这时候leader挂了,选举了副本1为新的leader,别的消费者去消费的时候就没有message4,就会造成不同的consumer消费的数据不一致,破坏了数据的一致性。在引入了High water mark机制后,会导致broker之间的消息复制因为某些原因变慢,消息到达消费者的时间也会延长(需要等消息复制完了才能消费),延迟的时间可以通过参数来设置:replica.lag.time.max.ms(它指定了副本在复制消息时可被允许的最大延迟时间)
3.follower副本为什么不对外提供服务类似数据库事务中的幻读,脏读
比如你现在写入一条数据到kafka主题topic,消费者comsumerA从主题topic消费数据,却发现消费不到,因为消费者comsumerA去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者comsumerB却可以消费到最新那条数据,因为它消费了leader副本。
如果你对zookeeper选举机制有所了解,就知道zookeeper每次leader节点挂掉时,都会通过内置id,来选举处理了最新事务的那个follower节点。
从结果上来说,kafka分区副本的选举也是类似的,都是选择最新的那个follower副本,但它是通过一个In-sync(ISR)副本集合实现。
kafka会将与leader副本保持同步的副本放到ISR副本集合中。当然,leader副本是一直存在于ISR副本集合中的,在某些特殊情况下,ISR副本中甚至只有leader一个副本。
当leader挂掉时,kakfa通过zookeeper感知到这一情况,在ISR副本中选取新的副本成为leader,对外提供服务。
但这样还有一个问题,前面提到过,有可能ISR副本集合中,只有leader,当leader副本挂掉后,ISR集合就为空,这时候怎么办呢?这时候如果设置unclean.leader.election.enable参数为true,那么kafka会在非同步,也就是不在ISR副本集合中的副本中,选取出副本成为leader,但这样意味这消息会丢失,这又是可用性和一致性的一个取舍了
5.ISR副本集合保存的副本的条件是什么答案其实跟一个参数有关:replica.lag.time.max.ms。
前面说到follower副本的任务,就是从leader副本拉取消息,如果持续拉取速度慢于leader副本写入速度,慢于时间超过replica.lag.time.max.ms后,它就变成“非同步”副本,就会被踢出ISR副本集合中。但后面如何follower副本的速度慢慢提上来,那就又可能会重新加入ISR副本集合中了。如果一个追随者在replica.lag.time.max.ms周期内与领导者保持一致,那么就可以任务是同步的(in-sync)或者最新的,同步或者最新意味着当前副本与领导者消息内容完全一致。
七、为什么使用Kafka来源:公众号:五分钟学大数据
- 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足
够多的机器来保证冗余,kafka 在中间可以起到一个缓冲的作用,把消息
暂存在kafka 中,下游服务就可以按照自己的节奏进行慢慢处理。解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作
为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即
可获取扩展能力。冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅
topic 的服务消费到,供多个毫无关联的业务使用。健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不
会影响主要业务的正常进行。异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了
异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向
队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
来源:公众号:五分钟学大数据
kafka 消费消息的offset 是定义在zookeeper 中的, 如果想重复消费kafka 的
消息,可以在redis 中自己记录offset 的checkpoint 点(n 个),当想重复消
费消息时,通过读取redis 中的checkpoint 点进行zookeeper 的offset 重设,
这样就可以达到重复消费消息的目的了
图片来源: https://www.cnblogs.com/cjsblog/p/9664536.html https://www.jianshu.com/p/dbbca800f607
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
来源:公众号:五分钟学大数据
- 连接ZK 集群,从ZK 中拿到对应topic 的partition 信息和partition的Leader 的相关信息连接到对应Leader 对应的brokerconsumer 将自己保存的offset 发送给LeaderLeader 根据offset 等信息定位到segment(索引文件和日志文件)根据索引文件中的内容,定位到日志文件中该偏移量量对应的开始位置读取相应长度的数据并返回给consumer



