栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume之旅

Flume之旅

Flume之旅
  • Flume概述
    • Flume定义
    • Flume基础架构
      • Agent
      • Source
      • Sink
      • Channel
      • Event
  • Flume入门
    • Flume安装部署
      • 安装地址
      • 安装部署
    • Flume入门案例
      • 监控端口数据官方案例
      • 实时监控单个追加文件
      • 实时监控目录下多个新文件
      • 实时监控目录下的多个追加文件
  • Flume进阶
    • Flume事务
    • Flume Agent内部原理
    • Flume拓扑结构
    • Flume企业开发案例
    • 自定义Interceptor
    • 自定义Source
    • 自定义Sink
    • Flume数据流监控

Flume概述 Flume定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单

Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS

Flume基础架构

Flume组成架构 :

Agent

Agent 是一个 JVM进程,它以事件的形式将数据从源头送至目的

Agent主要有3个部分组成

  • Source
  • Channel
  • Sink
Source

Source 是负责接收数据到 Flume Agent 的组件

Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy

Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、Hbase、solr、自定义

Channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink运作在不同的速率上
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作

Flume自带两种Channel

  • Memory Channel
  • File Channel

Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失

File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

Event

传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地

Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

Flume入门 Flume安装部署 安装地址
  • Flume官网地址:http://flume.apache.org/
  • 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
  • 下载地址:http://archive.apache.org/dist/flume/
安装部署

将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下

解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0

将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

mv guava-11.0.2.jar guava-11.0.2.blk

配置环境变量

vim /etc/profile.d/my_env.sh
#FLUME_HOME
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

生效

source /etc/profile

Flume入门案例 监控端口数据官方案例

案例需求:

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

需求分析:

实现步骤:

  1. 安装netcat工具
sudo yum install -y nc

  1. 判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444

  1. 创建 Flume Agent 配置文件 flume-netcat-logger.conf

  2. 在 flume 目录下创建 job 文件夹并进入 job 文件夹

mkdir job
cd job/

  1. 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
vim flume-netcat-logger.conf
  1. 在 flume-netcat-logger.conf 文件中添加
# Name the components on this agent
# a1: agent 名
# r1 : a1的 Source 名
# k1: a1的 Sink 名
# c1: a1的 Channel 名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# a1的输入源类型为 netcat 端口类型
# a1的监听的主机
# a1的监听的端口号
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
# a1 输出目的地是控制台 logger 类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# a1的channel 类型是 memory 内存型
# al的channel 总容量 1000 个 event
# al的channel 传输时收集到了100条 event 以后提交事务
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 将 r1 和 c1 连接起来
# 将 k1 和 cl 连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html

  1. 先开启 Flume 监听端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

  • –conf/-c:表示配置文件存储在conf/目录

  • –name/-n:表示给agent起名为a1

  • –conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。

  • -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括 : log、info、warn、error

  1. 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
  1. 在 Flume 监听页面观察接收数据情况

nc hadoop102 44444,flume能否接收到?

实时监控单个追加文件

案例需求 :

实时监控Hive日志,并上传到HDFS中

需求分析:

实现步骤:

  1. Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关jar包

检查 /etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确

 

创建 flume-file-hdfs.conf 文件

创建文件

vim flume-file-hdfs.conf

注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件

添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://cpucode100:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注意:

对于所有与时间相关的转义序列,Event Header 中必须存在以 “ timestamp ” 的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

a3.sinks.k3.hdfs.useLocalTimeStamp = true
  1. 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
  1. 开启 Hadoop 和 Hive 并操作 Hive 产生日志
 
bin/hive
  1. 在HDFS上查看文件
实时监控目录下多个新文件

案例需求:

使用Flume监听整个目录的文件,并上传至HDFS

需求分析:

实现步骤:

  1. 创建配置文件 flume-dir-hdfs.conf

创建一个文件

vim flume-dir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 启动监控文件夹命令

说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动

  1. 向upload文件夹中添加文件

在 /opt/module/flume 目录下创建 upload 文件夹

向 upload 文件夹中添加文件

查看HDFS上的数据

实时监控目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
而 Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传

案例需求 :

使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

需求分析:

实现步骤:

创建配置文件flume-taildir-hdfs.conf

创建一个文件

vim flume-taildir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://cpucode100:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
  1. 向files文件夹中追加内容

在 /opt/module/flume 目录下创建 files 文件夹

mkdir files

向 upload 文件夹中添加文件

echo hello >> file1.txt
echo atguigu >> file2.txt
  1. 查看HDFS上的数据

Taildir 说明:

Taildir Source 维护了一个 json 格式的 position File ,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传

Position File 的格式如下:

{
	"inode":2496272,
	"pos":12,
	"file":"/opt/module/flume-1.9.0/files/file1.txt"
}
{
	"inode":2496275,
	"pos":12,
	"file":"/opt/module/flume-1.9.0/files/file2.txt"
}

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用inode 号码来识别文件

Flume进阶 Flume事务


Put事务流程

  • doPut : 将批数据先写入临时缓冲区 putList

  • doComm ne 存队列是否足够合并。

  • doRollback : channel不足, 回滚数据

Take事务
•doTake:将数据取到临时缓冲区takeList, 并将数据发送到HDFS
•doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
•doRollback:数据发送过程中如果出现异常, rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。

Flume Agent内部原理

重要组件:
1)ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
2)SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

Flume拓扑结构

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

Flume企业开发案例 自定义Interceptor 自定义Source 自定义Sink Flume数据流监控
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682359.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号