栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

数据采集入门——Flume应用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

数据采集入门——Flume应用

Flume应用

Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统。
Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。一个agent内部有三个组件:
· Source:采集源,用于跟数据源对接,以获取数据;
· Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
· Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。

【工作原理】:

常见的Source、Channel、Sink类型有:

【安装 · 配置Flume环境】
(1)https://flume.apache.org/download.html
下载flume版本

(2)解压缩,进入其bin目录,复制路径
(3)win+R 打开命令行运行界面(进入指定目录)

(4)由于apache-flume运行在java环境下,需配置JDK 环境
A、命令行界面查看java安装路径 where java

B、配置环境变量
方法一、配置JAVA_HOME (只需到JAVA环境下,无需进入bin执行目录)

然后在将JAVA_HOME配置到环境变量PATH中(PATH需要配置到bin目录级别)
(bin 为可执行文件的目录)

方法二、
直接在PATH中配置java路径
直接把JAVA_HOME展开,放在PATH中,到bin目录级别

(5)运行查看flume是否安装成功(如下所示,即为成功)

【案例实现与分析】
启动Flume监听端口
【参考命令行】:进入D:apache-flume-1.9.0-bin目录下(bin目录的前一级)
.binflume-ng agent --conf .conf --conf-file .confexample.conf --name a1 -property flume.root.logger=INFO,console
【参数说明】:
(1)–conf .conf:表示配置文件存储在conf/目录
(2)–name a1 :表示给agent起名为a1
(3)–conf-file .confexample.conf:flume本次启动读取的配置文件是在conf文件夹下的example.conf文件。
(4)flume.root.logger=INFO,console :flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
【安装telnet】:实现对单端口的监控
(telnet 是 windows自带组件)
常在使用netcat监控端口时使用
① 控制面板——程序与功能——启用或关闭window功能——开启telnet客户端

②查看是否安装成功:

案例一、netcat 数据采集(监控端口的信息)

A、监听netcat端口数据,并显示在控制台
【参考配置文件】

#设置Agent上的各个组件名称 
#r1表示a1的输入源
a1.sources = r1  
#k1表示a1的输出目的地
a1.sinks = k1  
#c1表示a1的缓冲区
a1.channels = c1  

#配置Source (数据输入源配置)
#设置a1的输入源类型为netcat端口类型 
a1.sources.r1.type = netcat 
#表示a1的监听的主机 
a1.sources.r1.bind = localhost  
#表示a1的监听的主机端口号
a1.sources.r1.port = 44444 
 
#配置Sink  (数据输出端配置) 
#表示a1的输出目的地是控制台logger类型(显示在控制台)
a1.sinks.k1.type = logger 

#配置Channel  (数据传输通道配置)
#表示a1的channel类型是memory内存型
a1.channels.c1.type = memory 
#表示a1的channel总容量是1000个event
a1.channels.c1.capacity = 1000 
#表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100  

#把Source和Sink绑定到Channel 
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1

B、监听netcat端口数据,并输出到文件
【参考配置文件】

#Source和channel的配置内容均不发生变化,因此不做修改,此处不做重复展示

#配置Sink  (数据输出端配置) 
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹)
a1.sinks.k1.sink.rollInterval = 0 
#设置文件输出的命名方式  此处:文件的命名方式为yyyyMMddHH
a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH

案例二、Spooling Directory Source的日志采集(监控指定目录下的变化(监控整个文件夹),将目录变化作为数据输入源)
【参考配置文件】

# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置Source  重点修改部分
#设置a1的输入源类型为spooldir 
a1.sources.r1.type = spooldir
#指定监控的目录(文件夹)
a1.sources.r1.spoolDir = 监控的目录路径(文件夹)
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#是否增加一个header用来存储文件的绝对路径,默认是false
a1.sources.r1.fileHeader = true
 
# 配置Sink
#输出到控制台
a1.sinks.k1.type = logger 
#输出到文件
#a1.sinks.k1.type = file_roll
#a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹)
#a1.sinks.k1.sink.rollInterval = 0 
#设置文件输出的命名方式  此处:文件的命名方式为yyyyMMddHH
#a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

案例三、Exec Source的日志采集(监控单个文件
监控某个日志文件,对单个日志文件新增加的内容显示到服务器的控制台上。

# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置Source  重点修改部分
#设置a1的输入源类型为Exec 
a1.sources.r1.type = exec
#指定Exec执行的shell命令
a1.sources.r1.command = tail -F 指定文件的绝对路径
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置Sink
#输出到控制台
a1.sinks.k1.type = logger 
#输出到文件
#a1.sinks.k1.type = file_roll
#a1.sinks.k1.sink.directory = 输出文件的存储路径(文件夹)
#a1.sinks.k1.sink.rollInterval = 0 
#设置文件输出的命名方式  此处:文件的命名方式为yyyyMMddHH
#a1.sinks.k1.sink.file.name.timeFormat = yyyyMMddHH

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Exec 可以通过指定的操作对日志进行读取,使用Exec时需要指定shell命令
但可能在windows环境下运行时会出现如下问题:
问题描述:
由于使用Exec Source对日志进行读取时,需要指定shell命令。但常用的tail命令是Linux下用来管理日志的命令,windows系统并未提供。
问题解决:
1、在Linux系统进行操作
2、Windows下安装运行tail的工具tail.exe,将其放在C:WindowsSystem32目录下,即可运行
下载路径: 提取码:o7bi
https://pan.baidu.com/s/14XHrCuuLmNIf9ZczERLYfQ

案例四、基于Avro,实现对telnet数据的采集并上传到统一的服务器
Avro Source负责监听Avro端口,接收来自外部Avro客户端的事件流,利用Avro Source可以达到多级流动、扇出流、扇入流等效果。另外,他还可以接受通过Flume提供的Avro客户端发送的日志信息。
Avro:Flume通过Avro方式在两台及其之间进行数据传输,Flume可以监听和收集指定端口的日志,使用Avro的Source需要说明被监听的主机的IP地址和端口号 =》必须是完整的IP地址。

【案例需求】:
输入要求:每个人通过telnet输入:学号,查看数据是否上传成功
或者上传文件
上传服务器地址:172.17.67.240,端口:12345
参考配置文件

# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置Source
# 监听netcat数据源,端口信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 监听文件夹(上传文件)
#a1.sources.r1.type = spooldir
#a1.sources.r1.spoolDir = 监控的目录路径(文件夹)
#是否增加一个header用来存储文件的绝对路径,默认是false
#a1.sources.r1.fileHeader = true
 
# 配置Sink
a1.sinks.k1.type = avro
#输出的远程主机IP
a1.sinks.k1.hostname = 172.17.67.240
a1.sinks.k1.port = 12345

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

案例五、基于Avro多Agent分布式日志采集
【案例需求】:
Agent1:采集telnet的信息,并已avro的格式将信息sink到Agent2;
Agent2: 将Agent1发过来的信息显示到控制台上;
【实现逻辑】:
Agent1作为数据源,发送数据输出到Agent2,Agent2作为数据的接收者,接收到数据后进行输出显示。
【参考配置文件】:
Agent1配置文件 : ——数据的发送者

# Agent1为数据的发送者
# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置Source
# 监听netcat数据源,端口信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 监听文件夹(上传文件)
#a1.sources.r1.type = spooldir
#a1.sources.r1.spoolDir = 监控的目录路径(文件夹)
#是否增加一个header用来存储文件的绝对路径,默认是false
#a1.sources.r1.fileHeader = true
 
# 配置Sink
a1.sinks.k1.type = avro
#输出的远程主机IP(必须是IP地址)
a1.sinks.k1.hostname = 192.168.43.119
a1.sinks.k1.port = 12345

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent2配置文件:——数据的接收者

# Agent1为数据的发送者
# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置Source
# 监听netcat数据源,端口信息
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.43.119
a1.sources.r1.port = 12345

# 配置Sink
a1.sinks.k1.type = logger

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

【备注】:要先开启Agent2,再开启Agent1

案例六、通过spooldir模式,将文件上传到hdfs
要求日志文件以“年-月-日/时-分/”为文件夹的名字
【参考配置文件】:

#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置Source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = D:/hky
agent1.sources.source1.fileHeader = false

# 配置Sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://localhost:9000/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 设置Channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# 把Source和Sink绑定到Channel上
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/845735.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号