单机版安装教程
文章目录- Linux环境下(Centos7) 下安装Kafka/Zookeeper 详细教程
- 一、安装Zookeeper
- 1.1 安装步骤
- 1.2 常用命令如下:
- 二、安装Kafka
- 1.1 安装步骤
- 1.2 常用命令
- 1.3 开放端口
- 1.4 编写脚本启动
- 1.5 设置脚本开机自动执行
- 三、springboot整合kafka实现消息发送、消费
一、安装Zookeeper 1.1 安装步骤
创建文件夹
mkdir -p /usr/local/zookeeper
进入文件夹
cd /usr/local/zookeeper
在线下载镜像
wget --no-check-certificate https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
此处的zookeeper版本失效的话去官网下载最新版本即可
解压文件
tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz #修改文件名 mv apache-zookeeper-3.5.9-bin zookeeper-3.5.9-bin #进入文件目录 cd zookeeper-3.5.9-bin/
进入到解压的文件夹后,创建data文件夹,用于存储数据文件;创建logs文件夹,用于存储日志:
mkdir data mkdir logs
创建配置文件zoo.cfg
vim conf/zoo.cfg
tickTime = 2000 dataDir = /usr/local/zookeeper/zookeeper-3.5.9-bin/data dataLogDir = /usr/local/zookeeper/zookeeper-3.5.9-bin/logs tickTime = 2000 clientPort = 2181 initLimit = 5 syncLimit = 2
使用命令 vim conf/zoo.cfg 创建配置文件并打开,虽然文件夹下有了一个zoo_sample.cfg示例配置文件,我们还是新创建一个。至此以安装结束
1.2 常用命令如下:启动服务:
/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh start
查看服务状态:
/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh status
停止服务:
/usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh stop二、安装Kafka 1.1 安装步骤
下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.2/kafka_2.12-2.7.2.tgz --no-check-certificate
解压
tar -zxvf kafka_2.12-2.7.2.tgz
进入目录并修改server.properties:
cd kafka_2.12-2.7.2 vim config/server.properties
在broker.id= 0 后面增加如下配置:
advertised.listeners=PLAINTEXT://192.168.29.128:9092
1.2 常用命令这里的192.168.29.128 替换为自己实际服务器ip, 此处端口号默认为9092
启动:
/usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.7.2/config/server.properties
停止:
/usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-stop.sh1.3 开放端口
firewall-cmd --zone=public --add-port=2181/tcp --permanent #工具连接 firewall-cmd --zone=public --add-port=9092/tcp --permanent #默认端口 firewall-cmd --reload1.4 编写脚本启动
kafka启动之前必须先启动zookeeper
创建启动脚本
vim kafkastart.sh
#!/bin/sh #启动zookeeper /usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh start echo "zookeeper start success" sleep 5 #启动kafka /usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.7.2/config/server.properties echo "kafka start success"
创建停止脚本
vim kafkastop.sh
创建启动脚本 ```java vim kafkastart.sh
#!/bin/sh #停止zookeeper /usr/local/zookeeper/zookeeper-3.5.9-bin/bin/zkServer.sh stop echo "zookeeper stop success" sleep 5 #停止kafka /usr/local/kafka/kafka_2.12-2.7.2/bin/kafka-server-stop.sh echo "kafka stop success"1.5 设置脚本开机自动执行
vim /etc/rc.local #编辑,在最后添加一行 sh /usr/local/kafka/kafkastart.sh & #设置开机自动在后台运行脚本
至此,Linux下Kafka单机安装配置完成。
三、springboot整合kafka实现消息发送、消费- 添加依赖
org.springframework.kafka spring-kafka
- 配置文件
#====================== kafka =================== # 自定义 topics ,用于实现订阅-消费模式 spring.kafka.consumer.topic=earlyWarning # 指定kafka 代理地址,可以多个 逗号隔开 spring.kafka.bootstrap-servers=101.201.235.150:9092 # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 spring.kafka.listener.missing-topics-fatal=false # 指定listener容器中线程数,用于提高并发量 #spring.kafka.listener.concurrency=3 #kafka 的每次调用来自哪个应用 #spring.kafka.client-id=kafka001 #=============== provider 生产者 ======================= # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 spring.kafka.producer.batch-size=65536 ## 设置生产者内存缓冲区的大 spring.kafka.producer.buffer-memory=524288 #=============== consumer 消费者======================= #设置一个默认组 spring.kafka.consumer.group-id=group-001 #key-value序列化反序列化 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #//指定消息被消费之后自动提交偏移量(即消息的编号,表示消费到了哪个位置,消费者每消费完一条消息就会向kafka服务器汇报自己消消费到的那个消息的编号,以便于下次继续消费)。 spring.kafka.consumer.enable-auto-commit=true #从最近的地方开始消费 广播模式,一般情况下,无需消费历史的消息,从订阅的 Topic 的队列的尾部开始消费即可 spring.kafka.consumer.auto-offset-reset=latest
3.生产者
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessageSync(String channel, String message) throws InterruptedException, ExecutionException, TimeoutException {
SendResult stringStringSendResult = kafkaTemplate.send(channel, message).get(10, TimeUnit.SECONDS);
System.out.println(stringStringSendResult);
}
public void sendMessageAsync(String topic, String message) {
ListenableFuture> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onSuccess(SendResult result) {
System.out.println("success:"+result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failure:"+ex);
}
});
}
}
4.消费者
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"#{'${spring.kafka.consumer.topic}'}"},groupId = "${spring.kafka.consumer.group-id}")
public void consumerGroup1(ConsumerRecord, ?> record){
Optional message = Optional.ofNullable(record.value());
if(message.isPresent()){
Object msg = message.get();
System.out.println("consumerGroup1 消费了: Topic:" + record.topic() + ",Message:" + msg);
}
}
@KafkaListener(topics = {"earlyWarning11"},groupId = "001")
public void receiveMessageUser(String message){
//收到通道的消息之后执行秒杀操作
System.out.println("user:"+message);
}
}



