栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

04-系统内置拦截器使用&自定义拦截器

04-系统内置拦截器使用&自定义拦截器

系统内置拦截器使用

在Flume运行过程中,Flume有能力在运行阶段修改/删除Event,这是通过拦截器((Interceptors)来实现的。拦截器有下面几个特点:

  • 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
  • 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
  • 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
  • 一个拦截器返回的事件列表被传递给链中的下一个拦截器。
  • 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
常用拦截器:
  1. Timestamp Interceptor :时间戳拦截器,将当前时间戳((毫秒)加入到events header中,key名字为: timestamp,值为当前时间戳。用的不是很多
  2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为: host (也可自定义)
  3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

案例演示

通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS

配置方案

[root@tianqinglong01 flumeconf]# vi ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type=syslogtcp
a1.sources.r1.host=tianqingglong01
a1.sources.r1.port=6666
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=tianqinglong01

a1.channels.c1.type=memory

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://tianqinglong01:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
ai.sinks.s1.hdfs.inuseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundvalue=1
ai.sinks.s1.hdfs.roundUnit=second
ai.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

启动agent的服务

[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

测试

[root@tianqinglong ~]# echo "hello world hello interceptor" | nc tianqinglong01 6666

# nc需要安装
yum install -y nmap-ncat
选择器的使用

说明

Flume中的Channel选择器作用于source阶段,是决定Source接受的特定事件写入到哪个Channe的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

Agent中各个组件的交互

由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChanelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.

  • replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
  • multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由

案例演示: replicating selector

配置方案

[root@tianqinglong01 flumeconf]# vi rep.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2

a1.sources.r1.type=syslogtcp
a1.sources.r1.host = tianqinglong01
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating

a1.channels.c1.type=memory

a1.channels.c2.type=memory

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://tianqinglong01:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inuseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundvalue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://tianqinglong01:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
ai.sinks.s2.hdfs.fileType=Datastream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundunit=second
a1.sinks. s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

启动agent的服务

[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

测试

[root@tianqinglong ~]# echo "hello world hello interceptor" | nc tianqinglong01 6666

# nc需要安装
yum install -y nmap-ncat

案例演示: Multiplexing selector

配置方案

[root@tianqinglong01 flumeconf]# vi mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2

a1.sources.r1.type=syslogtcp
a1.sources.r1.host = tianqinglong01
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1


a1.channels.c1.type=memory

a1.channels.c2.type=memory

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://tianqinglong01:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inuseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundvalue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://tianqinglong01:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
ai.sinks.s2.hdfs.fileType=Datastream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundunit=second
a1.sinks. s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

启动agent的服务

[root@tianqinglong flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

测试

[root@tianqinglong ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my multiplex to c2"]' http://tianqinglong01:6666
[root@tianqinglong ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"]' http://tianqinglong01:6666
自定义拦截器

需求

为了提高Flume的扩展性,用户可以自己定义一个拦载器

将event的body中的数据,以数字开头的,存储为 hdfs://tianqinglong01:8020/flume/number.log s1
将event的body中的数据,以字母开头的,存储为 hdfs://tianqinglong01:8020/flume/character.log s2
将event的body中的数据,其他的开头的,乎储为hdfs: //tianqinglong01:8020/flume/other.log s3

pom.xml


	org.apache.flume
    flume-ng-core
	1.8.0

代码

package com.qf;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class MyIntercepter implements Interceptor {
    private static final String LOCATION_KEY="location";
    private static final String LOCATION_NUMBER="number";
    private static final String LOCATION_CHARACTER="character";
    private static final String LOCATION_OTHER="other";


    public Event intercept(Event event) {
        byte[] body = event.getBody();
        if(body[0]>='0'&&body[0]<='9'){
            event.getHeaders().put(LOCATION_KEY,LOCATION_NUMBER);
        }
        else if(body[0]>='a'&&body[0]<='z' ||body[0]>='A'&&body[0]<='Z'){
            event.getHeaders().put(LOCATION_KEY,LOCATION_CHARACTER);

        }else {
            event.getHeaders().put(LOCATION_KEY,LOCATION_OTHER);

        }
        return event;
    }

    public List intercept(List events) {
        for (Event event: events
             ) {
            intercept(event);
        }
        return events;
    }

    public void initialize() {

    }
    public void close() {

    }
    public static class MyBuilder implements Builder{

        public Interceptor build() {
            return new MyIntercepter();
        }

        public void configure(Context context) {

        }
    }
}

打包上传

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib 目录下

编写方案

a1.sources=r1
al.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
#设置source的属性
a1.sources.ri.type=syslogtcp
a1.sources.r1.host=tianqinglong01
a1.sources.r1.port=12345
#设置拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.qf.flume. MyInterceptor$MyBuilder
#设置选择器的属性
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=location
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.ri.selector.mapping.other=c3
#设置channel的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
#设置sink的属性
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs: //tianqinglong01:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=regex
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=102400
a1.sinks.s1.hdfs.rollCount=30
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs: //tianqinglong01:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=regex
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=102400
a1.sinks.s2.hdfs.rollCount=30
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text

a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs: //tianqinglong01:8020/flume/customInterceptor/s1/%Y-%m-%d-%H
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=regex
a1.sinks.s3.hdfs.rollInterval=0
a1.sinks.s3.hdfs.rollSize=102400
a1.sinks.s3.hdfs.rollCount=30
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.writeFormat=Text

启动agent

[root@tianqinglong01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n -a1 -Dflume.root.logger=INFO,console

测试

echo "hello world" | nc  tianqinglong01 12345
echo "123 hello world" | nc  tianqinglong01 12345
echo ".123 hello world" | nc  tianqinglong01 12345
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581380.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号