Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手
- 一、Producer-生产者
-
- 二、Producer核心流程
- 三、Producer初始化
- 1.初始化KafkaProducer对象
- 2.初始化KafkaProducer重要参数
一、Producer-生产者
1.producer类
public class Producer extends Thread {
private final KafkaProducer producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
// 指定kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
// client.id一般不做设置
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
// 设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//todo: 初始化KafkaProducer对象
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
// todo: 一直会往kafka发送数据
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
//isAsync , kafka发送数据的时候,有两种方式
//todo:1: 异步发送 isAsync=true
//todo:2: 同步发送 isAsync=false
if (isAsync) { // Send asynchronously
//todo:异步发送
//这样的方式,性能比较好,我们生产代码用的就是这种方式。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
//todo:同步发送
//todo:发送一条消息,等这条消息所有的后续工作都完成以后才继续下一条消息的发送。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get(); //阻塞方法
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
二、Producer核心流程
- ProducerInterceptors是一个拦截器,对发送的数据进行拦截处理
- Serializer对消息的key和value进行序列化
- 通过使用分区器作用在每一条消息上,实现数据分发进入到topic不同的分区之中
- RecordAccumulator缓存消息,实现批量发送
- Sender从RecordAccumulator中获取消息
- 构建ClientRequest对象
- 将ClientRequest交到NetWorkClient准备发送
- ClientRequest将请求放入到KafkaChannel的缓存
- 发送请求到kafka集群
- Sender线程接受服务端发送的响应
- 执行绑定的回调函数
三、Producer初始化
1.初始化KafkaProducer对象
private final KafkaProducer producer;
//todo: 初始化KafkaProducer对象
producer = new KafkaProducer<>(props);
2.初始化KafkaProducer重要参数
Properties props = new Properties();
// 指定kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
// client.id一般不做设置
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
// 设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());