栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Linux环境下(Centos7) 下安装Kafka/Zookeeper 详细教程

Linux环境下(Centos7) 下安装Kafka/Zookeeper 详细教程

Linux环境下(Centos7) 下安装Kafka/Zookeeper 详细教程

单机版安装教程

文章目录
  • Linux环境下(Centos7) 下安装Kafka/Zookeeper 详细教程
  • 一、安装Zookeeper
    • 1.1 安装步骤
    • 1.2 常用命令如下:
  • 二、安装Kafka
    • 1.1 安装步骤
    • 1.2 常用命令
    • 1.3 开放端口
    • 1.4 编写脚本启动
    • 1.5 设置脚本开机自动执行
  • 三、springboot整合kafka实现消息发送、消费


一、安装Zookeeper 1.1 安装步骤

创建文件夹

mkdir -p /usr/local/zookeeper

进入文件夹

cd /usr/local/zookeeper

在线下载镜像

wget --no-check-certificate  https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

此处的zookeeper版本失效的话去官网下载最新版本即可
解压文件

tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz
#修改文件名 
mv apache-zookeeper-3.5.9-bin zookeeper-3.5.9-bin
#进入文件目录
cd zookeeper-3.5.9-bin/

进入到解压的文件夹后,创建data文件夹,用于存储数据文件;创建logs文件夹,用于存储日志:

mkdir data
mkdir logs

创建配置文件zoo.cfg

vim conf/zoo.cfg
tickTime = 2000
dataDir = /usr/local/zookeeper/zookeeper-3.5.9-bin/data
dataLogDir = /usr/local/zookeeper/zookeeper-3.5.9-bin/logs
tickTime = 2000
clientPort = 2181
initLimit = 5
syncLimit = 2

使用命令 vim conf/zoo.cfg 创建配置文件并打开,虽然文件夹下有了一个zoo_sample.cfg示例配置文件,我们还是新创建一个。至此以安装结束

1.2 常用命令如下:

启动服务:

/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh start

查看服务状态:

/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh status

停止服务:

/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh stop
二、安装Kafka 1.1 安装步骤

下载

 wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.2/kafka_2.12-2.7.2.tgz --no-check-certificate

解压

 tar -zxvf kafka_2.12-2.7.2.tgz 

进入目录并修改server.properties:

cd kafka_2.12-2.7.2
vim config/server.properties

在broker.id= 0 后面增加如下配置:

advertised.listeners=PLAINTEXT://192.168.29.128:9092

这里的192.168.29.128 替换为自己实际服务器ip, 此处端口号默认为9092

1.2 常用命令

启动:

/usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.7.2/config/server.properties

停止:

/usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-stop.sh
1.3 开放端口
firewall-cmd --zone=public --add-port=2181/tcp --permanent  #工具连接
firewall-cmd --zone=public --add-port=9092/tcp --permanent  #默认端口 
firewall-cmd --reload

1.4 编写脚本启动

kafka启动之前必须先启动zookeeper

创建启动脚本

 vim kafkastart.sh
  #!/bin/sh
  #启动zookeeper
  /usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh start
  echo "zookeeper start success"
  sleep 5
  #启动kafka
  /usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.7.2/config/server.properties
  echo "kafka start success"

创建停止脚本

 vim kafkastop.sh
 创建启动脚本

```java
 vim kafkastart.sh
  #!/bin/sh
  #停止zookeeper
  /usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh stop
  echo "zookeeper stop success"
  sleep 5
  #停止kafka
  /usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-stop.sh
  echo "kafka stop success"
1.5 设置脚本开机自动执行
vim /etc/rc.local   #编辑,在最后添加一行
sh /usr/local/kafka/kafkastart.sh & #设置开机自动在后台运行脚本

至此,Linux下Kafka单机安装配置完成。

三、springboot整合kafka实现消息发送、消费
  1. 添加依赖


    org.springframework.kafka
    spring-kafka

  1. 配置文件
#====================== kafka ===================
# 自定义 topics  ,用于实现订阅-消费模式
spring.kafka.consumer.topic=earlyWarning
# 指定kafka 代理地址,可以多个 逗号隔开
spring.kafka.bootstrap-servers=101.201.235.150:9092
# 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
spring.kafka.listener.missing-topics-fatal=false
# 指定listener容器中线程数,用于提高并发量
#spring.kafka.listener.concurrency=3
#kafka 的每次调用来自哪个应用
#spring.kafka.client-id=kafka001

#=============== provider 生产者 =======================

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
spring.kafka.producer.batch-size=65536
## 设置生产者内存缓冲区的大
spring.kafka.producer.buffer-memory=524288

#=============== consumer  消费者=======================
#设置一个默认组
spring.kafka.consumer.group-id=group-001
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#//指定消息被消费之后自动提交偏移量(即消息的编号,表示消费到了哪个位置,消费者每消费完一条消息就会向kafka服务器汇报自己消消费到的那个消息的编号,以便于下次继续消费)。
spring.kafka.consumer.enable-auto-commit=true
#从最近的地方开始消费  广播模式,一般情况下,无需消费历史的消息,从订阅的 Topic 的队列的尾部开始消费即可
spring.kafka.consumer.auto-offset-reset=latest

3.生产者

@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    
    public void sendMessageSync(String channel, String message) throws InterruptedException, ExecutionException, TimeoutException {
        SendResult stringStringSendResult = kafkaTemplate.send(channel, message).get(10, TimeUnit.SECONDS);
        System.out.println(stringStringSendResult);
    }

     
    public void sendMessageAsync(String topic, String message) {
        ListenableFuture> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onSuccess(SendResult result) {
                System.out.println("success:"+result);
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("failure:"+ex);
            }
        });
    }
}

4.消费者

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = {"#{'${spring.kafka.consumer.topic}'}"},groupId = "${spring.kafka.consumer.group-id}")
   public void consumerGroup1(ConsumerRecord record){
        Optional message = Optional.ofNullable(record.value());
        if(message.isPresent()){
            Object msg = message.get();
            System.out.println("consumerGroup1 消费了: Topic:" + record.topic() + ",Message:" + msg);
        }
    }

    @KafkaListener(topics = {"earlyWarning11"},groupId = "001")
    public void receiveMessageUser(String message){
        //收到通道的消息之后执行秒杀操作
		System.out.println("user:"+message);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673865.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号