参考资料若发现文章有误,敬请指教,感谢
文章目录
参考资料一、运行环境二、自定义Source
2.1 程序具体实现2.2 Flume配置2.3 测试 三、自定义Sink
3.1 编写程序3.2 Flume配置3.3 测试
视频链接
一、运行环境CentOS7JDK8Flume1.9 二、自定义Source
2.1 程序具体实现
导入依赖:pom.xml
org.apache.flume flume-ng-core 1.9.0
编写自定义的source:UniSource.java
注:必须继承AbstractSource,并实现Configurable和PollableSource接口
package com.uni.flume.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class UniSource extends AbstractSource implements Configurable, PollableSource {
// 声明数据源source的前后缀
private String prefix;
private String subfix;
private Long delay;
@Override
public void configure(Context context) {
prefix = context.getString("pre", "pre-");
subfix = context.getString("sub");
delay = context.getLong("delay", 2000L);
}
@Override
public Status process() throws EventDeliveryException {
// 1. 循环创建事件信息, 传给channel
try {
for (int i = 0; i < 5; i++) {
// 2. 声明事件
Event event = new SimpleEvent();
HashMap headers = new HashMap<>();
event.setHeaders(headers);
event.setBody((prefix + "uni:" + i + subfix).getBytes());
getChannelProcessor().processEvent(event);
}
Thread.sleep(delay);
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
使用Maven打包成jar包,并发送到节点Flume根目录下的lib文件夹里
2.2 Flume配置注:a1.sources.r1.type指定为自定义source类的具体类名即可。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置自定义的数据源 a1.sources.r1.type = com.uni.flume.source.UniSource # 消息前缀,默认为pre- a1.sources.r1.pre = hhh- # 消息后缀,默认为空 a1.sources.r1.sub = -hhh # 消息频率,默认每2000ms5条 a1.sources.r1.deley = 1000 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c12.3 测试
$ bin/flume-ng agent -c conf/ -n a1 -f job/custom-source-console.conf -Dflume.root.logger=INFO,console
测试结果:
3.1 编写程序
导入Flume的依赖pom.xml
org.apache.flume flume-ng-core 1.9.0
自定义Sink类 UniSink.java
package com.uni.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UniSink extends AbstractSink implements Configurable {
// 声明数据的前后缀
private String prefix;
private String subfix;
// 创建 Logger 对象
private Logger logger = LoggerFactory.getLogger(UniSink.class);
@Override
public void configure(Context context) {
prefix = context.getString("pre", "pre-");
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
// 1. 获取 Channel 并开启事务
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
// 2. 从Channel中抓取数据打印到控制台
try{
// 2.1 抓取数据
Event event;
while(true){
event = channel.take();
if(event != null)
break;
}
// 2.2 处理数据
logger.info(prefix + new String(event.getBody()) + subfix);
// 2.3 提交事务
transaction.commit();
return Status.READY;
} catch (Exception e){
// 回滚
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
和之前一样,使用Maven打包,发送到节点的Flume根目录下的lib文件夹
3.2 Flume配置a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置自定义的Sink,给消息添加前后缀 a1.sinks.k1.type = com.uni.flume.sink.UniSink a1.sinks.k1.pre = orz- a1.sinks.k1.sub = -orz a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c13.3 测试
(1)启动Flume
$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-console.conf -Dflume.root.logger=INFO,console
(2)使用netcat往本地的44444端口发送消息
nc localhost 44444
发送消息:
经过自定义的sink时打印的消息:
至此,自定义的Source与Sink测试完毕。



