栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume之进阶(二)

Flume之进阶(二)

1. 自定义 Agent 组件 1.1 自定义 Interceptor 1.1.1 需求

检测一个端口的数据,将数据包含codecat发送到一个channel,不包含的发送到另一个channel

1.1.2 分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要
发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

1.2.3 实现步骤
  1. 创建一个 maven 项目,并引入以下依赖
    
        
            org.apache.flume
            flume-ng-core
            1.9.0
        
    
    
  2. 定义 TypeInterceptor 类并实现 Interceptor 接口
    package com.codecat.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class TypeInterceptor implements Interceptor {
        // 声明一个存放事件的集合
        private List addHeaderEvents;
    
        @Override
        public void initialize() {
            // 初始化存放事件的集合
            addHeaderEvents = new ArrayList<>();
        }
    
        // 单个事件拦截
        @Override
        public Event intercept(Event event) {
            // 1. 获取事件中的头信息
            Map headers = event.getHeaders();
    
            // 2. 获取事件中的body信息
            String body = new String(event.getBody());
    
            // 3. 根据body中是否有"codecat"来决定添加怎样的头信息
            if (body.contains("codecat")) {
                // 4. 添加头信息
                headers.put("type", "first");
            } else {
                // 4. 添加头信息
                headers.put("type", "second");
            }
            return event;
        }
    
        // 批量事件拦截
        @Override
        public List intercept(List list) {
            // 1. 清空集合
            addHeaderEvents.clear();
    
            // 2. 遍历events
            for (Event event : list) {
                // 3. 给每个事件添加头信息
                addHeaderEvents.add(intercept(event));
            }
    
            // 4. 返回结果
            return addHeaderEvents;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public Interceptor build() {
                return new TypeInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    
  3. 将写好的代码打包,并放到 flume 的 lib 目录下
  4. 配置文件
  • 在 hadoop102 上的 Flume 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type =
    com.codecat.interceptor.TypeInterceptor$Builder
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.first = c1
    a1.sources.r1.selector.mapping.second = c2
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4242
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
  • 在 hadoop103 上的 Flume 配置一个 avro source 和一个 logger sink
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop103
    a1.sources.r1.port = 4141
    
    a1.sinks.k1.type = logger
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1
    
  • 在 hadoop104 上的 Flume 配置一个 avro source 和一个 logger sink
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop104
    a1.sources.r1.port = 4242
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1
    
  1. 启动 flume 进程

    [codecat@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-flume-logger.conf -Dflume.root.logger=INFO,console
    
    [codecat@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-flume-logger.conf -Dflume.root.logger=INFO,console
    
    [codecat@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-netcat-flume.conf 
    
  2. 在 hadoop102 使用 netcat 向 localhost:44444 发送数据

  3. 观察 hadoop103 和 hadoop104 打印的日志

1.2 自定义 Source 1.2.1 需求

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台

1.2.2 分析

1.2.3 实现步骤
  1. 导入 pom 依赖
    
        
            org.apache.flume
            flume-ng-core
            1.9.0
        
    
    
  2. 编写代码
    package com.codecat.source;
    
    import org.apache.flume.Context;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    
    public class MySource extends AbstractSource implements Configurable, PollableSource {
    
        // 定义配置文件将来要读取的字段
        private Long delay;
        private String field;
    
        @Override
        public void configure(Context context) {
            delay = context.getLong("delay");
            field = context.getString("filed", "Hello!");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
        try {
            // 创建事件头信息
            HashMap headerMap = new HashMap<>();
    
            // 创建事件
            SimpleEvent event = new SimpleEvent();
    
            // 循环封装事件
            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(headerMap);
                // 给事件设置内容
                event.setBody((field + i).getBytes(StandardCharsets.UTF_8));
                // 将事件写入channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
        return Status.READY;
        }
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    }
    
  3. 将写好的代码打包,并放到 flume 的 lib 目录下
  4. 配置文件
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = com.codecat.source.MySource
    a1.sources.r1.delay = 1000
    #a1.sources.r1.field = codecat
    # Describe the sink
    a1.sinks.k1.type = logger
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  5. 开启任务
    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/mysource.conf -Dflume.root.logger=INFO,console
    
  6. 结果展示
1.3 自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

1.3.1 需求

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台

1.3.2 分析

1.3.3 实现步骤
  1. 编码

    package com.codecat.sink;
    
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySink extends AbstractSink implements Configurable {
        // 创建Logger对象
        private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    
        private String prefix;
        private String suffix;
    
        @Override
        public void configure(Context context) {
            // 读取配置文件内容,有默认值
            prefix = context.getString("prefix", "hello");
    
            // 读取配置文件内容,无默认值
            suffix = context.getString("suffix");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            // 声明返回值状态信息
            Status status;
            // 获取当前sink绑定的channel
            Channel channel = getChannel();
            // 获取事务
            Transaction transaction = channel.getTransaction();
            // 声明事件
            Event event;
            // 开启事务
            transaction.begin();
            // 读取channel中的事件,直到读取到事件结束循环
            while (true) {
                event = channel.take();
                if (event != null) {
                    break;
                }
            }
            try {
                // 处理事件(打印)
                LOG.info(prefix + new String(event.getBody()) + suffix);
    
                // 事务提交
                transaction.commit();
                status = Status.READY;
            } catch (Exception e) {
                // 遇到异常,事务回滚
                transaction.rollback();
                status = Status.BACKOFF;
            } finally {
                // 关闭事务
                transaction.close();
            }
            return status;
        }
    }
    
  2. 将写好的代码打包,并放到flume的 lib 目录下

  3. 配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    # Describe the sink
    a1.sinks.k1.type = com.codecat.sink.MySink
    #a1.sinks.k1.prefix = codecat:
    a1.sinks.k1.suffix = :codecat
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  4. 开启任务

    [codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/mysink.conf -Dflume.root.logger=INFO,console
    

  5. 结果展示

2. Flume 数据流监控 2.1 Ganglia 的安装与部署

Ganglia 由 gmond、gmetad 和 gweb 三部分组成

  • gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。
  • gmetad(Ganglia meta Daemon)整合所有信息,并将其以 RRD格式存储至磁盘的服务
  • gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad所存储数据的 PHP 前端
2.1.1 安装 ganglia
  1. 规划
    hadoop102: web gmetad gmod
    hadoop103: gmod
    hadoop104: gmod
    
  2. 在 102 103 104 分别安装 epel-release
    [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install epel-release
    [codecat@hadoop103 flume-1.9.0]$ sudo yum -y install epel-release
    [codecat@hadoop104 flume-1.9.0]$ sudo yum -y install epel-release
    
  3. 在 102 安装
    [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmetad 
    [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-web
    [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmond
    
  4. 在 103 和 104 安装
    [codecat@hadoop103 flume-1.9.0]$ sudo yum -y install ganglia-gmond
    [codecat@hadoop104 flume-1.9.0]$ sudo yum -y install ganglia-gmond
    
2.1.2 在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf

2.1.3 在 102 修改配置文件/etc/ganglia/gmetad.conf

2.1.4 在 102 103 104 修改配置文件/etc/ganglia/gmond.conf

2.1.5 在 102 修改配置文件/etc/selinux/config

提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:

sudo setenforce 0
2.1.6 启动 ganglia
  1. 在 102 103 104 启动
    sudo systemctl start gmond
    
  2. 在 102 启动
    sudo systemctl start httpd
    sudo systemctl start gmetad
    
2.1.7 打开网页浏览 ganglia 页面

2.2 操作 Flume 测试监控 2.2.1 启动 Flume 任务
flume-ng agent 
-c conf/ 
-n a1 
-f job/flume-netcat-logger.conf 
-Dflume.root.logger=INFO,console 
-Dflume.monitoring.type=ganglia 
-Dflume.monitoring.hosts=hadoop102:8649
2.2.2 发送数据观察 ganglia 监测图

图例说明

字段(图表名称)字段含义
EventPutAttemptCountsource 尝试写入 channel 的事件总数量
EventPutSuccessCount成功写入 channel 且提交的事件总数量
EventTakeAttemptCount sink尝试从 channel 拉取事件的总数量。
EventTakeSuccessCount sink成功读取的事件的总数量
StartTimechannel 启动的时间(毫秒)
StopTimechannel 停止的时间(毫秒)
ChannelSize目前 channel 中事件的总数量
ChannelFillPercentagechannel 占用百分比
ChannelCapacitychannel 的容量
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号