栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka初学(自己觉得好难)

kafka初学(自己觉得好难)

kafka初学 一、介绍

Kafka是是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:

    比如基于hadoop的批处理系统低延迟的实时系统Storm/Spark流式处理引擎web/nginx日志访问日志消息服务等等

Kafka用scala语言编写 1. 应用场景

日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。消息系统:解耦和生产者和消费者、缓存消息等。用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 2. Kafka基本概念

    kafka是一个分布式的,分区的消息服务kafka提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确并没有完全遵循JMS规范。
2.1 基础的消息(Message)相关术语
名称解释
Broker消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
TopicKafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer消息生产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer来进行消费:

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。 二、kafka基本使用 1. 安装前的环境准备

安装jdk安装zk官网下载kafka的压缩包:http://kafka.apache.org/downloads解压缩至如下路径

/usr/local/kafka/

修改配置文件:/usr/local/kafka/kafka2.11-2.4/config/server.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092   
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
2.启动kafka服务器

进入到bin目录下。使用命令来启动

./kafka-server-start.sh -daemon ../config/server.properties

验证是否启动成功:

进入到zk中的节点看id是0的broker有没有存在(上线)

ls /brokers/ids/

server.properties核心配置详解:

PropertyDefaultDescription
broker.id0每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listenersPLAINTEXT://192.168.65.60:9092server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3
log.retention.hours168每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
num.partitions1创建topic的默认分区数
default.replication.factor1自动创建topic的默认副本数量,建议设置为大于等于2
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enablefalse是否允许删除主题
3.创建主题topic

topic是什么概念?topic可以实现消息的分类,不同消费者订阅不同的topic。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6028PZRb-1646227957372)(img/截屏2021-07-08 下午2.41.33.png)]

执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1:

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test

查看当前kafka内有哪些topic

 ./kafka-topics.sh --list --zookeeper 172.16.253.35:2181 
4.发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

./kafka-console-producer.sh --broker-list 172.16.253.21:9092 --topic test
5.消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息

方式一:从最后一条消息的偏移量+1开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.21:9092 --topic test

方式二:从头开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.21:9092 --from-beginning --topic test

几个注意点:

消息会被存储消息是顺序存储消息是有偏移量的消费时可以指明偏移量进行消费 三、Kafka中的关键细节 1. 消息的顺序存储

消息的生产者会把消息发送到broker中,broker会存储消息,消息是按照顺序进行存储的。消息消费者在消费消息的时候也是按照顺序消费的,消费消息时可以从默认位置(最后一条消息的下一个偏移量)开始消费,也可以指定某个位置开始消费
2. 单播消息的实现

单播消息:一个消费组里,最多只有一个消费者能消费到某一个topic中的消息,除非次消费者挂掉,否则下一个消费者无法消费到此topic中的消息

./kafka-console-consumer.sh --bootstrap-server 172.16.253.21:9092  --consumer-property group.id=testGroup --topic test
3. 多播消息的实现

不同消费者组中的某一个消费者可以同时收到消息在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。

./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092  --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092  --consumer-property group.id=testGroup2 --topic test

4. 查看消息组及信息

Currennt-offset: 当前消费组的已消费偏移量Log-end-offset: 主题对应分区消息的结束偏移量(HW)Lag: 当前消费组未消费的消息数

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 172.16.253.21:9092 --list

# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
 ./kafka-consumer-groups.sh --bootstrap-server 172.16.253.21:9092 --describe --group testGroup

四、主题、分区的概念 1. 主题Topic

主题Topic可以理解成是一个类别的名称 2. 分区partition

一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了3个分区。那么topic中的消息就会分别存放在这三个分区中。

    为一个主题创建多个分区
./kafka-topics.sh --create --zookeeper 172.16.253.81:2181 --partitions 2 --replication-factor 1 --topic test1
    可以通过这样的命令查看topic的分区信息
./kafka-topics.sh --describe --zookeeper 172.16.253.81:2181 --topic test1
    分区的作用:

可以分布式存储可以并行写

实际上是存在data/kafka-logs/test-0 和 test-1中的0000000.log文件中
小细节:

定期将⾃⼰消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic⾥的消息,最后就保留最新的那条数据

因为__consumer_offsets可能会接收⾼并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的⽅式抗⼤并发。

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区

公式:hash(consumerGroupId) % __consumer_offsets主题的分区数

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/750986.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号