- Kafka简介
- 一、Kafka基础
- 1、kafka的安装部署
- 2、Kafka的基本概念
- 3、创建topic
- 4、发送与消费消息
- 5、消息的细节
- 6、查看消费组的详细信息
- 二、Kafka主题与分区
- 1、Topic
- 2、Partition
- 3、Kafka日志文件
- 三、Kafka集群
- 1、搭建Kafka集群(3个broker)
- 2、副本(Replication)
- 3、集群消息发送与消费
- 4、分区消费组的消费者细节
- 四、Kafka的生产者
- 1、java方式生产者实现
- 2、生产者的ack参数
- 3)消息发送的缓存区
- 五、Kafka的消费者
- 1、java方式消费者实现
- 2、消费者手动与自动提交offset
- 3、消费者Poll消息过程
- 4、其他的一些配置
- 六、Kafka集群机制
- 1、Controller
- 2、Reblance机制
- 七、Kafka-eagle监控平台
- 1、搭建
- 八、资源链接
Kafka是最初由linkedin公司开发,是⼀个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper 协调的分布式消息系统,它的最⼤的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
一、Kafka基础 1、kafka的安装部署1)JDK环境和Zookeeper环境的搭建。
JDK downloads:https://www.oracle.com/java/technologies/downloads/
Zookeeper downloads:http://zookeeper.apache.org/releases.html
2)下载Kafka的安装包:http://kafka.apache.org/downloads
3)上传到kafka目录上: /usr/local/kafka,解压。
4)进⼊到config⽬录内,修改server.properties
#broker.id属性在kafka集群中必须要是唯⼀ broker.id=0 #kafka部署的机器ip和提供服务的端⼝号 listeners=PLAINTEXT://x.x.x.x:9092 #kafka的消息存储⽂件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=x.x.x.x:2181
5)进⼊到bin⽬录内,执⾏以下命令来启动kafka服务器(带着配置⽂件
./kafka-server-start.sh -daemon ../config/server.properties
6)检测Kafka是否启动成功:
进⼊到zk内查看是否有kafka的节点: /brokers/ids/0
| 名称 | 解释 |
|---|---|
| Broker | 消息中间件处理节点(kafka服务器),⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群。 |
| Topic | Topic 是逻辑上的概念,Kafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic。 |
| Producer | 消息⽣产者,向Broker发送消息的客户端。 |
| Consumer | 消息消费者,从Broker读取消息的客户端。 |
通过kafka命令向zk中创建⼀个主题:
./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 --partitions 1 --topic test
查看当前zk中所有的主题:
./kafka-topics.sh --list --zookeeper x.x.x.x:2181 test
删除Topic:
bin/kafka-topics.sh --zookeeper x.x.x.x:2181 --delete --topic test
注:需要server.properties中设置delete.topic.enable=true否则只是标记删除(如果当前Topic没有使用过,即没有传输过信息,则可以彻底删除。)。
4、发送与消费消息发送消息:
把消息发送给broker中的某个topic,打开⼀个kafka发送消息的客户端,然后开始⽤客户端向
kafka服务器发送消息。
./kafka-console-producer.sh --bootstrap-server x.x.x.x:9092 --topic test
或
./kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic test
–bootstrap-server:指定了连接kafka集群的地址。
–broker-list:指定了连接kafka集群的地址。
消费消息:
打开⼀个消费消息的客户端,向kafka服务器的某个主题消费消息。
⽅式⼀:从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费。
./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --topic test
⽅式⼆:从当前主题中的第⼀条消息开始消费。
./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092 --from-beginning --topic test5、消息的细节
1)生产者将消息发送给broker,broker会将消息保存在本地的日志文件中。
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
2)消息的保存是有序的,通过offset偏移量来描述消息的有序性。
3)消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置。
通过以下命令可以查看到消费组的相关信息:
./kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --describe --group testGroup
重点关注以下信息:
current-offset:最后被消费的消息的偏移量。
Log-end-offset:消息总量(最后⼀条消息的偏移量)。
Lag:积压了多少条消息。
主题(topic)在kafka中是⼀个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被
订阅该topic的消费者消费。
但是有⼀个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被
保存到log日志文件中的。为了解决这个文件过大的问题,kafka提出了Partition分区的概念。
1)分区的概念
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
a)分区存储,可以解决统⼀存储文件过大的问题(分布式存储)。
b)提供了读写的吞吐量:读和写可以同时在多个分区中进行(可以并行写)。
1.1)分区的策略
- 轮询分配策略(默认)
如果在生产消息时,key为null,则使用轮询算法均衡地分配分区。 - 随机策略(早版本默认,现已不是)
也是为了均匀的写入到每个分区,但后续轮询策略表现更佳,故很少使用随机策略了。 - 按Key分配策略
按key分配策略,有可能出现“数据倾斜”。例如,某个key包含了大量的数据,因为key值一样,所以所有的数据将都分配到一个分区中,造成该分区的数量远大于其他的分区。
2)创建多分区的主题
./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 --partitions 2 --topic test1
查看topic的分区信息
./kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic test1
修改分区数量
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
注:修改分区数时,仅能增加分区个数。若是使其减少partition个数,则会报错。
3、Kafka日志文件1)000000000.log: 这个文件中保存的就是消息。
2)__consumer_offsets-49:
kafka内部自己创建了__consumer_offsets主题包含了50个分区(通过offsets.topic.num.partitions设置)。这个主题用来存放消费者消费某个主题的偏移量(offset)。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题 consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。
- 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数
- 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值。
3)文件中保存的消息,默认保存7天。七天到后消息会被删除。
准备3个server.properties文件,每个文件中的这些内容要调整:
server.properties
broker.id=0 listeners=PLAINTEXT://x.x.x.x:9092 log.dir=/usr/local/data/kafka-logs
server1.properties
broker.id=1 listeners=PLAINTEXT://x.x.x.x:9093 log.dir=/usr/local/data/kafka-logs-1
server2.properties
broker.id=2 listeners=PLAINTEXT://x.x.x.x:9094 log.dir=/usr/local/data/kafka-logs-2
然后分别启动三个brokers:
./kafka-server-start.sh -daemon ../config/server.properties
搭建完后通过查看zk中的/brokers/ids 看是否启动成功。
2、副本(Replication)副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。下面例子:创建1
个主题,2个分区、3个副本。
./kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
- replicas:
当前副本存在的broker节点 。 - leader:副本里的概念
每个partition都有⼀个broker作为leader。
消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上面。副本里的leader专门用来接收消息。接收到消息,其他follower通过poll的方式来同步数据。 - follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker挂掉,那么就会进行新leader的选举,至于怎么选,在之后的controller的概念中介绍。
- ISR:
可以同步的broker节点和已同步的broker节点,存放在ISR集合中。
kafka集群中由多个broker组成。
⼀个broker中存放⼀个topic的不同partition——副本。
Kafka集群消息的发送:
./kafka-console-producer.sh --broker-list x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --topic my-replicated-topic
Kafka集群消息的消费:
./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --from-beginning --topic my-relicated-topic
指定消费组来消费消息:
./kafka-console-consumer.sh --bootstrap-server x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic4、分区消费组的消费者细节
- 图中Kafka集群有两个broker,每个broker中有多个partition。⼀个partition只能被⼀个消费组里的某⼀个消费者消费,从而保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同⼀个topic中的多个partition中保证总的消费顺序性。⼀个消费者可以消费多个partition。
- 消费组中消费者的数量不能比⼀个topic中的partition数量多,否则多出来的消费者消费不到消息。
- 如果消费者挂了,那么会触发rebalance机制(后面介绍),会让其他消费者来消费该分区。
引入依赖
org.apache.kafka kafka-clients 2.4.1
生产者发送消息的基本实现:
同步发消息
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException,
InterruptedException {
//1.设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer producer = new KafkaProducer(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
//0:代表发送消息到哪个分区上
ProducerRecord producerRecord = new ProducerRecord<>
(TOPIC_NAME, 0,"mykeyvalue","hellokafka");
//4.发送消息,得到消息发送的元数据并输出
Recordmetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:" + "topic-" +metadata.topic()
+ "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
}
异步发消息
生产者发消息,发送完后不用等待broker给回复,直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果。
producer.send(producerRecord, new Callback() {
public void onCompletion(Recordmetadata metadata, Exception
exception) {
if (exception != null) {
System.err.println("发送消息失败:"+ exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步⽅式发送消息结果:" + "topic-"+ metadata
.topic() + "|partition-"+ metadata.partition() + "|offset-" +
metadata.offset());
}
}
});
2、生产者的ack参数
在同步发消息的场景下:生产者发到broker上后,ack会有3种不同的选择:
1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下⼀条消息。性能最高,但是最容易丢消息。
2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下⼀条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
3)acks=-1或all: 需要等待min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有⼀个备份存活就不会丢失数据。这是最强的数据保证。⼀般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
代码配置:
props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);3)消息发送的缓存区
kafka默认会创建⼀个消息缓冲区,用来存放要发送的消息,缓冲区是32m。
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker。
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);五、Kafka的消费者 1、java方式消费者实现
public class MySimpleConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"x.x.x.x:9092, x.x.x.x:9093, x.x.x.x:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
//1.创建⼀个消费者的客户端
KafkaConsumer consumer = new KafkaConsumer(props);
//2. 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d,
key =%s, value = %s%n", record.partition(),record.offset(),
record.key(), record.value());
}
}
}
}
2、消费者手动与自动提交offset
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
1)自动提交offset
消费者poll到消息后默认情况下,会⾃动向broker的_consumer_offsets主题提交当前主题-分
区消费的偏移量。
设置自动提交参数 - 默认
// 是否自动提交offset,默认就是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此
时消费者挂了,于是下⼀个消费者会从已提交的offset的下⼀个位置开始消费消息。之前未被
消费的消息就丢失掉了。
2)手动提交offset
需要把⾃动提交的配置改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交分手动同步提交和手动异步提交:
- 手动同步提交:
在消费完消息后调用同步提交的方法,当集群返回ack前⼀直阻塞,返回ack后表示提交成功,执行之后的逻辑。
if (records.count() > 0) {
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();
}
- 手动异步提交:
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用。
if (records.count() > 0) {
// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map
offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " +
exception.getStackTrace());
}
}
});
}
3、消费者Poll消息过程
消费者建立了与broker之间的长连接,开始poll消息,默认情况下,消费者⼀次会poll500条消息。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点可以通过这个值进行设置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。
ConsumerRecordsrecords = consumer.poll(Duration.ofMillis( 1000));
消费者发送⼼跳的时间间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);4、其他的一些配置
1)指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2)消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
3)指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
4)新消费组的消费offset规则
新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)。
- latest:默认的,消费新消息
- earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");六、Kafka集群机制 1、Controller
- 集群中谁来充当controller
每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller。 - controller负责管理整个集群中的所有分区和副本的状态:
1)当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
2)当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
3)当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
前提:消费组中的消费者没有指明分区来消费 。
触发的条件:当消费组中的消费者和分区的关系发生变化的时候 。
分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略 :
-
range:根据公示计算得到每个消费消费哪几个分区:前面的消费者是分区总数/消费
者数量+1,之后的消费者是分区总数/消费者数量 。
-
轮询:大家轮着来 。
-
sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之
前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。
III)HW和LEO
HW俗称高水位,HighWatermark的缩写,取⼀个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。
另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
LEO是某个副本最后消息的消息位置(log-end-offset)。
1)去Kafka-eagle官网下载压缩包
EFAK (kafka-eagle.org)
然后前置环境JDK按照好。
2)放到Linux指定目录中,解压
tar -zxvf kafka-eagle-bin.x.x.x.tar.gz
3)给kafka-eagle配置环境变量
export KE_HOME=/usr/local/kafkaeagle/kafka-eagle export PATH=$PATH:$KE_HOME/bin
4)修改kafka-eagle里面conf里面system-config.properties文件
5)运行bin目录下的ke.sh
./ke.sh start
运行成功!
6)在浏览器访问10.0.50.245:8048/ke
7)成功进入
基础完,待续…
八、资源链接zookeeper官网
JDK Download
Kafka官网下载



