- 序
- ParquetRowInputFormat
- 间隔获取HDFS上的文件
- 使用广播进行关联
- 广播类型
在前文提到使用Flink SQL 在1.13.2版本下无法支撑定时获取HDFS上的文件(更新等状态),但是Flink 的API上其实是提供了这个方案的。
再次申明,截止1.14版本Flink,是没有SQL能够不通过Hive来定时获取HDFS上的文件的。
ParquetRowInputFormat 是继承于RichInputFormat 的文件读取器,使用它可以定时的去更新整个数据的流向。
根据建表语句来设定MessageType(这个构造可以像我一样啰嗦一点,可以是使用别人的反射,或者直接使用Arvo的)。
在 env.readFile 中,有五个参数的构造,分别是 FileFormat 数据读取器,pathString 路径,FileProcessingMode 文件读取类型,间隔时间。如下我设定了 每一个小时去check一次路径上的文件是否有变化。
final String pathString = "hdfs://path/city/";
final ArrayList cityFields = new ArrayList<>();
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "province"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "city"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "district"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "provincecode"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "citycode"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "districtcode"));
final ParquetRowInputFormat cityFormat = new ParquetRowInputFormat(new Path(pathString), new MessageType("", cityFields));
final SingleOutputStreamOperator city= env.readFile(cityFormat , pathString
, FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
.map((MapFunction) cityConfig::new)
.returns(cityConfig.class)
.name("getCityConfig");
使用广播进行关联
// 创建 一个广播 final MapStateDescriptor广播类型cityConfigMapStateDescriptor = new MapStateDescriptor<>("city", String.class, CityConfig.class); // 主流进行关联 T 是主表中的javaBean对象 或者说PoJo吧 final SingleOutputStreamOperator volteResult = volteStream .connect(city.broadcast(cityConfigMapStateDescriptor)) // 关联city表,并且广播city .process(new BroadcastProcessFunction () { // 使用Broadcast进行广播,将数据分发到所有节点 @Override // 需要实现两个方法,第一个时当遇到主流数据应该怎么做 // 从 broadcastState 中获取数据来进行关联 public void processElement(T value, BroadcastProcessFunction .ReadonlyContext ctx , Collector out) throws Exception { ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor); String cellKey = value.getProvinceCode() + "_" + value.getCityCode(); if (broadcastState.contains(cellKey)) { final CityConfig cityConfig = broadcastState.get(cellKey); out.collect(value.connect(cityConfig)); } } @Override // 第二个时当遇到广播的数据该怎么做,直接放 broadcastState 中就可以了 public void processBroadcastElement(CityConfig value, BroadcastProcessFunction .Context ctx , Collector out) throws Exception { BroadcastState broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor); String cellKey = value.getProvinceCode() + "_" + value.getCityId(); broadcastState.put(cellKey, value); } }) .uid("connectCity") // 请设定uid,因为这个是要存储数据的,最好指定好uid,这样以后升级也能获取到这个数据。 .name("connectCity");
BroadcastProcessFunction 和 KeyBroadcastProcessFunction。
上述使用的是 BroadcastProcessFunction,意思是将所有数据分发到所有节点。
Keyed意思就是使用Hash进行分发,每个节点只有部分数据。



