EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)EFAK是开源可视化和管理软件。可以查询、可视化、监控kafka集群,是将 kafka 的集群数据转换为图形可视化的工具。
Kafka-Eagle 是一款完全开源的对 Kafka 集群及应用做全面监控的系统,其核心由以下几个部分组成:
数据采集:核心数据来源 JMX 和 API 获取;
数据存储:支持 MySQL 和 Sqlite 存储;
数据展示:消费者应用、图表趋势监控(包括集群状态、消费生产速率、消费积压等)、开发的分布式 KSQL 查询引擎,通过 KSQL 消息查询;
数据告警:支持常用的 IM 告警(微信,钉钉,WebHook等),同时邮件、短信、电话告警也一并支持。
官方网站:https://www.kafka-eagle.org/
下载地址:
http://download.kafka-eagle.org/
直接下载二进制 .tar.gz 文件即可,推荐使用这个进行linux安装。
下面进行安装步骤:
1、安装JDK
如果Linux服务器上有JDK环境,这一步可以忽略,继续下一步的安装。如果没有JDK,请先到Oracle官网下载JDK。
wget http://download.oracle.com/otn-pub/java/jdk/8u20-b26/jdk-8u20-linux-x64.tar.gz 1.下载后重命名:jdk-8u281-linux-x64.tar.gz 2.解压 tar -zxvf jdk-8u281-linux-x64.tar.gz 3.配置环境变量:vim /etc/profile 最后添加: export JAVA_HOME=/opt/jdk export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin wq! 6.使变量生效:source /etc/profile 7.java -version
2、安装EFAK
这里我们解压到/data/soft/new目录并解压: tar -zxvf efak-xxx-bin.tar.gz 如果之前安装过版本,删除修改后的版本,重命名当前版本,如下图: rm -rf efak mv efak-xxx efak 然后,配置 EFAK 配置文件 vi /etc/profile export KE_HOME=/data/soft/new/efak export PATH=$PATH:$KE_HOME/bin 最后,我们使用. /etc/profile使配置立即生效。
3、配置EFAK系统文件
根据自身Kafka集群的实际情况配置EFAK,例如zookeeper地址、Kafka集群的版本类型(zk为低版本,kafka为高版本)、开启安全认证的Kafka集群等。
cd ${KE_HOME}/conf
vi system-config.properties
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1 cluster1.zk.list=kafka1:2181,kafka2:2181,kafka3:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=16 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # EFAK enable distributed ###################################### efak.distributed.enable=false efak.cluster.mode.status=master efak.worknode.master.host=localhost efak.worknode.port=8085 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.efak.offset.storage=kafka #cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### #cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://kafka1:9988/jmxrmi cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= #cluster2.efak.sasl.enable=false #cluster2.efak.sasl.protocol=SASL_PLAINTEXT #cluster2.efak.sasl.mechanism=PLAIN #cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; #cluster2.efak.sasl.client.id= #cluster2.efak.blacklist.topics= #cluster2.efak.sasl.cgroup.enable=false #cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### #efak.driver=org.sqlite.JDBC #efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db #efak.username=root #efak.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address ###################################### efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=MyNewPass4!
3、启动EFAK服务器(独立)
在$KE_HOME/bin目录中,有一个ke.sh脚本文件。执行启动命令如下:
cd ${KE_HOME}/bin
chmod +x ke.sh
ke.sh start
之后,当 EFAK 服务器重新启动或停止时,执行以下命令:
ke.sh restart ke.sh stop
如下图所示:
4、启动EFAK服务器(分布式)
在$KE_HOME/bin目录中,有一个ke.sh脚本文件。执行启动命令如下:
cd ${KE_HOME}/bin
# sync efak package to other worknode node
# if $KE_HOME is /data/soft/new/efak
for i in `cat $KE_HOME/conf/works`;do scp -r $KE_HOME $i:/data/soft/new;done
# sync efak server .bash_profile environment
for i in `cat $KE_HOME/conf/works`;do scp -r ~/.bash_profile $i:~/;done
chmod +x ke.sh
ke.sh cluster start
之后,当 EFAK 服务器重新启动或停止时,执行以下命令:
ke.sh cluster restart ke.sh cluster stop
如下图所示:



