栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka入门,安装,案例(一)

kafka入门,安装,案例(一)

kafka安装

安装包链接自取

链接:https://pan.baidu.com/s/19qKoEH7uY_pxhyuRXOKOdQ 
提取码:l56e 

步骤1)上传、解压、更名、配置环境变量

 [root@qianfeng01 ~]# tar -zxvf kafka_2.11-1.1.1.tgz -C /usr/local/

 [root@qianfeng01 ~]# cd /usr/local/

 [root@qianfeng01 ~]# mv kafka_2.11-1.1.1/ kafka

 [root@qianfeng01 ~]# vim /etc/profile

 .....省略....

 #kafka environment

 export KAFKA_HOME=/usr/local/kafka

 export PATH=$PATH:$KAFKA_HOME/bin

 ​

 [root@qianfeng01 ~]# source /etc/profile

步骤2)修改server.properties

 [root@qianfeng01 kafka]# cd config

 [root@qianfeng01 config]# vim server.properties

 ​# 这个文档是修改,不是删除所有,可以退出插入模式 按下/,输入内容,回车查找

 # 每个节点的唯一标识符的配置

 broker.id=0

 # 设置消息的存储位置,如果有多个目录,可以用逗号隔开

 log.dirs=/usr/local/kafka/data

 # 设置zookeeper的集群地址,同时指定kafka在zookeeper上的各个节点的父znode

 zookeeper.connect=qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka

 ​

 ​

 # 下面属性可以改可不改

 # 发送消息的缓存大小,100K

 socket.send.buffer.bytes=102400

 # 接收消息的缓存大小,100K

 socket.receive.buffer.bytes=102400

 # 服务端处理发送过来的数据的最大字节数 100M

 socket.request.max.bytes=104857600

 # 消息对应的文件保留的时间,默认使7天

 log.retention.hours=168

 # 消息对应的文件的最大字节数,1G

 log.segment.bytes=1073741824

 # 用来检查消息对应的文件是否过期或者是大于1G的时间周期,默认是300秒一检查

 log.retention.check.interval.ms=300000

步骤3)同步到其他节点上

 [root@qianfeng01 local]# scp -r kafka/ qianfeng02:/usr/local/

 [root@qianfeng01 local]# scp -r kafka/ qianfeng03:/usr/local/

 ​

 [root@qianfeng01 local]# scp /etc/profile qianfeng02:/etc/

 [root@qianfeng01 local]# scp /etc/profile qianfeng03:/etc/

步骤4)修改其他节点上的brokerId

下面是kafka的config中的server.properities文件,只需要更改brokerid=1

 qianfeng02的broker.id   为 1

 qianfeng03的broker.id   为 2

启动kafka

步骤1)先启动zookeeper     

       三台机器都要启动zk

   代码是 zkServer.sh start

步骤2)启动三台机器的kafka

      正常启动, 注意,带上配置文件,进行后台启动

           kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

           关闭:

           kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties

查看Zookeeper维护的数据

首先进入zookeeper

zkCli.sh

[zk: localhost:2181(CONNECTED) 0] ls /        查看目录

[zk: localhost:2181(CONNECTED) 0] get  /kafka/cluster/id    涉及到补全就改get查看

[zk: localhost:2181(CONNECTED) 0] get  /kafka/cluster

下面是总结zookeeper中文件夹内的内容

--cluster/id  :  用于存储kafka集群的唯一标识

   {"version":"1","id":"mO77ods8Q-ek99Y7lTOwUg"}

--controller  :  用于记录多个broker中谁是控制角色

   {"version":1,"brokerid":0,"timestamp":"1644464244261"}

--controller_epoch  :  记录的是第几次选举controller角色

   3

--brokers/ids:  以子znode的形式记录所有的broker唯一标识符

              brokers/ids/0 : 记录着自己的信息

              {..."endpoints":["PLAINTEXT://qianfeng01:9092"],"host":"qianfeng01","port":9092}

              brokers/ids/1 : 记录着自己的信息

              {..."endpoints":["PLAINTEXT://qianfeng02:9092"],"host":"qianfeng02","port":9092}

              brokers/ids/2 : 记录着自己的信息

              {..."endpoints":["PLAINTEXT://qianfeng03:9092"],"host":"qianfeng03","port":9092}

--brokers/topics : 以子znode的形式记录kafka集群中的所有主题名

--consumers :  旧版本用来记录消费者消费消息的偏移量,以便下次继续消费,但是新版本不用该znode,

                                   而是以一个主题"__consumer_offsets"来记录各个消费者的偏移量

Kafka的基本操作
  1. 帮助信息

[root@qianfeng01 data]# kafka-topics.sh

Create, delete, describe, or change a topic.

Option                                   Description

------                                   -----------

--alter                                  修改一个主题的分区数量,副本,以及配置等

--config             修改主题的一个配置。

--create                                 创建一个新的主题

--delete                                 删除一个主题

--delete-config            移除一个配置

--describe                               列出指定的主题的详情信息

--disable-rack-aware                     禁用机架感知副本分配

--force                                  抑制控制台提示

--help                                   打印帮助信息

--if-exists                              如果在更改或删除主题时设置该操作,则该操作只会在主题存在时执行

--if-not-exists                          如果在创建主题时设置,则只在主题不存在时执行该操作

--list                                   列出所有的主题名称

--partitions                                            `必需属性`,创建或者修改时的分区数量

--replica-assignment <.....>             正在创建或更改的主题的手动分区到代理分配列表。

--replication-factor            `必需属性`,创建主题时的副本因子

--topic                   要创建,修改或者描述的主题名称

--topics-with-overrides                  如果在描述主题时设置,则只显示覆盖了配置的主题

--unavailable-partitions                 如果在描述主题时设置,只显示leader不可用的分区

--under-replicated-partitions            如果在描述主题时设置,则只显示除了leader的副本

--zookeeper               `必需属性`,用于zookeeper连接的连接字符串,格式为host:port。可以指定多个主机以允许故障转移。                                   

2)创建主题

进入kafka的data目录(就是一执行kafka会自己创建data目录)执行

[root@qianfeng01 data]# kafka-topics.sh

--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka

--create

--topic food   #主题

--partitions 4   #分区

--replication-factor 2  #副本

案例2:

[root@qianfeng01 data]# kafka-topics.sh

--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka

--create

--topic pet

--partitions 2

--replication-factor 3

小贴士:

 1)  副本因子不能大于broker的数量, 否则报以下错误:

 ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException Replication factor: 4 larger than available brokers: 3

 ​

 2) zookeeper的路径,必需写到server.properties里指定的kafka的根znode.

 3) 创建时,必需指定分区数量,副本因子,zookeeper路径

3.1.3 列出所有的主题
 [root@qianfeng01 data]# kafka-topics.sh 

 --zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka 

 --list

原理:

 列出所有主题名称的逻辑:其实就是访问zookeeper的/kafka/brokers/topics/ 子节点的名字

3.1.4 查看指定主题
 [root@qianfeng01 data]# kafka-topics.sh 

 --zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka 

 --describe 

 --topic food 

获取的描述信息如下:

 Topic:food   PartitionCount:4       ReplicationFactor:2     

 Configs:

        Topic: food     Partition: 0   Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: food     Partition: 1   Leader: 1       Replicas: 1,2   Isr: 1,2

        Topic: food     Partition: 2   Leader: 2       Replicas: 2,0   Isr: 2,0

        Topic: food     Partition: 3   Leader: 0       Replicas: 0,2   Isr: 0,2

 ​

 可以获取四个信息:

 Topic: 主题名

 PartitionCount: 该主题的分区数量

 ReplicationFactor: 该主题的每个分区的副本因子

 Configs: 列出的是每个分区的详情信息

          

 摘抄一条解析分区的详情信息: 

 Topic: food     Partition: 0   Leader: 0                       Replicas: 0,1           Isr: 0,1

      ^             ^                 ^                               ^                   ^

      |             |                 |                               |                   |

  主题名称       分区号码       该分区所有副本的leader的所在broker   副本所在的broker   当前可用的副本的broker位置

3.1.5 修改主题
 [root@qianfeng01 ~]# kafka-topics.sh 

 --zookeeper qianfeng01,qianfeng02,qianfeng03/kafka 

 --alter 

 --topic food 

 --partitions 5 

小贴士

 1. 修改主题的分区时,数量只能增加,不能减少

 2. 副本因子不能被修改

3.1.6 删除主题
 [root@qianfeng01 ~]# kafka-topics.sh 

 --zookeeper qianfeng01,qianfeng02,qianfeng03/kafka 

 --delete 

 --topic pet

删除的原理:

 1. 将zookeeper里对应的znode删除

 2. 将kafka的存储目下的该主题的分区目录打标记,过一会,再删除.

3.2 生产和消费消息

3.2.1 生产者生产消息

1)帮助信息

使用 kafka-console-producer.sh脚本,回车,即可显示帮助信息。

 [root@qianfeng01 data]# kafka-console-producer.sh

 Read data from standard input and publish it to Kafka.

 Option                                   Description

 ------                                   -----------

 --batch-size              如果没有同步发送消息,则在单个批处理中发送的消息数。(默认:200)

 --broker-list       `REQUIRED`: 形式为HOST1:PORT1,HOST2:PORT2的代理列表字符串。

 --compression-codec [compression-codec]    'none','gzip', 'snappy', or 'lz4'. 默认值为'gzip'

 --key-serializer                用于序列化key的消息编码器实现的类名。(默认:kafka.serializer.DefaultEncoder)

 --line-reader        用于从标准中读取行的类的类名。默认情况下,每行读取为单独的消息。

                                          (default: kafka.tools. ConsoleProducer$LineMessageReader)

 --max-block-ms                      生产者在发送请求期间阻塞的最大时间 (default: 60000)

 --max-memory-bytes                生产者用来缓冲等待发送到服务器的记录的总内存。(default: 33554432)

 --max-partition-memory-bytes      分配给分区的缓冲区大小。当接收到小于这个大小的记录时,

  生产者将尝试乐观地将它们组合在一起,直到达到这个大小。

                                            (default: 16384)

 --message-send-max-retries        重试次数(default: 3)

 --producer-property              将用户定义的属性以key=value形式传递给生产者的机制。

 --producer.config    引用生产者的配置文件

 --property                  自定义属性

 --queue-enqueuetimeout-ms        如果设置并且生产者以异步模式运行,这将给出等待足够批处理

  大小的消息队列的最大数量。(default: 10000)

 --request-required-acks              在每次重试之前,生产者会刷新相关主题的元数据。由于leader选举需要一些时间,

  这个属性指定生产者在刷新元数据之前等待的时间。 (default: 100)

 --socket-buffer-size      tcp协议的缓存大小。(default: 102400)

 --sync                                   如果设置消息,发送到代理的请求是同步的,每次一个。

 --timeout          如果设置并且生产者在异步模式下运行,这将给出消息队列等待足够大的批处理的最大时间。

  该值的单位是ms。(default: 1000)

 --topic                   `REQUIRED`: 生产的消息去往的主题名称.

 --value-serializer  
 

2)生产数据到food主题的分区里

该脚本的作用是读取控制台的数据,发送到kafka集群中

执行完就可以自己随便在下一行打字

 [root@qianfeng01 data]# kafka-console-producer.sh 

 --broker-list qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food

3.2.2 消费者消费消息

1)帮助信息

使用 kafka-console-consumer.sh脚本,回车,即可显示帮助信息。

 [root@qianfeng01 data]# kafka-console-consumer.sh

 The console consumer is a tool that reads data from Kafka and outputs it to standard output.

 Option                                   Description

 ------                                   -----------

 --blacklist          将主题排除在消费之外的黑名单。

 --bootstrap-server    `REQUIRED` (unless old consumer isused): 要连接的kafka服务器.

 --consumer-property              将用户定义的属性以key=value形式传递给消费者的机制。

 --consumer.config  引用消费者的配置文件

 --csv-reporter-enabled                   如果设置,将启用CSV指标报表器

 --delete-consumer-offsets               如果指定了,则启动时删除zookeeper中的consumer路径

 --enable-systest-events                 除了记录所使用的消息之外,还要记录使用者的生命周期事件。(这是特定于系统测试的。)

 --formatter              用于格式化显示kafka消息的类名。

                                          (default: kafka.tools.DefaultMessageFormatter)

 --from-beginning                         从日志中出现的最早的消息开始消费。而不是消费最新的消息。

                                          普通情况下:消费者消费的数据是最新的数据 也就是消费者启动后生产者新生产的数据

 --group                          消费者所在的消费者组ID

 --isolation-level                设置为read_committed是为了过滤掉未提交的事务性消息。

  设置为read_uncommitted以读取所有消息. (default: read_uncommitted)

 --key-deserializer              key的反序列化要实现的类型

 --max-messages    退出前要使用的最大消息数。如果没有设定,消费是持续的。

 --metrics-dir            如果设置了csv-reporter-enable,并且该参数为isset,则csv指标将在这里输出

 --new-consumer                           使用新的使用者实现。这是默认设置,因此该选项已被弃用,并将在未来的版本中删除。

 ​

 --offset          要消费的偏移id(一个非负数),或者'earliest'表示从开始,或者'latest'表示从最新

                                            (default: latest)

 --partition          指定要消费的具体分区号。除非指定了'——offset',否则消费将从该分区的末尾开始。

 --property                  自定义属性

 --skip-message-on-error                   如果在处理消息时出现错误,跳过它而不是停止。

 --timeout-ms        如果指定了,如果在指定的时间间隔内没有可用的消息,则退出。

 --topic                    要消费的主题

 --value-deserializer              key的反序列化要实现的类型

 --whitelist            要包含用于消费的主题白名单。

 --zookeeper                `REQUIRED` (针对于老版本的消费者是必须的): 

  用于zookeeper连接的连接字符串,格式为host:port。可以提供多个url来允许故障转移。
    消费者消费food主题的数据

每次执行案例都是先执行一个生产者的代码,新开一个窗口执行消费者的代码

默认情况,消费最新消息 (最新消息指的是消费者启动后生产者新生产的消息)

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

从头消费信息,一个主题的所有分区的消息

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --from-beginning

指定分区,指定偏移量消费消息--------> 指定非负数形式

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --partition 0 

 --offset 0     

 ​

 注意: 0 表示从该分区的第一个消息开始消费

指定分区,指定偏移量消费消息--------> 指定从头消费

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --partition 0 

 --offset earliest

指定分区,指定偏移量消费消息--------> 从最新的消息开始消费

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --partition 0 

 --offset latest

经过测试: 消息默认使用轮询的方式进入到不同的分区里

3.2.3 消费者组的引入

1) 说明

 1. kafka这个消息队列系统,消息是可以共享的

 2. 消息的主题可以分多个分区,想要并行计算。

 3. 方便设置消费者的属性。对多个消费者进行设置,归为一组,只需要最该组进行设置即可。

 ​

 基于上述的设计需求,提供了消费者组的概念:

 ​

 一个消费者组里可以有多个消费者,如果想要并行计算分区的数据,则可以让每一个消费者去尽量均分主题里的分区

 不同的消费者组,可以共享数据,也就是可以重复处理同一个分区的数据

 ​

 小贴士:

    如果一个消费者不指定消费者组,则默认为一个消费者组。   简而言之,Kafka消费数据时,是基于消费者组进行消费的。

2)参数

在consumer.properteis配置文件中,有一个属性,group.id用来指定消费者所属的组id。但是如果想要使用该配置文件,必须在命令行指定。格式如下:

 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --consumer.config /usr/local/kafka/config/consumer.properties

3)使用命令行上的参数临时指定:--group
 [root@qianfeng01 data]# kafka-console-consumer.sh 

 --bootstrap-server qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 

 --topic food 

 --group g2

4)消费者组的总结
 #1. kafka是基于消费者组来消费数据的,组的成员可以并行处理消息,加快处理速度

 ​

 #2. 不同的消费者组可以消费同一个主题

 ​

 #3. 消费者组的消费者数量与主题的分区数量的对比

      - 消费者数量=分区数量时,该状态是最优的,一个消费者处理一个分区的数据

      - 消费者数量>分区数量时, 一个消费者处理一个分区的数据,剩下的消费者闲置。

      - 消费者数量<分区数量时, 消费者处理的分区数个数尽量均分。 比如有5个分区,三个消费者,出现的情况是2,2,1. 不可能出现3,1,1情况

      

#

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735298.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号