栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。

ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。

Flink 定时获取HDFS 上某路径的parquet文件,并作为dim进行关联。
    • ParquetRowInputFormat
    • 间隔获取HDFS上的文件
    • 使用广播进行关联
    • 广播类型

在前文提到使用Flink SQL 在1.13.2版本下无法支撑定时获取HDFS上的文件(更新等状态),但是Flink 的API上其实是提供了这个方案的。
再次申明,截止1.14版本Flink,是没有SQL能够不通过Hive来定时获取HDFS上的文件的。

ParquetRowInputFormat

ParquetRowInputFormat 是继承于RichInputFormat 的文件读取器,使用它可以定时的去更新整个数据的流向。

间隔获取HDFS上的文件

根据建表语句来设定MessageType(这个构造可以像我一样啰嗦一点,可以是使用别人的反射,或者直接使用Arvo的)。

在 env.readFile 中,有五个参数的构造,分别是 FileFormat 数据读取器,pathString 路径,FileProcessingMode 文件读取类型,间隔时间。如下我设定了 每一个小时去check一次路径上的文件是否有变化。

final String pathString = "hdfs://path/city/";
        final ArrayList cityFields = new ArrayList<>();
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "province"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "city"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "district"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "provincecode"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "citycode"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "districtcode"));

        final ParquetRowInputFormat cityFormat = new ParquetRowInputFormat(new Path(pathString), new MessageType("", cityFields));
        final SingleOutputStreamOperator city= env.readFile(cityFormat , pathString
                        , FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
                .map((MapFunction) cityConfig::new)
                .returns(cityConfig.class)
                .name("getCityConfig");
使用广播进行关联
// 创建 一个广播
final MapStateDescriptor cityConfigMapStateDescriptor = new MapStateDescriptor<>("city", String.class, CityConfig.class);

// 主流进行关联 T 是主表中的javaBean对象 或者说PoJo吧

final SingleOutputStreamOperator volteResult = volteStream
                .connect(city.broadcast(cityConfigMapStateDescriptor)) // 关联city表,并且广播city
                .process(new BroadcastProcessFunction() { // 使用Broadcast进行广播,将数据分发到所有节点
                    @Override
                    // 需要实现两个方法,第一个时当遇到主流数据应该怎么做
                    // 从 broadcastState 中获取数据来进行关联
                    public void processElement(T value, BroadcastProcessFunction.ReadonlyContext ctx
                            , Collector out) throws Exception {
                        ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
                        String cellKey = value.getProvinceCode() + "_" + value.getCityCode();
                        if (broadcastState.contains(cellKey)) {
                            final CityConfig cityConfig = broadcastState.get(cellKey);
                            out.collect(value.connect(cityConfig));
                        }
                    }

                    @Override
                    // 第二个时当遇到广播的数据该怎么做,直接放 broadcastState 中就可以了
                    public void processBroadcastElement(CityConfig value, BroadcastProcessFunction.Context ctx
                            , Collector out) throws Exception {
                        BroadcastState broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
                        String cellKey = value.getProvinceCode() + "_" + value.getCityId();
                        broadcastState.put(cellKey, value);
                    }
                })
                .uid("connectCity") // 请设定uid,因为这个是要存储数据的,最好指定好uid,这样以后升级也能获取到这个数据。
                .name("connectCity");
广播类型

BroadcastProcessFunction 和 KeyBroadcastProcessFunction。
上述使用的是 BroadcastProcessFunction,意思是将所有数据分发到所有节点。
Keyed意思就是使用Hash进行分发,每个节点只有部分数据。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354802.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号