栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

015 大数据之Flume

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

015 大数据之Flume

1、Apache Flume初识

【Flume】Flume 简单理解及使用实例

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。

Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

2、Flume安装部署

Flume启动报错,guava.java包冲突
CentOS7 yum提示:another app is currently holding the yum lock;waiting for it to exit

# 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm guava-11.0.2.jar
# 可以通过强制关掉yum进程
[atguigu@hadoop102 ~]$ rm -f /var/run/yum.pid
# 杀掉占用44444端口的进程
[atguigu@hadoop102 ~]$ netstat -tlunp | grep 44444
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 127.0.0.1:44444         :::*                    LISTEN      6303/java           
[atguigu@hadoop102 ~]$ kill -9 6303
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ sudo netstat -nlp | grep 44444
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ cat job/flume-netcat-logger.conf 
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
# 在一个hadoop102的一个终端在44444端口产生数据
[atguigu@hadoop102 ~]$ nc localhost 44444
# 在另一个hadoop102的一个终端监听44444端口
[atguigu@hadoop102 ~]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

PS:-Dflume.root.logger=INFO,console(-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error)

3、Flume 1.9.0 User Guide

flume之HDFS Sink详解(转载)
Hadoop的core-site.xml配置文件里的fs.default.name和fs.defaultFS
Hadoop2.x与Hadoop3.x的默认端口变化

3.1、单个变化文件的数据读取
[atguigu@hadoop102 job]$ cat flume-file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
[atguigu@hadoop102 job]$ cd ..
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 logs]$ hive
which: no hbase in (/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/module/jdk1.8.0_212/bin:/opt/module/hadoop-3.1.3/bin:/opt/module/hadoop-3.1.3/sbin:/opt/module/apache-zookeeper-3.5.7-bin/bin:/opt/module/apache-zookeeper-3.5.7-bin/sbin:/opt/module/jdk1.8.0_212/bin:/opt/module/hadoop-3.1.3/bin:/opt/module/hadoop-3.1.3/sbin:/opt/module/apache-hive-3.1.2-bin/bin:/home/atguigu/.local/bin:/home/atguigu/bin)
Hive Session ID = 6956125e-de20-496d-be95-12da80653c98

Logging initialized using configuration in file:/opt/module/apache-hive-3.1.2-bin/conf/hive-log4j2.properties Async: true
Hive Session ID = 35458a63-4e24-40ba-93cd-9f38f8be02ac
hive (default)> 


3.2、同一文件夹下多不变文件的读取
[atguigu@hadoop102 job]$ cat flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
# 文件夹下的文件内容、文件名不能改变,否则报错
a3.sources.r3.type = spooldir
# 指定文件夹路径
a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0-bin/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
# 监控 /opt/module/apache-flume-1.9.0-bin/upload 下新增的文件
[atguigu@hadoop102 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console


3.3、同一文件夹下多个变文件的读取

positionFile 参数:File in JSON format to record the inode, the absolute path and the last position of each tailing file.

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。

Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

[atguigu@hadoop102 job]$ cat flume-taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
# 记录读取文件的元信息
a3.sources.r3.positionFile = /opt/module/apache-flume-1.9.0-bin/taildir/tail_dir.json
# 设置需要读取的信息
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/apache-flume-1.9.0-bin/taildir/files1/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/apache-flume-1.9.0-bin/taildir/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
4、Flume进阶 4.1 Flume事务

4.2、Flume Agent内部原理

DefaultSinkProcessor对应的是单个的Sink;LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group;LoadBalancingSinkProcessor可以实现负载均衡的功能;FailoverSinkProcessor可以错误恢复的功能; 4.3、Flume的Source和Sink

Flume之Source全面解析flume的Source(数据源)Flume的Sink Flume Sink

AvroSink和AvroSource配合使用,是实现多级流动、扇出流(1到多) 扇入流(多到1) 的基础。AvroSource接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。所以,如果是AvroSource的话,源数据必须是经过avro序列化后的数据(AvroSink),也可以接收通过Flume提供的avro客户端发送的日志信息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710702.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号