栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

详细讲清楚Flink中使用EventTime+Watermark解决乱序问题

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

详细讲清楚Flink中使用EventTime+Watermark解决乱序问题

一般来说,以事件时间为时间语义,如果一个时间流不按事件时间递增的顺序到达Flink作业中,则称此流数据为乱序(out-of-order)数据流。下面给出一个乱序数据流的示意图

现假设所有传感器的时间是同步的,那么传感器中监测到的事件数据,理论上的顺序为1、2、3、3、5、6、7、9.但经过网络传输后到达Flink作业时的顺序为2、3、1、7、3、5、9、6,不少事件时间次序颠倒,形成乱序。其中有两个时间戳为3的事件数据,有是不同可能Key的时间戳,也可能是传感器重发了两次3.因此,在真正的分布式环境中,要想保证完全的计算准确性是非常难得。

所以我下面介绍使用EventTime和Watermark机制来解决乱序数据问题,即Flink框架通过一种机制来延迟触发窗口计算,从而可以等待一段时间,让乱序得数据全部到达,然后再触发计算,从而输出正确得结果。

无迟到的乱序数据

下面给出一个无迟到得乱序数据,如下图。这里得无迟到表示经过EventTime和Watermark机制,乱序数据都落入正确得窗口中进行计算。

代码 EventTimeWatermark.java

package com.atguigu.apitest.watermark;

import com.atguigu.apitest.example.ClickEvent;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


public class EventTimeWatermark {
    public static void main(String[] args) throws Exception {
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置周期生成Watermark间隔(3秒)
        env.getConfig().setAutoWatermarkInterval(1000*3L);
        //并行度
        env.setParallelism(1);
        //演示数据
        DataStreamSource mySource = env.fromElements(
                new ClickEvent("user1", 2L, 2),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 1L, 1),
                new ClickEvent("user1", 7L, 7),
                new ClickEvent("user1", 3L, 3),
                new ClickEvent("user1", 5L, 5),
                new ClickEvent("user1", 9L, 9),
                new ClickEvent("user1", 6L, 6),
                new ClickEvent("user1", 12L, 12),
                new ClickEvent("user1", 17L, 17),
                new ClickEvent("user1", 10L, 10),
                new ClickEvent("user1", 16L, 16),
                new ClickEvent("user1", 19L, 19),
                new ClickEvent("user1", 11L, 11),
                new ClickEvent("user1", 18L, 18),
                new ClickEvent("user1", 13L, 13),
                new ClickEvent("user1", 20L, 20)

        );
        SingleOutputStreamOperator streamDS = mySource.
                assignTimestampsAndWatermarks(new WatermarkStrategy() {
                    @Override
                    public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        //自定义非周期WatermarkGenerator
                        return new PunctuatedAssigner();
                    }
                }.withTimestampAssigner((event,recordTimestamp) ->
                        {
                            //提取时间戳,设置断点可以查看是否正确调用
                            return event.getDateTime();
                        }))
                .keyBy(event->event.getKey())
                .window(TumblingEventTimeWindows.of(Time.milliseconds(4L)))
                .process(new MyProcessWindowFunctionOrder());
                //.sum("Value")
        //结果打印
        streamDS.print();
        env.execute();
    }
    private static class PunctuatedAssigner implements
            WatermarkGenerator{
        //当有重复数值时,辅助确定个数
        int size = 0;


        @Override
        public void onEvent(ClickEvent clickEvent, long l, WatermarkOutput watermarkOutput) {
            size++;
            //每接到一个事件数据都会触发
            System.out.println("EventTime:"+clickEvent.getDateTime()+"");
            if (clickEvent.getDateTime() == 3 && size == 5){
                watermarkOutput.emitWatermark(new Watermark(4L));
                System.out.println("Watermark:4-->");
            }
            else if (clickEvent.getDateTime() == 6){
                watermarkOutput.emitWatermark(new Watermark(9L));
                System.out.println("Watermark:9-->");
            }
            else if (clickEvent.getDateTime() == 11L){
                watermarkOutput.emitWatermark(new Watermark(11L));
                System.out.println("Watermark:11-->");
            }
            else if (clickEvent.getDateTime() == 13){
                watermarkOutput.emitWatermark(new Watermark(13L));
                System.out.println("Watermark;13-->");
            }
            else if (size == 17){
                watermarkOutput.emitWatermark(new Watermark(Long.MAX_VALUE));
                //流结束
                System.out.println("Watermark:"+Long.MAX_VALUE+"-->");
                System.out.println("****************************************************");
            }
            else {
                //不发送
            }
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            //非周期无需处理
        }
    }
}

上述代码中DataStreamSource 所声明的mySource将给出了模拟的乱序数据。由于此处的乱序数据中的水位线生成规则并无明显周期规律,这里自定义了一个非周期生成水位线的PunctuatedAssigner类,它内部按照当前元素的索引来生成特定的水位线。
window(TumblingEventTimeWindows.of(Time.milliseconds(4L)))定义了一个基于事件时间的滚动窗口,其窗口大小为4毫秒
process方法调用MyProcessWindowFunctionOrder来处理乱序数据,代码如下
MyProcessWindowFunctionOrder.java

package com.atguigu.apitest.watermark;

import com.atguigu.apitest.example.ClickEvent;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.*;


public class MyProcessWindowFunctionOrder extends ProcessWindowFunction {
    @Override
    public void process(String s, Context context, Iterable iterable, Collector collector) throws Exception {
        System.out.print("Watermark="+context.currentWatermark() + ",");
        System.out.print("window:Start=" + context.window().getStart() + ",");
        System.out.print("End=" + context.window().getEnd() + ",");
        System.out.print("maxTimeStamp=" + context.window().maxTimestamp());
        System.out.println();
        System.out.println("****************************************************");
        List listClickEvent = new ArrayList();
        Iterator it = iterable.iterator();
        while (it.hasNext()){
            listClickEvent.add(it.next());
        }
        //排序
        Collections.sort(listClickEvent, new Comparator() {
            @Override
            public int compare(ClickEvent o1, ClickEvent o2) {
                //大于0表示升序,如1,2,3
                return (int) (o1.getDateTime()-o2.getDateTime());
            }
        });
        listClickEvent.forEach(clickEvent -> {
            System.out.println(clickEvent);
        });
        System.out.println("****************************************************");
        //可以返回处理结果,比如求和
        //collector.collect(...);
    }
}

其中方法process在窗口满足触发条件时触发,其中可以通过Context对象获取到窗口信息以及水位线信息等。其中的Iterable iterable代表当前窗口的所有元素,这样可以通过迭代,对数据进行各种灵活的处理。
下面给出上述程序的执行过程中的示意图,其中对于关键时间点的窗口快照进行阐述,这样更有利于升入理解Flink中水位线的工作原理。
在代码MyProcessWindowFunctionOrder.java中我创建了一个基于事件时间,窗口长度为4毫秒的滚动窗口,当事件时间为1的元素到达窗口算子时,会落入已经分配好的窗口[0,4)中,如下图所示。

由于窗口算子未收到水位线,此时并不满足窗口触发条件,因此此时窗口不触发计算逻辑
当事件时间7的元素到达窗口算子时,由于此时的事件时间不在窗口[0,4)中,因此需要再分配一个窗口[4,8),其过程示意图如下。

当水位线W(4)到达窗口算子时,那么满足W(4)大于等于窗口[0,4)的结束时间3,则满足窗口触发条件,此时可以对窗口[0,4)中的元素进行求和,即为9,如下图所示。

当事件时间9的元素到达窗口算子时,由于此时的事件时间不在窗口[4,8)中,因此需要再分配一个窗口[8,12),如下图

当水位线W(9)到达窗口算子时,那么满足W(9)大于等于窗口[4,8)的结束时间7,则满足窗口触发条件,此时可以对窗口[4,8)中的元素进行求和,即为18。其过程如下图。

当事件时间12的元素到达窗口算子时,由于此时的事件时间不在窗口[8,12)中,因此需要再分配一个窗口[12,16)。同理,当事件时间17的元素到达窗口算子时,由于此时的事件时间不在窗口[12,16)中,因此需要再分配一个窗口[16,20)。
当水位线W(11)到达窗口算子时,那么满足W(11)大于等于窗口[8,12)的结束时间11,则满足窗口触发条件,此时可以对窗口[8,12)中的元素进行求和,即为30。及具体过程如下图。

当事件时间20的元素到达窗口算子时,由于此时的事件时间不在窗口[16,20)中,因此需要再分配一个窗口[20,24)。
当水位线W(13)到达窗口算子时,当W(13)广播到三个窗口时,由于都不满足触发条件,即W(13)<23且W(13)<15且W(13)<19,因此都不触发。其过程如下图所示。

最后发送一个值为Long.MAX_VALUE的水位线,代表结束标志。即当W(Long.MAX_VALUE)广播到三个窗口时,由于都满足触发条件,即W(9223372036854775807)>=23且W(9223372036854775807)>=15且W(9223372036854775807)>=19,因此都触发。如下图所示

运行上述代码,输出结果如下

"C:Program FilesJavajdk1.8.0_181binjava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.5libidea_rt.jar=58306:C:Program FilesJetBrainsIntelliJ IDEA 2019.3.5bin" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_181jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_181jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_181jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_181jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_181jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_181jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_181jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_181jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_181jrelibextmysql-connector-java-5.1.46-bin.jar;C:Program FilesJavajdk1.8.0_181jrelibextnashorn.jar;C:Program FilesJavajdk1.8.0_181jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_181jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_181jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_181jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_181jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_181jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_181jrelibjce.jar;C:Program FilesJavajdk1.8.0_181jrelibjfr.jar;C:Program FilesJavajdk1.8.0_181jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_181jrelibjsse.jar;C:Program FilesJavajdk1.8.0_181jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_181jrelibplugin.jar;C:Program FilesJavajdk1.8.0_181jrelibresources.jar;C:Program FilesJavajdk1.8.0_181jrelibrt.jar;G:com.atguigutargetclasses;G:com.atguigulibjavax.jms.jar;G:com.atguigulibjavax.persistence.jar;G:com.atguigulibjavax.ejb.jar;G:com.atguigulibjavax.annotation.jar;G:com.atguigulibjavax.resource.jar;G:com.atguigulibjavax.transaction.jar;G:com.atguigulibjavax.servlet.jar;G:com.atguigulibjavax.servlet.jsp.jar;G:com.atguigulibjavax.servlet.jsp.jstl.jar;C:Usersnn.m2repositoryorgapacheflinkflink-java1.13.2flink-java-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-core1.13.2flink-core-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-annotations1.13.2flink-annotations-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-metrics-core1.13.2flink-metrics-core-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-shaded-asm-77.1-13.0flink-shaded-asm-7-7.1-13.0.jar;C:Usersnn.m2repositorycomesotericsoftwarekryokryo2.24.0kryo-2.24.0.jar;C:Usersnn.m2repositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;C:Usersnn.m2repositoryorgobjenesisobjenesis2.1objenesis-2.1.jar;C:Usersnn.m2repositorycommons-collectionscommons-collections3.2.2commons-collections-3.2.2.jar;C:Usersnn.m2repositoryorgapachecommonscommons-compress1.20commons-compress-1.20.jar;C:Usersnn.m2repositoryorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;C:Usersnn.m2repositoryorgapachecommonscommons-math33.5commons-math3-3.5.jar;C:Usersnn.m2repositoryorgslf4jslf4j-api1.7.15slf4j-api-1.7.15.jar;C:Usersnn.m2repositorycomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;C:Usersnn.m2repositoryorgapacheflinkforce-shading1.13.2force-shading-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-streaming-java_2.111.13.2flink-streaming-java_2.11-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-file-sink-common1.13.2flink-file-sink-common-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-runtime_2.111.13.2flink-runtime_2.11-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-queryable-state-client-java1.13.2flink-queryable-state-client-java-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-hadoop-fs1.13.2flink-hadoop-fs-1.13.2.jar;C:Usersnn.m2repositorycommons-iocommons-io2.8.0commons-io-2.8.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-shaded-netty4.1.49.Final-13.0flink-shaded-netty-4.1.49.Final-13.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-shaded-jackson2.12.1-13.0flink-shaded-jackson-2.12.1-13.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-shaded-zookeeper-33.4.14-13.0flink-shaded-zookeeper-3-3.4.14-13.0.jar;C:Usersnn.m2repositoryorgjavassistjavassist3.24.0-GAjavassist-3.24.0-GA.jar;C:Usersnn.m2repositoryorgscala-langscala-library2.11.12scala-library-2.11.12.jar;C:Usersnn.m2repositorycomtypesafeakkaakka-actor_2.112.5.21akka-actor_2.11-2.5.21.jar;C:Usersnn.m2repositorycomtypesafeconfig1.3.3config-1.3.3.jar;C:Usersnn.m2repositoryorgscala-langmodulesscala-java8-compat_2.11.7.0scala-java8-compat_2.11-0.7.0.jar;C:Usersnn.m2repositorycomtypesafeakkaakka-stream_2.112.5.21akka-stream_2.11-2.5.21.jar;C:Usersnn.m2repositoryorgreactivestreamsreactive-streams1.0.2reactive-streams-1.0.2.jar;C:Usersnn.m2repositorycomtypesafessl-config-core_2.11.3.7ssl-config-core_2.11-0.3.7.jar;C:Usersnn.m2repositoryorgscala-langmodulesscala-parser-combinators_2.111.1.1scala-parser-combinators_2.11-1.1.1.jar;C:Usersnn.m2repositorycomtypesafeakkaakka-protobuf_2.112.5.21akka-protobuf_2.11-2.5.21.jar;C:Usersnn.m2repositorycomtypesafeakkaakka-slf4j_2.112.5.21akka-slf4j_2.11-2.5.21.jar;C:Usersnn.m2repositoryorgclappergrizzled-slf4j_2.111.3.2grizzled-slf4j_2.11-1.3.2.jar;C:Usersnn.m2repositorycomgithubscoptscopt_2.113.5.0scopt_2.11-3.5.0.jar;C:Usersnn.m2repositoryorgxerialsnappysnappy-java1.1.8.3snappy-java-1.1.8.3.jar;C:Usersnn.m2repositorycomtwitterchill_2.11.7.6chill_2.11-0.7.6.jar;C:Usersnn.m2repositorycomtwitterchill-java.7.6chill-java-0.7.6.jar;C:Usersnn.m2repositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-shaded-guava18.0-13.0flink-shaded-guava-18.0-13.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-clients_2.111.13.2flink-clients_2.11-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-optimizer_2.111.13.2flink-optimizer_2.11-1.13.2.jar;C:Usersnn.m2repositorycommons-clicommons-cli1.3.1commons-cli-1.3.1.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-kafka-0.11_2.121.10.1flink-connector-kafka-0.11_2.12-1.10.1.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-kafka-0.10_2.121.10.1flink-connector-kafka-0.10_2.12-1.10.1.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-kafka-0.9_2.121.10.1flink-connector-kafka-0.9_2.12-1.10.1.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-kafka-base_2.121.10.1flink-connector-kafka-base_2.12-1.10.1.jar;C:Usersnn.m2repositoryorgapachekafkakafka-clients.11.0.2kafka-clients-0.11.0.2.jar;C:Usersnn.m2repositorynetjpountzlz4lz41.3.0lz4-1.3.0.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-kafka_2.121.13.2flink-connector-kafka_2.12-1.13.2.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-base1.13.2flink-connector-base-1.13.2.jar;C:Usersnn.m2repositoryorgapachebahirflink-connector-redis_2.111.0flink-connector-redis_2.11-1.0.jar;C:Usersnn.m2repositoryredisclientsjedis2.8.0jedis-2.8.0.jar;C:Usersnn.m2repositoryorgapachecommonscommons-pool22.3commons-pool2-2.3.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-elasticsearch7_2.121.12.1flink-connector-elasticsearch7_2.12-1.12.1.jar;C:Usersnn.m2repositoryorgapacheflinkflink-connector-elasticsearch-base_2.121.12.1flink-connector-elasticsearch-base_2.12-1.12.1.jar;C:Usersnn.m2repositoryorgelasticsearchclientelasticsearch-rest-high-level-client7.5.1elasticsearch-rest-high-level-client-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch7.5.1elasticsearch-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch-core7.5.1elasticsearch-core-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch-secure-sm7.5.1elasticsearch-secure-sm-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch-x-content7.5.1elasticsearch-x-content-7.5.1.jar;C:Usersnn.m2repositoryorgyamlsnakeyaml1.17snakeyaml-1.17.jar;C:Usersnn.m2repositorycomfasterxmljacksoncorejackson-core2.8.11jackson-core-2.8.11.jar;C:Usersnn.m2repositorycomfasterxmljacksondataformatjackson-dataformat-smile2.8.11jackson-dataformat-smile-2.8.11.jar;C:Usersnn.m2repositorycomfasterxmljacksondataformatjackson-dataformat-yaml2.8.11jackson-dataformat-yaml-2.8.11.jar;C:Usersnn.m2repositorycomfasterxmljacksondataformatjackson-dataformat-cbor2.8.11jackson-dataformat-cbor-2.8.11.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch-geo7.5.1elasticsearch-geo-7.5.1.jar;C:Usersnn.m2repositoryorgapachelucenelucene-core8.3.0lucene-core-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-analyzers-common8.3.0lucene-analyzers-common-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-backward-codecs8.3.0lucene-backward-codecs-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-grouping8.3.0lucene-grouping-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-highlighter8.3.0lucene-highlighter-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-join8.3.0lucene-join-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-memory8.3.0lucene-memory-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-misc8.3.0lucene-misc-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-queries8.3.0lucene-queries-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-queryparser8.3.0lucene-queryparser-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-sandbox8.3.0lucene-sandbox-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-spatial8.3.0lucene-spatial-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-spatial-extras8.3.0lucene-spatial-extras-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-spatial3d8.3.0lucene-spatial3d-8.3.0.jar;C:Usersnn.m2repositoryorgapachelucenelucene-suggest8.3.0lucene-suggest-8.3.0.jar;C:Usersnn.m2repositoryorgelasticsearchelasticsearch-cli7.5.1elasticsearch-cli-7.5.1.jar;C:Usersnn.m2repositorynetsfjopt-simplejopt-simple5.0.2jopt-simple-5.0.2.jar;C:Usersnn.m2repositorycomcarrotsearchhppc.8.1hppc-0.8.1.jar;C:Usersnn.m2repositoryjoda-timejoda-time2.10.3joda-time-2.10.3.jar;C:Usersnn.m2repositorycomtdunningt-digest3.2t-digest-3.2.jar;C:Usersnn.m2repositoryorghdrhistogramHdrHistogram2.1.9HdrHistogram-2.1.9.jar;C:Usersnn.m2repositoryorgelasticsearchjna4.5.1jna-4.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchclientelasticsearch-rest-client7.5.1elasticsearch-rest-client-7.5.1.jar;C:Usersnn.m2repositoryorgapachehttpcomponentshttpclient4.5.10httpclient-4.5.10.jar;C:Usersnn.m2repositoryorgapachehttpcomponentshttpcore4.4.12httpcore-4.4.12.jar;C:Usersnn.m2repositoryorgapachehttpcomponentshttpasyncclient4.1.4httpasyncclient-4.1.4.jar;C:Usersnn.m2repositoryorgapachehttpcomponentshttpcore-nio4.4.12httpcore-nio-4.4.12.jar;C:Usersnn.m2repositorycommons-codeccommons-codec1.11commons-codec-1.11.jar;C:Usersnn.m2repositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;C:Usersnn.m2repositoryorgelasticsearchpluginmapper-extras-client7.5.1mapper-extras-client-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchpluginparent-join-client7.5.1parent-join-client-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchpluginaggs-matrix-stats-client7.5.1aggs-matrix-stats-client-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchpluginrank-eval-client7.5.1rank-eval-client-7.5.1.jar;C:Usersnn.m2repositoryorgelasticsearchpluginlang-mustache-client7.5.1lang-mustache-client-7.5.1.jar;C:Usersnn.m2repositorycomgithubspullaramustachejavacompiler.9.6compiler-0.9.6.jar;C:Usersnn.m2repositoryorgapachelogginglog4jlog4j-api2.12.0log4j-api-2.12.0.jar;C:Usersnn.m2repositoryorgapachelogginglog4jlog4j-core2.12.0log4j-core-2.12.0.jar;C:Usersnn.m2repositorycomunboundidunboundid-ldapsdk4.0.9unboundid-ldapsdk-4.0.9.jar" com.atguigu.apitest.watermark.EventTimeWatermark
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
EventTime:2
EventTime:3
EventTime:1
EventTime:7
EventTime:3
Watermark:4-->
EventTime:5
EventTime:9
EventTime:6
Watermark:9-->
EventTime:12
EventTime:17
EventTime:10
EventTime:16
EventTime:19
EventTime:11
Watermark:11-->
EventTime:18
EventTime:13
Watermark;13-->
EventTime:20
Watermark:9223372036854775807-->
****************************************************
Watermark=4,window:Start=0,End=4,maxTimeStamp=3
****************************************************
ClickEvent{key='user1', DateTime=1, Value=1}
ClickEvent{key='user1', DateTime=2, Value=2}
ClickEvent{key='user1', DateTime=3, Value=3}
ClickEvent{key='user1', DateTime=3, Value=3}
****************************************************
Watermark=9,window:Start=4,End=8,maxTimeStamp=7
****************************************************
ClickEvent{key='user1', DateTime=5, Value=5}
ClickEvent{key='user1', DateTime=6, Value=6}
ClickEvent{key='user1', DateTime=7, Value=7}
****************************************************
Watermark=11,window:Start=8,End=12,maxTimeStamp=11
****************************************************
ClickEvent{key='user1', DateTime=9, Value=9}
ClickEvent{key='user1', DateTime=10, Value=10}
ClickEvent{key='user1', DateTime=11, Value=11}
****************************************************
Watermark=9223372036854775807,window:Start=12,End=16,maxTimeStamp=15
****************************************************
ClickEvent{key='user1', DateTime=12, Value=12}
ClickEvent{key='user1', DateTime=13, Value=13}
****************************************************
Watermark=9223372036854775807,window:Start=16,End=20,maxTimeStamp=19
****************************************************
ClickEvent{key='user1', DateTime=16, Value=16}
ClickEvent{key='user1', DateTime=17, Value=17}
ClickEvent{key='user1', DateTime=18, Value=18}
ClickEvent{key='user1', DateTime=19, Value=19}
****************************************************
Watermark=9223372036854775807,window:Start=20,End=24,maxTimeStamp=23
****************************************************
ClickEvent{key='user1', DateTime=20, Value=20}
****************************************************

Process finished with exit code 0

从输出结果来看,每个窗口中的元素与分析的过程中示意图一致,符合预期设想。且示例代码中已经将数据进行重新排序,并按从小到大的顺序进行排列。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号