栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume之进阶(一)

Flume之进阶(一)

1. Flume 事务

Put事务:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并
  • doRollback:channel内存队列空间不足,回滚数据

Take事务:

  • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区t akeList中的数据归还给channel内存队列
2. Flume Agent 内部原理

SinkProcessor:

  • DefaultSinkProcessor 对应的是单个的 Sink
  • LoadBalancingSinkProcessor对应的是 Sink Group,可以实现负
    载均衡的功能
  • FailoverSinkProcessor 对应的是 Sink Group,可以实现错误恢复的功能
3. Flume 拓扑结构 3.1 简单串联

此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速
率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

3.2 复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个
channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

案例分析 1. 案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

2. 需求分析

3. 实现步骤
  1. 准备工作

    • 在/opt/module/flume-1.9.0/jobs 目录下创建 group1 文件夹
      [codecat@hadoop102 jobs]$ mkdir group1
      
    • 在/opt/module/datas/目录下创建 flume3 文件夹
      [codecat@hadoop102 datas]$ mkdir flume3
      
  2. 创建 flume-file-flume.conf

    配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。添加如下内容

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # 将数据流复制给所有 channel
    a1.sources.r1.selector.type = replicating
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
    a1.sources.r1.shell = /bin/bash -c
    # Describe the sink
    # sink 端的 avro 是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
  3. 创建 flume-flume-hdfs.conf

    配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。添加如下内容

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    # source 端的 avro 是一个数据接收服务
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    # Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = flume2-
    #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 30
    #设置每个文件的滚动大小大概是 128M
    a2.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. 创建 flume-flume-dir.conf

    配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。添加如下内容

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/datas/flume3
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    

    输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目

  5. 执行配置文件

    由于avro的source为服务端,所以启动顺序应该为:

    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a3 -f jobs/group1/flume-flume-dir.conf
    
    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a2 -f jobs/group1/flume-flume-hdfs.conf 
    
    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/group1/flume-file-flume.conf 
    
  6. 启动hive,检查 HDFS 上数据

  7. 检查/opt/module/datas/flume3 目录中数据

3.3 负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能

案例分析 1. 案例需求

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。

2. 需求分析
  1. 创建 flume-netcat-flume.conf

    配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2。添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
  2. 创建 flume-flume-console1.conf

    配置上级Flume 输出的Source,输出是到本地控制台。添加如下内容:

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    # Describe the sink
    a2.sinks.k1.type = logger
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  3. 创建 flume-flume-console2.conf

    配置上级 Flume 输出的 Source,输出是到本地控制台。添加如下内容:

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    # Describe the sink
    a3.sinks.k1.type = logger
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    
  4. 执行配置文件

    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a3 -f jobs/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
    
    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a2 -f jobs/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
    
    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/group2/flume-netcat-flume.conf 
    
  5. 使用 netcat 工具向本机的 44444 端口发送内容

  6. 查看 Flume2及 Flume3 的控制台打印日志

3.4 聚合

案例分析 1. 案例需求

hadoop102 上的 Flume-1 监控文件/opt/module/group.log

hadoop103 上的 Flume-2 监控某一个端口的数据流

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

2. 需求分析 3. 实现步骤
  1. 准备工作

    // 分发 Flume
    [codecat@hadoop102 module]$ xsync flume-1.9.0/
    
    [codecat@hadoop102 jobs]$ mkdir group3
    [codecat@hadoop103 jobs]$ mkdir group3
    [codecat@hadoop104 jobs]$ mkdir group3
    
  2. 在 hadoop102 上创建 flume1-logger-flume.conf

    [codecat@hadoop102 group3]$ vim flume1-logger-flume.conf 
    

    添加如下内容

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/group.log
    a1.sources.r1.shell = /bin/bash -c
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 4141
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. 在 hadoop103 上创建 flume2-netcat-flume.conf

    [codecat@hadoop103 group3]$ vim flume2-netcat-flume.conf
    

    添加如下内容

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = hadoop103
    a2.sources.r1.port = 44444
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 4141
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. 在 hadoop104 上创建 flume3-flume-logger.conf

    [codecat@hadoop104 group3]$ vim flume3-flume-logger.conf
    

    添加如下内容

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4141
    # Describe the sink
    # Describe the sink
    a3.sinks.k1.type = logger
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
  5. 执行配置文件

    [codecat@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f jobs/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
    
    [codecat@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f jobs/group3/flume1-logger-flume.conf 
    
    [codecat@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f jobs/group3/flume2-netcat-flume.conf 
    
  6. 在 hadoop102 上向/opt/module 目录下的 group.log 追加内容

    [codecat@hadoop102 module]$ echo 'hello' > group.log
    
  7. 在 hadoop103 上向 44444 端口发送数据

  8. 检查 hadoop104 上数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307624.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号