前面我们使用基于console的生产者和消费者对topic实现了数据的生产和消费,,这个基于控制台的生产者和消费者主要是让我们做测试用的。
在实际工作中,我们有时候需要将生产者和消费者功能集成到我们已有的系统中,此时就需要写代码实现生产者和消费者的逻辑了。
在这我们使用java代码来实现生产者和消费者的功能。
一、Java代码实现生产者代码 1、创建maven项目先创建maven项目,db_kafka
添加kafka的maven依赖。
3、开发生产者代码org.apache.kafka kafka-clients2.4.1
package com.imooc.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic
String topic = "hello";
//创建kafka生产者
KafkaProducer producer = new KafkaProducer(prop);
//向topic中生产数据
producer.send(new ProducerRecord(topic, "hello kafka"));
//关闭链接
producer.close();
}
}
等一会我们把消费者代码实现好了以后一起验证。
二、Java代码实现消费者代码 1、开发消费者代码package com.imooc.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "con-1");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer(prop);
Collection topics = new ArrayList();
topics.add("hello");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
ConsumerRecords poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}
2、注意:
1、关闭kafka服务器的防火墙
2、配置windows的hosts文件 添加kafka节点的hostname和ip的映射关系。
[如果我们的hosts文件中没有对kafka节点的 hostnam和ip的映射关系做配置,在这经过多次尝试连接不上就会报错]
发现没有消费到数据,这个topic中是有数据的,为什么之前的数据没有消费出来呢?不要着急,先带着这个问题往下面看
此时回到kafka的消费者端就可以看到消费出来的数据了。
ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = hello kafka)
所以这个时候我们发现,新产生的数据我们是可以消费到的,但是之前的数据我们就无法消费了,那下面我们来分析一下这个问题。
三、消费者代码扩展//==================================================
//开启自动提交offset功能,默认就是开启的
prop.put("enable.auto.commit","true");
//自动提交offset的时间间隔,单位是毫秒
prop.put("auto.commit.interval.ms","5000");
prop.put("auto.offset.reset","latest");
//==================================================
此时我们来验证一下,
先启动一次生产者
再启动一次消费者,看看消费者能不能消费到这条数据,如果能消费到,就说明此时是根据上次保存的offset信息进行消费了。
结果发现是可以消费到的。
注意:消费者消费到数据之后,不要立刻关闭程序,要至少等5秒,因为自动提交offset的时机是5秒提交一次。
ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, CreateTime = 1591687894952, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = hello kafka)
将auto.offset.reset置为earliest,修改一下group.id的值,相当于使用一个新的消费者,验证一下,看是否能把这个topic中的所有数据都取出来,因为新的消费者第一次肯定是获取不到offset信息的,所以就会根据auto.offset.reset的值来消费数据。
prop.put("group.id", "con-2");
prop.put("auto.offset.reset","earliest");
结果发现确实把之前的所有数据都消费过来了.
ConsumerRecord(topic = hello, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1591672800863, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = hehe) ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = hello kafka) ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, CreateTime = 1591687864482, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = hello kafka)
此时,关闭消费者(需要等待5秒,这样才会提交offset),再重新启动,发现没有消费到数据,说明此时就根据上次保存的offset来消费数据了,因为没有新数据产生,所以就消费不到了。
最后来处理一下程序输出的日志警告信息,这里其实示因为缺少依赖日志依赖
在pom文件中添加log4j的依赖,然后将log4j.properties添加到resources目录中。
四、Consumer消费offset查询org.slf4j slf4j-api1.7.10 org.slf4j slf4j-log4j121.7.10
kafka0.9版本以前,消费者的offset信息保存在zookeeper中。
从kafka0.9开始,使用了新的消费API,消费者的信息会保存在kafka里面的__consumer_offsets这个topic中。
因为频繁操作zookeeper性能不高,所以kafka在自己的topic中负责维护消费者的offset信息。
如何查询保存在kafka中的Consumer的offset信息呢?
使用kafka-consumer-groups.sh这个脚本可以查看
查看目前所有的consumer group
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 con-1 con-2
具体查看某一个consumer group的信息。
GROUP:当前消费者组,通过group.id指定的值
TOPIC:当前消费的topic
PARTITION:消费的分区
CURRENT-OFFSET:消费者消费到这个分区的offset
LOG-END-OFFSET:当前分区中数据的最大offset
LAG:当前分区未消费数据量
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group con-1 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID con-1 hello 4 1 1 0 - - - con-1 hello 2 1 1 0 - - - con-1 hello 3 1 1 0 - - - con-1 hello 0 0 0 0 - - - con-1 hello 1 0 0 0 - - -
此时再执行一次生产者代码,生产一条数据,重新查看一下这个消费者的offset情况。
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group con-1 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID con-1 hello 4 1 2 1 - - - con-1 hello 2 1 1 0 - - - con-1 hello 3 1 1 0 - - - con-1 hello 0 0 0 0 - - - con-1 hello 1 0 0 0 - - -五、Consumer消费顺序
当一个消费者消费一个partition时候,消费的数据顺序和此partition数据的生产顺序是一致的。
当一个消费者消费多个partition时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition中的数据。
总之:如果一个消费者消费多个partiton,只能保证消费的数据顺序在一个partition内是有序的
也就是说消费kafka中的数据只能保证消费partition内的数据是有序的,多个partition之间是无序的。
六、Kafka的三种语义kafka可以实现以下三种语义,这三种语义是针对消费者而言的:
至少一次:at-least-once
这种语义有可能会对数据重复处理
实现至少一次消费语义的消费者也很简单。
1: 设置enable.auto.commit为false,禁用自动提交offset
2: 消息处理完之后手动调用consumer.commitSync()提交offset
这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,
有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次。
至多一次:at-most-once
这种语义有可能会丢失数据
至多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1: enable.auto.commit设置为true。
2: auto.commit.interval.ms设置为一个较低的时间范围。
由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。
消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。
仅一次:exactly-once
这种语义可以保证数据只被消费处理一次。
实现仅一次语义的思路如下:
1: 将enable.auto.commit设置为false,禁用自动提交offset
2: 使用consumer.seek(topicPartition,offset)来指定offset
3: 在处理消息的时候,要同时保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息。
这样就可以保证数据仅被处理一次了。



