- 初始kafka
- 基本概念
- 安装和配置
- 基本使用
kafka起初是有linkedIn公司采用scala语言开发的一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
三大主要角色:
- 消息系统:具有解耦,冗余存贮,削峰,缓冲,异步通信,扩展性,可恢复性等功能。
- 存贮系统:将消息持久化到磁盘上,并且采用多副本机制,降低消息的丢失风险,也可以通过设置将已经消费的消息永久保存。
- 流失处理平台:kafka为每个流行的流式处理框架提供可靠的数据来源和完整的流式处理库。
kafka体系架构:若干个producer,若干broker,若干consumer,以及一个zookeeper集群。
- producer:他是一个生产者,向Broker发送消息的客户端。
- broker:他是服务代理节点,生产者将数据发送到broker,broker收集消息并存贮到磁盘中,一个broker就是一个节点,若干个broker组成一个kafka集群。
- topic:他是kafka的主题,一个broker是由多个topic组成,topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic。
- partition:他是分区,一个topic可以分为多个partition,每个partition内部消息是有序的。
- consumerGroup:他是kafka的消费组,每个Consumer属于一个特定的消费组,一条消息可以被多个不同的消费组消费,但是一个消费组中只能有一个Consumer能够消费该消息,消息的点对点和广播就是通过此实现的。
- consumer:他是一个消费者,负责中broker中订阅并消费消息,从Broker读取消息的客户端。
- zookeeper:他是用来存贮kafka节点元数据以及控制器的选举。
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK,同时因为kafka依赖zookeeper,所以需要还需安装zookeeper。
- 下载kafka版本,并解压(因为国外网站下载较慢,我是下载好直接上传服务器解压)。
wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz tar -xzf kafka_2.11-2.4.1.tgz cd kafka_2.11-2.4.1
- 修改配置文件config/server.properties:(集群的话拷贝此文件该端口,再起一个kafka即可)
#broker.id属性在kafka集群中必须要是唯一 broker.id=0 #kafka部署的机器ip和提供服务的端口号 listeners=PLAINTEXT://ip:9092 #kafka的消息存储文件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=ip:2181
- 启动服务
# 启动kafka,运行日志在logs目录的server.log文件里 bin/kafka-server-start.sh -daemon config/server.properties #-daemon后台启动,不会打印日志到控制台 或者用 bin/kafka-server-start.sh config/server.properties & jps #查看是否启动 # 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树 bin/zkCli.sh ls / #查看zk的根目录kafka相关节点 ls /brokers/ids #查看kafka节点 # 停止kafka bin/kafka-server-stop.sh基本使用
- 创建主题:
bin/kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 1 --topic test001 Created topic test001.#成功标志
–replication-factor:制定副本因子
–create :创建主题的动作指令
–topic:所创建主题的名称
–zookeeper:所要连接的zookeeper服务地址。
- 查看主题
bin/kafka-topics.sh --list --zookeeper ip:2181
- 删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper ip:2181
- 发送消息
bin/kafka-console-producer.sh --broker-list ip:9092 --topic test
- 消费消息
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
- 消费多主题
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --whitelist "test|test-2"
- 单播消费
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test
- 多播消费
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup --topic test bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --consumer-property group.id=testGroup-2 --topic test



