Producer API:
4.1.1 消息发送流程:
kafka的producer发送消息采用的是异步放松的方式。在消息发送过程中,涉及到两个线程——main线程和sender线程,以及一个线程共享变量RecordAccumulator。main线程把消息发送给RecordAccumulator,sender线程再不断地从RecordAccumulator里面拉取消息发送到kafka broker。
main线程里面经过send(ProducerRecord)方法——拦截器——序列化器——分区器,将消息放到RecordAccumulator里面,sender线程再将消息不断地发送到kafka broker里面。
简单生产者
package com.jin.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
//1、创建kafka生产者的配置信息
Properties properties = new Properties();
//2、kafka 集群, broker-list
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//3、acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//4、重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
//5、批次大小
properties.put("batch.size", 16384);
//6、等待时间
properties.put("linger.ms", 1);
//7、RecordAccumulator 缓冲区大小
properties.put("buffer.memory", 33554432);
//8、指定序列化
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//9、创建生产者对象
KafkaProducer producer = new KafkaProducer(properties);
//10、发送数据
for(int i = 0; i < 10; i++){
producer.send(new ProducerRecord("first","jin--" + i));
}
//11、关闭资源
producer.close();
}
}
普通消费者
package com.jin.producer.Consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
//连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//开启自动提交offset,即消费到什么地方
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自动提交的延时,默认一秒提交一次offset
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("first"));
//获取数据 poll(time):如果某次拉取数据时为空就会延时time秒再去拉取
ConsumerRecords consumerRecords = consumer.poll(100);
for(ConsumerRecord consumerRecord : consumerRecords){
System.out.println(consumerRecord.key()+"---"+consumerRecord.value());
}
consumer.close();
}
}



