栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink窗口大小怎么确定的,开始时间是时候?

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink窗口大小怎么确定的,开始时间是时候?

面试官:Flink窗口大小怎么确定的,开始时间是什么时候,结束是什么时候?

这篇文章会很短,但是我认为很有意义,今天晚上微醺了,纯手敲的。

我还记得的在我刚学习flink的时候,B站的老师说过,Flink窗口的开始时间和结束时间和你想的不一样。那个时候我好像记得老师说过,flink的窗口大小会根据你的时间单位来进行修正。

然后在现如今,很多人还是不是很了解窗口机制,以及watermark。更别提什么窗口什么时候,什么时候结束。所以呢,今天从源码角度给大家普及一下窗口什么时候开始,什么时候结束。

我们可以来编写一个简单的代码,来看一下效果,我习惯用java来写flink,所以也就使用java了。

	@Override
	public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		if (timestamp > Long.MIN_VALUE) {
			List windows = new ArrayList<>((int) (size / slide));
      //获取窗口开始时间
			long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
			for (long start = lastStart;
				start > timestamp - size;
				start -= slide) {
				windows.add(new TimeWindow(start, start + size));
			}
			return windows;
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}
	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    //窗口开始计算的时间
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}

我们可以看出来窗口开始时间, 是取模过后的时间,我们来简单的分析一番。

假如我们第一条数据的时间戳是1000,offset暂时不需要管,因为他是时间的偏移量例如,东八区什么的。我们假如窗口大小是5s,

那么接下来的公式计算也就是 1000 - (1000 - 0 + 5000)%5000,那么我们可以计算出来的结果就是0,也就是说,开始的时间窗口是0.更大的时间窗口大小,各位大佬可以下面自己算一下。

也就是说开始时间是 0,结束的时间窗口也就是4999,因为到5000的时候就触发计算了。那么我们接下来就进行验证一番和我们分析的是否一致。

接下来我们写一个简单的wordcount,因为在多并行度下,不是很好分析,我们设置为单并行读。如果有对watermark还不是很理解的大佬,可以看我的这篇文章,https://blog.csdn.net/weixin_43704599/article/details/117411252

public class WindowSizeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource source = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator source1 = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.of(0, TimeUnit.MILLISECONDS)) {
            @Override
            public long extractTimestamp(String s) {

                return Long.parseLong(s.split(",")[0]);
            }
        });

        SingleOutputStreamOperator> map = source1.map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                return Tuple2.of(s.split(",")[1],1);
            }
        });

        WindowedStream, Tuple, TimeWindow> window = map.keyBy(0).window(TumblingEventTimeWindows.of(Time.of(5000, TimeUnit.MILLISECONDS)));
        window.sum(1).print();
        env.execute();
    }
}

接下来我们,就看看我们分析的是否正确

很明显是正确的。那么一天的窗口大小你会计算吗?

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/709209.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号