栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

自定义flume拦截器(1)

自定义flume拦截器(1)

1.进入idea,在maven中导入jar包

pom.xml文件下添加代码

    
        
        
            org.apache.flume
            flume-ng-core
            1.8.0
        
        
        
            com.alibaba
            fastjson
            1.2.48
        
    

注意:修改flume的版本与自己的flume的版本相同,我的是flume版本是1.8.0

2.新建一个类,实现flume的Interceptor接口,再在内里面实现静态内部类,实现Builder

 一个这个自定义flume拦截器类的框架如下:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class MyInterceptor implements Interceptor {
    @Override    
    public void initialize() {
        
    }

    @Override
    public Event intercept(Event event) {
        return null;
    }

    @Override
    public List intercept(List list) {
        return null;
    }

    @Override
    public void close() {

    }
    public static class Build implements Builder{
        @Override
        public Interceptor build() {
            return null;
        }

        @Override
        public void configure(Context context) {

        }
    }
}
3.实现Interceptor和Builder接口实现方法解析

Interceptor接口方法:

initialize():该方法用来初始化拦截器,在拦截器的构造方法执行之后执行,也就是创建完拦 截器对象之后执行。

intercept():用来处理每一个event对象,该方法不会被系统自动调用,一般在 List intercept(List events) 方法内部调用。

intercept(List list):用来处理一批event对象集合,集合大小与flume启动配置有关,和 transactionCapacity大小保持一致。一般直接调用 Event intercept(Event event) 处理每一个event数据。

close():该方法主要用来销毁拦截器对象值执行,一般是一些释放资源的处理

Builder接口方法:

 Builder接口:通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实现其抽象方法

build():该方法主要用来返回创建的自定义类拦截器对象 configure():通过该对象可以获取flume配置自定义拦截器的参数
4.实例 题目:
日志信息如下形式: '{ "host":"www.baidu.com", "user_id":"13755569427", "items":[         {                 "item_type":"eat",                 "active_time":156234         },         {                 "item_type":"car",                 "active_time":156233         }     ] }' 要求:对每一组的 item_type和active_time都过滤出相应的HOST和USERID
1.首先创建flume自定义的拦截器类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
// todo 自定义拦截器
// 1.实现Interceptor接口,实现其方法
// 2.实现静态内部类,实现Builder
public class MyInterceptor implements Interceptor {
    @Override
    //该方法用来设定初始化
    public void initialize() {

    }

    @Override
//    处理一个事件
    
    public Event intercept(Event event) {
        //定义输入输出
        String input = null;
        byte[] output = null;
        //
        ArrayList jsonList = new ArrayList<>();
        //获取消息体数据,转换为String类型
        input = new String(event.getBody(),Charsets.UTF_8);
        //将String转换成Json对象,方便通过key-value获取值
        JSonObject jsonObject = JSON.parseObject(input);
        //key的名字要跟数据中的key一模一样
        String host = jsonObject.getString("host");
        String user_id = jsonObject.getString("user_id");
        //返回一个数组,
        JSonArray items = jsonObject.getJSonArray("items");
        //遍历数组获取数据
        for (Object item:items){
            //因为item数组遍历出来只能使用object进行接收
            //所有需要将object类型在转换成String类型
            JSonObject jsonItem = JSON.parseObject(item.toString());
            String item_type = jsonItem.getString("item_type");  //遍历第一次的值eat 第二次car
            String active_time = jsonItem.getString("active_time");//

            HashMap map = new HashMap<>();
            map.put("host",host);
            map.put("user_id",user_id);
            map.put("item_type",item_type);
            map.put("active_time",active_time);
            //将map转换成String类型,放入到list中
            jsonList.add(new JSonObject(map).toJSonString());
        }
        //
        output = String.join("n",jsonList).getBytes();
        event.setBody(output);
        return event;
    }

    @Override
//    用来调用intercept()方法,将采集到一批event遍历传入到该方法中
    public List intercept(List list) {
        ArrayList result = new ArrayList<>();
        for (Event event:list){
            Event intercept = intercept(event);

            result.add(intercept);
        }
        return result;
    }

    @Override
    //关闭资源,释放资源
    public void close() {

    }

    public static class Buder implements Builder{
        @Override
        //
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
2.通过maven打成jar包

 打开idea右侧的Maven,点击package,当下方出现BUILD SUCCESS代表打包成功,可以在左侧的target目录下发现jar包

3.配置flume的采集文件
a1.sources = s1
a1.channels = c1
a1.sinks = r1

a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1=/opt/data/log/.*log
#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = MyInterceptor$Buder

a1.channels.c1.type = file

a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = hdfs://node1:8020/flume/spooldir


a1.sources.s1.channels = c1
a1.sinks.r1.channel = c1

a1.sources.s1.interceptors.i1.type = MyInterceptor$Buder

这个写的是jar包中拦截器的类名

MyInterceptor$Buder代表的是我的(拦截器类名和中间静态类的类名),如果有包名需要在前面添加包名,用.好连接,如:FlumeInterceptor.MyInterceptor$Buder

4.启动flume agent,测试

[root@node1 flumeconf]# flume-ng agent -n a1 -c ../conf/ -f ./taidir-file-hdfs.conf -Dflume.root.logger=INFO,console

[root@node1 data]# vi my.sh

#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
    {
        "item_type":"eat",
        "active_time":156234
    },
    {
        "item_type":"car",
        "active_time":156233
    }
 ]
}'
echo $log>> /opt/data/log/test.log

[root@node1 data]# bash my.sh

5.查看hdfs中文件内容

hdfs dfs -cat /flume/spooldir/FlumeData.1644173406163

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761699.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号