1、使用的组件类型①netcat source: 作用就是监听某个tcp端口手动的数据,将每行数据封装为一个event。
工作原理类似于nc -l 端口
配置:
必须属性:
type – The component type name, needs to be netcat
bind – Host name or IP address to bind to
port – Port # to bind to
②logger sink: 作用使用logger(日志输出器)将event输出到文件或控制台,使用info级别记录event!
必须属性:
type – The component type name, needs to be logger
可选属性:
maxBytesToLog 16 Maximum number of bytes of the Event body to log③memery channel
必须属性:
type – The component type name, needs to be memory
可选属性:
capacity 100 The maximum number of events stored in the channel
transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction
2、编写配置文件案例二:监控文件将数据发送到HDFS上#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1#组名名.属性名=属性值
#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop102
a1.sources.r1.port=44444#定义sink
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
1. 使用的组件类型①EXECSource
介绍: execsource会在agent启动时,运行一个linux命令,运行linux命令的进程要求是一个可以持续产生数据的进程!
将标准输出的数据封装为event!
通常情况下,如果指定的命令退出了,那么source也会退出并且不会再封装任何的数据!
所以使用这个source一般推荐类似cat ,tail -f 这种命令,而不是date这种只会返回一个数据,并且执行完就退出的命令!
配置:
必须配置:
type – The component type name, needs to be exec
command – The command to execute
②HDFSSink
介绍: hdfssink将event写入到HDFS!目前只支持生成两种类型的文件: text | sequenceFile,这两种文件都可以使用压缩!
写入到HDFS的文件可以自动滚动(关闭当前正在写的文件,创建一个新文件)。基于时间、events的数量、数据大小进行周期性的滚动!
支持基于时间和采集数据的机器进行分桶和分区操作!
HDFS数据所上传的目录或文件名可以包含一个格式化的转义序列,这个路径或文件名会在上传event时,被自动替换,替换为完整的路径名!
使用此Sink要求本机已经安装了hadoop,或持有hadoop的jar包!
配置:
必须配置:
type – The component type name, needs to be hdfs
hdfs.path – HDFS directory path (eg hdfs://namenode/flume/webdata/)
可选参考word
2、配置:案例三:监控目录将数据发送到HDFS上#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1#组名名.属性名=属性值
#定义source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/hive/logs/hive.log#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000#定义sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-#以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
#是否将时间戳向下舍
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100#以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
#30秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#每写多少个event滚动一次
a1.sinks.k1.hdfs.rollCount = 0
#以不压缩的文本形式进行存储
a1.sinks.k1.hdfs.fileType=DataStream#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
1、使用的组件类型1.SpoolingDirSource
简介:
SpoolingDirSource指定本地磁盘的一个目录为"Spooling(自动收集)"的目录!这个source可以读取目录中
新增的文件,将文件的内容封装为event!
SpoolingDirSource在读取一整个文件到channel之后,它会采取策略,要么删除文件(是否可以删除取决于配置),要么对文件
进程一个完成状态的重命名,这样可以保证source持续监控新的文件!
SpoolingDirSource和execsource不同,SpoolingDirSource是可靠的!即使flume被杀死或重启,依然不丢数据!但是为了保证
这个特性,付出的代价是,一旦flume发现以下情况,flume就会报错,停止!
①一个文件已经被放入目录,在采集文件时,不能被修改
②文件的名在放入目录后又被重新使用(出现了重名的文件)
要求: 必须已经封闭的文件才能放入到SpoolingDirSource,在同一个SpoolingDirSource中都不能出现重名的文件!
使用:
必需配置:
type – The component type name, needs to be spooldir.
spoolDir – The directory from which to read files from.
2、配置:#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1#组名名.属性名=属性值
#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/flume#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000#定义sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-#以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
#是否将时间戳向下舍
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100#以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
#30秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#每写多少个event滚动一次
a1.sinks.k1.hdfs.rollCount = 0
#以不压缩的文本形式保存数据
a1.sinks.k1.hdfs.fileType=DataStream
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1



