栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka系统学习

Kafka系统学习

Kafka学习视频

文章目录

一、Kafka 概述二、消息队列

传统消息队列的应用场景使用消息队列的好处消息队列的 两种模式 三、Kafka 基础架构四、Docker安装 Kafka

Kafka命令行 六、Kafka架构深入

Kafka 生产者异步送 发送 API 七、生产者分区

分区好处分区策略自定义分区器生产者 如何提高吞吐量 八、数据可靠性九、数据去重

kafka事务 十、数据顺序十一、Kafka Broker


一、Kafka 概述

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

二、消息队列 传统消息队列的应用场景

MQ传统应用场景之异步处理
异步、削峰、解耦

使用消息队列的好处

1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

3)缓冲

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

4)灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

5)异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

消息队列的 两种模式

(1 )点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2 )发布/ 订阅模式(一对多,消费者消费数据之后不会清除消息)

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。(缺点:长连接浪费资源 consumer)

三、Kafka 基础架构

1 )Producer :消息生产者,就是向 kafka broker 发消息的客户端;

2 )Consumer :消息消费者,向 kafka broker 取消息的客户端;

3 )Consumer Group (CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个 组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。

4 )Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个topic。

5 )Topic :可以理解为一个队列, 生产者和消费者面向的都是一个 topic;

6 )Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;

7)Replica: :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

8 )leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

9 )follower :每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

四、Docker安装 Kafka

学习文档

1、集群规划

官网

启动docker

安装zookeeper

docker pull wurstmeister/zookeeper

安装kafka

docker pull wurstmeister/kafka


启动zookeeper容器
查询自己的虚拟机ip地址

启动kafka容器

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=192.168.159.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.159.128:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka 

进入kafka容器的命令行

docker exec -it kafka /bin/bash

集群搭建

使用docker命令可快速在同一台机器搭建多个kafka,只需要改变brokerId和端口

docker run  -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_ConNECT=192.168.159.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.159.128:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka 
docker run  -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_ConNECT=192.168.159.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.159.128:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka 

启动三台kafka

进入kafka容器实例该目录下

创建Replication为2,Partition为2的topic

bin/kafka-topics.sh --create --zookeeper 192.168.159.128:2181 --replication-factor 2 --partitions 2 --topic partopic

查看topic的状态

bin/kafka-topics.sh --describe --zookeeper 192.168.159.128:2181 --topic partopic

Kafka命令行

查看当前服务器中的所有 topic

bin/kafka-topics.sh --zookeeper 192.168.159.128:2181 --list

创建topic

选项说明:
–topic 定义 topic 名

–replication-factor 定义副本数

–partitions 定义分区数
bin/kafka-topics.sh --zookeeper 192.168.159.128:2181 --create --replication-factor 3 --partitions 1 --topic first

删除 topic

bin/kafka-topics.sh --zookeeper 192.168.159.128:2181 --delete --topic first

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.159.128:9092 --topic first

消费消息

–from-beginning:会把主题中以往所有的数据都读取出来。

bin/kafka-console-consumer.sh  --zookeeper 192.168.159.128:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server 192.168.159.128:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server 192.168.159.128:9092 --from-beginning --topic first

查看某个 Topic 的详情

bin/kafka-topics.sh --zookeeper 192.168.159.128:2181 --describe --topic first


修改分区数

bin/kafka-topics.sh --zookeeper 192.168.159.128:2181 --alter --topic first --partitions 6
六、Kafka架构深入 Kafka 生产者

①发送原理
在消息发送的过程中,涉及到了 两个线程 ——main 线程和Sender 线程。

在 main 线程中创建了 一个 双端列队列 RecordAccumulator。

main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

异步送 发送 API

依赖


	
		org.apache.kafka
		kafka-clients
		3.0.0
	


package com.lzm.kafkademo.Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class CustomProducer {
    public  static  void  main(String[]  args)  throws InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.159.128:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 创建 kafka 生产者对象
        KafkaProducer kafkaProducer  =  new
                KafkaProducer(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new
                    ProducerRecord<>("first","lzm " + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}


②带回调函数的 异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(Recordmetadata)和异常信息(Exception)

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.lzm.kafkademo.Producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


public class CustomProducerCallback {
    public  static  void  main(String[]  args)  throws InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                " 192.168.159.128:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        // 3. 创建 kafka 生产者对象
        KafkaProducer kafkaProducer  =  new KafkaProducer(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            // 添加回调
            kafkaProducer.send(new ProducerRecord<>("first","哈哈 " + i), new Callback() {
                // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(Recordmetadata metadata, Exception exception) {
                    if (exception == null) {
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主 题 : "  +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                    } else {
                        // 出现异常打印
                        exception.printStackTrace();
                    }
                }
            });
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}


③同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可

七、生产者分区 分区好处

分区策略



自定义分区器



实现步骤:

(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法

package com.lzm.kafkademo.Producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;


public class MyPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[]
            keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 lzm
        if (msgValue.contains("lzm")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map configs) {
    }
}

生产者 如何提高吞吐量


八、数据可靠性





九、数据去重



如何使用幂等性
开启参数 enable.idempotence 默认为 true,false关闭。

kafka事务


事务api

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map offsets, String  consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
	public  static  void  main(String[]  args)  throws InterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	
	// 2. 给 kafka 配置对象添加配置信息
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
	"hadoop102:9092");
	// key,value 序列化
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	
	// 设置事务 id(必须),事务 id 任意起名
	properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
	"transaction_id_0");
	
	// 3. 创建 kafka 生产者对象
	KafkaProducer  kafkaProducer  =  new KafkaProducer(properties);
	
	// 初始化事务
	kafkaProducer.initTransactions();
	
	// 开启事务
	kafkaProducer.beginTransaction();
	
	try {
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
			// 发送消息
			kafkaProducer.send(new  ProducerRecord<>("first",
			"atguigu " + i));
		}
		// int i = 1 / 0;
		
		// 提交事务
		kafkaProducer.commitTransaction();
		
	} catch (Exception e) {
		// 终止事务
		kafkaProducer.abortTransaction();
	} finally {
	
		// 5. 关闭资源
		kafkaProducer.close();
		}
	}
}

十、数据顺序


十一、Kafka Broker


启动 Zookeeper 客户端:

bin/zkCli.sh

通过 ls命令可以查看 kafka 相关信息:

ls /kafka

prettyZoo




修改ip地址和主机名

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762316.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号