栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop高手之路--06--Flume日志采集

Hadoop高手之路--06--Flume日志采集

Flume日志采集 一、Flume简介


Logo表示通过河道(频道)把木材(数据)从一个地方(数据源)传送到另一个地方(数据目的地)

使用文档

二、Flume的安装配置 1. 下载并上传到服务器

2. 解压

3. 配置

根据模板复制flume-env.sh


修改flume-env.sh

4. 配置环境变量

5. 使环境变量起作用

三、Flume入门使用 1. 配置采集方案

查看官网



案例:连接和监听某个端口,采集数据显示
用notepad++进行配置

创建新文件


复制官网的采集配置示例:


源代码:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字及各个组件sources,sinks,channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义数据源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
# 定义数据的目的地(下沉)
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2. 指定采集方案启动flume

查看官网

命令详解

3. 采集数据测试

可以用telnet命令向本机的44444端口发送数据
先安装telnet


安装telnet后,启动telnet


在telnet端发送数据

在flume短后采集的数据:

四、案例1:监控某个文件夹的变化,将添加的新文件采集存入到hdfs

采集服务器下的某个文件夹(日志文件夹),在该文件夹下产生一个新文件,则该文件中的数据就会被传送到hdfs上的某个文件夹下

1. 采集方案需要确定三大部分

数据源:查看官网


数据下沉:


2. 采集配置文件:


最终配置文件:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义数据源
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true

# Describe the sink
# 定义数据的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true


# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 启动命令
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

3. 启动flume

4. 测试

flume会检测/var/log/apache/flumeSpool/文件夹下是否有新文件产生,往该文件夹下上传一个文件,flume会把该文件上传到hdfs集群中,

5. 出现错误

由于guava版本不一致造成

6. 重新启动flume,并往日志文件夹上传一个文件,查看结果


查看文件有乱码,说明采集方案需要改进

7. 修改采集方案,重新启动flume,上传文件测试结果

没有乱码了

五、案例2:监控某个文件的变化,把变化的内容存储到hdfs上 1. 采集方案

数据源


数据下沉不要修改


配置代码:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义数据源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/text.log


# Describe the sink
# 定义数据的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循环创建文件夹
a1.sinks.k1.hdfs.round = true
# 循环创建文件夹的时间间隔是10分钟
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间格式
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 时间间隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小间隔
a1.sinks.k1.hdfs.rollSize = 20
# event的个数,这三个参数谁先满足就出发循环滚动
a1.sinks.k1.hdfs.rollCount = 5
# 批处理数量
a1.sinks.k1.hdfs.batchSize = 1
# 文件格式,表示普通文本格式
a1.sinks.k1.hdfs.fileType = DataStream 

# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 启动命令
# flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

2. 测试采集功能

模拟场景
先写一个shell脚本,持续输出当前日期到监控文件/var/log/test.log中,模拟服务器日志的文件

3. 再克隆一个会话,查看新增的内容

4. 启动flume

5. 查看HDFS上的结果

六、Flume的可靠性保证-负载均衡

配置的采集方案是通过唯一一个Sink作为接收器接收后续需要的数据,但会出现当前Sink故障或数据收集请求量较大的情况,这时单一Sink配置可能就无法保证Flume开发的可靠性。因此,Flume 提供Flume Sink Processors解决上述问题。
Sink处理器允许定义Sink groups,将多个sink分组到一个实体中,Sink处理器就可通过组内多个sink为服务提供负载均衡功能。

1. 搭建并配置flume集群

三台服务器的flume集群:hadoop01、hadoop02、hadoop03

1) 分发hadoop01上的flume文件到hadoop02和hadoop03上


2) 分发环境变量配置文件

3) 使环境变量起作用

2. 配置采集方案 1) 查看官方文档的示例

2) 在hadoop01上配置第一级采集方案

两级的配置方案


Avro



代码:

3) 在hadoop02和hadoop03上配置第二级方案

Hadoop02:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#定义组的属性
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

#定义负载均衡
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000


# Describe/configure the source
# 定义数据源
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log

# Describe the sink
# 定义数据的目的地1(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 4545

# 定义数据的目的地2(下沉)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 4545


# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 启动命令
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file exec-avro.conf --name a1 -Dflume.root.logger=INFO,console


代码:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义数据源

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 4545
# Describe the sink
# 定义数据的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循环创建文件夹
a1.sinks.k1.hdfs.round = true
# 循环创建文件夹的时间间隔是十分钟
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间个数
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列编辑模式,按住alt选择多列

# 时间间隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小间隔
a1.sinks.k1.hdfs.rollSize = 20
# event的个数,这三个参数谁先满足就出发循环滚动
a1.sinks.k1.hdfs.rollCount = 5
# 批处理数量
a1.sinks.k1.hdfs.batchSize = 1 
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream


# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 启动命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

Hadoop03:

3. 启动flume 1) 在hadoop02和hadoop03上flume

从最后一级开始启动flume
Hadoop02:


Hadoop03:

2) 在hadoop01上启动flume


同时在hadoop02和hadoop03上也提示连接成功

4. 负载均衡的测试

克隆hadoop01的一个会话,编写脚本并运行

5. 查看结果

Hadoop02:


Hadoop03:


Hadoop集群上:


七、Flume的可靠性保证-故障恢复

1. 配置采集方案

只需要改动第一级采集方案部分内容


改为故障恢复:


第二级采集方案不用修改

2. 启动flume

从最后一级开始启动flume
再启动测试脚本

3. 故障恢复

把hadoop03的flume关闭,等待十秒钟(超时时间,在采集方案中进行的定义)

八、Flume拦截器

1. 场景:

在实际开发的应用场景中,两台服务器A、B在实时产生日志数据,日志类型主要为access.log、nginx.log和web.log。现需要将A、B两台服务器产生的日志数据access.log、nginx.log和web.log采集汇总到服务器C上,并统一收集并上传到HDFS文件系统进行保存。在HDFS中保存日志数据的文件必须按照以下要求进行归类统计(20180723表示收集日志数据的当前日期):
/source/logs/access/20180723/**
/source/logs/nginx/20180723/**
/source/logs/web/20180723/**

2. 日志数据采集流程图

3. Hadoop02和hadoop03的配置文件
# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义r1数据源

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/access.log

# 定义拦截器r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# 定义r2数据源

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx.log

# 定义拦截器r2
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# 定义r3数据源

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /var/log/web.log

# 定义拦截器r3
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Describe the sink
# 定义数据的目的地(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01
a1.sinks.k1.port = 41414



# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

# 启动命令
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
4. Hadoop01上的采集方案配置文件
# Name the components on this agent
# 定义代理的名字a1及各个组件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定义数据源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 41414

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
# 定义数据的目的地(下沉)

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/logs/%{type}/%y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循环创建文件夹
a1.sinks.k1.hdfs.round = true
# 循环创建文件夹的时间间隔是十分钟
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间个数
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列编辑模式,按住alt选择多列

# 时间间隔
a1.sinks.k1.hdfs.rollInterval = 0
# 大小间隔
a1.sinks.k1.hdfs.rollSize = 10485760
# event的个数,这三个参数谁先满足就出发循环滚动
a1.sinks.k1.hdfs.rollCount = 0
# 批处理数量
a1.sinks.k1.hdfs.batchSize = 10 
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.threadsPoolSize=10
a1.sinks.k1.hdfs.callTimeout=30000


# Use a channel which buffers events in memory
# 定义管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 组装组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 启动命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
5. 启动flume

先启动hadoop01上的flume


再启动hadoop02和hadoop03上的flume

6. 测试效果

在hadoop02和hadoop03上分别克隆三个会话,分别执行以下三个脚本,用来产生生产日志数据

while true;do echo "access log ……"  >> /var/log/access.log;sleep 5;done



结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701662.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号