要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:
# list the sources,sinks and channels for the agent.sources = .sinks = .channels = # set channel for source .sources. .channels = ... # set channel for sink .sinks. .channel =
案例如下
# list the sources, sinks and channels for the agent agent_foo.sources = avro-appserver-src-1 agent_foo.sinks = hdfs-sink-1 agent-foo.channels = mem-channel-1 # set channel for source agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 #set channel for sink agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1配置组件属性
案例如下
agent_foo.sources=avro-AppSrv-source agent_foo.sinks = hdfs-Cluster1-sink agent_foo.channels = mem-channel-1 # set channel for sources,sinks # properties of avro-AppSrv-source agent_foo. sources.avro-AppSrv-source.type = avro agent_foo.sources.avro-AppSrv-source.bind = localhost agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1 agent_foo.channels.mem-channel-1.type = memory agent_foo.channels.mem-channel-1.capacity = 1000 agent_foo .channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs agent_foo.sinks. hdfs-Cluster1-sink.hdfs.path = hdfs://namenode /flume/webdata #...常用的source和sink种类 常用的flume sources
# Avro source: avro # Syslog TCP source: syslogtcp # Syslog uDP Source: syslogudp # HTTP Source: http # Exec source: exec # JMS source: jms # Thrift source: thrift # Spooling directory source: spooldir # Kafka source: org.apache.flume. source.kafka, KafkaSource ...常见的flume channels
# Memory Channel memory # JDBC Channel jdbc # Kafka Channel org.apache.flume.channel.kafka.KafkaChannel # File Channel file常用的flume sinks
# HDFS Sink hdfs # HIVE Sink hive # Logger Sink logger # Avro Sink avro # Kafka Sink org.apache.flume.sink.kafka.KafkaSink # Hbase Sink hbase案例演示 案例演示:avro+memory+logger
Avro Source:监听一个指定的Avro端口,通过Awro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
1.1 编写采集方案
[root@tianqinglong01 flume]# mkdir flumeconf [root@tianqinglong01 flume]# cd flumeconf [root@tianqinglong01 flumeconf]# vi avro-logger.conf #定义各个组件的名字 a1.sources=avro-sour1 a1.channels=mem-chan1 a1.sinks=logger-sink1 #定义sources组件的相关属性 a1.sources.avro-sour1.type=avro a1.sources.avro-sour1.bind=tianqinglong01 a1.sources.avro-sour1.port=9999 #定义channels组件的相关属性 a1.channels.mem-chan1.type=memory #定义sinks组件的相关属性 a1.sinks.logger-sink1.type=logger a1.sinks.logger-sink1.maxBytesToLog=100 #组件之间进行绑定 a1.sources.avro-sour1.channels=mem-chan1 a1.sinks.logger-sink1.channel=mem-chan1
1.2 启动Agent
[root@tianqinglong01 flumeconf]# flume-ng agent -c ../cong -f ./avro-logger.conf -n a1 -Dflume.root.logger-INFO,console
# 再开一个客户端 [root@tianqinglong01 ~]# echo "hello flume" >> text [root@tianqinglong01 ~]# flume-ng avro-client -c $FLUME_HOME/conf -H tianqinglong01 -p 9999 -F ./text案例演示 实时采集(监听文件): exec+memory+hdfs
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容
memory:传输数据的Channel为Memory
hdfs是输出目标为Hdfs
配置方案
[root@tianqinglong flumeconf]# vi exec-hdfs.conf a1.sources=r1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /root/flume-test-exec-hdfs a1.sinks=k1 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://qianfeng01:8020/flume/tailout/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix=events a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1.hdfs.roundUnit=second a1.sinks.k1.hdfs.rollInterval=3 a1.sinks.k1.hdfs.rollSize=20 a1.sinks.k1.hdfs.rollCount=5 a1.sinks.k1.hdfs.batchSize=1 a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.fileType=DataStream a1.channels=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
启动Agent
[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger-INFO,console
测试数据
[root@tianqinglong ~]# echo "hello world" >> flume-test-exec-hdfs案例演示 实时采集(监听目录): spool+mem+logger
spool: Source来源于目录,有文件进入目录就摄取。mem:通过内存来传输数据
logger:是传送数据到日志
配置方案
[root@tianqinglong01 flumeconf]# vi spool-logger.conf a1.sources = r1 a1.channels = c1 a1.sinks = s1 a1.sources.r1.type=spooldir a1.sources.r1.spoolDir = /home/flume/spool a1.sources.r1.fileSuffix =.COMPLETED a1.sources.r1.deletePolicy=never ai.sources.r1.fileHeader=false a1.sources.r1.fileHeaderKey=file a1.sources.r1.basenameHeader=false a1.sources.r1.basenameHeaderKey=basename a1.sources.r1.batchSize=100 a1.sources.r1.inputCharset=UTF-8 a1.sources.r1.bufferMaxLines=1000 a1.channels.c1.type=memory a1.sinks.s1.type=logger a1.sinks.s1.maxBytesToLog=16 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
启动Agent
[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试
[root@tianqinglong ~]# for i in `seq 1 10` ; do echo $i >> /home/flume/spool/$i;done案例演示:http+mem+logger
http:表示数据来源是http网络协议,一般接收的请求为get或post请求.所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.
mem:表示用内存传输通道
logger:表示输出格式为Logger格式
配置方案
[root@qianfengo1 flumeconf]# vi http-logger.conf a1.sources = r1 a1.channels = c1 a1.sinks = s1 a1.sources.r1.type=http a1.sources.r1.bind = tianqinglong01 a1.sources.r1.port = 6666 a1.sources.r1.handler = org.apache.flume. source.http.JSonHandler a1.channels.c1.type=memory a1.sinks.s1.type=logger a1.sinks.s1.maxBytesToLog=16 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1
启动agent的服务
[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试
[root@tianqinglong ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":123456},"body":"this is my content"}]' http://tianqinglong:6666



