【Flume】Flume 简单理解及使用实例
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。
2、Flume安装部署Flume启动报错,guava.java包冲突
CentOS7 yum提示:another app is currently holding the yum lock;waiting for it to exit
# 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3 [atguigu@hadoop102 lib]$ rm guava-11.0.2.jar # 可以通过强制关掉yum进程 [atguigu@hadoop102 ~]$ rm -f /var/run/yum.pid # 杀掉占用44444端口的进程 [atguigu@hadoop102 ~]$ netstat -tlunp | grep 44444 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 127.0.0.1:44444 :::* LISTEN 6303/java [atguigu@hadoop102 ~]$ kill -9 6303
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ sudo netstat -nlp | grep 44444 [atguigu@hadoop102 apache-flume-1.9.0-bin]$ cat job/flume-netcat-logger.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 [atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
# 在一个hadoop102的一个终端在44444端口产生数据 [atguigu@hadoop102 ~]$ nc localhost 44444 # 在另一个hadoop102的一个终端监听44444端口 [atguigu@hadoop102 ~]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
PS:-Dflume.root.logger=INFO,console(-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error)
3、Flume 1.9.0 User Guideflume之HDFS Sink详解(转载)
Hadoop的core-site.xml配置文件里的fs.default.name和fs.defaultFS
Hadoop2.x与Hadoop3.x的默认端口变化
[atguigu@hadoop102 job]$ cat flume-file-hdfs.conf # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 [atguigu@hadoop102 job]$ cd .. [atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 logs]$ hive which: no hbase in (/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/module/jdk1.8.0_212/bin:/opt/module/hadoop-3.1.3/bin:/opt/module/hadoop-3.1.3/sbin:/opt/module/apache-zookeeper-3.5.7-bin/bin:/opt/module/apache-zookeeper-3.5.7-bin/sbin:/opt/module/jdk1.8.0_212/bin:/opt/module/hadoop-3.1.3/bin:/opt/module/hadoop-3.1.3/sbin:/opt/module/apache-hive-3.1.2-bin/bin:/home/atguigu/.local/bin:/home/atguigu/bin) Hive Session ID = 6956125e-de20-496d-be95-12da80653c98 Logging initialized using configuration in file:/opt/module/apache-hive-3.1.2-bin/conf/hive-log4j2.properties Async: true Hive Session ID = 35458a63-4e24-40ba-93cd-9f38f8be02ac hive (default)>
[atguigu@hadoop102 job]$ cat flume-dir-hdfs.conf a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source # 文件夹下的文件内容、文件名不能改变,否则报错 a3.sources.r3.type = spooldir # 指定文件夹路径 a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0-bin/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
# 监控 /opt/module/apache-flume-1.9.0-bin/upload 下新增的文件 [atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console
positionFile 参数:File in JSON format to record the inode, the absolute path and the last position of each tailing file.
Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。
[atguigu@hadoop102 job]$ cat flume-taildir-hdfs.conf a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR # 记录读取文件的元信息 a3.sources.r3.positionFile = /opt/module/apache-flume-1.9.0-bin/taildir/tail_dir.json # 设置需要读取的信息 a3.sources.r3.filegroups = f1 f2 a3.sources.r3.filegroups.f1 = /opt/module/apache-flume-1.9.0-bin/taildir/files1/.*file.* a3.sources.r3.filegroups.f2 = /opt/module/apache-flume-1.9.0-bin/taildir/files2/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c34、Flume进阶 4.1 Flume事务 4.2、Flume Agent内部原理
DefaultSinkProcessor对应的是单个的Sink;LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group;LoadBalancingSinkProcessor可以实现负载均衡的功能;FailoverSinkProcessor可以错误恢复的功能; 4.3、Flume的Source和Sink
Flume之Source全面解析flume的Source(数据源)Flume的Sink Flume Sink
AvroSink和AvroSource配合使用,是实现多级流动、扇出流(1到多) 扇入流(多到1) 的基础。AvroSource接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。所以,如果是AvroSource的话,源数据必须是经过avro序列化后的数据(AvroSink),也可以接收通过Flume提供的avro客户端发送的日志信息。



