- 1、简介
- 2、Kafka基本概念
- 3、Kafka的使用场景
- 4、Kafka环境搭建
- 4.1、下载Kafka安装包
- 4.2、解压下载好的安装包
- 4.3、修改配置文件
- 4.3.1、修改zookeeper.properties
- 4.3.2、修改server.properties
- 4.3.3、server.properties核心配置
- 4.4、启动服务
- 4.4.1、启动zookeeper
- 4.4.2、启动Kafka
- 5、Kafka具体命令使用
- 5.1、创建主题
- 5.2、查看所有主题
- 5.3、查看单个主题信息
- 5.4、查看所有主题信息
- 5.5、生产消息
- 5.6、消费消息
- 5.7、单播消费
- 5.8、多播消费
- 5.9、多主题消费
- 5.10、查看消费组名称
- 5.11、查看消费组的消费偏移量
- 5.12、删除主题
- 5.13、修改主题分区数量
- 5.14、关闭zookeeper与kafka
- 6、Kafka集群搭建
- 6.1、搭建zookeeper集群
- 6.1.2、复制三份zookeeper.properties
- 6.1.3、修改配置文件
- 6.1.4、启动
- 6.2、搭建Kafka集群
- 6.2.1、复制配置文件
- 6.2.2、修改配置信息
- 6.2.3、启动
- 6.3 使用集群节点
- 6.3.1、生产消息
- 6.3.2、消费消息
- 7、API
- 7.1、异步生产消息-没有回调函数
- 7.2、异步生产消息-有回调函数
- 7.3、 消费消息
- 8、SpringBoot集成Kafka
Kafk是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景
消息队列的好处
- 解耦合:
- 异步调用
- 流量削峰
- Producer:消息生产者,向Broker发送消息的客户端
- Consumer:消息消费者,向Broker读取消息的客户端。
- Consumer Group:消费组,每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
- Broker:消息中间件处理节点,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
- Topic:消息主题,可以理解为一个队列,生产者和消费者都是面向一个Topic
- Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
- Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多Follower
- Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是
Leader。 - Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,当
Leader挂了之后,会选举一个新的Leader出来
- 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
在安装Kafka之前需要先安装jdk,这里就不说jdk的安装了
4.1、下载Kafka安装包 4.2、解压下载好的安装包Kafka是依赖zookeeper的,我下载的安装包是携带有zookeeper。如果下载的是旧版的Kafka那么就需要先安装zookeeper
tar -vzxf kafka_2.12-3.0.0.tgz4.3、修改配置文件 4.3.1、修改zookeeper.properties
# zookeeper数据保存地址 dataDir=/tmp/zookeeper # zookeeper端口号 clientPort=21814.3.2、修改server.properties
配置Kafka的访问地址及端口号,这个配置是必须的,不然无法在外部访问Kafka
#broker.id属性在kafka集群中必须要是唯一 broker.id=0 #kafka部署的机器ip和提供服务的端口号 listeners=PLAINTEXT://192.168.3.128:9092 #kafka的消息存储文件 log.dirs=/tmp/kafka-logs #kafka连接zookeeper的地址-单机 zookeeper.connect=192.168.3.128:2181 #如果搭建了zookeeper集群那么使用逗号分割ip地址 #zookeeper.connect=192.168.3.128:2181,192.168.3.128:2182,192.168.3.128:21834.3.3、server.properties核心配置
| Property | Default | Description |
|---|---|---|
| broker.id | 0 | 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。 |
| log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 |
| listeners | listeners=PLAINTEXT://192.168.3.128:9092 | server接受客户端连接的端口,ip配置kafka本机ip即可 |
| zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3 |
| log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。 |
| default.replication.factor | 1 | 自动创建topic的默认副本数量,建议设置为大于等于2 |
| min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息 |
| elete.topic.enable | false | 是否允许删除主题 |
| num.partitions | 1 | 创建topic的默认分区数 |
-daemon 表示以后台进程运行
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties sudo ps -ef |grep zookeeper4.4.2、启动Kafka
sudo bin/kafka-server-start.sh -daemon config/server.properties sudo ps -ef |grep kafka5、Kafka具体命令使用 5.1、创建主题
// test_topic 主题名称 // replication-factor 1 副本个数 // partitions 3 分区个数 sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --create --topic test_topic --replication-factor 1 --partitions 35.2、查看所有主题
sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --list5.3、查看单个主题信息
sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --describe --topic test_topic5.4、查看所有主题信息
sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --describe
第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
leader: 节点负责给定partition的所有读写请求。
replicas: 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
isr: 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
sudo bin/kafka-console-producer.sh --bootstrap-server 192.168.3.128:9092 --topic test_topic5.6、消费消息
sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --topic test_topic --from-beginning5.7、单播消费
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息
sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --consumer-property group.id=testGroup --topic test_topic5.8、多播消费
一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息
分别在两个客户端执行两条消费命令指定不同的消费组
sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --consumer-property group.id=testGroup-1 --topic test_topic sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --consumer-property group.id=testGroup-2 --topic test_topic5.9、多主题消费
分别在两个客户端启动两个主题生产消息,通过以下命令指定消费主题,两个主题分布生产消息都可以被消费
sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --whitelist "test|test_topic"5.10、查看消费组名称
sudo bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.128:9092 --list5.11、查看消费组的消费偏移量
sudo bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.128:9092 --describe --group testGroup
current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --delete --topic test_topic5.13、修改主题分区数量
sudo bin/kafka-topics.sh --bootstrap-server 192.168.3.128:9092 --alter --topic test_topic --partitions 45.14、关闭zookeeper与kafka
sudo bin/kafka-server-stop.sh config/server.properties sudo bin/zookeeper-server-stop.sh config/zookeeper.properties6、Kafka集群搭建
对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。
6.1、搭建zookeeper集群6.1.2、复制三份zookeeper.properties 6.1.3、修改配置文件注意:我这里使用的是Kafka自带的zookeeper搭建集群,如果想要使用单独安装的zookeeper进行集群搭建可以查阅下方链接的文章,里面详细介绍了zookeeper和kafka集群搭建的各项配置
https://blog.csdn.net/cx897459376/article/details/114991401?spm=1001.2014.3001.5501
分别修改三个配置文件的日志数据保存路径,zookeeper端口号
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper-1.properties sudo bin/zookeeper-server-start.sh -daemon config/zookeeper-2.properties sudo bin/zookeeper-server-start.sh -daemon config/zookeeper-3.properties6.2、搭建Kafka集群 6.2.1、复制配置文件
同样复制三份server.properties
修改每一份文件的配置信息
主要修改以下四个配置即可
# 每个配置的id必须不一致 broker.id=0 # 修改每个配置的端口号 listeners=PLAINTEXT://192.168.3.128:9091 log.dirs=/tmp/kafka-logs-1 zookeeper.connect=192.168.3.128:2181,192.168.3.128:2182,192.168.3.128:80836.2.3、启动
sudo bin/kafka-server-start.sh -daemon config/server-1.properties sudo bin/kafka-server-start.sh -daemon config/server-2.properties sudo bin/kafka-server-start.sh -daemon config/server-3.properties
这样就完成了Kafka集群的搭建
6.3 使用集群节点 6.3.1、生产消息sudo bin/kafka-console-producer.sh --bootstrap-server 192.168.3.128:9092,192.168.3.128:9091,192.168.3.128:9093 -topic test6.3.2、消费消息
sudo bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092,192.168.3.128:9091,192.168.3.128:9093 --topic test --from-beginning7、API
添加客户端依赖
7.1、异步生产消息-没有回调函数org.apache.kafka kafka-clients2.4.1
public class MsgProducer {
private static final String TOPIC_NAME = "test";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// 如果没有搭建集群,则只需要写一个ip地址就行
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.128:9091,192.168.3.128:9092,192.168.3.128:9093");
// 设置ack机制
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 批次大小:消息大小为16384才发送消息
props.put("batch.size", 16384);
// 等待时间:如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// ReadAccumulator缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 构造producer
Producer producer = new KafkaProducer(props);
// 生产消息
for (int i = 1; i <= 10; i++) {
// 构造消息体
producer.send(new ProducerRecord<>(TOPIC_NAME, "test-" + i, "test-" + i));
}
producer.close();
}
}
7.2、异步生产消息-有回调函数
public class MsgProducer {
private static final String TOPIC_NAME = "test";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.128:9091,192.168.3.128:9092,192.168.3.128:9093");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put("batch.size", 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
for (int i = 1; i <= 100; i++) {
// 构造消息体,主要是在这里使用使用了一个回调函数new CallBack()
producer.send(new ProducerRecord<>("test", "test-" + i, "test-" + i), new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if (e == null) {
System.out.println(recordmetadata.partition() + "-" + recordmetadata.offset());
} else {
e.printStackTrace();
}
}
});
}
producer.close();
}
}
7.3、 消费消息
private final static String TOPIC_NAME = "test";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.128:9091,192.168.3.128:9092,192.168.3.128:9093");
// 设置消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "abc");
// 设置offset的自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置offset自动化提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 生产者是序列化,消费者则为反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 这里需要订阅具体的topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 一直处于监听状态中
while (true) {
// 因为消费者是通过pull获取消息消费的,这里设置间隔1000ms
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
// 对获取到的结果遍历
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.printf("offset=%d, key=%s, value=%sn", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
}
}
}
Kafka各项配置参数解析可以查阅-https://blog.csdn.net/cx897459376/article/details/115313962?spm=1001.2014.3001.5501
8、SpringBoot集成Kafka添加客户端依赖
org.springframework.kafka spring-kafka2.7.6
添加kafka相关配置
application.yml
spring:
kafka:
#bootstrap-servers: 192.168.3.128:9091,192.168.3.128:9092,192.168.3.128:9093
bootstrap-servers: 192.168.3.128:9092
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
# acks=0 意味着生产者能够通过网络吧消息发送出去,那么就认为消息已成功写入Kafka 一定会丢失一些数据
# acks=1 意味着首领在疏导消息并把它写到分区数据问津是会返回确认或者错误响应,还是可能会丢数据
# acks=all 意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: #消费者
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener: # 监听
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
编写接口进行消息发送
private final static String TOPIC_NAME = "test_topic";
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/send")
public String send() {
// 指定分区发送
kafkaTemplate.send(TOPIC_NAME, 0, "test", "this is msg");
return "success";
}
添加消费方法
@Component
public class MyConsumer {
@KafkaListener(topics = "test_topic", groupId = "test")
public void listen(ConsumerRecord record, Acknowledgment acknowledgment) {
String key = record.key();
String value = record.value();
System.out.println(key);
System.out.println(value);
System.out.println(record);
// 提交
acknowledgment.acknowledge();
}
}
调用接口后可以看到控制台输出



