- jdk: 1.8
- zookeeper版本:3.4.14
- kafka版本:kafka_2.12-2.3.1
- flume版本:apache-flume-1.8.0-bin.tar.gz
- 操作系统:CentOS Linux release 7.4.1708 (Core)
- 安装教程:
https://www.runoob.com/w3cnote/zookeeper-setup.html
- 修改/etc/hosts,配置zookeeper域名解析
- 修改安装目录的应用配置文件/config/server.properties
- 设置zookeeper地址:
zookeeper.connect=zk1:2181
host设置为使用hostname而不是ip,避免连接缓慢
2. 设置broker.id(在集群中必须唯一)broker.id=x
- 设置日志文件存储位置(也可使用默认位置)
log.dirs=/xxx/xxx
- 配置允许kafka远程访问
advertised.listeners=PLAINTEXT://172.23.x.x:9092
- 其余配置
#禁止自动创建主题 auto.create.topics.enable=false #允许删除主题 delete.topic.enable=true
- 配置文档:
http://kafkadoc.beanmr.com/030_configuration/01_configuration_cn.html#brokerconfigs
- 在kafka安装目录输入以下命令启动kafka
bin/kafka-server-start.sh config/server.properties &
- kafka集群安装好后,在zookeeper安装目录执行以下命令连接zookeeper
./bin/zkCli.sh -server zk1:2181
- 通过zookeeper节点查看kafka状态
命令
ls /brokers/ids
结果
- 在kafka安装目录执行以下脚本:
bin/kafka-topics.sh --create --zookeeper 172.23.x.x:2181 --replication-factor 1 --partitions 2 --topic nginx-log
- 创建成功后,在当前目录输入命令查看topic分片情况
bin/kafka-topics.sh --describe --zookeeper 172.23.x.x:2181 nginx-log使用flume
-
架构图
-
安装flume
-
官方配置文档
https://flume.apache.org/releases/content/1.8.0/FlumeUserGuide.html -
配置agent,数据输出到终端,以便验证方案是否可行
- 新建文件/etc/flume/nginx-log.conf(也可从安装目录的配置模板conf/flume-conf.properties.template开始),配置如下。
aNginx1.sources = rNginx1 aNginx1.sinks = kNginx1 aNginx1.channels = cNginx1 #配置source组件aNginx1 aNginx1.sources.rNginx1.type = exec aNginx1.sources.rNginx1.command = tail -f /var/log/nginx/access.log # Each sink's type must be defined aNginx1.sinks.kNginx1.type = logger # Each channel's type is defined. # 描述和配置 channel 组件,此处使用是内存缓存的方式 aNginx1.channels.cNginx1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel aNginx1.channels.cNginx1.capacity = 1000 aNginx1.channels.cNginx1.transactionCapacity = 100 #配置 source、channel、sink 之间的连接关系 aNginx1.sources.rNginx1.channels = cNginx1 aNginx1.sinks.kNginx1.channel = cNginx1
- 在安装目录执行如下命令启动(将日志输出终端)
bin/flume-ng agent --conf conf --conf-file /etc/flume/nginx-log.conf --name aNginx1 -Dflume.root.logger=INFO,console &
- 再访问Nginx,观察终端输出效果判断是否抓取到nginx日志。
-
配置agent,数据输出到kafka
aNginx1.sources = rNginx1 aNginx1.sinks = kNginx1 aNginx1.channels = cNginx1 #配置source组件aNginx1 aNginx1.sources.rNginx1.type = exec aNginx1.sources.rNginx1.command = tail -f /var/log/nginx/access.log #调优参数 aNginx1.sources.rNginx1.batchSize = 2000 aNginx1.sources.rNginx1.batchDurationMillis = 1000 aNginx1.sinks.kNginx1.batchSize = 2000 aNginx1.sinks.kNginx1.batchDurationMillis = 1000 # 配置kafka aNginx1.sinks.kNginx1.type = org.apache.flume.sink.kafka.KafkaSink aNginx1.sinks.kNginx1.brokerList = 172.23.x.x:9092, 172.23.x.x:9092 aNginx1.sinks.kNginx1.kafka.topic = nginx-log #压缩 aNginx1.sinks.kNginx1.kafka.producer.compression.type = snappy #设置序列化方式 aNginx1.sinks.kNginx1.serializer.class = kafka.serializer.StringEncoder # 描述和配置 channel 组件,此处使用是内存缓存的方式 aNginx1.channels.cNginx1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel aNginx1.channels.cNginx1.capacity = 3000 aNginx1.channels.cNginx1.transactionCapacity = 1000 #配置 source、channel、sink 之间的连接关系 aNginx1.sources.rNginx1.channels = cNginx1 aNginx1.sinks.kNginx1.channel = cNginx1
- 在安装目录执行如下命令启动(指定日志输出到文件,日志文件配置见安装目录的conf/log4j.properties)
bin/flume-ng agent --conf conf --conf-file /etc/flume/nginx-log.conf --name aNginx1 -Dflume.root.logger=INFO,LOGFILE &使用flink
- 安装依赖
1.10.3 1.18.10 1.2.49 3.5.1 UTF-8 UTF-8 UTF-8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-jdbc_2.12 ${flink.version} provided org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.projectlombok lombok provided ${lombok.version} com.alibaba fastjson ${fastjson.version} log4j log4j 1.2.17 org.slf4j slf4j-log4j12 1.7.25
- 编写flume从kafka抓取nginx日志
// 构建流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka 配置
Properties props = new Properties();
props.put("bootstrap.servers", "172.23.x.x:9092,172.23.x.x:9092");
props.put("zookeeper.connect", "zk1:2181");
props.put("group.id", "flink");
props.put("client.id", "flink-1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
DataStreamSource dataStreamSource = env.addSource(
new FlinkKafkaConsumer(
"nginx-log",
new SimpleStringSchema(),
props))
//单线程打印,控制台不乱序,不影响结果
.setParallelism(1);
//从kafka里读取数据
dataStreamSource.timeWindowAll( Time.seconds(5L) ).apply(new AllWindowFunction, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable iterableValues, Collector> out) throws Exception {
List strList = Lists.newArrayList(iterableValues);
if ( strList.isEmpty() ){
return;
}
strList.forEach( System.out::println );
}
});
env.execute("nginx log analyse running");
-
抓取结果
4. 持久化到数据库- 新增下沉类,用于保存统计结果
public class JdbcPersistence extends RichSinkFunction
- > {
@Override
public void invoke(List
values, Context context) throws Exception { //TODO 解析日志 for (String log : values) { System.out.println(log); } //TODO 构建统计结果 //TODO 入库 } } - 挂载下沉类到flume启动器
//从kafka里读取数据 dataStreamSource.timeWindowAll( Time.seconds(5L) ).apply(new AllWindowFunction, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable iterableValues, Collector - > out) throws Exception {
List
strList = Lists.newArrayList(iterableValues); if ( strList.isEmpty() ){ return; } out.collect(strList); } }).addSink( new JdbcPersistence() );sink 到数据库 -
flume工程代码
https://download.csdn.net/download/qq_41633199/85188566



