感谢尚硅谷教育对资源的无私开放 一.概念 1. 定义
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
2.问题与解决方案 缓存/消峰双十一期间,消息的发布速度远大于消息的消费(处理)速度,此时通过kafka可以接收发布的消息,消费者端按照自身的处理能力接收消息队列中的消息。
解耦kafka提供接口,使不同渠道的消息发布者可以按照该接口将消息发布到队列中,消息的消费者根据接口订阅队列中的消息,不需要每个发布者针对不同的消费者而进行不同的处理。
异步处理在appstore通过绑定支付宝买东西时,支付订单后就显示支付成功,但是扣钱的行为却是在一小时之后或者更久,这就是通过类似于kafka这种消息队列的异步处理,用户只需要支付订单,就会返回成功消息,而真实的扣钱行为却是在支付成功之后处理,从而提高用户体验。
3.消息队列的两种模式 点对点模式发布者将消息发布到消息队列中,消费者接收消息,向队列确认收到,队列就会将该消息删除掉。
发布订阅模式发布者将消息发布到队列,队列对消息进行分类,不同的消费者消费不同类别队列中的消息,接收消息后也不会删除队列中的消息。
4.结构 Consumer Group(CG)消费者组。消费者组内每个消费者负责消费不同分区的数据(如果组内有三个消费者,topic主题队列有三个分区,则每个消费者,消费不同的分区的队列中的消息)。消费者组之间互不影响。
Broker一台Kafka服务器就是一个broker。一个kafka集群由多个broker组成。一个broker可以容纳多个topic。
Topic可以理解为一个队列,生产者和消费者面向的都是topic。
Partition为了实现扩展性,一个非常大的topic可以分布到多个broker上分为多个分区。
Replica副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
每个分区多个副本中的Follower实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
zookeeper(xy36)
kafka(9co5)
linux版本jdk8(5d0o)
# 第一步:集群部署zookeeper ========================================start======================================== # 启动zookeeper前需要使服务器拥有java环境 # 添加可执行权限 chmod +x jdk-8u151-linux-x64.rpm # 安装RPM软件包 rpm -ivh jdk-8u151-linux-x64.rpm java -version # 修改zookeeper/bin/zkEnv.sh文件,添加内容: JAVA_HOME="/opt/1.8.0_151" # 启动集群中所有服务器的zookeeper ./zkServer.sh start # 配置zookeeper环境变量 vi /etc/profile.d/zookeeper.sh # 添加内容: ==========star========== export ZOOKEEPER_HOME=/zookeeper目录 export PATH=$PATH:$ZOOKEEPER_HOME/bin ==========end========== source /etc/profile # 创建文件夹 mkdir /opt/apache-zookeeper/zkdata # zkdata目录下创建myid文件,文件内容为数字012... # 修改zookeeper配置文件zoo.cfg ==========start========== dataDir=/opt/apache-zookeeper/zkdata # 0,1,2对应的是myid文件中的0,1,2,否则报错!!!!!!!!!!!!!!!!!!! server.0=192.168.1.1:2888:3888 server.1=192.168.1.2:2888:3888 server.2=192.168.1.3:2888:3888 ==========end========== # 每个zookeeper配置文件都相同,只是myid中的内容不同 # 启动 ./zkServer.sh start ./zkServer.sh status # 批量启动的脚本 ==========start========== for host in 192.168.1.1 192.168.1.2 192.168.1.3 do ssh $host "source /etc/profile;/root/apps/zookeeper/bin/zkServer.sh start" done ==========end========== ========================================end======================================== # 第一步:集群部署kafka ========================================start======================================== # 解压到/opt/module目录下 tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/ cd /opt/module # 修改文件名为kafka mv kafka_2.12-3.0.0/ kafka # 修改server.properties文件 cd /opt/module/kafka/config vim server.properties ==========start========== (修改处一) #broker 的全局唯一编号,集群内不能重复,只能是数字(跟zookeeper配置文件中的server0,server1对应吧,别他么又错了,又要看日志找问题)。 broker.id=0 # 处理网络请求的线程数量 num.network.threads=3 # 用来处理磁盘 IO 的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区大小 socket.request.max.bytes=104857600 (修改处二) # kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas # topic 在当前 broker 上的分区个数 num.partitions=1 # 用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 # segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 # 每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 (修改处三) # 配置连接 Zookeeper 集群地址 zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 (修改处四) # 配置监听器(监听各自ip的) listeners=PLAINTEXT://192.168.159.1:9092 ==========end========== # 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置 sudo vim /etc/profile.d/my_env.sh ==========start========== # KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin ==========end========== # 刷新环境变量 source /etc/profile # 启动kafka cd /opt/module/kafka bin/kafka-server-start.sh -daemon config/server.properties # 关闭kafka bin/kafka-server-stop.sh ========================================end========================================
集群启停脚本kf.sh
#! /bin/bash
case $1 in
"start"){
for i in 192.168.159.10 192.168.159.11 192.168.159.13
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in 192.168.159.10 192.168.159.11 192.168.159.13
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
启停命令
chmod +x kf.sh kf.sh start kf.sh stop
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。
集群部署中遇到的问题 问题一:org.apache.kafka.common.KafkaException: Socket server failed to bind to 114.115.219.189:9092: Cannot assign requested address.解决方案:
由于监听别的ip的9092端口导致的,修改kafka的server.properties文件:
listeners=PLAINTEXT://本机IP:9092
当kafka异常关闭时,日志记录就会出现异常,会把当时的情况记录到meta.properties文件中,重新启动时此文件会对启动造成影响。
解决方案:
删除kafka日志目录下meta.properties文件
基础架构
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接的 Kafka Broker 主机名称和端口号 |
| –topic | 操作的 topic 名称 |
| –create | 创建主题 |
| –delete | 删除主题 |
| –alter | 修改主题 |
| –list | 查看所有主题 |
| –describe | 查看主题详细描述 |
| –partitions | 设置分区数 |
| –replication-factor | 设置分区副本 |
| –config | 更新系统默认的配置 |
cd /opt/module/kafka/bin # 查看操作主题命令参数 ./kafka-topics.sh # 查看当前服务器中的所有 topic ./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --list # 查看指定topic的详情 ./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first --describe # 创建test主题 ./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --create --partitions 1 --replication-factor 3 --topic test # 修改分区数(分区数只能增加,不能减少) ./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --alter --partitions 2 --topic test # 删除test主题 ./kafka-topics.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --delete --topic test2.producer
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接的 Kafka Broker 主机名称和端口号 |
| –topic | 操作的 topic 名称 |
# 查看操作生产者命令参数 ./kafka-console-producer.sh # 生产者发送消息 ./kafka-console-producer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first3.consumer
| 参数 | 描述 |
|---|---|
| –bootstrap-server | 连接的 Kafka Broker 主机名称和端口号 |
| –topic | 操作的 topic 名称 |
| –from-beginning | 从头开始消费 |
| –group | 指定消费者组名称 |
# 查看操作消费者命令参数 ./kafka-console-consumer.sh ## 消费消息 ./kafka-console-consumer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --topic first # 把主题中所有的数据都读取出来(包括历史数据) ./kafka-console-consumer.sh --bootstrap-server 192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092 --from-beginning --topic first遇到个无法解决的问题:第一次时producer发送消息,consumer可以接收到,关闭consumer后,consumer再接收数据,就无法接收到。 四.Kafka 生产者 1.生产者发送数据到kafka集群的流程
生产者发送流程
在消息发送的过程中,涉及到main 线程和 Sender 线程。main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
pom.xml
org.apache.kafka kafka-clients 3.0.0
Producer1.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) {
// 1.创建发送者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer(properties);
// 2.发送数据到kafka集群
for (int i = 0; i < 5; i++) {
// producer.send(new ProducerRecord("topic1", "value:" + i));
// 回调函数会在 producer 收到 ack 时调用,为异步调用(send()方法后加.get(),变为同步发送)
producer.send(new ProducerRecord<>("topic1", "值:" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
}
}
);
}
// 3.关闭资源
producer.close();
}
}
Kafka分区好处
合理使用存储资源。100T的数据通过三个分区存储,每个分区对应的服务器只需要33T。
提高并行度。生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
默认的分区器DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
自定义分区器,取消使用默认分区器
需求:发送者发送的数据中包含name,则将该数据放到0分区,包含age,则将该数据放到1分区,其他放到2分区。
第一步:创建Partitioner接口的实现类MyPartition.java
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partiton = 2;
String content = value.toString();
if (content != null){
if (content.contains("name")){
partiton = 0;
}else if (content.contains("age")){
partiton = 1;
}
}
return partiton;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
第二步:发送者指定分区器
Producer1.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) {
// 1.创建发送者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value值为自定义分区器类的全类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
KafkaProducer producer = new KafkaProducer(properties);
// 2.发送数据到kafka集群
for (int i = 0; i < 5; i++) {
//producer.send(new ProducerRecord("topic1", "value:" + i));
// 回调函数会在 producer 收到 ack 时调用,为异步调用
producer.send(new ProducerRecord<>("topic1", "b", "fdshjkfsdf:" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
}
}
);
}
// 3.关闭资源
producer.close();
}
}
3.生产者提高吞吐量的方式
生产者如何提高吞吐量
设置缓冲区的大小
设置从缓存区发往broker的每批次的大小(batch.size)
设置从缓存区发往broker的等待时间的大小(linger.time)
设置压缩每批次发送的数据,使不改变每批次大小的条件下一次发送更多的数据
Producer1.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) {
// 1.创建发送者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.10:9092,192.168.159.11:9092,192.168.159.13:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value值为自定义分区器类的全类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
KafkaProducer producer = new KafkaProducer(properties);
// 2.发送数据到kafka集群
for (int i = 0; i < 5; i++) {
//producer.send(new ProducerRecord("topic1", "value:" + i));
// 回调函数会在 producer 收到 ack 时调用,为异步调用
producer.send(new ProducerRecord<>("topic1", "b", "fdshjkfsdf:" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
}
}
);
}
// 3.关闭资源
producer.close();
}
}
4.发送的数据的可靠性
生产者将数据通过broker发送给消费者时,可能会发生相关问题。
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
生产者发送完数据后就应答。
生产者发送过来的数据,Leader收到数据后应答。
生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。
设置ack与重试次数:
// 设置 acks properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 重试次数 retries,默认是 int 最大值,2147483647 properties.put(ProducerConfig.RETRIES_CONFIG, 3);
问题一:
可能会出现follower同步leader的数据时,follower挂掉了,导致一直没有应答,但是leader和其他follower是已经接收到数据了的。
解决办法:
Leader维护了一个动态的ISR,意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间(默认30s)未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,这样就不用等长期联系不上或者已经故障的节点。
问题二:
如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1,即leader:0,isr:0,这样和和ack=1的效果是一样的,仍然有丢数的风险。
结论:
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2(副本数量=leader+follower的数量)
生产者发送数据时,leader和follower都接收到了,但是可能会出现leader挂掉了,就没有给producer应答,此时某个follower会成为新的leader,并且这个新的leader中已经含有了生产者发送的数据,但是由于之前没有给producer应答,producer会再次发送该数据给leader,但是leader已经有该数据了,导致发送的数据重复。
至少一次= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
精确一次:幂等性和事务 + 至少一次
幂等性是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
开启参数 enable.idempotence 即开启幂等性。默认为 true,false 关闭。
重复数据的判断标准:具有
PID:Kafka每次重启都会分配一个新的PID
Partition :分区号
Sequence Number:单调自增的数字。
幂等性只能保证的是在单会话、单分区、单主题内不重复。
事务解决了kafka服务器不小心关掉再重启后可能导致的数据重复问题。
开启事务,必须开启幂等性。
事务工作原理
PS:这工作原理听一遍很难记住啊!
// 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map代码offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
使用步骤:
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionid1");
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
// 2.发送数据到kafka集群
producer.send();...............................
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 终止事务
producer.abortTransaction();
} finally {
// 3.关闭资源
producer.close();
}
Producer1.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) {
// 1.创建发送者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.130:9092,192.168.159.131:9092,192.168.159.133:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value值为自定义分区器类的全类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "MyPartition");
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionid1");
KafkaProducer producer = new KafkaProducer(properties);
// 设置事务 id(必须),事务 id 任意起名
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
// 2.发送数据到kafka集群
for (int i = 0; i < 5; i++) {
//producer.send(new ProducerRecord("topic1", "value:" + i));
// 回调函数会在 producer 收到 ack 时调用,为异步调用
producer.send(new ProducerRecord<>("topic1", "b", "trans:" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println("主题:" + metadata.topic() + ",分区:" + metadata.partition());
}
}
);
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 终止事务
producer.abortTransaction();
} finally {
// 3.关闭资源
producer.close();
}
}
}
6.数据有序
1)kafka在1.x版本之前保证数据单分区有序,条件为:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
2)kafka在1.x及以后版本保证数据单分区有序,条件为:
开启幂等性,max.in.flight.requests.per.connection需要设置小于等于5即可;
或者未开启幂等性,max.in.flight.requests.per.connection设置为1
因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,无论如何都可以保证最近5个request的数据都是有序的。
prettyZoo
总体工作流程



