栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume 学习笔记(四)自定义Source 与 自定义 Sink 案例以及测试

Flume 学习笔记(四)自定义Source 与 自定义 Sink 案例以及测试

若发现文章有误,敬请指教,感谢

文章目录

参考资料一、运行环境二、自定义Source

2.1 程序具体实现2.2 Flume配置2.3 测试 三、自定义Sink

3.1 编写程序3.2 Flume配置3.3 测试

参考资料

视频链接

一、运行环境

CentOS7JDK8Flume1.9 二、自定义Source


2.1 程序具体实现

导入依赖:pom.xml


    org.apache.flume
    flume-ng-core
    1.9.0

编写自定义的source:UniSource.java
注:必须继承AbstractSource,并实现Configurable和PollableSource接口

package com.uni.flume.source;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;


public class UniSource extends AbstractSource implements Configurable, PollableSource {
    // 声明数据源source的前后缀
    private String prefix;
    private String subfix;
    private Long delay;
    @Override
    public void configure(Context context) {
        prefix = context.getString("pre", "pre-");
        subfix = context.getString("sub");
        delay = context.getLong("delay", 2000L);
    }

    @Override
    public Status process() throws EventDeliveryException {

        // 1. 循环创建事件信息, 传给channel
        try {
            for (int i = 0; i < 5; i++) {
                // 2. 声明事件
                Event event = new SimpleEvent();
                HashMap headers = new HashMap<>();
                event.setHeaders(headers);
                event.setBody((prefix + "uni:" + i + subfix).getBytes());
                getChannelProcessor().processEvent(event);
            }
            Thread.sleep(delay);
            return Status.READY;
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

使用Maven打包成jar包,并发送到节点Flume根目录下的lib文件夹里

2.2 Flume配置

注:a1.sources.r1.type指定为自定义source类的具体类名即可。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置自定义的数据源 
a1.sources.r1.type = com.uni.flume.source.UniSource
# 消息前缀,默认为pre-
a1.sources.r1.pre = hhh-
# 消息后缀,默认为空
a1.sources.r1.sub = -hhh
# 消息频率,默认每2000ms5条
a1.sources.r1.deley = 1000

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
2.3 测试
$ bin/flume-ng agent -c conf/ -n a1 -f job/custom-source-console.conf -Dflume.root.logger=INFO,console

测试结果:

三、自定义Sink
3.1 编写程序

导入Flume的依赖pom.xml

 
     org.apache.flume
     flume-ng-core
     1.9.0
 

自定义Sink类 UniSink.java

package com.uni.flume.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class UniSink extends AbstractSink implements Configurable {
    // 声明数据的前后缀
    private String prefix;
    private String subfix;

    // 创建 Logger 对象
    private Logger logger = LoggerFactory.getLogger(UniSink.class);
    @Override
    public void configure(Context context) {
        prefix = context.getString("pre", "pre-");
        subfix = context.getString("sub");
    }

    @Override
    public Status process() throws EventDeliveryException {
        // 1. 获取 Channel 并开启事务
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        // 2. 从Channel中抓取数据打印到控制台
        try{
            // 2.1 抓取数据
            Event event;
            while(true){
                event = channel.take();
                if(event != null)
                    break;
            }
            // 2.2 处理数据
            logger.info(prefix + new String(event.getBody()) + subfix);
            // 2.3 提交事务
            transaction.commit();
            return Status.READY;
        } catch (Exception e){
            // 回滚
            transaction.rollback();
            return Status.BACKOFF;
        } finally {
            transaction.close();
        }
    }
}

和之前一样,使用Maven打包,发送到节点的Flume根目录下的lib文件夹

3.2 Flume配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置自定义的Sink,给消息添加前后缀
a1.sinks.k1.type = com.uni.flume.sink.UniSink
a1.sinks.k1.pre = orz-
a1.sinks.k1.sub = -orz

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
3.3 测试

(1)启动Flume

$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-console.conf -Dflume.root.logger=INFO,console

(2)使用netcat往本地的44444端口发送消息

nc localhost 44444

发送消息:

经过自定义的sink时打印的消息:

至此,自定义的Source与Sink测试完毕。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775396.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号