首先创建目录,执行命令
mkdir -p /mnt/kafka
创建服务编排配置文件:
vim /mnt/kafka/docker-compose.yml
文件内容为:(192.168.200.188需要改为实际的IP)
version: '3.6'
services:
kafka1:
image: wurstmeister/kafka:2.12-2.4.1
restart: always
hostname: kafka1
container_name: kafka1
privileged: true
ports:
- 9093:9093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.188:9093
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.188:3183,192.168.200.188:3184,192.168.200.188:3185/kafka
networks:
seckill_network:
ipv4_address: 172.36.0.93
kafka2:
image: wurstmeister/kafka:2.12-2.4.1
restart: always
hostname: kafka2
container_name: kafka2
privileged: true
ports:
- 9094:9094
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.188:9094
KAFKA_ADVERTISED_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.188:3183,192.168.200.188:3184,192.168.200.188:3185/kafka
networks:
seckill_network:
ipv4_address: 172.36.0.94
kafka3:
image: wurstmeister/kafka:2.12-2.4.1
restart: always
hostname: kafka3
container_name: kafka3
privileged: true
ports:
- 9095:9095
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.188:9095
KAFKA_ADVERTISED_PORT: 9095
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.188:3183,192.168.200.188:3184,192.168.200.188:3185/kafka
networks:
seckill_network:
ipv4_address: 172.36.0.95
networks:
seckill_network:
external:
name: seckill_network
启动命令:
# 进入目录 cd /mnt/kafka/ # 服务编排启动Kafka集群 docker-compose up -d # 服务编排停止Kafka集群并删除容器 docker-compose down8.3 测试Kafka
创建队列:
# 进入容器 Kafka1为容器的名字,可以选择任意节点进入 docker exec -it kafka1 /bin/bash # 进入目录 cd /opt/kafka_2.12-2.4.1/bin # 创建队列 9093需要改为当前节点监听的端口号 ./kafka-topics.sh --create --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2 --topic itemaccess
使用kafka-topics.sh创建队列:
–create:执行创建一个新的队列操作
--bootstrap-server:需要链接的kafka配置,必填
--partitions 1:设置一个topic设置几个分区(就是把数据切割成几块,分别存储)
--replication-factor 1:设置分区的副本数量(就是设置多少个备份,有了备份,一个挂了没事,可以使用备份)
--topic itemaccess:队列的名字叫itemaccess
消息发布
在kafka容器中执行消息发送(接着上面的步骤执行):
# 发送消息
./kafka-console-producer.sh --broker-list localhost:9093 --topic itemaccess
# 发送内容为
{"actime":"2021-4-10 9:50:10","uri":"http://www-seckill.itheima.net/items/333.html","IP":"119.123.33.231","Token":"Bearer itcast"}
消息订阅
# 进入容器 Kafka3为容器的名字,测试集群选择任意节点进入 docker exec -it kafka3 /bin/bash # 进入目录 cd /opt/kafka_2.12-2.4.1/bin # 订阅消息 9095需要改为当前节点监听的端口号 ./kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic itemaccess --from-beginning
其他命令
# 进入容器 docker exec -it kafka1 /bin/bash # 进入目录 cd /opt/kafka_2.12-2.4.1/bin # 查看topic列表 注意修改9093为当前节点端口号 ./kafka-topics.sh --bootstrap-server localhost:9093 --list # 查看指定的topic信息 注意修改9093为当前节点端口号 ./kafka-topics.sh --bootstrap-server localhost:9093 --list # 删除topics 注意修改9093为当前节点端口号 ./kafka-topics.sh --bootstrap-server localhost:9093 --delete --topic itemaccess2



