栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

目录

一、什么是kafka

二、卡夫卡安装

传统方式

docker方式安装

三、整合SpringBoot

一般模式消费

生产者回调模式

Kafka事务

 消费者批量消费消息

 消费者手动确认

 指定消费

 指定自定义分区器

 消费端异常处理

 消息过滤器


一、什么是kafka

        Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。——来自百度百科

二、卡夫卡安装

传统方式

下载地址:Apache Download Mirrors

安装环境:

1、Java8+ ,参考Linux系统下安装jdk17&jdk8安装_熟透的蜗牛的博客-CSDN博客

2、安装ZK,参考搭建ZooKeeper3.7.0集群(传统方式&Docker方式)_熟透的蜗牛的博客-CSDN博客

3、解压文件

[root@localhost ~]# tar -zxvf kafka_2.13-3.0.0.tgz 

4、移动到/usr/local/kafka

[root@localhost ~]# mv kafka_2.13-3.0.0 /usr/local/kafka

5、修改kafka配置文件

[root@localhost config]# vim server.properties 

broker1

broker.id=0
#监听
listeners=PLAINTEXT://192.168.139.155:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183

broker2

broker.id=1
#监听
listeners=PLAINTEXT://192.168.139.156:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183

broker3

broker.id=2
#监听
listeners=PLAINTEXT://192.168.139.157:9094
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183

6、分别启动kafka

[root@localhost kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties

7、在其中一台创建topic

[root@localhost kafka]# ./bin/kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic test-topic --partitions 3   --replication-factor 3 

通过zk的可视化工具可知,分区已经创建完成。 

 8、测试

发送消息

[root@localhost kafka]# ./bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.139.155:9092

消费消息 在另一台broker上接收消息

[root@localhost kafka]# ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.139.156:9092

docker方式安装

1、拉取镜像

[root@localhost ~]# docker pull wurstmeister/kafka 

2、安装

Broker1

docker run -d --name kafka1 
-p 9092:9092 
-e KAFKA_BROKER_ID=1 
-e KAFKA_ZOOKEEPER_ConNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

Broker2

docker run -d --name kafka2 
-p 9093:9093 
-e KAFKA_BROKER_ID=2 
-e KAFKA_ZOOKEEPER_ConNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9093 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka

Broker3

docker run -d --name kafka3 
-p 9094:9094 
-e KAFKA_BROKER_ID=3 
-e KAFKA_ZOOKEEPER_ConNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9094 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka

3、测试

进入容器

[root@bogon ~]# docker container exec -it 2d4be3823f16 /bin/bash 

进入/opt/kafka_2.13-2.7.1/bin目录

创建topic

bash-5.1# ./kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic my-topic --partitions 3   --replication-factor 3 

创建消息

bash-5.1# ./kafka-console-producer.sh --topic my-topic --bootstrap-server 192.168.139.155:9092 

消费者 ,进入另一个容器进行消费

bash-5.1# ./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server 192.168.139.155:9093

三、整合SpringBoot

一般模式消费

默认情况下是自动提交offset值。可通过consumer下的属性配置

enable-auto-commit: false 

生产者

   public void sendMSg(){
        System.out.println(">>>>>>>>>>>>>>>>>");
        for (int i=0;i<5;i++){
            kafkaTemplate.send("xiaojie-topic","test message>>>>>>>>>>>>>>>>>>>>>>"+i);
        }
    }

消费者

  @KafkaListener(groupId = "xiaojie_group",topics = {"xiaojie-topic"})
    public void onMessage(ConsumerRecord record) {
        log.info("消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }

生产者回调模式

生产者回调函数,可以确认消息是否成功发送到broker,发送失败,进行重试或者人工补偿措施,确保消息投递到broker。有以下两种方式

方式1:

    public void sendMsgCallback(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(success -> {
        //当消息发送成功的回调函数
            // 消息发送到的topic
            String topic = success.getRecordmetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordmetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordmetadata().offset();
            System.out.println("发送消息成功>>>>>>>>>>>>>>>>>>>>>>>>>" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            //消息发送失败的回调函数
            System.out.println("消息发送失败,可以进行人工补偿");
        });
    }

方式2

public void sendMsgCallback1(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败
                System.out.println("发送失败。。。。。。。。。。。");
            }
            @Override
            public void onSuccess(SendResult result) {
                //分区信息
                Integer partition = result.getRecordmetadata().partition();
                //主题
                String topic=result.getProducerRecord().topic();
                String key=result.getProducerRecord().key();
                //发送成功
                System.out.println("发送成功。。。。。。。。。。。分区为:"+partition+",主题topic:"+topic+",key:"+key);
            }
        });
    }

Kafka事务

应用场景

  1. 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。
  2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
  3. kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。
  4. producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
  5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别

在spring.kafka.producer.transaction-id-prefix: tx #开启事务管理

注意:此时重试retries不能为0,acks=-1或者all

    
    public void sendTx(){
        kafkaTemplate.executeInTransaction(kafkaOperations -> {
            String msg="这是一条测试事务的数据......";
            kafkaOperations.send("tx-topic",msg);
            int i=1/0; //报错之后,由于事务存在,消息并不会发送到broker
            return null;
        });
    }

 消费者批量消费消息

消费者批量消费消息,如果此时开启批量消费模式,那么同样的topic,消费者将会进行批量消费,不再进行逐条消费。

spring.kafka.listener.type=batch 开启批量消费

    
    @KafkaListener(topics = "xiaojie-topic")
    public void batchonMessage(List> records) {
        for (ConsumerRecord record : records) {
            log.info("批量消费消息>>>>>>>>>>>>>>>>>{}", record.value());
        }
    }

 消费者手动确认

Kafak并不会像rabbitmq那样,消息消费之后,会将消息从队列中删除,Kafka通常根据时间决定数据可以保留多久。默认使用log.retention.hours参数配置时间,默认值是168小时,也就是一周。除此之外,还有其他两个参数,log.retention.minutes和log.retention.ms,这三个参数作用是一样的,都是决定消息多久以会被删除,不过还是推荐使用log.retention.ms,如果指定了不止一个参数,Kafka会优先使用最小值的那个参数。卡夫卡是以offset的位置进行消费,如果不进行,确认那么消费者下次消费的时候,还会从上次消费的位置进行消费。

修改消费者自动提交为false:enable-auto-commit: false

配置工厂类

 
@Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory> manualListenerContainerFactory(
            ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true); //设置批量为true,那么消费端就要一批量的形式接收信息
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

消费者

    
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List> record, Acknowledgment ack) {
        for (int i=0;i 

 指定消费
  
    @KafkaListener(groupId = "xiaojie_group",topicPartitions = {
            @TopicPartition(topic = "test-topic", partitions = {"1"}),
            @TopicPartition(topic = "xiaojie-test-topic", partitions = {"1"},
                    partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "15"))
    })
    public void onMessage1(ConsumerRecord record) {
        //指定消费某个topic,的某个分区,指定消费位置
        //执行消费xiaojie-test-topic的1号分区,和xiaojie-test-topic的1和2号分区,并且2号分区从15开始消费
        log.info("消费主题>>>>>>:{},消费分区>>>>>>>>:{},消费偏移量>>>>>:{},消息内容>>>>>:{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }

 指定自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
package com.xiaojie.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //计算分区器
        System.out.println("key>>>>>>>>>>>>>"+key);
        if ("weixin".equals(key)&&"test-topic".equals(topic)){
            return 1;
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

 消费端异常处理
package com.xiaojie.config;

import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;


@Component
public class MyErrorHandler {
    @Bean
    ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){
        return (message, e, consumer) -> {
            System.out.println("消息消费异常"+message.getPayload());
            System.out.println("异常信息>>>>>>>>>>>>>>>>>"+e);
            return null;
        };
    }
}

使用方法 

    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List> record, Acknowledgment ack) {
        for (int i=0;i 

 消息过滤器
  @Bean("filterFactory")
    public ConcurrentKafkaListenerContainerFactory filterFactory(ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(consumerRecord -> {
            String value = (String) consumerRecord.value();
            if (value.contains("hello")) {
                //返回false消息没有被过滤继续消费
                return false;
            }
            System.out.println("....................");
            //返回true 消息被过滤掉了
            return true;
        });
        return factory;
    }

 使用方法

  
    @KafkaListener(topics = "filter-topic",containerFactory = "filterFactory")
    public void filteronmessage(ConsumerRecord record){
        log.info("消费到的消息是:》》》》》》》》》》》{}",record.value());
    }

完整代码请参考,kafka部分:spring-boot: Springboot整合redis、消息中间件等相关代码 

参考:SpringBoot集成kafka全面实战_Felix-CSDN博客

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327632.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号