栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink SQL中使用异步io关联维表

Flink SQL中使用异步io关联维表

目录
      • 一、目标
      • 二、方案
          • 2.1、 自己实现demo:
          • 2.2、 社区实现:

一、目标

希望使用flink sql来关联维度表,但是想用异步IO的方式关联。

二、方案

当前社区进展:目前Flink SQL 中的connector都没实现异步io关联维表,但是接口是已经支持了的,可以自定义实现;Hbase connector 社区有人正在支持异步io关联维表,预计1.13可以使用。

2.1、 自己实现demo:
    
    public static class AsyncTestValueLookupFunction extends AsyncTableFunction {

        private static final long serialVersionUID = 1L;
        private final Map> mapping;
        private transient boolean isOpenCalled = false;
        private transient ExecutorService executor;

        protected AsyncTestValueLookupFunction(Map> mapping) {
            this.mapping = mapping;
        }

        @Override
        public void open(FunctionContext context) throws Exception {
            RESOURCE_COUNTER.incrementAndGet();
            isOpenCalled = true;
            executor = Executors.newSingleThreadExecutor();
        }

        public void eval(CompletableFuture> resultFuture, Object... inputs) {
            checkArgument(isOpenCalled, "open() is not called.");
            final Row key = Row.of(inputs);
            if (Arrays.asList(inputs).contains(null)) {
                throw new IllegalArgumentException(
                        String.format(
                                "Lookup key %s contains null value, which should not happen.",
                                key));
            }
            CompletableFuture.supplyAsync(
                            () -> {
                                List list = mapping.get(key);
                                if (list == null) {
                                    return Collections.emptyList();
                                } else {
                                    return list;
                                }
                            },
                            executor)
                    .thenAccept(resultFuture::complete);
        }

        @Override
        public void close() throws Exception {
            RESOURCE_COUNTER.decrementAndGet();
            if (executor != null && !executor.isShutdown()) {
                executor.shutdown();
            }
        }
    }
2.2、 社区实现:

https://github.com/apache/flink/pull/14684#pullrequestreview-604148209

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/633000.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号