自定义带超时时间的计数窗口触发器
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class CountTriggerWithTimeoutextends Trigger { private int maxCount; private TimeCharacteristic timeType; private ReducingStateDescriptor countStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE); public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) { this.maxCount = maxCount; this.timeType = timeType; } private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState countState = ctx.getPartitionedState(countStateDescriptor); countState.add(1L); if (countState.get() >= maxCount) { return fireAndPurge(window, ctx); } if (timestamp >= window.getEnd()) { return fireAndPurge(window, ctx); } else { return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.ProcessingTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { return fireAndPurge(window, ctx); } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.EventTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { return fireAndPurge(window, ctx); } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ReducingState countState = ctx.getPartitionedState(countStateDescriptor); countState.clear(); } class Sum implements ReduceFunction { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }
使用案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//设置窗口时间类型为系统时间,否则会报错
envStream.keyBy(t-> t)
.timeWindow(Time.of(5, TimeUnit.SECONDS))//设置超时时间为5秒
.trigger(new CountTriggerWithTimeout(500, TimeCharacteristic.ProcessingTime))//窗口数据条数为500条
.process(new CdcMyProcessWindowFunction())//对窗口内的数据类型做一下转换,否则不能调Sink
.addSink()
public class CdcMyProcessWindowFunction extends ProcessWindowFunction, String, TimeWindow> { @Override public void process(String s, Context context, Iterable elements, Collector > out) throws Exception { ArrayList
sqlArr = Lists.newArrayList(elements); if (sqlArr.size() > 0) { out.collect(sqlArr); sqlArr.clear(); } } }



