栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume 学习笔记(三)source多路复用选择器 | 简单的拦截器实现 | 不同集群节点的数据分流

Flume 学习笔记(三)source多路复用选择器 | 简单的拦截器实现 | 不同集群节点的数据分流

若发现文章有误,敬请指教,感谢

文章目录

一、参考资料二、运行环境三、自定义拦截器四、测试

一、参考资料

视频资料

二、运行环境

CentOS7JDK8Hadoop3.3.0Flume1.9

集群环境

三、自定义拦截器

pom.xml



    4.0.0

    com.uni
    custom-interceptor
    1

    
        8
        8
    

    
        
            org.apache.flume
            flume-ng-core
            1.9.0
        
    


TypeInterceptor.java拦截类 ,作用是检查收到的消息,若包含"uni"字段则在header头里标注为uni,否则就标记为other

package com.uni.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.linkedList;
import java.util.List;
import java.util.Map;


public class TypeInterceptor implements Interceptor {
    // 声明集合存放拦截器处理后的事件
    private List addHeaderEvents;
    @Override
    public void initialize() {
        addHeaderEvents = new linkedList<>();
    }


    
    @Override
    public Event intercept(Event event) {
        // 1. 获取 header 和 body
        Map headers = event.getHeaders();
        String body = new String(event.getBody());
        // 2. 根据 body 中是否包含 "uni" 添加不同的头信息
        if (body.contains("uni")) {
            headers.put("type", "uni");
        } else{
            headers.put("type", "others");
        }
        // 返回数据
        return event;
    }

    
    @Override
    public List intercept(List list) {
        // 1. 清空集合
        addHeaderEvents.clear();
        // 2. 遍历
        for (Event event : list) {
            addHeaderEvents.add(intercept(event));
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

Maven打包,上传到集群节点hadoop101,并且上传到flume根目录里的lib文件夹,这样在执行时就不用配置类路径,Flume会自动扫描lib文件夹下的包

案例架构图


这里为方便标识,则分别用集群节点hadoop101、hadoop102、hadoop103去启动Agent 1、Agent 2和Agent 3

Hadoop101 net-flume-avro.conf

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.uni.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.uni = c1
a1.sources.r1.selector.mapping.others = c2

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4411
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4412

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Hadoop102 avro-flume-console-uni.conf

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4411

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

Hadoop103 avro-flume-console-other.conf

a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4412

a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
四、测试

先将以avro为source的agent启动、即在hadoop102、hadoop103启动a2、a3,最后在启动以avro为sink的a1。

[root@hadoop103 flume1.9]# bin/flume-ng agent -c conf/ -n a3 -f job/group3/avro-flume-console-other.conf -Dflume.root.logger=INFO,console

[root@hadoop102 flume1.9]# bin/flume-ng agent -c conf/ -n a2 -f job/group3/avro-flume-console-uni.conf -Dflume.root.logger=INFO,console

[root@hadoop101 flume1.9]# bin/flume-ng agent -c conf/ -n a1 -f job/group3/net-flume-avro.conf

将每个集群节点的flume打开后,接下来在任何一个集群节点在新开一个终端,使用netcat对Hadoop101节点进行端口通信,netcat连接的端口号为44444

$ nc hadoop101 44444

发送的消息如下图:

最终测试结果:
hadoop103:

hadoop102:

hadoop101:
至此,Flume在集群多节点的多路复用、拦截实现测试完毕。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773830.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号