tar -zxvf apache-flume-1.6.0-bin.tar.gz2、重命名目录,并配置环境变量
mv apache-flume-1.6.0-bin/ flume-1.6.0
export FLUME_HOME=/home/sunshj/flume-1.6.0 export PATH=$PATH:$FLUME_HOME/bin
source .bashrc3、查看flume版本
flume-ng version
4、测试flume spoolingtest.confFlume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
-
监控一个目录,将数据打印出来
- 新建spoolingtest.conf,配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定spooldir的属性 a.sources.r1.type = spooldir a.sources.r1.spoolDir = /home/sunshj/flumescript/spoolData a.sources.r1.fileHeader = true a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp #指定sink的类型 a.sinks.k1.type = logger #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
- 启动agent
flume-ng agent -n a -f spoolingtest.conf -Dflume.root.logger=DEBUG,console
- 新建/home/sunshj/flumescript/spoolData目录
mkdir /home/sunshj/flumescript/spoolData
- 在spoolData目录下新建文件,输入内容,观察flume进程打印的日志
# 随意在a.txt中加入一些内容 vim a.txt
-
配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定spooldir的属性 a.sources.r1.type = spooldir a.sources.r1.spoolDir = /home/sunshj/flumescript/spoolData a.sources.r1.fileHeader = true a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp #指定sink的类型 a.sinks.k1.type = hdfs a.sinks.k1.hdfs.path = /flume/data/dir1 # 指定文件名前缀 a.sinks.k1.hdfs.filePrefix = student # 指定达到多少数据量写一次文件 单位:bytes a.sinks.k1.hdfs.rollSize = 102400 # 指定多少条写一次文件 a.sinks.k1.hdfs.rollCount = 1000 # 指定文件类型为 流 来什么输出什么 a.sinks.k1.hdfs.fileType = DataStream # 指定文件输出格式 为text a.sinks.k1.hdfs.writeFormat = text # 指定文件名后缀 a.sinks.k1.hdfs.fileSuffix = .txt #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
- 在 /home/sunshj/flumescript/spoolData目录下准备数据
-
启动agent
flume-ng agent -n a -f spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
- 配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定spooldir的属性 a.sources.r1.type = exec a.sources.r1.command = tail -f /home/sunshj/hbase-1.4.13/logs/hbase-sunshj-master-master.log #指定sink的类型 a.sinks.k1.type = hdfs a.sinks.k1.hdfs.path = /flume/data/dir2 # 指定文件名前缀 a.sinks.k1.hdfs.filePrefix = hbaselog # 指定达到多少数据量写一次文件 单位:bytes a.sinks.k1.hdfs.rollSize = 102400 # 指定多少条写一次文件 a.sinks.k1.hdfs.rollCount = 1000 # 指定文件类型为 流 来什么输出什么 a.sinks.k1.hdfs.fileType = DataStream # 指定文件输出格式 为text a.sinks.k1.hdfs.writeFormat = text # 指定文件名后缀 a.sinks.k1.hdfs.fileSuffix = .txt #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
- 启动agent
flume-ng agent -n a -f hbaseLogToHDFS.conf -Dflume.root.logger=DEBUG,console(3). netcatToLogger
监听telnet端口
- 安装telnet
yum install telnet
- 配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定spooldir的属性 a.sources.r1.type = netcat a.sources.r1.bind = master a.sources.r1.port = 8888 #指定sink的类型 a.sinks.k1.type = logger #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
- 先启动agent
flume-ng agent -n a -f netcatToLogger.conf -Dflume.root.logger=DEBUG,console
- 在启动telnet
telnet master 8888(4). httpToLogger
- 配置文件
a.sources = r1 a.sinks = k1 a.channels = c1 a.sources.r1.type = http a.sources.r1.bind = master a.sources.r1.port = 8888 a.sinks.k1.type = hdfs #指定hdfs地址中的输出⽬录 a.sinks.k1.hdfs.path = hdfs://master:9000/flume/output a.sinks.k1.hdfs.writeFormat = Text a.sinks.k1.hdfs.fileType = DataStream a.sinks.k1.hdfs.rollInterval = 10 a.sinks.k1.hdfs.rollSize = 0 a.sinks.k1.hdfs.rollCount = 0 a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a.sinks.k1.hdfs.useLocalTimeStamp = true a.channels.c1.type = file a.channels.c1.checkpointDir = /home/sunshj/flumescript/checkpoint a.channels.c1.dataDirs = /home/sunshj/flumescript/data a.sources.r1.channels = c1 a.sinks.k1.channel = c1
-
启动
- 先启动agent
flume-ng agent -n a -f httpToLogger.conf -Dflume.root.logger=DEBUG,console
- 再使用curl发起一个http请求
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://master:8888
a.sources = r1 a.sinks = k1 a.channels = c1 a.sources.r1.type = spooldir a.sources.r1.spoolDir = /home/sunshj/flumescript/file a.sources.r1.fileHeader = true a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp a.sinks.k1.type = hdfs a.sinks.k1.hdfs.path =hdfs://master:9000/flume/file a.sinks.k1.hdfs.filePrefix = pre- a.sinks.k1.hdfs.minBlockReplicas= 1 a.sinks.k1.hdfs.fileType = DataStream a.channels.c1.type = memory a.channels.c1.capacity = 1000 a.channels.c1.transactionCapacity = 100 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
-
在 /home/sunshj/flumescript/file目录下准备数据
-
启动agent
flume-ng agent -n a -f fileToHdfs.conf -Dflume.root.logger=DEBUG,console



