栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

flume基本使用

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flume基本使用

1.入门案例:监控端口数据

需求:使用Flume监听一个端口,收集数据并打印到控制台

实现步骤

    安装netcat工具

sudo yum install -y nc

    判断是否被占用

sudo netstat -nlp | grep 44444

    创建Flume Agent配置文件 flume-netcat-logger.conf

    vim /opt/app/flume/job/flume-netcat-logger.conf

    开启监听端口

    bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -f Dflume.root.logger=INFO,console

    参数说明:

    ​ -c:表示配置文件存储在 conf/目录

    ​ -n:表示给 agent 起名为 a1

    ​ -f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。

    ​ -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值, 并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

    使用netcat工具向本机的44444端口发送内容

    nc localhost 44444

    启用端口

配置文件
# Name the components on this agent 
a1.sources = r1			    **r1:a1的Source名称**
a1.sinks = k1					**k1:a1的Sink名称**
a1.channels = c1 			**c1:a1的Channel名称**



# Describe/configure the source  			
a1.sources.r1.type = netcat					**表示a1输入源类型为netcat**
a1.sources.r1.bind = localhost				**表示a1监听的主机**
a1.sources.r1.port = 44444					 **表示a1监听的端口号**



# Describe the sink 
a1.sinks.k1.type = logger 						**表示a1的输出目的地是控制台logger类型**



# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 						**表示a1的channel类型是memory类型**
a1.channels.c1.capacity = 1000 					   **表示a1的channel总容量为1000个event**

a1.channels.c1.transactionCapacity = 100		**表示a1的channel传输是收集到100条event后再提交事务**



# Bind the source and sink to the channel 
a1.sources.r1.channels = c1					  表示将r1和c1连接起来
a1.sinks.k1.channel = c1								表示将k1和c1连接起来

实时监控单个追加文件

需求:实时监控Hive日志,并上传到HDFS上 实现步骤

    创建flume-file-hdfs.conf配置文件

    vim /opt/app/flume-1.9.0/job/flume-file-hdfs.conf

    运行flume

    bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf

配置文件
# Name the components on this agent

a2.sources = r2
a2.sinks = k2
a2.channels = c2



# Describe/configure the source

a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/app/hive-3.1.2/logs/hive.log



# Describe the sink

a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop001:8020/flume/%Y%m%d/%H



#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0

#Use a channel which buffers events in memory

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

#Bind the source and sink to the channel

a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

实时监控目录下多个新文件

需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

实现步骤

    创建flume-dir-hdfs.conf配置文件

    vim flume-dir-hdfs.conf

    启动flume

配置文件
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#Describe/configure the source

a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/app/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)



#Describe the sink

a3.sinks.k3.type = hdfs
xa3.sinks.k3.hdfs.path = hdfs://hadoop001:8020/flume/upload/%Y%m%d/%H

#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true	 **因为头部没有时间戳,所以这句必须要有**
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0



#Use a channel which buffers events in memory

a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100



# Bind the source and sink to the channel

a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
实时监控目录下多个追加文件

需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

实现步骤

    创建flume-taildir-hdfs.conf配置文件

    vim flume-taildir-hdfs.conf

    启动flume

配置文件
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/app/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/app/flume-1.9.0/files1/.*file.*
a3.sources.r3.filegroups.f2 = /opt/app/flume-1.9.0/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop001:8020/flume/upload2/%Y%m%d/%H

#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
复制和多路复用

案例需求:使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储 到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。 实现步骤

    创建 flume-file-flume.conf ,配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flumehdfs 和 flume-flume-dir。

    vim /opt/app/flume-1.9.0/job/group1/flume-file-flume.conf

    创建 flume-flume-hdfs.conf 配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。

    vim /opt/app/flume-1.9.0/job/group1/flume-flume-hdfs.conf

    创建 flume-flume-dir.conf 配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。

    vim /opt/app/flume-1.9.0/job/group1/flume-flume-dir.conf

    执行配置文件

    启动Hadoop和Hive

配置文件

flume-file-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/app/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop001
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop001:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop001
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/app/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
负载均衡和故障转移

案例需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor,实现故障转移的功能。 实现步骤:

    创建 flume-netcat-flume.conf

    vim /opt/app/flume-1.9.0/job/group2/flume-netcat-flume.conf

    创建 flume-flume-console1.conf

    vim /opt/app/flume-1.9.0/job/group2/ flume-flume-console1.conf

    创建 flume-flume-console2.conf

    vim /opt/app/flume-1.9.0/job/group2/flume-flume-console2.conf

    执行配置文件

配置文件:

flume-netcat-flume.conf

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume-netcat-console1.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = hadoop001a2.sources.r1.port = 4141# Describe the sinka2.sinks.k1.type = logger# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1

flume-flume-console2.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c2# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop001a3.sources.r1.port = 4142# Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 1000a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c2a3.sinks.k1.channel = c2
负载均衡 实现步骤:与上述类似,将flume-netcat-flume.conf文件的配置修改部分 配置文件

flume-netcat-flume.conf

source部分修改为:# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = load_balance
聚合

案例需求:

hadoop102 上的 Flume-1 监控文件/opt/module/group.log,

hadoop103 上的 Flume-2 监控某一个端口的数据流,

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印

实现步骤:

    在hadoop001创建 flume1-logger-flume.conf

    配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。

    vim /opt/app/flume-1.9.0/job/group3/flume1-logger-flume.conf

    在hadoop002创建 flume2-netcat-flume.conf

    配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:

    vim /opt/app/flume-1.9.0/job/group3/flume1-netcat-flume.conf

    在hadoop003创建 flume3-flume-logger.conf

    配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制 台。

    vim /opt/app/flume-1.9.0/job/group3/flume3-flume-logger.conf

    执行配置文件

    在 hadoop001 上向/opt/module 目录下的 group.log 追加内容,在 hadoop002 上向 44444 端口发送数据

配置文件

hadoop001配置文件 flume1-logger-flume.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/app/data/flume/1.loga1.sources.r1.shell = /bin/bash -c# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop003a1.sinks.k1.port = 4141# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

hadoop002配置文件 flume2-netcat-flume.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = netcata2.sources.r1.bind = hadoop002a2.sources.r1.port = 44444# Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = hadoop003a2.sinks.k1.port = 4141# Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1

hadoop003配置文件 flume3-flume-logger.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = c1# Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop003a3.sources.r1.port = 4141# Describe the sink# Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = c1
自定义Interceptor

案例需求:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不 同的分析系统 实现步骤:

    使用idea,编辑自定义的Interceptor,导入依赖,并打包传入flume的lib目录下

    具体代码实现在idea代码下的flume-demo中,类名为TypeInterceptor

    为 hadoop001 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

    vim /opt/app/flume-1.9.0/job/group4/flume1

    为 hadoop002 上的 Flume4 配置一个 avro source 和一个 logger sink。

    vim /opt/app/flume-1.9.0/job/group4/flume3

    为 hadoop003 上的 Flume3 配置一个 avro source 和一个 logger sink。

    vim /opt/app/flume-1.9.0/job/group4/flume4

    启动flume2,flume3,flume1

配置文件
    在hadoop001上的flume1
    # Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 c2# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = bigdata.interceptor.TypeInterceptor$Buildera1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = typea1.sources.r1.selector.mapping.bigdata = c1a1.sources.r1.selector.mapping.other = c2# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop002a1.sinks.k1.port = 4141a1.sinks.k2.type=avroa1.sinks.k2.hostname = hadoop003a1.sinks.k2.port = 4242# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memorya1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2
    
    hadoop002上的flume2.conf配置
    a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type = avroa1.sources.r1.bind = hadoop002a1.sources.r1.port = 4141a1.sinks.k1.type = loggera1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1a1.sources.r1.channels = c1
    
    hadoop003上的flume3.conf配置
    a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type = avroa1.sources.r1.bind = hadoop003a1.sources.r1.port = 4242a1.sinks.k1.type = loggera1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1a1.sources.r1.channels = c1
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/733775.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号