栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

自定义 Flume Interceptor

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

自定义 Flume Interceptor

目录

1.需求

2.实现步骤

(1)在maven项目中导入依赖

(2)定义 TypeInterceptor 类并实现 Interceptor 接口。

(3)将maven项目导包并上传到hadoop02的lib目录

(4)编辑 flume 配置文件

(5)启动进程        


1.需求

        在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予 不同的值。

        在该案例中,我们以端口数据模拟日志,以是否包含”love”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”love”,将其分别发往不同的分析 系统(Channel)。

2.实现步骤

(1)在maven项目中导入依赖

 org.apache.flume
 flume-ng-core
 1.9.0

(2)定义 TypeInterceptor 类并实现 Interceptor 接口。
package com.my.bigdata.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {
    //声明一个存放事件的集合
    private List addHeaderEvents;

    @Override
    public void initialize() {
        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) {
        //1.获取事件中的头信息
        Map headers = event.getHeaders();
        //2.获取事件中的 body 信息
        String body = new String(event.getBody());
        //3.根据 body 中是否有"atguigu"来决定添加怎样的头信息
        if (body.contains("love")) {
            //4.添加头信息
            headers.put("type", "one");
        } else {
            //4.添加头信息
            headers.put("type", "two");
        }
        return event;
    }

    //批量事件拦截
    @Override
    public List intercept(List events) {
        //1.清空集合
        addHeaderEvents.clear();
        //2.遍历 events
        for (Event event : events) {
            //3.给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        //4.返回结果
        return addHeaderEvents;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

(3)将maven项目导包并上传到hadoop02的lib目录

(4)编辑 flume 配置文件

(1)在 hadoop02 上 配置一个netcat-group.conf文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.bigdata.flume.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.one = c1
a1.sources.r1.selector.mapping.two = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 8181
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop04
a1.sinks.k2.port = 8282
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(2)在 hadoop03上 配置一个flume3.conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 8181
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(3)在 hadoop04上 配置一个flume4.conf文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop04
a1.sources.r1.port = 8282
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(5)启动进程        

(1)分别在 hadoop02,hadoop03,hadoop04 上启动 flume 进程,注意先后顺序(先启动hadoop03和hadoop04的flume进程)。

bin/flume-ng agent -c conf/ -n a1 -f conf/netcat-group.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f conf/flume3.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f conf/flume4.conf -Dflume.root.logger=INFO,console

(2)在hadoop02上使用 netcat 工具向本机的 44444 端口发送内容

 nc localhost 44444

(3)在hadoop03和hadoop04上查看打印日志

谢谢观看 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/644868.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号