栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

1024程序员节,我来发个flume

1024程序员节,我来发个flume

Flume Flume Source 1、Avro Source
  • channels
  • type
  • bind
  • port
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

2、Exec Source
  • chennels
  • type
  • command
a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/test.txt  # -f和-F区别:-F失败重试

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

# 执行命令
flume-ng agent -n a1 -f conf/flume_exec_logger.conf -c conf/ -Dflume.root.logger=INFO,console
3、Spooling Directory Source
  • channels
  • type
  • spoolDir # 记好,是大spoolDir而不是spooldir
  • fileSuffix
  • includePattern
  • ignorePattern
a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/data
a1.sources.r1.fileHeader=true

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9000/flume/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix=event-
a1.sinks.k1.hdfs.rollInterval=30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

4、Taildir Source

这个source说白了就是Exec Source和Spooling Directory Source 的结合体,支持断点续传。

这个Source是Flume1.7.0才有的。

  • channels
  • filegroups
  • filegroups.
  • positionFile ~/.flume/taildir_position.json
a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/home/data/.*
a1.sources.r1.positionFile=/home/taildir_position.json
a1.sources.r1.fileHeader=true #开启文件路径存入header

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9000/flume/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix=event-
a1.sinks.k1.hdfs.rollInterval=30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

5、NetCat Source
  • channels
  • type
  • bind
  • port
a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.sources.r1.port=44444

a1.sinks.k1.type=logger

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
Flume Sink 1、HDFS Sink
AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, …)
%Alocale’s full weekday name (Monday, Tuesday, …)
%blocale’s short month name (Jan, Feb, …)
%Blocale’s long month name (January, February, …)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%eday of month without padding (1)
%Ddate; same as %m/%d/%y
%Hhour (00…23)
%Ihour (01…12)
%jday of year (001…366)
%khour ( 0…23)
%mmonth (01…12)
%nmonth without padding (1…12)
%Mminute (00…59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00…59)
%ylast two digits of year (00…99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)
%[localhost]Substitute the hostname of the host where the agent is running
%[IP]Substitute the IP address of the host where the agent is running
%[FQDN]Substitute the canonical hostname of the host where the agent is running
  • channel
  • type hdfs
  • hdfs.path
  • hdfs.filePrefix
  • hdfs.fileSuffix
  • hdfs.rollInterval 30s
  • hdfs.rollSize 1024
  • hdfs.rollCount 10
  • hdfs.batchSize 100
  • hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
  • hdfs.fileType File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
  • hdfs.round 时间戳是否应该四舍五入
  • hdfs.useLocalTimeStamp
2、Logger Sink
  • channel
  • type
3、Avro Sink
  • channel
  • type
  • hostname
  • port
4、File Roll Sink
  • channel
  • type file_roll
  • sink.directory
  • sink.rollInterval
  • batchSize
a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/home/data
a1.sinks.k1.sink,rollInterval=10

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

5、Kafka Sink

这个版本不支持0.8.x版本的kafka

  • type org.apache.flume.sink.kafka.KafkaSink
  • kafka.bootstrap.servers
  • kafka.topic
  • flumeBatchSize
  • kafka.producer.acks
a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=master:9092
a1.sinks.k1.kafka.topic=second

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350633.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号