栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume学习四

flume学习四

自定义Interceptor

案例需求:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。

在该案例中,我们以端口数据模拟日志,以数据中含有“up”和不含“up”模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。

 通过netcat端口监听数据传入flume1,通过channel选择器将数据进行分类,含有“up”的传入flume2,不含“up”的传入flume3。flume2和flume3端的sink就打印到控制台就OK了。

在idea里面写代码,首先要添加flume的依赖:


    org.apache.flume
    flume-ng-core
    1.7.0

代码部分:

定义一个类实现 Intercept接口去重写里面的方法:

package test.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {

    //存放事件的集合
    private List addHeaderEvents;

    public void initialize() {

        //初始化
        addHeaderEvents = new ArrayList();

    }

    //单个事件拦截
    public Event intercept(Event event) {

        //1.获取事件中的头信息header
        Map headers = event.getHeaders();

        //2.获取事件中的body
        String body = new String(event.getBody());

        //3.根据body中是否含有'hello'来决定添加怎样的头信息
        if(body.contains("up")){
            headers.put("type", "up");
        }else {
            headers.put("type", "notup");
        }
        return event;
    }

    //批量事件拦截
    public List intercept(List events) {

        //1.清空集合
        addHeaderEvents.clear();

        //2.遍历envents
        for (Event event :events){
            //3.给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        return addHeaderEvents;
    }

    public void close() {
    }

    public static class Bulider implements Interceptor.Builder{

        public Interceptor build() {
            return new TypeInterceptor();
        }

        public void configure(Context context) {

        }
    }
}

 写好之后进行打包:

打包成功之后:

 上传至集群

然后写配置文件:
先写hadoop03上的flume2配置文件:

a1.sources - r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

 再配置hadoop04的flume3的配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

 最后配置hadoop02上的flume1文件:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing

a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

 

配置文件中:
        a1.sources.r1.selector.header = type
        a1.sources.r1.selector.mapping.up = c1
        a1.sources.r1.selector.mapping.notup= c2
这里的type与if语句中的type对应,因为含有up的传入到flume2中,所以c1对应的是up,c1对应的是notup

 最后就是启动我们的flume了,启动的时候,必须先启动后面,也就是hadoop03和hadoop04

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700203.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号