2.环境搭建Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
官方文档:
https://kafka.apache.org/documentation/#api
2.Kafka简单使用注意:Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装并启动zookeeper
拉取镜像:(安装zookeeper)
docker pull zookeeper:3.4.14创建容器:
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14拉取镜像:(安装kafka)
docker pull wurstmeister/kafka:2.12-2.3.1创建容器:
docker run -d --name kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 --env KAFKA_ZOOKEEPER_ConNECT=192.168.200.130:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" --net=host wurstmeister/kafka:2.12-2.3.1
3.Kafka高可用设计导入依赖:
org.springframework.kafka spring-kafkaorg.apache.kafka kafka-clientsorg.apache.kafka kafka-clients生产者发送消息:
public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducerproducer = new KafkaProducer (properties); //封装发送的消息 ProducerRecord record = new ProducerRecord ("jie-topic-input", "hello aa bb cc"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } } 消费者接收消息:
public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumerconsumer = new KafkaConsumer (properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("jie-topic-out")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } } 注意:
生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
多个消费者设置同一个group
生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)
多个消费者设置不同的group
4.kafka生产者1.启集群
进入kafka容器内,修改 broker_id 和 服务端口......
5.Kafka消费者public class ProducerQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //ack配置,消息确认机制 // 0 , 不会等待任何来自服务器的响应 // 1 , 只要主节点收到,就会响应 // ALL ,所有参与赋值的节点全部收到消息,才会响应 properties.put(ProducerConfig.ACKS_CONFIG, "all"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 10); //数据压缩(大数据) //snappy , //lz4 ,压缩和解压缩速度较快 //gzip ,但会提供更高的压缩比 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); //2.生产者对象 KafkaProducerproducer = new KafkaProducer (properties); //封装发送的消息 主体,分区,key,value ProducerRecord record = new ProducerRecord ("jie-topic", 0, "100001", "hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } } 1.同步发送:
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功//2.生产者对象 KafkaProducerproducer = new KafkaProducer (properties); //封装发送的消息 主体,分区,key,value ProducerRecord record = new ProducerRecord ("jie-topic", 0, "100001", "hello kafka"); //3.发送消息 // producer.send(record); //同步消息 Recordmetadata recordmetadata = producer.send(record).get(); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); 2.异步发送:
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数//2.生产者对象 KafkaProducerproducer = new KafkaProducer (properties); //封装发送的消息 主体,分区,key,value ProducerRecord record = new ProducerRecord ("jie-topic", 0, "100001", "hello kafka"); //3.发送消息 // producer.send(record); //同步消息 //Recordmetadata recordmetadata = producer.send(record).get(); //异步消息 producer.send(record, new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception e) { if (null != e) { System.out.println("记录异常信息到日志表中!"); } System.out.println(recordmetadata.offset()); } }); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); 3.消息确认:
//ack配置 消息确认机制 prop.put(ProducerConfig.ACKS_CONFIG,"all");
确认机制 说明 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 acks=1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 4.重试机制:
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms//重试次数 prop.put(ProducerConfig.RETRIES_CONFIG,10);5.消息压缩:(消费端会自动解压)
默认情况下, 消息发送时不会被压缩。
作用:使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。//数据压缩 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
压缩算法 说明 snappy 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 lz4 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 gzip 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumerconsumer = new KafkaConsumer (properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("jie-topic")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } } 1.消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
一个发布在Topic上消息被分发给此消费者组中的一个消费者
所有消费者都在一个组中,那么这就变成了queue模型
所有消费者都不再一个组中,那么就变成了发布-订阅模型(广播,所有订阅者都消费)
2.消息有序性:
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区 。3.提交偏移量:
kafka默认enable.auto.commit 为 true 会自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去,所以我们需要将其改为false//改为手动提交偏移量 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);同步提交偏移量:
while (true) { //4.获取消息 ConsumerRecordsconsumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); //同步提交偏移量 try { consumer.commitSync(); } catch (Exception e) { System.out.println("记录失败的异常:e = " + e); } } } 异步提交偏移量:
while (true){ ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map map, Exception e) { if(e!=null){ System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e); } } }); } 同步+异步提交偏移量:
try { while (true){ ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(); } }catch (Exception e){+ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { consumer.commitSync(); }finally { consumer.close(); } }



