自定义Sink由于工作需要最近学习flink
现记录下Flink介绍和实际使用过程
这是flink系列的第五篇文章
Sink介绍SinkFunction接口介绍RichSinkFunction类介绍
Sink介绍flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中。
在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,es等。
| 连接器 | 是否提供Source支持 | 是否提供Sink支持 |
| Apache Kafka | 是 | 是 |
| Apache Cassandra | 否 | 是 |
| Apache Kinesis Data Streams | 是 | 是 |
| Elasticsearch | 否 | 是 |
| HDFS | 否 | 是 |
| RabbitMQ | 是 | 是 |
| Apache NiFi | 是 | 是 |
| Twitter Streaming API | 是 | 是 |
当然我们也可以通过自定义的方式,来实现我们自己的sink,自定义可以通过两种方式:
- 实现SinkFunction接口继承RichSinkFunction类
首先看下Flink中提供的SinkFunction接口,实现了SinkFunction接口就可以实现自定义Sink。
这是SinkFunction接口的源码,我们只需要实现invoke方法即可实现自定义sink:
public interface SinkFunctionRichSinkFunction类介绍extends Function, Serializable { @Deprecated default void invoke(IN value) throws Exception { } default void invoke(IN value, SinkFunction.Context context) throws Exception { this.invoke(value); } @Public public interface Context { long currentProcessingTime(); long currentWatermark(); Long timestamp(); } }
既然SinkFunction接口即可满足要求,那么为什么要通过继承RichSinkFunction类来实现呢。
这是RichSinkFunction类的源码:
public abstract class RichSinkFunctionextends AbstractRichFunction implements SinkFunction { private static final long serialVersionUID = 1L; public RichSinkFunction() { } }
可以看到RichSinkFunction除了实现了SinkFunction接口,还继承了AbstractRichFunction 类。
这是AbstractRichFunction 类的源码:
public abstract class AbstractRichFunction implements RichFunction, Serializable {
private static final long serialVersionUID = 1L;
private transient RuntimeContext runtimeContext;
public AbstractRichFunction() {
}
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
throw new IllegalStateException("The runtime context has not been initialized.");
} else if (this.runtimeContext instanceof IterationRuntimeContext) {
return (IterationRuntimeContext)this.runtimeContext;
} else {
throw new IllegalStateException("This stub is not part of an iteration step function.");
}
}
public void open(Configuration parameters) throws Exception {
}
public void close() throws Exception {
}
}
这里可以看到open和close方法,我们可以通过重写这两个方法,在open方法中连接资源,在close方法中关闭资源、释放资源。
当然我们还可以在open中做更多的操作,比如我们的sink是将数据加入到clickhouse中,加入clickhouse时数据最好是批量加入(不清楚百度哈),所以我们要在open中初始化定时任务去定时处理数据,每10000条加入一次。当然单线程不一定来得及,还要配合上线程池使用,所以open方法中改成初始化线程池去执行定时任务。



