一、Kafka集群部署方案规划
1、操作系统选择
通常,生产环境应该将Kafka集群部署在Linux操作系统上,原因如下:
(1)Kafka客户端底层使用了Java的selector,selector在Linux上的实现机制是epoll,而在Windows平台上的实现机制是select,因此Kafka部署在Linux上能够获得更高效的I/O性能。
(2)网络传输效率的差别。Kafka需要在磁盘和网络间进行大量数据传输,在Linux部署Kafka能够享受到零拷贝(Zero Copy)技术所带来的快速数据传输特性。
(3)社区的支持度。Apache Kafka社区目前对Windows平台上发现的Kafka Bug不做任何承诺。
2、磁盘
(1)Kafka实现了冗余机制来提供高可靠性,并通过分区机制在软件层面实现负载均衡,因此Kafka的磁盘存储可以不使用磁盘阵列(RAID),使用普通磁盘组成存储空间即可。
(2)使用机械磁盘能够胜任Kafka线上环境,但SSD显然性能更好。
3、磁盘容量
规划磁盘容量时需要考虑:新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩等因素。
假设公司业务每天需要向Kafka集群发送100000000条消息,每条消息保存两份以防止数据丢失,消息默认保存7天时间,消息的平均大小是1KB,Kafka的数据压缩比是0.75。
每天100000000条1KB大小的消息,保存两份,压缩比0.75,占用空间大小就等于150GB(100000000*1KB*2/1000/1000*0.75),考虑到Kafka集群的索引数据等,需要预留出10%的磁盘空间,因此每天总存储容量是165GB。数据留存7天,因此规划磁盘容量为1155GB(165GB*7)。
4、网络带宽
假设公司的机房环境是千兆网络,即1Gbps,业务需要在1小时内处理1TB的业务数据。假设Kafka Broker会用到70%的带宽资源,超过70%的阈值可能网络丢包,单台Kafka Broker最多能使用大约700Mb的带宽资源,但通常需要再额外为其它服务预留出2/3的资源,即Kafka Broker可以为Kafka服务分配带宽240Mbps(700Mb/3)。1小时处理1TB数据,则每秒需要处理2336Mb(1024*1024*8/3600)数据,除以240,约等于10台服务器。如果还需要额外复制两份,那么服务器台数还要乘以3,即30台。
二、Kafka集群参数配置
1、Broker端参数
Broker端参数也被称为静态参数(Static Configs),静态参数只能在Kafka的配置文件server.properties中进行设置,必须重启Broker进程才能生效。
log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中一定要为log.dirs配置多个路径,如果条件允许,需要保证目录被挂载到不同的物理磁盘上。优势在于,提升读写性能,多块物理磁盘同时读写数据具有更高的吞吐量;能够实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,而且Broker还能正常工作,基于Failover机制,Kafka可以舍弃RAID方案。
zookeeper.connect:CS格式参数,可以指定值为zk1:2181,zk2:2181,zk3:2181,不同Kafka集群可以指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只需要写一次。
listeners:设置内网访问Kafka服务的监听器。
advertised.listeners:设置外网访问Kafka服务的监听器。
auto.create.topics.enable:是否允许自动创建Topic。
unclean.leader.election.enable:是否允许Unclean Leader 选举。
auto.leader.rebalance.enable:是否允许定期进行Leader选举,生产环境中建议设置成false。
log.retention.{hours|minutes|ms}:控制一条消息数据被保存多长时间。优先级:ms设置最高、minutes次之、hours最低。
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker能够接收的最大消息大小。
2、Topic级别参数
如果同时设置了Topic级别参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数,而每个Topic都能设置自己的参数值。
生产环境中,应当允许不同部门的Topic根据自身业务需要,设置自己的留存时间。如果只能设置全局Broker参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置Topic级别参数对Broker参数进行覆盖就是一个不错的选择。
retention.ms:指定Topic消息被保存的时长,默认是7天,只保存最近7天的消息,会覆盖掉Broker端的全局参数值。
retention.bytes:指定为Topic预留多大的磁盘空间。通常在多租户的Kafka集群中使用,默认值是 -1,表示可以无限使用磁盘空间。
max.message.bytes:指定Kafka Broker能够正常接收Topic 的最大消息大小。
Topic级别参数可以在创建Topic时进行设置,也可以在修改Topic 时设置,推荐在修改Topic时进行设置,Apache Kafka社区未来可能统一使用kafka-configs脚本来设置Topic级别参数。
3、JVM参数
Kafka 2.0.0版本已经正式摒弃对Java 7的支持。
Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的Byte Buffer实例,因此JVM端设置的Heap Size不能太小,建议设置6GB。
export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
JVM端配置的一个重要参数是垃圾回收器的设置。对于Java 7,如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器,开启方法是指定-XX:+UseParallelGC。对于Java 9,用默认的G1收集器,在没有任何调优的情况下,G1表现得要比CMS出色,主要体现在更少的Full GC,需要调整的参数更少等,所以使用G1就好。
export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
4、操作系统参数
件描述符限制:ulimit -n。建议设置成一个超大的值,如ulimit -n 1000000。
文件系统类型:文件系统类型的选择。根据官网的测试报告,XFS 的性能要强于ext4。
Swappiness:推荐设置为一个较小值,如1。如果将swap设置为0,将会完全禁止Kafka Broker进程使用swap空间;当物理内存耗尽时,操作系统会触发OOM killer组件,随机挑选一个进程kill掉,不给用户任何预警。如果设置一个比较小值,当开始使用swap空间时,Broker性能会出现急剧下降,从而给进一步调优和诊断问题的时间。
提交时间:提交时间(Flush落盘时间)。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就认为写入成功,随后操作系统根据LRU算法会定期将页缓存上的脏数据落盘到物理磁盘上。页缓存数据写入磁盘的周期由提交时间来确定,默认是5秒,可以适当地增加提交间隔来降低物理磁盘的写操作。如果在页缓存中的数据在写入到磁盘前机器宕机,数据会丢失,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,拉大提交间隔换取性能是一个合理的做法。
三、Docker镜像选择
1、安装docker
安装Docker:sudo yum install docker
启动Docker:sudo systemctl start docker
docker版本检查:docker version
2、docker-compose安装
docker-compose下载:
sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
docker-compose安装:
sudo chmod +x /usr/local/bin/docker-compose
docker-compose版本检查:
docker-compose version
3、docker镜像选择
zookeeper镜像选择:
docker search zookeeper
选择star最多的镜像:docker.io/zookeeper
Kafka镜像选择:
docker search kafka
选择star最多的镜像:docker.io/wurstmeister/kafka
kafka-manager镜像选择:
docker search kafka-manager
选择官方镜像:kafkamanager/kafka-manager
四、Kafka单机部署方案
1、编写docker-compose.yml文件
# 单机 zookeeper + kafka + kafka-manager集群
version: '2'
services:
# 定义zookeeper服务
zookeeper-test:
image: zookeeper # zookeeper镜像
restart: always
hostname: zookeeper-test
ports:
- "12181:2181" # 宿主机端口:docker内部端口
container_name: zookeeper-test # 容器名称
# 定义kafka服务
kafka-test:
image: wurstmeister/kafka # kafka镜像
restart: always
hostname: kafka-test
ports:
- "9092:9092" # 对外暴露端口号
- "9999:9999" # 对外暴露JMX_PORT
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 #
KAFKA_ADVERTISED_PORT: 9092 #
KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服务
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper连接超时
KAFKA_LOG_CLEANUP_POLICY: "delete"
KAFKA_LOG_RETENTION_HOURS: 120 # 设置消息数据保存的最长时间为120小时
KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息体的最大字节数
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 #
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 #
KAFKA_NUM_PARTITIONS: 1 # 分区数量
KAFKA_DELETE_RETENTION_MS: 10000 #
KAFKA_BROKER_ID: 1 # kafka的ID
KAFKA_COMPRESSION_TYPE: lz4
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999" # 导入KAFKA_JMX_OPTS环境变量
JMX_PORT: 9999 # 导入JMX_PORT环境变量
depends_on:
- zookeeper-test # 依赖
container_name: kafka-test
# 定义kafka-manager服务
kafka-manager-test:
image: kafkamanager/kafka-manager # kafka-manager镜像
restart: always
container_name: kafka-manager-test
hostname: kafka-manager-test
ports:
- "9000:9000" # 对外暴露端口,提供web访问
depends_on:
- kafka-test # 依赖
environment:
ZK_HOSTS: zookeeper-test:2181 # 宿主机IP
KAFKA_BROKERS: kafka-test:9090 # kafka
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启安全认证
KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登录用户
KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登录密码
# 单机 zookeeper + kafka + kafka-manager集群 version: '2' services: # 定义zookeeper服务 zookeeper-test: image: zookeeper # zookeeper镜像 restart: always hostname: zookeeper-test ports: - "12181:2181" # 宿主机端口:docker内部端口 container_name: zookeeper-test # 容器名称 # 定义kafka服务 kafka-test: image: wurstmeister/kafka # kafka镜像 restart: always hostname: kafka-test ports: - "9092:9092" # 对外暴露端口号 - "9999:9999" # 对外暴露JMX_PORT environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 # KAFKA_ADVERTISED_PORT: 9092 # KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服务 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper连接超时 KAFKA_LOG_CLEANUP_POLICY: "delete" KAFKA_LOG_RETENTION_HOURS: 120 # 设置消息数据保存的最长时间为120小时 KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息体的最大字节数 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 # KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 # KAFKA_NUM_PARTITIONS: 1 # 分区数量 KAFKA_DELETE_RETENTION_MS: 10000 # KAFKA_BROKER_ID: 1 # kafka的ID KAFKA_COMPRESSION_TYPE: lz4 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999" # 导入KAFKA_JMX_OPTS环境变量 JMX_PORT: 9999 # 导入JMX_PORT环境变量 depends_on: - zookeeper-test # 依赖 container_name: kafka-test # 定义kafka-manager服务 kafka-manager-test: image: kafkamanager/kafka-manager # kafka-manager镜像 restart: always container_name: kafka-manager-test hostname: kafka-manager-test ports: - "9000:9000" # 对外暴露端口,提供web访问 depends_on: - kafka-test # 依赖 environment: ZK_HOSTS: zookeeper-test:2181 # 宿主机IP KAFKA_BROKERS: kafka-test:9090 # kafka KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启安全认证 KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登录用户 KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登录密码
需要确认相应端口是否被占用。
2、启动服务
创建kafka目录,将docker-compose.yml文件放入kafka目录,在kafka目录执行命令。
启动:
docker-compose up -d
关闭:
docker-compose down
3、kafka服务查看
进入docker容器:
docker exec -it kafka /bin/bash
创建Topic:
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test
查看Topic:
kafka-topics.sh --list --zookeeper zookeeper:2181
生产消息:
kafka-console-producer.sh --broker-list kafka:9092 --topic test
消费消息:
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
打开两个Terminal,一个执行生产消息的命令,一个执行消费消息的命令,每生产一条消息时消费消息Terminal就会显示一条消息,实现消息队列。
4、Kafka版本查询
wurstmeister/kafka镜像中,kafka安装在/opt目录下,进入/opt目录,kafka_2.12-2.4.0目录即为kafka安装目录。
Scala版本:2.12
Kafka版本:2.4
5、kafka-manager监控
Web方式访问:http://127.0.0.1:9000
五、错误解决
1、容器删除失败
docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
报错信息:
ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy
grep docker /proc/*/mountinfo|grep containerid | awk -F ":" '{print $1}' | awk -F "/" '{print $3}'
sudo kill -9 3119
2、kafka服务一直重启
报错信息:
Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running
错误原因:
docker-compose.yml文件对kafka服务配置restart: always,如果kafka服务启动失败会一直重启,可以通过docker logs kafka查看kafka服务启动的日志信息,查找错误原因。



