栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink ProcessFucntion

Flink ProcessFucntion

文章目录
      • 1.为什么要学习底层 ProcessFuntion API
      • 2.Flink提供了哪些 ProcessFuntion
      • 3.这些 ProcessFuntion有什么不同

1.为什么要学习底层 ProcessFuntion API
  • 为了访问 时间戳 watermark以及注册定时事件
2.Flink提供了哪些 ProcessFuntion
  • ProcessFunction
  • KeyedProcessFunction: keyBy后调用
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction: 开窗之后调用
  • ProcessAllWindowFunction
public class ProcessTest1_KeyedProcessFunction {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 测试KeyedProcessFunction,先分组然后自定义处理
        dataStream.keyBy("id")
                .process( new MyProcess() )
                .print();

        env.execute();
    }

    // 实现自定义的处理函数
    public static class MyProcess extends KeyedProcessFunction{
        ValueState tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor("ts-timer", Long.class));
        }


        @Override
        public void processElement(SensorReading value, Context ctx, Collector out) throws Exception {
            out.collect(value.getId().length());

            // context
            ctx.timestamp();
            ctx.getCurrentKey();
           ctx.output();
           // 创建定时器 定时器源码:eventTimeTimersQueue.add(new TimerHeapInternalTimer(Timer))
            ctx.timerService().currentProcessingTime();
            ctx.timerService().currentWatermark();
            ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
            tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
//            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
//            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
            System.out.println(timestamp + " 定时器触发");
            ctx.getCurrentKey();
//           ctx.output();
            ctx.timeDomain();
        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }
}

3.这些 ProcessFuntion有什么不同
  • 调用的地方不同
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663442.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号