栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume基础

Flume基础

1. Flume 定义         Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。           2.Flume的基础架构

Flume 架构中的组件: 2.1 Agent Agent 是一个 JVM 进程,它以 事件 的形式将数据从源头送至目的。 Agent 主要有 3 个部分组成, Source 、 Channel 、 Sink 。 2.2 Source Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种 格式的日志数据,包括 avro 、thrift、 exec 、jms、 spooling directory 、 netcat 、sequence generator、syslog、http、legacy。 2.3 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储 或索引系统、或者被发送到另一个 Flume Agent。 Sink 组件目的地包括 hdfs 、 logger 、 avro 、thrift、ipc、 file 、 Hbase 、solr、自定 义。 2.4 Channel Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运 作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。 Flume 自带两种 Channel: Memory Channel 和 File Channel 以及 Kafka Channel 。 Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适 用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕 机或者重启都会导致数据丢失。 File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数 据。 2.5 Event 传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。 Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构, Body 用来存放该条数据,形式为字节数组

3.Flume监控本地文件上传到hdfs

说明:需要安装hadoop配置hadoop环境变量

监控/opt/data/test.log,并且上传到hdfs,每小时生成一个新的文件夹,每30秒生成一个新的文件

[atguigu@hadoop112 data]$ touch test.log
[atguigu@hadoop112 job]$ vim flume-file-hdfs.conf
添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
#监控文件的路径
a2.sources.r2.command = tail -F /opt/data/test.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop112:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

 然后进入flume安装目录执行下面命令

 [atguigu@hadoop112 flume-1.7.0]$bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

新开一个终端,不断向test.log里面追加数据查看是否监控成功

[atguigu@hadoop112 data]$ echo "你好啊,监控文件成功" >> test.log
[atguigu@hadoop112 data]$ echo "你好啊,监控文件成功" >> test.log
[atguigu@hadoop112 data]$ echo "你好啊,监控文件成功" >> test.log
[atguigu@hadoop112 data]$ echo "你好啊,监控文件成功" >> test.log
[atguigu@hadoop112 data]$ echo "你好啊,监控文件成功" >> test.log

如果如下图所示,恭喜你案例成功

缺点:Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;

 4.Flume监控目录的新文件上传hdfs

监控/opt/data目录,并且上传到hdfs,每小时生成一个新的文件夹,每30秒生成一个新的文件

[atguigu@hadoop112 job]$ vim flume-dir-hdfs.conf
添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/data
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop112:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

注:以后缀.tmp,. COMPLETED结尾的会被忽略不上传,文件名字不能相同

然后进入flume安装目录执行下面命令

 [atguigu@hadoop112 flume-1.7.0]$bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

新开一个终端,不断向/opt/data里面追加文件查看是否监控成功

[atguigu@hadoop112 data]$ touch a.txt
[atguigu@hadoop112 data]$ touch b.log
[atguigu@hadoop112 data]$ touch c.tmp

如果如下图所示,恭喜你案例成功

 缺点:Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控

5.Flume实时监控不同目录的多个追加文件(断点续传) Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控

监控/opt/data和/data/tmpdata目录下的文件,并且上传到hdfs,

创建/opt/data和/opt/tmpdata两个目录

# agent名字相同不能同时启动
[atguigu@hadoop112 job]$  vim flume-taildir-hdfs.conf
添加如下内容


a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
# 断点续传,保存文件读取信息
a3.sources.r3.positionFile = /opt/module/flume-1.7.0/tail_dir.json
#声明两个文件组
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/data/.*txt.*
a3.sources.r3.filegroups.f2 = /opt/tmpdata/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop112:9000/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload2-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
                                      

注:监控/opt/data/下的*.txt  ,监控/opt/tmpdata/下的*.log

然后进入flume安装目录执行下面命令

[atguigu@hadoop112 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 --conf-file job/flume-taildir-hdfs.conf

新开一个终端,不断向/opt/data里面追加文件查看是否监控成功

[atguigu@hadoop112 data]$ touch a.txt
[atguigu@hadoop112 data]$ echo hello >> a.txt
[atguigu@hadoop112 data]$ cd /opt/tmpdata/
[atguigu@hadoop112 tmpdata]$ touch b.txt
[atguigu@hadoop112 tmpdata]$ touch b.log
[atguigu@hadoop112 tmpdata]$ echo hello >> b.log 

如果如下图所示,恭喜你案例成功

 缺点:如果文件名字改变,那么整个文件都会重新上传

优化:1.采用生成日志时自动带标志,后期不会更改名字的框架,如:logback

           2.修改源码,flume断点续传中,如果{"inode":67279386,"pos":6,"file":"/opt/data/a.txt"}中的inode和file有一个改变,则认为整个文件发生改变重新上传,所以我们可以让inode成为为唯一一个确定整个改变因素,从而优化上述缺点

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327031.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号