下载,官网地址https://kafka.apache.org/downloads
安装
解压缩: tar -zxvf kafka_2.12-3.0.0.tgz 安装步骤非常简单,直接解压即可,kafka 依赖的 Zookeeper 已经在此文件中包含
配置 Zookeeper
开启: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 关闭: bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
配置 Kafka
bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-stop.sh -daemon config/server.properties
配置主题https://www.orchome.com/6
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka 高版本: bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic Hello-Kafka --partitions 1 --replication-factor 1 查看所有topic: bin/kafka-topics.sh --list --bootstrap-server localhost:9092
发送消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
单播消息
问:生产者发送消息是否可以被多个消费者同时消费?
答:同一个消费组只能有一个消费者收到topic中的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test --from-beginning 消费组: --consumer-property group.id=testGroup
多播消息
在某些业务场景下需要一条消息被多个消费者消费,使用多播模式
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test
查看消费组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
集群搭建
配置文件id不同,服务器ip一般也不同
· server.properties:
broker.id= 0(1,2)
listeners=PLAINTEXT://192.168.65.60:
log.dir=/usr/local/data/kafka-logs
· 启动命令
/kafka-server-start.sh -daemon ../config/server0.properties
/kafka-server-start.sh -daemon ../config/server1.properties
/kafka-server-start.sh -daemon ../config/server2.properties
分区(partition)
一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了 3 个分区。那么topic中的消息就会分别存放在这三个分区中。
副本
副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。下面例子:创建 1个主题, 2 个分区、 3 个副本。
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
leader:副本里的概念
每个partition都有一个broker作为leader,消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上面。副本里的leader专⻔用来接收消息。
接收到消息后,其他follower通过poll的方式来同步数据
follower:eader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果leader所在的broker挂掉,那么就会进行新leader的选举,至于怎么选,在之后的controller的概念中介绍。
isr:可以同步或者已同步的节点会被出入isr集合。注意:如果isr中节点性能较差会被踢出isr集合
kill掉leader后再查看
# kill掉leader ps -aux | grep server.properties kill 17631 # 查看topic情况 ./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic my-replicated-topic
Java客户端实现生产者
依赖:
org.apache.kafka kafka-clients 3.0.0
实现:
public class MyProducer {
private final static String TOPIC_NAME = "my-test-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 把Key序列化为
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把Value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// kafka生产者客户端
KafkaProducer kafkaProducer = new KafkaProducer(props);
Order order = new Order();
order.setOrderId(1002L);
order.setCount(3);
order.setMessage("这是第3次测试发送kafka的消息");
// 发送消息的客户端
// key决定了往哪个分区上发送
ProducerRecord record = new ProducerRecord(TOPIC_NAME, "test2", JSON.toJSONString(order));
// 发送消息
Recordmetadata metadata = kafkaProducer.send(record).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
异步发送:
// 异步发送消息
kafkaProducer.send(record,(recordmetadata,exception)-> {
if (exception!=null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (recordmetadata!=null) {
System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
});
// 主线程停止太快无法回调
Thread.sleep(10000L);
生产者ACK配置
( 1 )acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
( 2 )acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
( 3 )acks=-1或all: 需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
props.put(ProducerConfig.ACKS_CONFIG, "1");
消费者简单实现
public class MyConsumer {
private static final String CONSUMER_GROUP_NAME = "test-group";
private final static String TOPIC_NAME = "my-test-topic";
public static void main(String[] args) {
// 1.配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者客户端
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 订阅topic
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// 长轮询接受消息(poll轮询)
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord record : records) {
System.out.printf("收到消息:partition = %d,offset=%d,key=%s,value=%s%n",record.partition(),record.offset(),record.key(),record.value());
}
}
}
}
自动提交offset
// 自动提交offset(默认) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 自动提交offset时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
消费者poll到消息后会自动向broker的_consumer_foosets主题提交当前主题-分区消费的偏移量。
自动提交存在消息丢失的情况:因为如果消费者poll到消息后就自动提交了偏移量,如果此时挂掉就会误以为已经消费掉了这条消息,造成消息丢失。
手动提交
// 手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
if(records.count()>0){
// 同步提交,会阻塞线程,因为这里没什么逻辑,所以阻塞也没关系
consumer.commitSync();
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map map, Exception e) {
if(e!=null){
System.out.println("commit exception offect:"+map);
System.out.println("commit exception exception:"+e.getStackTrace());
}
}
});
}
长轮询poll消息
默认情况下,消费者一次可以poll500条消息
// 一次poll消息的最大条数,可以根据消费速度来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
如果两次poll的时间超过了30s,kafka会认为其消费能力过低,将其踢出消费组。并将分区交给其他消费者,触发rebalance机制
// 一次poll消息的最大条数,可以根据消费速度来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500); // 如果两次poll的时间超过了30s,kafka会认为其消费能力过低,将其踢出消费组。并将分区交给其他消费者 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);
回溯消费
// 指定消费分区 consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME,0))); // 消息回溯消费 consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME,0)));
指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 ))); consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );
从指定时间消费
ListtopicPartitions =consumer.partitionsFor(TOPIC_NAME); //从 1 小时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ; Map map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime); } Map parMap =consumer.offsetsForTimes(map); for (Map.Entry entry :parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() +"|offset-" + offset); System.out.println(); //根据消费里的timestamp确定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
新消费组的消费偏移量
当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费?
latest(默认) :只消费自己启动之后发送到主题的消息earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
引入maven依赖:
org.springframework.kafka spring-kafka
配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
生产消费者代码
@Component
public class KafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
@Resource
private KafkaTemplate kafkaTemplate;
public static final String TOPIC_TEST = "Hello-Kafka";
public static final String TOPIC_GROUP = "test-consumer-group";
public void send(Object obj) {
Object jsonStr = JSON.toJSONString(obj);
logger.info("准备发送消息为:{}", jsonStr);
ListenableFuture> future = kafkaTemplate.send(TOPIC_TEST, jsonStr);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
logger.error(TOPIC_TEST + "-生产者 消息发送失败" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult stringObjectSendResult) {
logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
public void kafkaTest(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
// 手动提交offset
ack.acknowledge();
}
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);
@Resource
private KafkaProducer kafkaProducer;
@Test
public void testSendMsg() throws InterruptedException {
Map map = new HashMap<>();
map.put("name", "三国演义");
map.put("author", "罗贯中");
map.put("price", "25");
kafkaProducer.send(map);
Thread.sleep(9999999);
}
}
2022-01-25 11:12:17.622 INFO 3105 --- [ad | producer-1] c.i.l.application.mq.KafkaProducer : Hello-Kafka - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=Hello-Kafka, partition=null, headers=RecordHeaders(headers = [], isReadonly = true), key=null, value={"author":"罗贯中","price":"25","name":"三国演义"}, timestamp=null), recordmetadata=Hello-Kafka-0@1]
2022-01-25 11:12:17.697 INFO 3105 --- [ntainer#0-0-C-1] c.i.l.application.mq.KafkaConsumer : topic_test 消费了: Topic:Hello-Kafka,Message:{"author":"罗贯中","price":"25","name":"三国演义"}



