Flume安装地址
·Flume官网地址:http://flume.apache.org/
·文档查看地址:http://flume.apache.org/FlumeUserGuide.html
·下载地址:http://archive.apache.org/dist/flume/
Flume概述
·Flume是一个高可用的,分布式的海量日志采集、聚合和传输的系统:基于流式架构,简单灵活
·Flume可以实时的监控本地磁盘的数据,实时读取服务器本地磁盘,聚合在一起,上传到HDFS等
Flume基础架构
·Agent是一个JVM进程,主要组成的是source、channel、sink
·source
·source是负责拉取数据到Flume agent的组件。source组件可以处理各种类型、各种格式的日志数据
·channel(管道)
·channel是位于source和sink之间的缓冲区;channel允许source和sink运作速率不同。线程安全的,可以同时处理多个source的写入和sink的读取操作
·Flume自带两种channel:Memroy channel、File channel
·sink(将数据传送到HDFS或者存储系统或另一个Flume Agent)
·不断地轮询Channel中的event且批量删除,并将这些event批量写入存储体系等
·Event
·Flume传输基本单元,以Event形式将数据从源头送至目的地
·Event有Header和Body组成
·Header用来存放Event的一些属性,为K-V结构;拦截器就是利用Header来的
·Body用来存储该条数据,为字节数组
Flume安装部署
·解压flume的压缩包
·将flume/lib下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
·修改/conf下的log4j.properties日志打印位置
#console表示同时将日志输出到控制台
flume.root.logger=INFO,LOGFILE,console
#固定日志输出的位置
flume.log.dir=/opt/module/flume/logs
#日志文件的名称
flume.log.file=flume.log
Flume入门案例
需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台
步骤:
·通过nc命令查看是否存在netcat,如果没有先安装netcat工具(网络测试工具,可以将两台机器进行tcp/udp连接)
·sudo yum install -y nc
·查看端口是否被占用
·sudo netstat -nlp | grep 端口
·在flume下创建一个job目录,也可以直接使用conf目录
·创建flume agent配置文件nc-flume-console.conf
·添加以下内容
添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source 从netcat中拿取数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
·先开启flume监听端口
·flume-ng agent -c /flume/conf -f /job/nc-flume-console.conf -n a1
或
·flume-ng agent --conf conf/ --name a1 --conf-file conf/nc-flume-log.conf -Dflume.root.logger=INFO,console
实时监控目录下的多个追加文件
需求:使用flume监听整个目录的实时追加文件,并上传到hdfs
·source的TAILDIR类型
·Taildir:观察指定的文件,并在检测到附加到每个文件的新行后几乎实时地跟踪它们。具有断点续传的功能
·filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字
·配置文件
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source TAILDIR
a1.sources.r1.type = TAILDIR
#filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字
a1.sources.r1.filegroups = f1 f2
#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*file.*
#JSON格式的文件,记录每个尾号文件的inode、绝对路径和最后位置。
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
# Describe the sink
a1.sinks.k1.type = hdfs
#HDFS路径,主机名:端口号可写可不写,会自动识别
a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
#可设置前缀、后缀
a1.sinks.k1.hdfs.filePrefix = log-
#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件
#业内经验rollInterval=1h,rollSize=128M,rollCount=0
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#是否使用本地时间戳,以上占位符会使用的到
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置文件类型,DataStrem不带序列化
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume进阶
·Put事务
·source和channel之间的事务
·Take事务
·channel和sink之间的事务
Flume内部原理
·ChannelSelector
·ChannelSelector的作用是选出Event将要被发往哪个Channel。共两种类型:Replicating(复制)和Multiplexing(多路复用)
·ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
·SinkProcessor
·共三种类型:DefaultSinkProcessor(默认1对1)、LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
·DefaultSinkProcessor对应的是单个的Sink;LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。
Replication(复制)案例
·Agent-01
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source TAILDIR
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*
#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
# 将数据流复制给所有channel 默认参数可以不写
a1.sources.r1.selector.type = replicating
# Describe the sink Avro的source和sink是一组的,能形成通信
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
·Agent-02(上传到hdfs)
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source avro
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = hdfs
#HDFS路径,主机名:端口号可写可不写,会自动识别
a1.sinks.k1.hdfs.path = /flumeHDFS/%Y%m%d/%H
#可设置前缀、后缀
a1.sinks.k1.hdfs.filePrefix = log-
#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件
#业内经验rollInterval=1h,rollSize=128M,rollCount=0
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#是否使用本地时间戳,以上占位符会使用的到
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置文件类型
a1.sinks.k1.hdfs.fileType = DataStream
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
·Agent-03(传到本地)
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source TAILDIR
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4142
# Describe the sink 输出到本地磁盘: file_roll
a1.sinks.k1.type = file_roll
#输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
a1.sinks.k1.sink.directory = /opt/module/flume-1.9.0/flumeFile_roll_datas
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Multiplexing多路复用
需求:使用Flume采集服务器本地日志,按照日志类型的不同,将不同种类的日志发送到不同分析系统
模拟:在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
分析:在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
·编写拦截器API
public class MyInterceptor implements Interceptor {
//初始化,程序开始走以便
@Override
public void initialize() {
}
//处理单个Event,可以嵌套到下面的方法,解耦增加复用性
@Override
public Event intercept(Event event) {
//需求:在event的header中添加标记
//提供给channel selector 选择发送不同的channel
Map header = event.getHeaders();
byte[] body = event.getBody();
String info = new String(body);
//判断info的开头的第一个字符,数字发送给channel,字母发送给channel2
char c = info.charAt(0);
if (c >= '0' && c <='9'){
header.put("type","number");
}else if((c >= 'a'&&c <='z')||(c >= 'A'&&c <='Z')){
header.put("type","letter");
}
return event;
}
//处理多个Event,系统调用的是此方法
@Override
public List intercept(List events) {
for (Event event : events) {
intercept(event);
}
return events;
}
//关闭
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
//通过此方法创建拦截器对象
@Override
public Interceptor build() {
return new MyInterceptor();
}
//配置文件
@Override
public void configure(Context context) {
}
}
}
·打的jar包要放到/flume/lib下
·flume-1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source 从netcat中拿取数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 多路复用
a1.sources.r1.selector.type = multiplexing
#使用header中的哪些参数,对应header中的key
a1.sources.r1.selector.header = type
#number/letter对应value
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.letter = c2
#匹配不上走的channel
a1.sources.r1.selector.default = c1
#注册拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptors.MyInterceptor$builder
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
聚合
需求: 监听一台服务器的一个本地文件夹,监听另一台服务器的一个端口,前两台服务器的变化最终输出到第三台服务器的控制台上
·第一服务器
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source TAILDIR
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*
#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
·第二台服务器
# Name the components on this agent a1为agent的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
·第三台服务器
# Name the components on this agent a1为agent的别名
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
# Describe/configure the source TAILDIR
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sources.r2.type = avro
a1.sources.r2.bind = hadoop103
a1.sources.r2.port = 4142
# Describe the sink
a1.sinks.k1.type = logger
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1