Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统。
Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。一个agent内部有三个组件:
· Source:采集源,用于跟数据源对接,以获取数据;
· Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
· Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。
【工作原理】:
常见的Source、Channel、Sink类型有:
【安装 · 配置Flume环境】:
(1)https://flume.apache.org/download.html
下载flume版本
(2)解压缩,进入其bin目录,复制路径
(3)win+R 打开命令行运行界面(进入指定目录)
(4)由于apache-flume运行在java环境下,需配置JDK 环境
A、命令行界面查看java安装路径 where java
B、配置环境变量
方法一、配置JAVA_HOME (只需到JAVA环境下,无需进入bin执行目录)
然后在将JAVA_HOME配置到环境变量PATH中(PATH需要配置到bin目录级别)
(bin 为可执行文件的目录)
方法二、
直接在PATH中配置java路径
直接把JAVA_HOME展开,放在PATH中,到bin目录级别
(5)运行查看flume是否安装成功(如下所示,即为成功)
【案例实现与分析】
启动Flume监听端口
【参考命令行】:进入D:apache-flume-1.9.0-bin目录下(bin目录的前一级)
.binflume-ng agent --conf .conf --conf-file .confexample.conf --name a1 -property flume.root.logger=INFO,console
【参数说明】:
(1)–conf .conf:表示配置文件存储在conf/目录
(2)–name a1 :表示给agent起名为a1
(3)–conf-file .confexample.conf:flume本次启动读取的配置文件是在conf文件夹下的example.conf文件。
(4)flume.root.logger=INFO,console :flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
【安装telnet】:实现对单端口的监控
(telnet 是 windows自带组件)
常在使用netcat监控端口时使用
① 控制面板——程序与功能——启用或关闭window功能——开启telnet客户端
②查看是否安装成功:
案例一、netcat 数据采集(监控端口的信息)
A、监听netcat端口数据,并显示在控制台
【参考配置文件】
#设置Agent上的各个组件名称 #r1表示a1的输入源 a1.sources = r1 #k1表示a1的输出目的地 a1.sinks = k1 #c1表示a1的缓冲区 a1.channels = c1 #配置Source (数据输入源配置) #设置a1的输入源类型为netcat端口类型 a1.sources.r1.type = netcat #表示a1的监听的主机 a1.sources.r1.bind = localhost #表示a1的监听的主机端口号 a1.sources.r1.port = 44444 #配置Sink (数据输出端配置) #表示a1的输出目的地是控制台logger类型(显示在控制台) a1.sinks.k1.type = logger #配置Channel (数据传输通道配置) #表示a1的channel类型是memory内存型 a1.channels.c1.type = memory #表示a1的channel总容量是1000个event a1.channels.c1.capacity = 1000 #表示a1的channel传输时收集到了100条event以后再去提交事务 a1.channels.c1.transactionCapacity = 100 #把Source和Sink绑定到Channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
B、监听netcat端口数据,并输出到文件
【参考配置文件】
#Source和channel的配置内容均不发生变化,因此不做修改,此处不做重复展示 #配置Sink (数据输出端配置) a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹) a1.sinks.k1.sink.rollInterval = 0 #设置文件输出的命名方式 此处:文件的命名方式为yyyyMMddHH a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH
案例二、Spooling Directory Source的日志采集(监控指定目录下的变化(监控整个文件夹),将目录变化作为数据输入源)
【参考配置文件】
# 设置Agent上的各个组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source 重点修改部分 #设置a1的输入源类型为spooldir a1.sources.r1.type = spooldir #指定监控的目录(文件夹) a1.sources.r1.spoolDir = 监控的目录路径(文件夹) a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #是否增加一个header用来存储文件的绝对路径,默认是false a1.sources.r1.fileHeader = true # 配置Sink #输出到控制台 a1.sinks.k1.type = logger #输出到文件 #a1.sinks.k1.type = file_roll #a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹) #a1.sinks.k1.sink.rollInterval = 0 #设置文件输出的命名方式 此处:文件的命名方式为yyyyMMddHH #a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把Source和Sink绑定到Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
案例三、Exec Source的日志采集(监控单个文件)
监控某个日志文件,对单个日志文件新增加的内容显示到服务器的控制台上。
# 设置Agent上的各个组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source 重点修改部分 #设置a1的输入源类型为Exec a1.sources.r1.type = exec #指定Exec执行的shell命令 a1.sources.r1.command = tail -F 指定文件的绝对路径 a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置Sink #输出到控制台 a1.sinks.k1.type = logger #输出到文件 #a1.sinks.k1.type = file_roll #a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹) #a1.sinks.k1.sink.rollInterval = 0 #设置文件输出的命名方式 此处:文件的命名方式为yyyyMMddHH #a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把Source和Sink绑定到Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Exec 可以通过指定的操作对日志进行读取,使用Exec时需要指定shell命令
但可能在windows环境下运行时会出现如下问题:
问题描述:
由于使用Exec Source对日志进行读取时,需要指定shell命令。但常用的tail命令是Linux下用来管理日志的命令,windows系统并未提供。
问题解决:
1、在Linux系统进行操作
2、Windows下安装运行tail的工具tail.exe,将其放在C:WindowsSystem32目录下,即可运行
下载路径: 提取码:o7bi
https://pan.baidu.com/s/14XHrCuuLmNIf9ZczERLYfQ
案例四、基于Avro,实现对telnet数据的采集并上传到统一的服务器
Avro Source负责监听Avro端口,接收来自外部Avro客户端的事件流,利用Avro Source可以达到多级流动、扇出流、扇入流等效果。另外,他还可以接受通过Flume提供的Avro客户端发送的日志信息。
Avro:Flume通过Avro方式在两台及其之间进行数据传输,Flume可以监听和收集指定端口的日志,使用Avro的Source需要说明被监听的主机的IP地址和端口号 =》必须是完整的IP地址。
【案例需求】:
输入要求:每个人通过telnet输入:学号,查看数据是否上传成功
或者上传文件
上传服务器地址:172.17.67.240,端口:12345
参考配置文件
# 设置Agent上的各个组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source # 监听netcat数据源,端口信息 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 监听文件夹(上传文件) #a1.sources.r1.type = spooldir #a1.sources.r1.spoolDir = 监控的目录路径(文件夹) #是否增加一个header用来存储文件的绝对路径,默认是false #a1.sources.r1.fileHeader = true # 配置Sink a1.sinks.k1.type = avro #输出的远程主机IP a1.sinks.k1.hostname = 172.17.67.240 a1.sinks.k1.port = 12345 # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把Source和Sink绑定到Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
案例五、基于Avro多Agent分布式日志采集
【案例需求】:
Agent1:采集telnet的信息,并已avro的格式将信息sink到Agent2;
Agent2: 将Agent1发过来的信息显示到控制台上;
【实现逻辑】:
Agent1作为数据源,发送数据输出到Agent2,Agent2作为数据的接收者,接收到数据后进行输出显示。
【参考配置文件】:
Agent1配置文件 : ——数据的发送者
# Agent1为数据的发送者 # 设置Agent上的各个组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source # 监听netcat数据源,端口信息 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 监听文件夹(上传文件) #a1.sources.r1.type = spooldir #a1.sources.r1.spoolDir = 监控的目录路径(文件夹) #是否增加一个header用来存储文件的绝对路径,默认是false #a1.sources.r1.fileHeader = true # 配置Sink a1.sinks.k1.type = avro #输出的远程主机IP(必须是IP地址) a1.sinks.k1.hostname = 192.168.43.119 a1.sinks.k1.port = 12345 # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把Source和Sink绑定到Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Agent2配置文件:——数据的接收者
# Agent1为数据的发送者 # 设置Agent上的各个组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Source # 监听netcat数据源,端口信息 a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.43.119 a1.sources.r1.port = 12345 # 配置Sink a1.sinks.k1.type = logger # 配置Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把Source和Sink绑定到Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
【备注】:要先开启Agent2,再开启Agent1
案例六、通过spooldir模式,将文件上传到hdfs
要求日志文件以“年-月-日/时-分/”为文件夹的名字
【参考配置文件】:
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置Source组件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = D:/hky agent1.sources.source1.fileHeader = false # 配置Sink组件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://localhost:9000/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # 设置Channel agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # 把Source和Sink绑定到Channel上 agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1



