flume 是一个分布式、高可靠、高可用的海量日志采集、聚合、传输的系统
主要有 3 个部分组成,Source(源)、Channel(缓存)、Sink(下沉)。
flume 安装1、window安装, 解压即可
2、linux安装 ,解压,修改conf下文件后缀
a1.conf 一般放在conf文件夹下
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # sink a1.sinks.k1.type = logger # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1flume启动
进入bin目录下
windows(简写)
flume-ng agent -c ../conf -f ../conf/a1.conf -n a1 -Dflume.root.logger=INFO,console
linux
flume-ng agent --conf ../conf --conf-file ../conf/a1.conf --name a1 -property flume.root.logger=INFO,consoleflume监控文件 spooldir
监控文件夹,当有文件生成,读取文件内容
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir # 监控的文件夹 a1.sources.r1.spoolDir = D:/SogouQ # 上传成功后显示后缀名 a1.sources.r1.fileSuffix = .COMPLETED # 如论如何 加绝对路径的文件名 默认false a1.sources.r1.fileHeader = true #忽略所有以.tmp 结尾的文件(正在被写入),不上传 # ^以任何开头 出现无限次 以.tmp结尾的 #a1.sources.r1.ignorePattern = ([^ ]*.tmp) # Describe the sink # sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r1.inputCharset = GBK //不加会报错 a1.sinks.k1.channel = c1exec
监控某一文件,文件有新的内容写入时流入flume
这里的demo sink到kafka
windows下不支持exec模式
#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/dingke/text.log #Describe the sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = kafka-test-topic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1```avro
机器a负责采集,通过avrorpc远程传递到机器b,机器b sink kafka
avro实现跨机器日志采集
机器a
#example.conf: A single-node Flume configuration #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/dingke/text.log #sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.66.1 a1.sinks.k1.port = 4545 #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
机器b
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.66.1 a1.sources.r1.port = 4545 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = kafka-test-topic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume还有很种source 和sink 可以多多去官方文档学习
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
一般在大数据中都会结合其他工具使用



