wget -P ./ https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz tar -zxvf kafka_2.12-2.8.0.tgzZK集群
docker pull zookeeper vim docker-compose.yml
version: '2'
services:
zoo1:
image: zookeeper
restart: always
container_name: zoo1
ports:
- "8001:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
zoo2:
image: zookeeper
restart: always
container_name: zoo2
ports:
- "8002:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
zoo3:
image: zookeeper
restart: always
container_name: zoo3
ports:
- "8003:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
构建启动 COMPOSE_PROJECT_NAME=zk_test docker-compose up -d 停止 COMPOSE_PROJECT_NAME=zk_test docker-compose stop 删除 COMPOSE_PROJECT_NAME=zk_test docker-compose rm
使用 Docker命令行客户端连接ZK集群
docker run -it --rm
--link zoo1:zk1
--link zoo2:zk2
--link zoo3:zk3
--net zktest_default
zookeeper zkCli.sh -server zk1:2181,zk2:2181,zk3:2181;
zk安装包搭建常见问题
zoo.cfg中 server.A=B:C:D A=myid 把本机器的对应的ip改为0.0.0.0,每个机器各自的配置文件都要改 data/myid中id各不同 bin/zkServer.sh status 查看集群状态 bin/zkServer.sh start conf/zoo.cfg 启动配置 server.properties
broker.id=0 每个节点唯一值 delete.topic.enable=true 可以删除topic log.dirs=/root/kafka/kafka_2.12-2.8.0/logs 存放数据 zookeeper.connect=127.0.0.1:8001,127.0.0.1:8002,127.0.0.1:8003 zookeeper listeners=PLAINTEXT://127.0.0.1:9092启动&停止
./bin/kafka-server-start.sh -daemon config/server.properties & ./bin/kafka-server-stop.sh常用命令
查看当前服务器中的所有 topic ./kafka-topics.sh -list --zookeeper 101.132.125.206:8002 创建主题 ./kafka-topics.sh --create --replication-factor 2 --partitions 2 --topic first --zookeeper 101.132.125.206:8002 删除主题 ./kafka-topics.sh --zookeeper 101.132.125.206:8002 --delete --topic first 详情 ./bin/kafka-topics.sh --describe --topic first --zookeeper 101.132.125.206:8002 以主题名称first为例 replication-factor 3 副本 (数据存3份 first-0 first-0 first-0) partitions 3 分区 (数据分成2份存储 first-0 first-1 first-2) 生产者 ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first 消费者 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning --from-beginning 会把主题中以往所有的数据都读取出来 --consumer.config config/consumer.properties 设置消费者组(相同组类似:queue,不同组类似:topic)ISR
问题:leader收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack? Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集 合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR中选举新的 leader。 老版本isr不仅受到replica.lag.time.max.ms时间参数影响还受到条数影响,新版本移出原因: 当生产者条数batch>设置的条数阈值 此时follower还没开始同步时 会被直接踢出isr,follower同步完时又进入isr 这样会频繁消耗kafka性能,而且数据存在zk中,也会消耗zk性能。Kafka 监控 Eagle
下载kafka-eagle-bin-1.2.4.tar.gz
配置环境变量
export KE_HOME=/Users/huziyang/Desktop/kafka/eagle export PATH=$PATH:$KE_HOME/bin export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_192.jdk/Contents/Home export PATH=$PATH:$JAVA_HOME/bin
修改conf目录下system-config.properties 文件
配置文件中配置 zk 集群+mysql 地址即可
cluster1.zk.list=32.9.5.172:2681,32.9.5.173:2681,32.9.5.174:2681 kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=xxx
#启动 ./ke.sh start #查看状态 ./ke.sh status #查看状态 ./ke.sh stats #关闭 ./ke.sh stop #重启 ./ke.sh restart
http://host:8048/ke
admin/12345



