- 一、目标
- 二、方案
- 2.1、 自己实现demo:
- 2.2、 社区实现:
希望使用flink sql来关联维度表,但是想用异步IO的方式关联。
二、方案2.1、 自己实现demo:当前社区进展:目前Flink SQL 中的connector都没实现异步io关联维表,但是接口是已经支持了的,可以自定义实现;Hbase connector 社区有人正在支持异步io关联维表,预计1.13可以使用。
public static class AsyncTestValueLookupFunction extends AsyncTableFunction {
private static final long serialVersionUID = 1L;
private final Map> mapping;
private transient boolean isOpenCalled = false;
private transient ExecutorService executor;
protected AsyncTestValueLookupFunction(Map> mapping) {
this.mapping = mapping;
}
@Override
public void open(FunctionContext context) throws Exception {
RESOURCE_COUNTER.incrementAndGet();
isOpenCalled = true;
executor = Executors.newSingleThreadExecutor();
}
public void eval(CompletableFuture> resultFuture, Object... inputs) {
checkArgument(isOpenCalled, "open() is not called.");
final Row key = Row.of(inputs);
if (Arrays.asList(inputs).contains(null)) {
throw new IllegalArgumentException(
String.format(
"Lookup key %s contains null value, which should not happen.",
key));
}
CompletableFuture.supplyAsync(
() -> {
List list = mapping.get(key);
if (list == null) {
return Collections.emptyList();
} else {
return list;
}
},
executor)
.thenAccept(resultFuture::complete);
}
@Override
public void close() throws Exception {
RESOURCE_COUNTER.decrementAndGet();
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
}
}
}
2.2、 社区实现:
https://github.com/apache/flink/pull/14684#pullrequestreview-604148209



