基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行。
Context 和 onTimerContext 所持有的 TimerService 对象拥有以下方法:
- currentProcessingTime(): Long 返回当前处理时间
- currentWatermark(): Long 返回当前 watermark 的时间戳
- registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
- registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时 器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。 如果没有这个时间戳的定时器,则不执行。
- deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如 果没有此时间戳的定时器,则不执行。
总结下来:定时器主要方法:获取当前时间、注册定时器、删除定时器
因此定时器的应用就被分为两部分:基于处理时间的定时器和基于事件时间的定时器。
案例(基于处理时间):同一个传感器,若 10s 内水位连续上升,则报警,正常数据正常输出,报警数据从侧输出流输出。
package day07;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Flink08_Case_Timer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator result = env.socketTextStream("localhost", 1111)
.map(new MapFunction() {
@Override
public WaterSensor map(String value) throws Exception {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}
})
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction() {
// 上一次水位线
private ValueState lastVc;
// 定时器定的时间
private ValueState timerTs;
@Override
public void open(Configuration parameters) throws Exception {
lastVc = getRuntimeContext().getState(new ValueStateDescriptor("Vc", Double.class));
timerTs = getRuntimeContext().getState(new ValueStateDescriptor("Ts", Long.class));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
// 如果水位线状态为null 或者 大于这次水位线正常输出
if (lastVc.value() == null || lastVc.value() >= value.getVc()) {
out.collect(value.toString());
// 尝试删除定时器
if (timerTs.value() != null) {
ctx.timerService().deleteProcessingTimeTimer(timerTs.value());
System.out.println("定时器" + timerTs.value() + "删除");
}
} else {
// 水位线升高了
timerTs.update(System.currentTimeMillis() + 10000);
ctx.timerService().registerProcessingTimeTimer(timerTs.value());
System.out.println("定时器" + timerTs.value() + "创建");
}
// 更新水位状态
lastVc.update(value.getVc());
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction.onTimerContext ctx, Collector out) throws Exception {
ctx.output(new OutputTag("告警数据") {
}, ctx.getCurrentKey());
}
});
result.print("正常数据");
result.getSideOutput(new OutputTag("告警数据") {
}).print("告警数据");
env.execute();
}
}
输出结果:



