栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume多路复用及拦截器的使用

flume多路复用及拦截器的使用

主要包括了flume多路复用拦截器的需求分析及原理和代码api及配置的实现!

目录标题
  • 需求
  • 需求分析及原理
  • 实现步骤

需求

使用flume采集服务器本地日志,需要按照日志类型的不同来将不同种类的日志发往不同的分析系统

需求分析及原理

在开发当中,一台服务器产生的日志类型往往是有很多种的,在不同类型的日志有可能要发往不同的分析系统,在这个时候就会用到flume的channel selector中的Multiplexing结构。其原理是:根据event中header的某个key值,将不同的event发送到不同的channel中,所以我们需要自定义一个interceptor,来为不同类型的event的header中的key赋不同的值

在本demo中,以端口数据来模拟日志,以开头为字母或者数字来模拟不同类型的日志,为此需要自定义interceptor来区分数字和字母,并将其发往不同的分析系统(channel)

分析图如下:

实现步骤

1、创建一个maven项目,并在pox.xml中添加如下配置项



    4.0.0

    org.example
    flumeDemo
    1.0-SNAPSHOT

    
        8
        8
    

	
    
        
            org.apache.flume
            flume-ng-core
            1.9.0
        
    
    

2、创建InterceptorDemo类,并实现Interceptor接口
完整代码如下:

package com.lqs.flume;



import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;



public class InterceptorDemo implements Interceptor {

    
    @Override
    public void initialize() {

    }

    
    @Override
    public Event intercept(Event event) {

        //需要配合channel选择器使用,向headers当中put对应的参数
        //根据传入的数据来判断首位是数字还是字母,判断它是不是同类型的日志
        byte[] body = event.getBody();
        byte b = body[0];
        Map headers = event.getHeaders();
        if (b >= '0' && b <= '9') {
            //当b为数字时
            headers.put("type", "number");
        } else if ((b >= 'A' && b <= 'z') || (b >= 'a' && b <= 'z')) {
            //当b为字母时
            headers.put("type", "letter");
        }

        //放回headers,这个这里也可以不写
        event.setHeaders(headers);

        return event;
    }

    
    @Override
    public List intercept(List list) {

        for (Event event : list) {
            intercept(event);
        }

        return list;
    }

    
    @Override
    public void close() {

    }

    
    public static class BuilderDemo implements Builder {

        
        @Override
        public Interceptor build() {

            return new InterceptorDemo();

        }

        
        @Override
        public void configure(Context context) {

        }
    }
}

记得打包上传到flume的lib文件夹下面

3、编辑flume配置文件
在job/group2文件中创建一下新文件:

我是将包名改成了flumeDemo.jar并上传的

[lqs@bdc112 group2]$ pwd
/home/lqs/module/flume-1.9.0/job/group2

[lqs@bdc112 group2]$ touch flume01
[lqs@bdc112 group2]$ touch flume02
[lqs@bdc112 group2]$ touch flume03

打开flume01文件添加如下内容

[lqs@bdc112 group2]$ vim flume01
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.selector.type = multiplexing
# 使用headers中的哪些参数
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.letter = c2

# a1.sources.r1.selector.default = c4

# 拦截器配置
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lqs.flume.InterceptorDemo$BuilderDemo

# Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bdc112
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bdc112
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

打开flume02文件并添加如下内容:

[lqs@bdc112 group2]$ vim flume02
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = bdc112
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

打开flume03文件并添加如下内容:

[lqs@bdc112 group2]$ vim flume03
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = bdc112
a1.sources.r1.port = 4142

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4、分别启动flume01、flume02、flume03

#第一个窗口
[lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume03 -Dflume.root.logger=INFO,console

#第二个窗口
[lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume02 -Dflume.root.logger=INFO,console

#第三个窗口
[lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume01 -Dflume.root.loggeINFO,console
#注意,以上三条命令表示的都是同一个意思

注意:一定要按照上述的顺序执行,否则会报错!!!

第一个窗口截图

第二个窗口截图

第三个窗口截图

注意:上面三个截图只是告诉你开三个窗口,没有其它意思。
5、在bdc112使用netcat向localhost:44444发送字母和数字

6、查看flume03和flume02的情况
flume03

flume02

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698941.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号