需求:实现FlinkSQL sink到ArangoDB图数据库
分析:自定义Flink Table & SQL connector 支持flink-connector-arangodb,只需要实现sink部分
官网支持user-defined sources&sinks,对Table SQL的source/sink定义提供了解释
Metadata:对表的声明,封装为Catalog,定义外部存储系统的元数据
Planning:Factory实例由Java SPI机制创建,将外部表元数据配置封装为参数化实例(DynamicTableSource/Sink)
Runtime:读取/写入核心逻辑,实现InputFormat/OutputFormat或SourceFunction/SinkFunction接口,构建与外部存储系统的连接和实现读取和写入逻辑。
需要我们扩展的地方:
Dynamic Table Factory
自定义工场类实现org.apache.flink.table.factories.DynamicTableSinkFactory(我这里仅要支持sink,如果要支持source,需要实现org.apache.flink.table.factories.DynamicTableSourceFactory)
DDL语句中的‘connector’配置项作为标识符用来发现对应的工厂类实例
Factory工场类是由Java SPI来实例化的,我们需要在自定义connector模块的resource下添加文件
META-INF/services/org.apache.flink.table.factories.Factory
文件中指定工厂类的全路径
Dynamic Table Sink
Factory工厂类主要构建DynamicTableSource/DynamicTableSink,这是个参数化实例对象,定义connector配置参数。在DynamicTableSource/Sink中数据的传递要使用Flink内部数据结构org.apache.flink.table.data.RowData,这里获取到数据需要对value做一下转换,value数据提取封装为RowData,RowData接口的实现类也比较多,可根据情况选择合适的实现类。
Sink接口实现:
有三个接口的实现会影响DML语句的执行
| 接口 | 描述 |
| SupportsOverwrite | 实现此接口可以使用INSERT OVERWRITE语句覆盖现有的表或分区数据 |
| SupportsPartitioning | 允许写入分区数据 |
| SupportsWritingMetadata | 保存持久化DDL中定义的列和类型 |
Runtime Provider
这里官网并没有对实际读取写入的Runtime实现作详细解释。说一下个人的理解
Dynamic Table Source/Sink 提供了获取RuntimeProvider实例函数getSinkRuntimeProvider(Context context),这个函数需要我们自定义逻辑去声明InputFormat/OutputFormat或者source/sinkFunction实例化对象
两种方式运行Provider
1.OutputFormatProvider.of(InputFormat/OutputFormat)
2.SinkFunctionProvider.of(source/sinkFunction)
使用lambda表达式,执行return () -> xxxFormat/xxxFunction;构建静态provider
关于InputFormat/OutputFormat或者source/sinkFunction
关键方法
1.xxxxFunction// 建立连接 open(); // 执行读取/写入逻辑 invoke(T value,Context context); // 关闭连接 close(); 2.xxxxFormat // 建立连接 open(int taskNumber, int numTasks); // 执行读取/写入逻辑 writeRecord(In record) // 关闭连接 close();
Encoding / Decoding Formats(待完善)
阅读flink-connector模块,对比几个connector的源码,分析后得出简单的connector主体架构:
// 1.工厂类构造source和sink,java SPI创建实例 ArangoDBDynamicTableFactory imp DynamicTableSinkFactory (如果支持source,需imp DynamicTableSourceFactory) - createDynamicTableSink // 创建连接器的参数化实例 -> 封装connector的参数 - optionalOptions - requiredOptions - factoryIdentifier // 2.arangodbsink引出sinkfunction ArangodbDynamicTableSink // 3.sinkFunction/outputFormat建立连接执行写入 - open() // 建立连接 - invoke() // arangodb API - writeRecord - ArangoCollection.insertDocuments(values) - ArangoCollection.updateDocument(key,value) - close() // 关闭连接 // 4.掺杂着其他的辅助类 - convert rowData -> document serialize/deserialize



