- channels
- type
- bind
- port
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41412、Exec Source
- chennels
- type
- command
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /home/test.txt # -f和-F区别:-F失败重试 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type=logger a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
# 执行命令 flume-ng agent -n a1 -f conf/flume_exec_logger.conf -c conf/ -Dflume.root.logger=INFO,console3、Spooling Directory Source
- channels
- type
- spoolDir # 记好,是大spoolDir而不是spooldir
- fileSuffix
- includePattern
- ignorePattern
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/data a1.sources.r1.fileHeader=true a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://master:9000/flume/%Y-%m-%d/%H-%M a1.sinks.k1.hdfs.filePrefix=event- a1.sinks.k1.hdfs.rollInterval=30 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sources.r1.channels=c1 a1.sinks.k1.channel=c14、Taildir Source
这个source说白了就是Exec Source和Spooling Directory Source 的结合体,支持断点续传。
这个Source是Flume1.7.0才有的。
- channels
- filegroups
- filegroups.
- positionFile ~/.flume/taildir_position.json
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1=/home/data/.* a1.sources.r1.positionFile=/home/taildir_position.json a1.sources.r1.fileHeader=true #开启文件路径存入header a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://master:9000/flume/%Y-%m-%d/%H-%M a1.sinks.k1.hdfs.filePrefix=event- a1.sinks.k1.hdfs.rollInterval=30 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sources.r1.channels=c1 a1.sinks.k1.channel=c15、NetCat Source
- channels
- type
- bind
- port
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.sources.r1.port=44444 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1Flume Sink 1、HDFS Sink
| Alias | Description |
|---|---|
| %{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
| %t | Unix time in milliseconds |
| %a | locale’s short weekday name (Mon, Tue, …) |
| %A | locale’s full weekday name (Monday, Tuesday, …) |
| %b | locale’s short month name (Jan, Feb, …) |
| %B | locale’s long month name (January, February, …) |
| %c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
| %d | day of month (01) |
| %e | day of month without padding (1) |
| %D | date; same as %m/%d/%y |
| %H | hour (00…23) |
| %I | hour (01…12) |
| %j | day of year (001…366) |
| %k | hour ( 0…23) |
| %m | month (01…12) |
| %n | month without padding (1…12) |
| %M | minute (00…59) |
| %p | locale’s equivalent of am or pm |
| %s | seconds since 1970-01-01 00:00:00 UTC |
| %S | second (00…59) |
| %y | last two digits of year (00…99) |
| %Y | year (2010) |
| %z | +hhmm numeric timezone (for example, -0400) |
| %[localhost] | Substitute the hostname of the host where the agent is running |
| %[IP] | Substitute the IP address of the host where the agent is running |
| %[FQDN] | Substitute the canonical hostname of the host where the agent is running |
- channel
- type hdfs
- hdfs.path
- hdfs.filePrefix
- hdfs.fileSuffix
- hdfs.rollInterval 30s
- hdfs.rollSize 1024
- hdfs.rollCount 10
- hdfs.batchSize 100
- hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
- hdfs.fileType File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
- hdfs.round 时间戳是否应该四舍五入
- hdfs.useLocalTimeStamp
- channel
- type
- channel
- type
- hostname
- port
- channel
- type file_roll
- sink.directory
- sink.rollInterval
- batchSize
a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.sources.r1.port=44444 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type=file_roll a1.sinks.k1.sink.directory=/home/data a1.sinks.k1.sink,rollInterval=10 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c15、Kafka Sink
这个版本不支持0.8.x版本的kafka
- type org.apache.flume.sink.kafka.KafkaSink
- kafka.bootstrap.servers
- kafka.topic
- flumeBatchSize
- kafka.producer.acks
a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.sources.r1.port=44444 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=master:9092 a1.sinks.k1.kafka.topic=second a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1



