怎么知道哪个才是leader partition,只需要获取到元数据不就好了嘛。
说来要怎么获取元数据也不难,只要随便找到集群下某一台服务器就可以了(因为集群中的每一台服务器元数据都是一样的)。
名词3——缓冲区
============================================================================
此时生产者不着急把消息发送出去,而是先放到一个缓冲区。
名词4——Sender
===============================================================================
把消息放进缓冲区之后,与此同时会有一个独立线程Sender去把消息分批次包装成一个个Batch,不难想到如果Kafka真的是一条消息一条消息地传输,一条消息就是一个网络连接,那性能就会被拉得很差。为了提升吞吐量,所以采取了分批次的做法。
整好一个个batch之后,就开始发送给对应的主机上面。此时经过第一篇所提到的Kakfa的网络设计中的模型,然后再写到os cache,再写到磁盘上面。
下图是当时我们已经说明过的Kafka网络设计模型。
生产者代码
=========================================================================
设置参数部分
// 创建配置文件对象
Properties props = new Properties();
// 这个参数目的是为了获取kafka集群的元数据
// 写一台主机也行,多个更加保险
// 这里使用的是主机名,要根据server.properties来决定
// 使用主机名的情况需要配置电脑的hosts文件(重点)
props.put(“bootstrap.servers”, “hadoop1:9092,hadoop2:9092,hadoop3:9092”);
// 这个就是负责把发送的key从字符串序列化为字节数组
// 我们可以给每个消息设置key,作用之后再阐述
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 这个就是负责把你发送的实际的message从字符串序列化为字节数组
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 以下属于调优,之后再解释
props.put(“acks”, “-1”);
props.put(“retries”, 3);
props.put(“batch.size”, 323840);
props.put(“linger.ms”, 10);
props.put(“buffer.memory”, 33554432);
props.put(“max.block.ms”, 3000);
创建生产者实例
// 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源
KafkaProducer
创建消息
ProducerRecord
“test-topic”, “test-value”);
当然你也可以指定一个key,作用之后会说明:
ProducerRecord
“test-topic”, “test-key”, “test-value”)
发送消息
带有一个回调函数,如果没有异常就返回消息发送成功。
// 这是异步发送的模式
producer.send(record, new Callback() {
@Overridepublic void onCompletion(Recordmetadata metadata, Exception exception) { if(exception == null) {
// 消息发送成功
System.out.println(“消息发送成功”);
} else {
// 消息发送失败,需要重新发送
}}});Thread.sleep(10 * 1000);
// 这是同步发送的模式(是一般不会使用的,性能很差,测试可以使用)
// 你要一直等待人家后续一系列的步骤都做完,发送消息之后
// 有了消息的回应返回给你,你这个方法才会退出来
producer.send(record).get();
关闭连接
producer.close();
干货时间:调优部分的代码
================================================================================
区分是不是一个勤于思考的打字员的部分其实就是在1那里还没有讲到的那部分调优,一个个拿出来单独解释,就是下面这一大串。
props.put(“acks”, “-1”);
props.put(“retries”, 3);
props.put(“batch.size”, 32384);
props.put(“linger.ms”, 100);
props.put(“buffer.memory”, 33554432);
props.put(“max.block.ms”, 3000);
acks 消息验证
=============================================================================
props.put(“acks”, “-1”);
这个acks参数有3个值,分别是-1,0,1,设置这3个不同的值会成为kafka判断消息发送是否成功的依据。Kafka里面的分区是有副本的,如果acks为-1.则说明消息在写入一个分区的leader partition后,这些消息还需要被另外所有这个分区的副本同步完成后,才算发送成功(对应代码就是输出System.out.println(“消息发送成功”)),此时发送数据的性能降低。
如果设置acks为1,需要发送的消息只要写入了leader partition,即算发送成功,但是这个方式存在丢失数据的风险,比如在消息刚好发送成功给leader partition之后,这个leader partition立刻宕机了,此时剩余的follower无论选举谁成为leader,都不存在刚刚发送的那一条消息。
如果设置acks为0,消息只要是发送出去了,就默认发送成功了。啥都不管了。
retries 重试次数(重要)
====================================================================================
这个参数还是非常重要的,在生产环境中是必须设置的参数,为设置消息重发的次数。
props.put(“retries”, 3);
在Kafka中可能会遇到各种各样的异常
【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】 浏览器打开:qq.cn.hn/FTf 免费领取
(可以直接跳到下方的补充异常类型),但是无论是遇到哪种异常,消息发送此时都出现了问题,特别是网络突然出现问题,但是集群不可能每次出现异常都抛出,可能在下一秒网络就恢复了呢,所以我们要设置重试机制。
这里补充一句:设置了retries之后,集群中95%的异常都会自己乘风飞去,我真没开玩笑!
代码中我配置了3次,其实设置5~10次都是合理的,补充说明一个,如果我们需要设置隔多久重试一次,也有参数,没记错的话是retry.backoff.ms,下面我设置了100毫秒重试一次,也就是0.1秒。
props.put(“retry.backoff.ms”,100);
batch.size 批次大小
===================================================================================
批次的大小默认是16K,这里设置了32K,设置大一点可以稍微提高一下吞吐量,设置这个批次的大小还和消息的大小有关,假设一条消息的大小为16K,一个批次也是16K,这样的话批次就失去意义了。所以我们要事先估算一下集群中消息的大小,正常来说都会设置几倍的大小。
props.put(“batch.size”, 32384);
linger.ms 发送时间限制
====================================================================================
比如我现在设置了批次大小为32K,而一条消息是2K,此时已经有了3条消息发送过来,总大小为6K,而生产者这边就没有消息过来了,那在没够32K的情况下就不发送过去集群了吗?显然不是,linger.ms就是设置了固定多长时间,就算没塞满Batch,也会发送,下面我设置了100毫秒,所以就算我的Batch迟迟没有满32K,100毫秒过后都会向集群发送Batch。
props.put(“linger.ms”, 100);
buffer.memory 缓冲区大小
=======================================================================================



