栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 高阶编程:定时器

Flink 高阶编程:定时器

基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行。

Context 和 onTimerContext 所持有的 TimerService 对象拥有以下方法:

  • currentProcessingTime(): Long 返回当前处理时间
  • currentWatermark(): Long 返回当前 watermark 的时间戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
  • registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时 器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。 如果没有这个时间戳的定时器,则不执行。
  • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如 果没有此时间戳的定时器,则不执行。

总结下来:定时器主要方法:获取当前时间、注册定时器、删除定时器

因此定时器的应用就被分为两部分:基于处理时间的定时器和基于事件时间的定时器。

案例(基于处理时间):同一个传感器,若 10s 内水位连续上升,则报警,正常数据正常输出,报警数据从侧输出流输出。

package day07;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Flink08_Case_Timer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator result = env.socketTextStream("localhost", 1111)
                .map(new MapFunction() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] els = value.split(" ");
                        return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
                    }
                })
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction() {
                    // 上一次水位线
                    private ValueState lastVc;
                    // 定时器定的时间
                    private ValueState timerTs;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        lastVc = getRuntimeContext().getState(new ValueStateDescriptor("Vc", Double.class));
                        timerTs = getRuntimeContext().getState(new ValueStateDescriptor("Ts", Long.class));
                    }

                    @Override
                    public void processElement(WaterSensor value, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
                        // 如果水位线状态为null 或者 大于这次水位线正常输出
                        if (lastVc.value() == null || lastVc.value() >= value.getVc()) {
                            out.collect(value.toString());
                            // 尝试删除定时器
                            if (timerTs.value() != null) {
                                ctx.timerService().deleteProcessingTimeTimer(timerTs.value());
                                System.out.println("定时器" + timerTs.value() + "删除");
                            }
                        } else {
                            // 水位线升高了
                            timerTs.update(System.currentTimeMillis() + 10000);
                            ctx.timerService().registerProcessingTimeTimer(timerTs.value());
                            System.out.println("定时器" + timerTs.value() + "创建");
                        }
                        // 更新水位状态
                        lastVc.update(value.getVc());
                    }

                    @Override
                    public void onTimer(long timestamp, KeyedProcessFunction.onTimerContext ctx, Collector out) throws Exception {
                        ctx.output(new OutputTag("告警数据") {
                        }, ctx.getCurrentKey());
                    }
                });

        result.print("正常数据");

        result.getSideOutput(new OutputTag("告警数据") {
        }).print("告警数据");

        env.execute();
    }
}

输出结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281599.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号