栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka简介与安装

kafka简介与安装

零、坐标火星,leader让研究一下kafka+websocket做一套即时通讯工具出来,需求紧急,调研了一番。 一、Kafka简介

1、消息队列(Message Queue)

Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。

JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

2、MQ消息模型

 3、MQ消息队列分类

     3.1 点对点

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

     3.2 发布/订阅

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

4、MQ消息队列对比

RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Loadbalance)或者数据持久化都有很好的支持。

ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级/复杂的队列,但是技术也复杂,并且只提供非持久性的队列。

ActiveMQ:Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列。

Redis:是一个key-Value的NOSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受。

5、Kafka简介

Kafka是分布式发布-订阅消息系统,它最初由 linkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者也能够做到随意重启和机器的上下线。
 

 二、术语介绍

1、Kafka基本属性

1、消息生产者:即:Producer,是消息的产生的源头,负责生成消息并发送到Kafka服务器上

2、消息消费者:即:Consumer,是消息的使用方,负责消费Kafka服务器上的消息。

3、主题:即:Topic,由用户定义并配置在Kafka服务器,用于建立生产者和消息者之间的订阅关系:生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

4、消息分区:即:Partition,一个Topic下面会分为很多分区,例如:“kafka-test”这个Topic下可以分为6个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供3个分区,假如服务器ID分别为0、1,则所有的分区为0-0、0-1、0-2和1-0、1-1、1-2。Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。

5、Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为 broker。

6、消费者分组:Group,用于归组同类消费者,在Kafka中,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

7、Offset:消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的Offset。

2、Kafka中的服务器

Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为 broker。

Message在Broker中通Log追加的方式进行持久化存储。并进行分区(patitions)。

为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。

Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。Message消息是有多份的。

Broker不保存订阅者的状态,由订阅者自己保存。

无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。

消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。

3、Kafka中的Message组成

Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。

Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。

partition中的每条Message包含了以下三个属性:

offset      即:消息唯一标识:对应类型:long

MessageSize 对应类型:int32

data        是message的具体内容。

4、Kafka的Partitions分区

Kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存。

可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。

越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。

5、Kafka的Consumers

在 kafka中,我们可以认为一个group是一个“订阅者”,一个Topic中的每个partions,只会被一个“订阅者”中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息(消费者数据小于Partions的数量时)。注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
一个partition中的消息只会被group中的一个consumer消息。每个group中consumer消息消费互相独立。

6、Kafka的持久化

一个Topic可以认为是一类消息,每个topic将被分成多partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中。
Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。

Partition:Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。

7、Kafka的通讯协议

Kafka的Producer、Broker和Consumer之间采用的是一套自行设计基于TCP层的协议,根据业务需求定制,而非实现一套类似ProtocolBuffer的通用协议。

基本数据类型:(Kafka是基于Scala语言实现的,类型也是Scala中的数据类型)

定长数据类型:int8,int16,int32和int64,对应到Java中就是byte, short, int和long。

变长数据类型:bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(表示内容的长度)和N个字节的内容。其中,N为-1表示内容为null。bytes的长度由int32表示,string的长度由int16表示。

数组:数组由两部分组成,分别是一个由int32类型的数字表示的数组长度NN个元素。

8、Kafka数据传输事务

1、at most once:最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发。

at most once:消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"atmost once"。

2、at least once:消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。

at least once:消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"atleast once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。

3、exactly once:消息只会发送一次。

exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的。

注:通常情况下"at-least-once"是我们首选。(相比at most once而言,重复接收数据总比丢失数据要好)。


三、Kafka的安装与部署 3.1 下载安装

官网:http://kafka.apache.org/downloads.htmlhttp://kafka.apache.org/downloads.html

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.12-3.0.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.12-3.0.0.tgz
[root@along ~]# cd /data/kafka_2.12-3.0.0/
3.2 配置启动zookeeper

kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。

(1)zookeeper需要java环境

(2)这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。

如果需要下载指定zookeeper版本;可以单独去zookeeper官网http://mirrors.shu.edu.cn/apache/zookeeper/下载指定版本。

[root@along ~]# cd /data/kafka_2.12-3.0.0/
[root@along kafka_2.12-3.0.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper   #数据存储目录
clientPort=2181   #zookeeper端口
maxClientCnxns=0

注:可自行添加修改zookeeper配置

3.3 配置kafka

(1)修改配置文件

[root@along kafka_2.12-3.0.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=localhost
advertised.host.name=localhost
advertised.port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

注:可根据自己需求修改配置文件

  •  broker.id:唯一标识ID
  •  listeners=PLAINTEXT://localhost:9092:kafka服务监听地址和端口
  •  log.dirs:日志存储目录
  •  zookeeper.connect:指定zookeeper服务
  • zookeeper.connection.timeout.ms=6000 zk链接超时时间
  • num.network.threads=3:网络线程数
  • num.io.threads=8:磁盘操作IO线程数
  • socket.send.buffer.bytes=102400 发送数据缓存大小
  • socket.receive.buffer.bytes=102400 接收数据缓存大小

(2)配置环境变量

[root@along ~]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.12-3.0.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@along ~]# source /etc/profile.d/kafka.sh

(3)配置Kafka启停脚本

创建启动脚本  cd /usr/local/kafka    vi kafkastart.sh #编辑,添加以下代码

#!/bin/sh
#启动zookeeper
nohup /data/kafka_2.12-3.0.0/bin/zookeeper-server-start.sh /data/kafka_2.12-3.0.0/config/zookeeper.properties &
sleep 3 #等3秒后执行
#启动kafka
nohup /data/kafka_2.12-3.0.0/bin/kafka-server-start.sh /data/kafka_2.12-3.0.0/config/server.properties &

创建关闭脚本 vi kafkastop.sh #编辑,添加以下代码

#!/bin/sh
#关闭kafka
nohup /data/kafka_2.12-3.0.0/bin/kafka-server-stop.sh /data/kafka_2.12-3.0.0/config/server.properties &
sleep 3 #等3秒后执行
#关闭zookeeper
nohup /data/kafka_2.12-3.0.0/bin/zookeeper-server-stop.sh /data/kafka_2.12-3.0.0/config/zookeeper.properties &

给脚本授权

chmod +x kafkastart.sh
chmod +x kafkastop.sh
3.4 启动kafka服务

①设置脚本开机自动执行

vi /etc/rc.d/rc.local 添加一行

sh /usr/local/kafka/kafkastart.sh &

②手动启动 sh /usr/local/kafka/kafkastart.sh  #启动kafka

sh /usr/local/kafka/kafkastop.sh #关闭kafka

ss -nutl     #查看端口启用情况 当看到 2181zookeeper端口; 9092kafka端口

service kafka status 查看kafka运行状态 Running 82302 代表kafka运行正常

3.5启动日志

路径在 /tmp/kafka-logs/  server.log 可以看到日志

四、kafka使用简单入门 4.1 创建主题topics

创建一个名为“along”的主题,它只包含一个分区,只有一个副本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

使用list topic命令,查看新创建的主题

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181
along
4.2 发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another 
4.3 启动消费者

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

五、设置多代理kafka群集

到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

5.1 准备配置文件
[root@along kafka_2.12-3.0.0]# cd /data/kafka_2.12-3.0.0/
[root@along kafka_2.12-3.0.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.12-3.0.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.12-3.0.0]# vim config/server-1.properties
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

注:该broker.id 属性是群集中每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。

5.2 开启集群另2个kafka服务
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.12-3.0.0/config/server-1.properties &
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.12-3.0.0/config/server-2.properties &
[root@along ~]# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                           
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                  
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*                                 
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::* 
5.3 在集群中进行操作

(1)现在创建一个复制因子为3的新主题my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

(2)在一个集群中,运行“describe topics”命令查看哪个broker正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/434019.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号