Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。kafka是基于发布订阅模式的主动拉取策略的。
kafka接触架构总结
1) Kafka集群
Kafka集群是由多个Broker组成的。 每个Broker拥有唯一的id.
Kafka集群中有多个Topic.每个Topic可有多个分区(partition),每个分区可有多个副本(replication).
一个Topic的多个分区可以存在到一个Broker中。 一个分区的多个副本只能在不同的broker存在.
一个分区的多个副本由一个leader和多个follower组成.
生产者和消费者读写数据面向leader. follower主要同步leader的数据。以及当leader故障后,follower代替leader工作.
2) 生产者
生成者的功能就是往topic中发布消息.
3) 消费者
消费者的功能就是从topic中消费消息.
消费者消费消息是以消费者组为单位进行的.
一个消费者组内的一个消费者可以同时消费一个topic中多个分区的消息.
一个Topic中的一个分区的消息同时只能被一个消费者组中的一个消费者消费.
4) Zookeeper
Kafka集群的工作需要依赖zookeeper,例如每个broker启动后需要向zookeeper注册.
Broker中大哥(controller)的选举(争抢策略)
Kafka 0.9版本之前消费者组的offset维护在zookeeper中. 0.9版本之后维护在kafka内部.
kafka快速入门
安装部署
1.集群规划
hadoop102 hadoop103 hadoop104 zk zk zk kafka kafka kafka
2.kafka下载
http://kafka.apache.org/downloads.html
我这里用的版本是2.4.1
3.集群部署
1)解压安装包 [atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/ 3)在/opt/module/kafka目录下创建datas文件夹:用来存放kafka消息的 [atguigu@hadoop102 kafka_2.11-2.4.1]$ mkdir datas 4)修改配置文件 [atguigu@hadoop102 kafka_2.11-2.4.1]$ cd config/ [atguigu@hadoop102 config]$ vim server.properties
server.properties配置,写了“改”字改一下,其他的先别动 #(改)broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能,当前版本此配置默认为true,已从配置文件移除 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #(改)kafka运行日志存放的路径 log.dirs=/opt/module/kafka_2.11-2.4.1/datas #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #(改)配置连接Zookeeper集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
4.配置环境变量
[atguigu@hadoop102 zkData]$ vim /etc/profile.d/my_env.sh
如图所示
5.分发安装包
将kafka安装目录与环境变量同步到hadoop103,hadoop104
# 同步kafka [atguigu@hadoop102 module]$ pwd /opt/module [atguigu@hadoop102 module]$ my_rsync.sh kafka_2.11-2.4.1/ # 同步环境变量 [atguigu@hadoop102module]$ scp -r /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/ [atguigu@hadoop102module]$ scp -r /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/ # 注意:103,104记得测试环境变量是否生效
6.分别再修改103,104的kafka配置文件
分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2 # 注:broker.id不得重复,我们规划的是: hadoop102 0 hadoop103 1 hadoop104 2
7.启动kafka集群
注意:启动kafka集群之前先启动zookeeper
zk_cluster.sh start
# 依次在hadoop102、hadoop103、hadoop104节点上启动kafka [atguigu@hadoop102 kafka_2.11-2.4.1]$ pwd /opt/module/kafka_2.11-2.4.1 [atguigu@hadoop102 kafka]$ kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop103 kafka]$ kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop104 kafka]$ kafka-server-start.sh -daemon config/server.properties
8.关闭集群
[atguigu@hadoop102 kafka]$ kafka-server-stop.sh stop [atguigu@hadoop103 kafka]$ kafka-server-stop.sh stop [atguigu@hadoop104 kafka]$ kafka-server-stop.sh stop
9.封装kafka的群启群停脚本my_kafka.sh
/home/atguigu/bin 目录下创建该脚本,记得chmod 744 my_kafka.sh
#!/bin/bash if [ $# -lt 1 ] then echo "缺少参数:start|stop" exit fi case $1 in start) for host in hadoop102 hadoop103 hadoop104 do echo "======================$host start kafka======================" ssh $host /opt/module/kafka_2.11-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties done ;; stop) for host in hadoop102 hadoop103 hadoop104 do echo "======================$host stop kafka======================" ssh $host /opt/module/kafka_2.11-2.4.1/bin/kafka-server-stop.sh done ;; *) echo "请输入合适的参数:start | stop" ;; esac
kafka命令操作ok,群启群停测试成功
对topic的基本操作
1) 查看topic 列表
kafka-topics.sh --list --bootstrap-server hadoop102:9092
2) 创建topic
kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic first
kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3
3) 查看Topic详情
kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic first
# 详情
Topic: first PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: first Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
4) 修改Topic的分区数(只能改大)
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2
5) 删除Topic
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first
生产者
kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
消费者
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first # 消费者消费数据offset重置问题: 新启动的消费者组中的消费者为何消费不到topic中的数据??? kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning
消费者组
# 指定消费者的配置文件consumer.properties,里面有个group.id的配置,就是消费者组的名字 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties # 指定消费者的名字 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group aa
kafka架构深入 kafka工作流程及文件存储机制观察到的现象:当前的top分区为2,消费者为2,正好一个消费者消费一个分区的数据,当再次增加一个消费者的时候,这个时候两个分区,三个消费者,那么就会重新分配,有一个消费者闲着。
工作流程
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。 topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费(GTP策略: group,topc,partition)。
文件存储机制
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
[atguigu@hadoop102 datas]$ ll -rw-rw-r--. 1 atguigu atguigu 4 10月 12 23:31 cleaner-offset-checkpoint -rw-rw-r--. 1 atguigu atguigu 4 10月 13 00:24 log-start-offset-checkpoint -rw-rw-r--. 1 atguigu atguigu 88 10月 12 23:14 meta.properties -rw-rw-r--. 1 atguigu atguigu 431 10月 13 00:24 recovery-point-offset-checkpoint -rw-rw-r--. 1 atguigu atguigu 431 10月 13 00:25 replication-offset-checkpoint drwxrwxr-x. 2 atguigu atguigu 4096 10月 12 23:46 second-0 # 分区文件 drwxrwxr-x. 2 atguigu atguigu 4096 10月 12 23:22 second-1 # 分区文件 [atguigu@hadoop102 datas]$ cd second-0 [atguigu@hadoop102 second-0]$ ll -rw-rw-r--. 1 atguigu atguigu 10485760 10月 12 23:22 00000000000000000000.index # -rw-rw-r--. 1 atguigu atguigu 293 10月 12 23:49 00000000000000000000.log # -rw-rw-r--. 1 atguigu atguigu 10485756 10月 12 23:22 00000000000000000000.timeindex -rw-rw-r--. 1 atguigu atguigu 8 10月 12 23:46 leader-epoch-checkpoint [atguigu@hadoop102 second-0]$
index和log文件以当前segment的第一条消息的offset命名。下图为index文件和log文件的结构示意图。
kafka生产者 分区策略每个segment都有一个log和index文件,log文件中记录的是消息和offset,index文件中记录的是每个offset和对应的偏移量。通过这种机制就能快速定位消息的位置。当一个segment的大小达到一个G(可配置)之后,就会新创建一个segment文件,这个segment的log的命名为00000+最新的offset.log,00000+最新的offset.index,这个index文件记录的是相对的offset从0开始(最新的offset减去文件名的那个数字就能找到)。数据就是通过这种机制维护的,当然。kafka并不可能为每条消息都维护一个offset。这样也太浪费了。
分区的原因
(1)可以提高并发,因为可以以Partition为单位读写了。如果只有一个分区,那么磁盘写的速度就确定了。 (2)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
分区的原则
(1) 指明 partition 的情况下,直接将指明的值直接作为 partiton 值; (2) 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值; (3) 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.数据可靠性保证
生产者发送数据到topic partition的可靠性保证
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,说明消息发送成功,否则重新发送数据。
Topic partition存储数据的可靠性保证
1.副本数据同步策略
| 方案 | 优点 | 缺点 |
|---|---|---|
| 半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
| 全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下: 1. 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。 2. 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
2.ISR
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢? Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定(默认为10000ms)。Leader发生故障之后,就会从ISR中选举新的leader。
ack应答级别
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置:0,1,-1(all)
0:这一操作提供了一个最低的延迟,partition的leader接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;
1: partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all): partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
1丢数据案例
-1数据重复案例
3.leader和 follower故障处理细节
Log文件中的HW和LEO :
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly once语义
了解一下!
将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。 At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。 0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least once + 幂等性 = Exactly once 要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对kafka消费者 消费方式做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。 但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
consumer采用pull(拉)模式从broker中读取数据
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。 pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。分区分配策略
kafak消费者默认使用的是范围分区
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。 Kafka有三种分配策略,RoundRobin,Range , Sticky。 # RoundRobin : 轮询 # Range : 范围分区(kafak消费者默认) # Stick : 粘性分区
RoundRobin : 轮询
Range : 范围分区(kafak消费者默认)
offset维护由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets # 该主题:_consumer_offsets默认存在50个分区,一个副本消费者组案例
(0)思想: __consumer_offsets 为kafka中的topic, 那就可以通过消费者进行消费. (1)修改配置文件consumer.properties # 不排除内部的topic exclude.internal.topics=false group.id=pihao-group (2)创建一个topic [atguigu@hadoop102 config]$ kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic pihao --partitions 2 --replication-factor 3 [atguigu@hadoop102 config]$ (3)启动生产者和消费者,分别往atguigu生产数据和消费数据 # 启动生产者 [atguigu@hadoop102 config]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic pihao # 启动消费者(指定consumer.properties的方式) [atguigu@hadoop102 config]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic pihao --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties (4)消费offset [atguigu@hadoop102 config]$ kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --formatter "kafka.coordinator.group.GroupmetadataManager$OffsetsMessageFormatter" --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --from-beginning (5)消费到的数据 # 从这么也观察到消费者是以组为单位去消费数据的,GTP策略 [pihao-group,pihao,0]::OffsetAndmetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1634133543104, expireTimestamp=None) [pihao-group,pihao,1]::OffsetAndmetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1634133543104, expireTimestamp=None)zookeeper在kafka中的作用
kafka事务Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。
了解下。
Producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。 为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer事务
上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。kafkaAPI
Producer APIhttps://gitee.com/pihao/sample-project-ph-01.git
发消息的种类:
异步发送,同步发送,带回调函数发送,自定义分区器发送
消息发送流程
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
相关参数:
batch.size只有数据积累到batch.size之后,sender才会发送数据。
linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。
Consumer API自动提交offsetConsumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
//开启自动提交offset consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //每次offset自动提交的间隔 consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);重置offset
//这个属性生效的两种情况: // 1:新开一个消费者,这个时候随机开一个消费者组,此时没有初始化offset,那么它会生效 // 2:原来就存在offset,但是时间大于7天被删除了。就是消费者挂了,等到它恢复的时候原来的数据超过7天已经被删除了. //该属性相当于--from-beginning 它的值有两个:latest(默认),earliest; consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");手动提交offset
虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
同步提交offset
while (true) {
//消费者拉取数据
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync();
}
同步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
while (true) {
//消费者拉取数据
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
}
});
}
数据漏消费和重复消费分析
kafka配置类汇总无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。
先提交offset后消费,有可能造成数据的漏消费;
而先消费后提交offset,有可能会造成数据的重复消费。
所以解决方案是借助三方工具比如mysql:将消费数据和提交offset作为一个事务来进行。
package com.pihao.main.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@Configuration
public class Config {
private static Properties producerProperties;
private static Properties consumerProperties;
static {
producerProperties=new Properties();
consumerProperties=new Properties();
//生产者的配置信息start
producerProperties.put("bootstrap.servers","hadoop102:9092"); //指定连接的kafka集群
producerProperties.put("acks","all"); //ack应答机制
producerProperties.put("retries",3); //重试次数
producerProperties.put("batch.size",16384); //批次大小 16k
producerProperties.put("linger.ms",1); //等待时间
producerProperties.put("buffer.memory",33554432); //RecordAccumulator缓冲区大小 32M
producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定key序列化规则
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定value序列化规则
producerProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.pihao.main.producer.partitioner.MyPartitioner"); //配置自己定义的分区策略
// List interceptors=new ArrayList<>();
// interceptors.add("com.pihao.main.producer.interceptor.TimeInterceptor");
// interceptors.add("com.pihao.main.producer.interceptor.CounterInterceptor");
// producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //配置拦截器 ,这里的拦截器可以有多个,传一个list的集合
//生产者的配置信息end
//消费者的配置信息start
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //指定连接的kafka集群
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //开启自动提交offset
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //每次offset自动提交的间隔
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //key的反序列化类
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//value的发序列化
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,"pihao-group"); //指定消费者组
//这个属性生效的两种情况:
// 1:新开一个消费者,这个时候随机开一个消费者组,此时没有初始化offset,那么它会生效
// 2:原来就存在offset,但是时间大于7天被删除了。就是消费者挂了,等到它恢复的时候原来的数据超过7天已经被删除了.
// consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //该属性相当于--from-beginning 它的值有两个:latest(默认),earliest;
//消费者的配置信息end
}
//注入KafkaProducer生产者
@Bean
public KafkaProducer kafkaProducer(){
return new KafkaProducer(producerProperties);
}
//注入kafkaConsumer消费者
@Bean
public KafkaConsumer kafkaConsumer(){
KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
//订阅主题
kafkaConsumer.subscribe(Arrays.asList("pihao"));
return kafkaConsumer;
}
}



