栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka集群安装

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka集群安装

kafka集群安装
  • 前提
  • 一、安装包获取
  • 二、安装kafka
  • 三、Kafka命令行操作

本人是一名在校大三学生,所选专业大数据技术,为了毕业能有一份可观的工作,
目前正在致力于努力学习中,在学习中的一些笔记和经验,希望可以通过写CSDN
博文记录并和同样正在努力中的你分享
前提
在安装好zookeeper集群的基础上进行安装kafka集群!!!
一、安装包获取
  • 在Windows中在官网下载,然后上传到虚拟机
    链接:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
  • 在虚拟机中通过wget命令下载
    • 首先安装wget:yum -y install wget
    • 下载kafka:进入你放置安装包的目录下执行下面命令
      wget http://archive.apache.org/dist/kafka/2.3.1/kafka_2.11-2.3.1.tgz
  • 百度网盘下载
    下载地址:1111
二、安装kafka
  1. 解压安装包

    tar -zvxf kafka_2.11-2.3.1.tgz -C /opt/module/
    
  2. 解压后修改文件名字

    mv kafka_2.11-2.3.1/ kafka
    
  3. 配置环境变量
    vi /etc/profile
    添加以下配置:

    #kafka
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    

    使配置文件生效
    source /etc/profile

  4. 在/opt/module/kafka目录下创建logs目录**

    mkdir logs
    
  5. 修改/opt/module/kafka/conf目录下的配置文件
    vi server.properties
    将里面的内容修改为一下内容:

    #broker的全局唯一编号,不能重复
    broker.id=0
    #删除topic功能使能
    delete.topic.enable=true
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘IO的现成数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志存放的路径
    log.dirs=/opt/module/kafka/logs
    #topic在当前broker上的分区个数
    num.partitions=1
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #配置连接Zookeeper集群地址
    zookeeper.connect=ethan001:2181,ethan002:2181,ethan003:2181
    

    注意,记得修改最后的主机名为你的集群中的三台主机名

  6. 分发安装包到集群的另外两台节点

    	scp -r kafka root@ethan001:/opt/module/
    	scp -r kafka root@ethan003:/opt/module/
    

注意:
- 分发之后配置好另外两台的环境变量
- 修改/opt/module/kafka/conf目录下的server.properties文件中的broker.id=1 和broker.id=2

7. 启动kafka集群
启动zookeeper集群之后再启动kafka集群

[root@ethan001 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
[root@ethan002 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
[root@ethan003 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
  1. 关闭kafka集群
    [root@ethan001 kafka]# bin/kafka-server-stop.sh stop
    [root@ethan002 kafka]# bin/kafka-server-stop.sh stop
    [root@ethan003 kafka]# bin/kafka-server-stop.sh stop
    
三、Kafka命令行操作

启动kafka集群,然后可以进行kafka的相关命令操作

  1. 查看当前服务器中的所有topic

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --list
    
    
  2. 创建topic
    名字:abc

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --create --replication-factor 3 --partitions 1 --topic abc
    
  3. 启动生产者进程并发送消息

    [root@ethan002 kafka]# bin/kafka-console-producer.sh --broker-list ethan002:9092 --topic abc
    >hello word
    >I love guiZhou
    >hello
    
  4. 启动消费者进程并消费消息
    在另外开一个窗口,可以获取生产的数据

    [root@ethan002 kafka]# bin/kafka-console-consumer.sh --bootstrap-server ethan002:9092 --topic abc --from-beginning
    hello word
    I love guiZhou
    hell
    
  5. 查看某个Topic的详情

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --describe --topic abc
    

    结果:

    Topic:abc       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: abc      Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,2
    
    
  6. 修改分区数

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --alter --topic abc --partitions 6
    

    注意:
    选项说明:
    –topic 定义topic名
    –replication-factor 定义副本数
    –partitions 定义分区数

  7. 删除topic

    	[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --delete --topic abc
    

    注意:
    需要server.properties中设置delete.topic.enable=true否则只是标记删除

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/603360.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号