栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka

Kafka

1.简介

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

官方文档:
        https://kafka.apache.org/documentation/#api

2.环境搭建

注意:Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装并启动zookeeper

拉取镜像:(安装zookeeper)
        docker pull zookeeper:3.4.14

创建容器:
        docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

拉取镜像:(安装kafka)
        docker pull wurstmeister/kafka:2.12-2.3.1

创建容器:

docker run -d --name kafka 
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 
--env KAFKA_ZOOKEEPER_ConNECT=192.168.200.130:2181 
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" 
--net=host wurstmeister/kafka:2.12-2.3.1

2.Kafka简单使用

导入依赖:

    
    
        org.springframework.kafka
        spring-kafka
        
            
                org.apache.kafka
                kafka-clients
            
        
    
    
        org.apache.kafka
        kafka-clients
    

生产者发送消息:

public class ProducerQuickStart {
    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //2.生产者对象
        KafkaProducer producer = new KafkaProducer(properties);
        //封装发送的消息
        ProducerRecord record = new ProducerRecord("jie-topic-input", "hello aa bb cc");
        //3.发送消息
        producer.send(record);
        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }
}

消费者接收消息:

public class ConsumerQuickStart {
    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer consumer = new KafkaConsumer(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("jie-topic-out"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }
    }
}

注意:
        生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
                多个消费者设置同一个group
        生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)
                多个消费者设置不同的group

3.Kafka高可用设计

1.启集群
        进入kafka容器内,修改 broker_id 和 服务端口

......

4.kafka生产者
public class ProducerQuickStart {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //ack配置,消息确认机制
        // 0    , 不会等待任何来自服务器的响应
        // 1    , 只要主节点收到,就会响应
        // ALL  ,所有参与赋值的节点全部收到消息,才会响应
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        //数据压缩(大数据)
        //snappy    ,
        //lz4       ,压缩和解压缩速度较快
        //gzip      ,但会提供更高的压缩比
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        //2.生产者对象
        KafkaProducer producer = new KafkaProducer(properties);
        //封装发送的消息 主体,分区,key,value
        ProducerRecord record = new ProducerRecord("jie-topic", 0, "100001", "hello kafka");
        //3.发送消息
        producer.send(record);
        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }
}

1.同步发送:
        使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

        //2.生产者对象
        KafkaProducer producer = new KafkaProducer(properties);
        //封装发送的消息 主体,分区,key,value
        ProducerRecord record = new ProducerRecord("jie-topic", 0, "100001", "hello kafka");
        //3.发送消息
        // producer.send(record);
        //同步消息
        Recordmetadata recordmetadata = producer.send(record).get();
        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();

2.异步发送:
        调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

        //2.生产者对象
        KafkaProducer producer = new KafkaProducer(properties);
        //封装发送的消息 主体,分区,key,value
        ProducerRecord record = new ProducerRecord("jie-topic", 0, "100001", "hello kafka");
        //3.发送消息
        // producer.send(record);
        //同步消息
        //Recordmetadata recordmetadata = producer.send(record).get();
        //异步消息
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                if (null != e) {
                    System.out.println("记录异常信息到日志表中!");
                }
                System.out.println(recordmetadata.offset());
            }
        });
        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();

3.消息确认:

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

4.重试机制:
        生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

5.消息压缩:(消费端会自动解压)
        默认情况下, 消息发送时不会被压缩。
        作用:使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
压缩算法说明
snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

5.Kafka消费者
public class ConsumerQuickStart {
    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //2.消费者对象
        KafkaConsumer consumer = new KafkaConsumer(properties);
        //3.订阅主题
        consumer.subscribe(Collections.singletonList("jie-topic"));
        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }
}

1.消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
        一个发布在Topic上消息被分发给此消费者组中的一个消费者
                所有消费者都在一个组中,那么这就变成了queue模型
                所有消费者都不再一个组中,那么就变成了发布-订阅模型(广播,所有订阅者都消费)
 

2.消息有序性:


        topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区 。

 3.提交偏移量:
        kafka默认enable.auto.commit 为 true 会自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去,所以我们需要将其改为false

        //改为手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        同步提交偏移量:

        while (true) {
            //4.获取消息
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());

                //同步提交偏移量
                try {
                    consumer.commitSync();
                } catch (Exception e) {
                    System.out.println("记录失败的异常:e = " + e);
                }
            }
        }

        异步提交偏移量:

        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : records) {
                System.out.println(record.value());
                System.out.println(record.key());
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map map, Exception e) {
                    if(e!=null){
                        System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
                    }
                }
            });
        }

        同步+异步提交偏移量:

try {
    while (true){
        ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
        consumer.commitAsync();
    }
}catch (Exception e){+
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682967.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号