栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka学习之安装与入门

kafka学习之安装与入门

kafka学习之安装与入门 kafka概述

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。kafka是基于发布订阅模式的主动拉取策略的。

kafka接触架构总结

  1) Kafka集群
     Kafka集群是由多个Broker组成的。 每个Broker拥有唯一的id.
     Kafka集群中有多个Topic.每个Topic可有多个分区(partition),每个分区可有多个副本(replication).
     一个Topic的多个分区可以存在到一个Broker中。 一个分区的多个副本只能在不同的broker存在.
     一个分区的多个副本由一个leader和多个follower组成.
     生产者和消费者读写数据面向leader. follower主要同步leader的数据。以及当leader故障后,follower代替leader工作.

  2) 生产者
     生成者的功能就是往topic中发布消息.

  3) 消费者
     消费者的功能就是从topic中消费消息.
     消费者消费消息是以消费者组为单位进行的.
     一个消费者组内的一个消费者可以同时消费一个topic中多个分区的消息. 
     一个Topic中的一个分区的消息同时只能被一个消费者组中的一个消费者消费.

  4) Zookeeper
     Kafka集群的工作需要依赖zookeeper,例如每个broker启动后需要向zookeeper注册. 
     Broker中大哥(controller)的选举(争抢策略)
     Kafka 0.9版本之前消费者组的offset维护在zookeeper中. 0.9版本之后维护在kafka内部.

kafka快速入门 安装部署

1.集群规划

hadoop102					hadoop103				hadoop104
zk							zk						zk
kafka						kafka					kafka

2.kafka下载

http://kafka.apache.org/downloads.html

我这里用的版本是2.4.1

3.集群部署

1)解压安装包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
3)在/opt/module/kafka目录下创建datas文件夹:用来存放kafka消息的
[atguigu@hadoop102 kafka_2.11-2.4.1]$ mkdir datas
4)修改配置文件
[atguigu@hadoop102 kafka_2.11-2.4.1]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
server.properties配置,写了“改”字改一下,其他的先别动
#(改)broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#(改)kafka运行日志存放的路径
log.dirs=/opt/module/kafka_2.11-2.4.1/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#(改)配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

4.配置环境变量

[atguigu@hadoop102 zkData]$ vim /etc/profile.d/my_env.sh

如图所示

5.分发安装包

将kafka安装目录与环境变量同步到hadoop103,hadoop104

# 同步kafka
[atguigu@hadoop102 module]$ pwd
/opt/module
[atguigu@hadoop102 module]$ my_rsync.sh kafka_2.11-2.4.1/
# 同步环境变量
[atguigu@hadoop102module]$ scp -r /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/
[atguigu@hadoop102module]$ scp -r /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/

# 注意:103,104记得测试环境变量是否生效

6.分别再修改103,104的kafka配置文件

分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

# 注:broker.id不得重复,我们规划的是:
hadoop102	0
hadoop103	1
hadoop104	2

7.启动kafka集群

注意:启动kafka集群之前先启动zookeeper

zk_cluster.sh start

# 依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[atguigu@hadoop102 kafka_2.11-2.4.1]$ pwd
/opt/module/kafka_2.11-2.4.1
[atguigu@hadoop102 kafka]$ kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop103 kafka]$ kafka-server-start.sh -daemon  config/server.properties
[atguigu@hadoop104 kafka]$ kafka-server-start.sh -daemon  config/server.properties

8.关闭集群

[atguigu@hadoop102 kafka]$ kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ kafka-server-stop.sh stop

9.封装kafka的群启群停脚本my_kafka.sh

/home/atguigu/bin 目录下创建该脚本,记得chmod 744 my_kafka.sh

#!/bin/bash
if [ $# -lt 1 ]
then 
	echo "缺少参数:start|stop"
	exit
fi
case $1 in
start)
	for host in hadoop102 hadoop103 hadoop104
	do
		echo "======================$host start kafka======================"
		ssh $host /opt/module/kafka_2.11-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties
	done
;;
stop)
	for host in hadoop102 hadoop103 hadoop104
	do
		echo "======================$host stop kafka======================"
		ssh $host /opt/module/kafka_2.11-2.4.1/bin/kafka-server-stop.sh
	done
;;
*)
	echo "请输入合适的参数:start | stop"
;;
esac

ok,群启群停测试成功

kafka命令操作

对topic的基本操作

 1) 查看topic 列表
     kafka-topics.sh  --list --bootstrap-server hadoop102:9092
 2) 创建topic
     kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic first

     kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3
 3) 查看Topic详情
     kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic first 
    
    # 详情
     Topic: first	PartitionCount: 2	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 1	Replicas: 1,0,2	Isr: 1,0,2
	Topic: first	Partition: 1	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
     
     
		
 4) 修改Topic的分区数(只能改大)
	 kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 		2
		
 5) 删除Topic
     kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first 

生产者

kafka-console-producer.sh  --broker-list hadoop102:9092 --topic first 

消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

# 消费者消费数据offset重置问题: 新启动的消费者组中的消费者为何消费不到topic中的数据???
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning 

消费者组

# 指定消费者的配置文件consumer.properties,里面有个group.id的配置,就是消费者组的名字
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties     

# 指定消费者的名字
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group aa  

观察到的现象:当前的top分区为2,消费者为2,正好一个消费者消费一个分区的数据,当再次增加一个消费者的时候,这个时候两个分区,三个消费者,那么就会重新分配,有一个消费者闲着。

kafka架构深入 kafka工作流程及文件存储机制

工作流程

Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费(GTP策略: group,topc,partition)。

文件存储机制

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
[atguigu@hadoop102 datas]$ ll
-rw-rw-r--. 1 atguigu atguigu    4 10月 12 23:31 cleaner-offset-checkpoint
-rw-rw-r--. 1 atguigu atguigu    4 10月 13 00:24 log-start-offset-checkpoint
-rw-rw-r--. 1 atguigu atguigu   88 10月 12 23:14 meta.properties
-rw-rw-r--. 1 atguigu atguigu  431 10月 13 00:24 recovery-point-offset-checkpoint
-rw-rw-r--. 1 atguigu atguigu  431 10月 13 00:25 replication-offset-checkpoint
drwxrwxr-x. 2 atguigu atguigu 4096 10月 12 23:46 second-0 # 分区文件
drwxrwxr-x. 2 atguigu atguigu 4096 10月 12 23:22 second-1 # 分区文件
[atguigu@hadoop102 datas]$ cd second-0
[atguigu@hadoop102 second-0]$ ll
-rw-rw-r--. 1 atguigu atguigu 10485760 10月 12 23:22 00000000000000000000.index #
-rw-rw-r--. 1 atguigu atguigu      293 10月 12 23:49 00000000000000000000.log #
-rw-rw-r--. 1 atguigu atguigu 10485756 10月 12 23:22 00000000000000000000.timeindex
-rw-rw-r--. 1 atguigu atguigu        8 10月 12 23:46 leader-epoch-checkpoint
[atguigu@hadoop102 second-0]$ 

index和log文件以当前segment的第一条消息的offset命名。下图为index文件和log文件的结构示意图。

每个segment都有一个log和index文件,log文件中记录的是消息和offset,index文件中记录的是每个offset和对应的偏移量。通过这种机制就能快速定位消息的位置。当一个segment的大小达到一个G(可配置)之后,就会新创建一个segment文件,这个segment的log的命名为00000+最新的offset.log,00000+最新的offset.index,这个index文件记录的是相对的offset从0开始(最新的offset减去文件名的那个数字就能找到)。数据就是通过这种机制维护的,当然。kafka并不可能为每条消息都维护一个offset。这样也太浪费了。

kafka生产者 分区策略

分区的原因

(1)可以提高并发,因为可以以Partition为单位读写了。如果只有一个分区,那么磁盘写的速度就确定了。

(2)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;

分区的原则

(1)  指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2) 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)  既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.
数据可靠性保证

生产者发送数据到topic partition的可靠性保证

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,说明消息发送成功,否则重新发送数据。

Topic partition存储数据的可靠性保证

1.副本数据同步策略

方案优点缺点
半数以上完成同步,就发送ack延迟低选举新的leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送ack选举新的leader时,容忍n台节点的故障,需要n+1个副本延迟高
Kafka选择了第二种方案,原因如下:
1. 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2. 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

2.ISR

采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
	Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定(默认为10000ms)。Leader发生故障之后,就会从ISR中选举新的leader。

ack应答级别

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置:0,1,-1(all)

0:这一操作提供了一个最低的延迟,partition的leader接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;
1: partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all): partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

1丢数据案例

-1数据重复案例

3.leader和 follower故障处理细节

Log文件中的HW和LEO :

LEO:指的是每个副本最大的offset;

HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。

(1)follower故障
	follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
    
(2)leader故障
	leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
    
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly once语义

了解一下!

将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。

	At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least once + 幂等性 = Exactly once

	要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
	
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
kafka消费者 消费方式

consumer采用pull(拉)模式从broker中读取数据

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
分区分配策略

kafak消费者默认使用的是范围分区

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有三种分配策略,RoundRobin,Range , Sticky。
# RoundRobin : 轮询
# Range	: 范围分区(kafak消费者默认)
# Stick	: 粘性分区

RoundRobin : 轮询

Range : 范围分区(kafak消费者默认)

offset维护

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets
# 该主题:_consumer_offsets默认存在50个分区,一个副本
消费者组案例
(0)思想: __consumer_offsets 为kafka中的topic, 那就可以通过消费者进行消费.

(1)修改配置文件consumer.properties
# 不排除内部的topic
exclude.internal.topics=false
group.id=pihao-group

(2)创建一个topic
[atguigu@hadoop102 config]$ kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic pihao --partitions 2 --replication-factor 3
[atguigu@hadoop102 config]$ 
 
(3)启动生产者和消费者,分别往atguigu生产数据和消费数据
# 启动生产者
[atguigu@hadoop102 config]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic pihao
# 启动消费者(指定consumer.properties的方式)
[atguigu@hadoop102 config]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic pihao --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties

(4)消费offset
[atguigu@hadoop102 config]$ kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  hadoop102:9092  --formatter "kafka.coordinator.group.GroupmetadataManager$OffsetsMessageFormatter" --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --from-beginning

(5)消费到的数据
# 从这么也观察到消费者是以组为单位去消费数据的,GTP策略
[pihao-group,pihao,0]::OffsetAndmetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1634133543104, expireTimestamp=None)
[pihao-group,pihao,1]::OffsetAndmetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1634133543104, expireTimestamp=None)
zookeeper在kafka中的作用

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。

kafka事务

了解下。

Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer事务

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
kafkaAPI

https://gitee.com/pihao/sample-project-ph-01.git

Producer API

发消息的种类:

异步发送,同步发送,带回调函数发送,自定义分区器发送

消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

相关参数:

batch.size只有数据积累到batch.size之后,sender才会发送数据。

linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

Consumer API

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据是必须考虑的问题。

自动提交offset
//开启自动提交offset
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); 
//每次offset自动提交的间隔
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); 
重置offset
//这个属性生效的两种情况:
// 1:新开一个消费者,这个时候随机开一个消费者组,此时没有初始化offset,那么它会生效
// 2:原来就存在offset,但是时间大于7天被删除了。就是消费者挂了,等到它恢复的时候原来的数据超过7天已经被删除了.
   
//该属性相当于--from-beginning  它的值有两个:latest(默认),earliest;
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 
手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

同步提交offset

while (true) {
    //消费者拉取数据
    ConsumerRecords records = consumer.poll(100); 
    for (ConsumerRecord record : records) {

        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), 				record.key(), record.value());

    }
    //同步提交,当前线程会阻塞直到offset提交成功
    consumer.commitSync();
}

同步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

while (true) {
    //消费者拉取数据
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), 				record.key(), record.value());
    }

    //异步提交
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map offsets, Exception exception) {
            if (exception != null) {
                System.err.println("Commit failed for" + offsets);
            }
        }
    }); 
}

数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。

先提交offset后消费,有可能造成数据的漏消费;

而先消费后提交offset,有可能会造成数据的重复消费。

所以解决方案是借助三方工具比如mysql:将消费数据和提交offset作为一个事务来进行。

kafka配置类汇总
package com.pihao.main.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;



@Configuration
public class Config {
    private static Properties producerProperties;
    private static Properties consumerProperties;
    static {
        producerProperties=new Properties();
        consumerProperties=new Properties();

        //生产者的配置信息start
        producerProperties.put("bootstrap.servers","hadoop102:9092"); //指定连接的kafka集群
        producerProperties.put("acks","all"); //ack应答机制
        producerProperties.put("retries",3);  //重试次数
        producerProperties.put("batch.size",16384);   //批次大小   16k
        producerProperties.put("linger.ms",1);  //等待时间
        producerProperties.put("buffer.memory",33554432);  //RecordAccumulator缓冲区大小  32M
        producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定key序列化规则
        producerProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");  //指定value序列化规则

        producerProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.pihao.main.producer.partitioner.MyPartitioner");   //配置自己定义的分区策略

//        List interceptors=new ArrayList<>();
//        interceptors.add("com.pihao.main.producer.interceptor.TimeInterceptor");
//        interceptors.add("com.pihao.main.producer.interceptor.CounterInterceptor");
//        producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //配置拦截器  ,这里的拦截器可以有多个,传一个list的集合
        //生产者的配置信息end


        //消费者的配置信息start
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //指定连接的kafka集群
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //开启自动提交offset
        consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //每次offset自动提交的间隔
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //key的反序列化类
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//value的发序列化
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,"pihao-group");  //指定消费者组

        //这个属性生效的两种情况:
        // 1:新开一个消费者,这个时候随机开一个消费者组,此时没有初始化offset,那么它会生效
        // 2:原来就存在offset,但是时间大于7天被删除了。就是消费者挂了,等到它恢复的时候原来的数据超过7天已经被删除了.
//        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //该属性相当于--from-beginning  它的值有两个:latest(默认),earliest;
        //消费者的配置信息end

    }

    //注入KafkaProducer生产者
    @Bean
    public KafkaProducer kafkaProducer(){
        return new KafkaProducer(producerProperties);
    }

    //注入kafkaConsumer消费者
    @Bean
    public KafkaConsumer kafkaConsumer(){

        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList("pihao"));
        return kafkaConsumer;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327110.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号