栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume

Flume

Flume安装地址
·Flume官网地址:http://flume.apache.org/
·文档查看地址:http://flume.apache.org/FlumeUserGuide.html
·下载地址:http://archive.apache.org/dist/flume/
Flume概述
·Flume是一个高可用的,分布式的海量日志采集、聚合和传输的系统:基于流式架构,简单灵活
·Flume可以实时的监控本地磁盘的数据,实时读取服务器本地磁盘,聚合在一起,上传到HDFS等

Flume基础架构

·Agent是一个JVM进程,主要组成的是source、channel、sink
	·source
		·source是负责拉取数据到Flume agent的组件。source组件可以处理各种类型、各种格式的日志数据
	·channel(管道)
		·channel是位于source和sink之间的缓冲区;channel允许source和sink运作速率不同。线程安全的,可以同时处理多个source的写入和sink的读取操作
		·Flume自带两种channel:Memroy channel、File channel
	·sink(将数据传送到HDFS或者存储系统或另一个Flume Agent)		
		·不断地轮询Channel中的event且批量删除,并将这些event批量写入存储体系等
·Event
	·Flume传输基本单元,以Event形式将数据从源头送至目的地
	·Event有Header和Body组成
		·Header用来存放Event的一些属性,为K-V结构;拦截器就是利用Header来的
		·Body用来存储该条数据,为字节数组
Flume安装部署
·解压flume的压缩包
·将flume/lib下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
·修改/conf下的log4j.properties日志打印位置
	#console表示同时将日志输出到控制台
	flume.root.logger=INFO,LOGFILE,console
	#固定日志输出的位置
	flume.log.dir=/opt/module/flume/logs
	#日志文件的名称
	flume.log.file=flume.log			
Flume入门案例
需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台
步骤:
	·通过nc命令查看是否存在netcat,如果没有先安装netcat工具(网络测试工具,可以将两台机器进行tcp/udp连接)
		·sudo yum install -y nc
	·查看端口是否被占用
		·sudo netstat -nlp | grep 端口
	·在flume下创建一个job目录,也可以直接使用conf目录
	·创建flume agent配置文件nc-flume-console.conf
	·添加以下内容
		添加内容如下:
		# Name the components on this agent
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1
		
		# Describe/configure the source 从netcat中拿取数据
		a1.sources.r1.type = netcat
		a1.sources.r1.bind = localhost
		a1.sources.r1.port = 44444
		
		# Describe the sink
		a1.sinks.k1.type = logger
		
		# Use a channel which buffers events in memory
		a1.channels.c1.type = memory
		a1.channels.c1.capacity = 1000
		a1.channels.c1.transactionCapacity = 100
		
		# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sink
		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1
	·先开启flume监听端口
		·flume-ng agent -c /flume/conf -f /job/nc-flume-console.conf -n a1
		或
		·flume-ng agent --conf conf/ --name a1 --conf-file conf/nc-flume-log.conf -Dflume.root.logger=INFO,console
实时监控目录下的多个追加文件
需求:使用flume监听整个目录的实时追加文件,并上传到hdfs
·source的TAILDIR类型
	·Taildir:观察指定的文件,并在检测到附加到每个文件的新行后几乎实时地跟踪它们。具有断点续传的功能
	·filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字
	·配置文件
		
	# Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1
	
	# Describe/configure the source TAILDIR
	a1.sources.r1.type = TAILDIR
	#filegroups:文件组,可以监控多个文件夹,需要给文件夹起个名字
	a1.sources.r1.filegroups = f1 f2
	#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
	a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*
	a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*file.*
	#JSON格式的文件,记录每个尾号文件的inode、绝对路径和最后位置。
	a1.sources.r1.positionFile =  /opt/module/flume-1.9.0/taildir_position.json
	
	# Describe the sink
	a1.sinks.k1.type = hdfs
	#HDFS路径,主机名:端口号可写可不写,会自动识别
	a1.sinks.k1.hdfs.path = /flume/%Y%m%d/%H
	#可设置前缀、后缀
	a1.sinks.k1.hdfs.filePrefix = log-
	#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件
	#业内经验rollInterval=1h,rollSize=128M,rollCount=0
	a1.sinks.k1.hdfs.rollInterval = 10
	a1.sinks.k1.hdfs.rollSize = 134217700
	a1.sinks.k1.hdfs.rollCount = 0
	#是否使用本地时间戳,以上占位符会使用的到
	a1.sinks.k1.hdfs.useLocalTimeStamp = true
	#设置文件类型,DataStrem不带序列化
	a1.sinks.k1.hdfs.fileType = DataStream

	# Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1
Flume进阶

·Put事务
	·source和channel之间的事务
·Take事务
	·channel和sink之间的事务
Flume内部原理

·ChannelSelector
	·ChannelSelector的作用是选出Event将要被发往哪个Channel。共两种类型:Replicating(复制)和Multiplexing(多路复用)
	·ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
·SinkProcessor
	·共三种类型:DefaultSinkProcessor(默认1对1)、LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
	·DefaultSinkProcessor对应的是单个的Sink;LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。
Replication(复制)案例

·Agent-01
	# Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sources = r1
	a1.sinks = k1 k2
	a1.channels = c1 c2
	
	# Describe/configure the source TAILDIR
	a1.sources.r1.type = TAILDIR
	a1.sources.r1.filegroups = f1
	#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
	a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*
	#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。
	a1.sources.r1.positionFile =  /opt/module/flume-1.9.0/taildir_position.json
	# 将数据流复制给所有channel 默认参数可以不写
	a1.sources.r1.selector.type = replicating
	
	# Describe the sink Avro的source和sink是一组的,能形成通信
	a1.sinks.k1.type = avro
	a1.sinks.k1.hostname = hadoop101
	a1.sinks.k1.port = 4141
	
	a1.sinks.k2.type = avro
	a1.sinks.k2.hostname = hadoop101
	a1.sinks.k2.port = 4142
	
	#Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	a1.channels.c2.type = memory
	a1.channels.c2.capacity = 1000
	a1.channels.c2.transactionCapacity = 100
	
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1 c2
	a1.sinks.k1.channel = c1
	a1.sinks.k2.channel = c2
·Agent-02(上传到hdfs)
	# Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1
	
	# Describe/configure the source avro
	a1.sources.r1.type = avro
	a1.sources.r1.bind = hadoop101
	a1.sources.r1.port = 4141
	
	# Describe the sink
	a1.sinks.k1.type = hdfs
	#HDFS路径,主机名:端口号可写可不写,会自动识别
	a1.sinks.k1.hdfs.path = /flumeHDFS/%Y%m%d/%H
	#可设置前缀、后缀
	a1.sinks.k1.hdfs.filePrefix = log-
	#为解决小文件问题,先存为.tmp文件,达到一定要求转为正式文件
	#业内经验rollInterval=1h,rollSize=128M,rollCount=0
	a1.sinks.k1.hdfs.rollInterval = 10
	a1.sinks.k1.hdfs.rollSize = 134217700
	a1.sinks.k1.hdfs.rollCount = 0
	#是否使用本地时间戳,以上占位符会使用的到
	a1.sinks.k1.hdfs.useLocalTimeStamp = true
	#设置文件类型
	a1.sinks.k1.hdfs.fileType = DataStream
	
	#Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1
·Agent-03(传到本地)
	# Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1
	
	# Describe/configure the source TAILDIR
	a1.sources.r1.type = avro
	a1.sources.r1.bind = hadoop101
	a1.sources.r1.port = 4142
	
	# Describe the sink 输出到本地磁盘: file_roll
	a1.sinks.k1.type = file_roll
	#输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
	a1.sinks.k1.sink.directory = /opt/module/flume-1.9.0/flumeFile_roll_datas
	
	#Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1
Multiplexing多路复用

	需求:使用Flume采集服务器本地日志,按照日志类型的不同,将不同种类的日志发送到不同分析系统
	模拟:在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
	分析:在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
·编写拦截器API
		
	public class MyInterceptor implements Interceptor {
	    //初始化,程序开始走以便
	    @Override
	    public void initialize() {
	
	    }
	
	    //处理单个Event,可以嵌套到下面的方法,解耦增加复用性
	    @Override
	    public Event intercept(Event event) {
	        //需求:在event的header中添加标记
	        //提供给channel selector 选择发送不同的channel
	        Map header = event.getHeaders();
	
	        byte[] body = event.getBody();
	        String info = new String(body);
	
	        //判断info的开头的第一个字符,数字发送给channel,字母发送给channel2
	        char c = info.charAt(0);
	        if (c >= '0' && c <='9'){
	            header.put("type","number");
	        }else if((c >= 'a'&&c <='z')||(c >= 'A'&&c <='Z')){
	            header.put("type","letter");
	        }
	        return event;
	    }
	
	    //处理多个Event,系统调用的是此方法
	    @Override
	    public List intercept(List events) {
	        for (Event event : events) {
	            intercept(event);
	        }
	        return events;
	    }
	
	    //关闭
	    @Override
	    public void close() {
	
	    }
	
	    public static class Builder implements Interceptor.Builder{
	        //通过此方法创建拦截器对象
	        @Override
	        public Interceptor build() {
	            return new MyInterceptor();
	        }
	        //配置文件
	        @Override
	        public void configure(Context context) {
	
	        }
	    }
	}
	·打的jar包要放到/flume/lib下
·flume-1
	# Name the components on this agent
	a1.sources = r1
	a1.sinks = k1 k2
	a1.channels = c1 c2
	
	# Describe/configure the source 从netcat中拿取数据
	a1.sources.r1.type = netcat
	a1.sources.r1.bind = localhost
	a1.sources.r1.port = 44444
	
	# 多路复用
	a1.sources.r1.selector.type = multiplexing
	#使用header中的哪些参数,对应header中的key
	a1.sources.r1.selector.header = type
	#number/letter对应value
	a1.sources.r1.selector.mapping.number = c1
	a1.sources.r1.selector.mapping.letter = c2
	#匹配不上走的channel
	a1.sources.r1.selector.default = c1
	
	
	#注册拦截器
	a1.sources.r1.interceptors = i1
	a1.sources.r1.interceptors.i1.type = com.flume.interceptors.MyInterceptor$builder
	
	# Describe the sink
	a1.sinks.k1.type = avro
	a1.sinks.k1.hostname = hadoop101
	a1.sinks.k1.port = 4141
	
	a1.sinks.k2.type = avro
	a1.sinks.k2.hostname = hadoop101
	a1.sinks.k2.port = 4142
	
	# Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	a1.channels.c2.type = memory
	a1.channels.c2.capacity = 1000
	a1.channels.c2.transactionCapacity = 100

	# Bind the source and sink to the channel 一个sink只能接一个channel,一个channels可以接多个source和sink
	a1.sources.r1.channels = c1 c2
	a1.sinks.k1.channel = c1
	a1.sinks.k2.channel = c2
聚合

需求: 监听一台服务器的一个本地文件夹,监听另一台服务器的一个端口,前两台服务器的变化最终输出到第三台服务器的控制台上
·第一服务器
	# Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1
	
	# Describe/configure the source TAILDIR
	a1.sources.r1.type = TAILDIR
	a1.sources.r1.filegroups = f1
	#文件组的绝对路径,必须到精确到文件,可以写匹配表达式匹配多个文件
	a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/files1/.*file.*
	#实现断点续传的文件存放位置 不改有默认位置也能实现断点续传。
	a1.sources.r1.positionFile =  /opt/module/flume-1.9.0/taildir_position.json
	
	# Describe the sink
	a1.sinks.k1.type = avro
	a1.sinks.k1.hostname = hadoop103
	a1.sinks.k1.port = 4141
	
	#Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1
·第二台服务器
	 # Name the components on this agent a1为agent的别名
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1

	# Describe/configure the source
	a1.sources.r1.type = netcat
	a1.sources.r1.bind = localhost
	a1.sources.r1.port = 44444

	# Describe the sink
	a1.sinks.k1.type = avro
	a1.sinks.k1.hostname = hadoop103
	a1.sinks.k1.port = 4142


	# Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1
·第三台服务器
	# Name the components on this agent a1为agent的别名
	a1.sources = r1 r2
	a1.sinks = k1
	a1.channels = c1
	
	# Describe/configure the source TAILDIR
	a1.sources.r1.type = avro
	a1.sources.r1.bind = hadoop103
	a1.sources.r1.port = 4141
	
	a1.sources.r2.type = avro
	a1.sources.r2.bind = hadoop103
	a1.sources.r2.port = 4142
	# Describe the sink
	a1.sinks.k1.type = logger
	
	#Use a channel which buffers events in memory
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 1000
	a1.channels.c1.transactionCapacity = 100
	
	# Bind the source and sink to the channel
	a1.sources.r1.channels = c1
	a1.sources.r2.channels = c1
	a1.sinks.k1.channel = c1
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741900.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号