栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink定时器

大数据之flink定时器

一、ProcessFunction的使用

1、没有进行keyBy

package cn._51doit.flink.day07;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;


public class NonKeyedProcessFunction {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //对NonKeyedDataStream调用Process
        SingleOutputStreamOperator> wordAndOne = lines.process(new ProcessFunction>() {

            @Override
            public void processElement(String line, Context ctx, Collector> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if (!word.equals("error")) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            }



        });

        wordAndOne.print();

        env.execute();


    }
}

2、有keyBy

package cn._51doit.flink.day07;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;


public class KeyedProcessFunctionDemo {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

        //创建DataStream
        //Source
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator> tpDataStream = lines.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });

        KeyedStream, String> keyedStream = tpDataStream.keyBy(t -> t.f0);

        //对KeyedDataStream调用process方法,可以获取KeyedState
        SingleOutputStreamOperator> result = keyedStream.process(new KeyedProcessFunction, Tuple3>() {

            private transient MapState mapState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态描述器
                MapStateDescriptor stateDescriptor = new MapStateDescriptor("kv-state", String.class, Double.class);
                //初始化或恢复历史状态
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple3 value, Context ctx, Collector> out) throws Exception {
                String city = value.f1;
                Double money = value.f2;
                Double historyMoney = mapState.get(city);
                if (historyMoney == null) {
                    historyMoney = 0.0;
                }
                Double totalMoney = historyMoney + money; //累加
                //更新到state中
                mapState.put(city, totalMoney);
                //输出
                value.f2 = totalMoney;
                out.collect(value);
            }
        });

        result.print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

二、定时器

1、基本使用

package cn._51doit.flink.day07;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ProcessingTimeTimerDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,1
        //spark,2
        //hadoop,1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator> wordAndCount = lines.map(new MapFunction>() {
            @Override
            public Tuple2 map(String line) throws Exception {
                String[] fields = line.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        keyed.process(new KeyedProcessFunction, Tuple2>() {

            @Override
            public void processElement(Tuple2 value, Context ctx, Collector> out) throws Exception {
                //获取当前的ProcessingTime
                long currentProcessingTime = ctx.timerService().currentProcessingTime();

                //System.out.println("当前时间:" + currentProcessingTime + ",定时器触发的时间:" + (currentProcessingTime + 30000));

                //将当前的ProcessingTime + 30 秒,注册一个定时器
                ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 30000);
            }

            //当闹钟到了指定的时间,就执行onTimer方法
            @Override
            public void onTimer(long timestamp, onTimerContext ctx, Collector> out) throws Exception {
                System.out.println("定时器执行了:" + timestamp);
            }
        }).print();

        env.execute();

    }
}

2、先把数据攒起来,满足条件了再输出

package cn._51doit.flink.day07;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ProcessingTimeTimerDemo02 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,1
        //spark,2
        //hadoop,1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator> wordAndCount = lines.map(new MapFunction>() {
            @Override
            public Tuple2 map(String line) throws Exception {
                String[] fields = line.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        keyed.process(new KeyedProcessFunction, Tuple2>() {

            private transient ValueState counter;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
                counter = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple2 value, Context ctx, Collector> out) throws Exception {
                //获取当前的ProcessingTime

                long currentProcessingTime = ctx.timerService().currentProcessingTime();
                long fireTime = currentProcessingTime - currentProcessingTime % 60000 + 60000;
                //下一分钟
                //如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次
                ctx.timerService().registerProcessingTimeTimer(fireTime);

                Integer currentCount = value.f1;
                Integer historyCount = counter.value();
                if(historyCount == null) {
                    historyCount = 0;
                }
                Integer totalCount =  historyCount + currentCount;
                //更新状态
                counter.update(totalCount);

            }

            //当闹钟到了指定的时间,就执行onTimer方法
            @Override
            public void onTimer(long timestamp, onTimerContext ctx, Collector> out) throws Exception {
                //定时器触发,输出当前的结果
                Integer value = counter.value();
                String currentKey = ctx.getCurrentKey();
                //输出key,Value
                //如果想要实现类似滚动窗口,不累加类似数据,只是累加当前窗口的数据,就清空状态
                //counter.update(0);
                out.collect(Tuple2.of(currentKey, value));
            }
        }).print();

        env.execute();

    }
}

三、测流输出 / 旁路输出

1、获取不同类型的数据,打上不同的标签

package cn._51doit.flink.day07;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //奇数
        OutputTag oddOutputTag = new OutputTag("odd") {};
        //偶数
        OutputTag evenOutputTag = new OutputTag("even") {};
        //非数字
        OutputTag nanOutputTag = new OutputTag("nan") {};

        SingleOutputStreamOperator mainStream = lines.process(new ProcessFunction() {

            @Override
            public void processElement(String value, Context ctx, Collector out) throws Exception {
                try {
                    int i = Integer.parseInt(value);
                    if (i % 2 == 0) {
                        //偶数
                        ctx.output(evenOutputTag, value);
                    } else {
                        //奇数
                        ctx.output(oddOutputTag, value);
                    }
                } catch (NumberFormatException e) {
                    ctx.output(nanOutputTag, value);
                }
                //在主流中输出全部的数据
                out.collect(value);
            }
        });

        //偶数
        DataStream evenStream = mainStream.getSideOutput(evenOutputTag);

        //奇数
        DataStream oddStream = mainStream.getSideOutput(oddOutputTag);

        oddStream.print("odd: ");

        evenStream.print("even: ");

        mainStream.print("main: ");

        env.execute();
    }
}

2、使用侧流输出获取窗口迟到的数据

package cn._51doit.flink.day07;

import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

//使用侧流输出获取窗口迟到的数据
public class WindowLateDateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //1000,spark,3
        //1200,spark,5
        //2000,hadoop,2
        //socketTextStream返回的DataStream并行度为1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));


        SingleOutputStreamOperator> wordAndCount = dataWithWaterMark.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //调用keyBy
        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        OutputTag> lateDataTag = new OutputTag>("late-data") {
        };

        //NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger
        // Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger
        WindowedStream, String, TimeWindow> windowed = keyed
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));
				.sideOutputLataData(lateDataTag); //将迟到数据打上标签

        SingleOutputStreamOperator> summed = windowed.sum(1);
		
		summed.print();
		
		DataStream> lataDataStream=summed.getSideOutput(lateDataTag); //从主流当中获取迟到数据

        lataDataStream.print("lata-data: ");

        env.execute();

    }

}

四、WindowFunction 使用

1、窗口内增量聚合,且与历史数据聚合

package cn._51oit.flink.day07;

import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

//使用侧流输出获取窗口迟到的数据
public class WindowLateDateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //1000,spark,3
        //1200,spark,5
        //2000,hadoop,2
        //socketTextStream返回的DataStream并行度为1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));


        SingleOutputStreamOperator> wordAndCount = dataWithWaterMark.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //调用keyBy
        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        //NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger
        //Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger
        WindowedStream, String, TimeWindow> windowed = keyed
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加
        //需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合
        SingleOutputStreamOperator> result = windowed.reduce(new MyReduceFunc(), new MyWindowFunc());

        result.print();

        env.execute();

    }

    public static class MyReduceFunc implements ReduceFunction> {

        @Override
        public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
            value1.f1 = value1.f1 + value2.f1;
            return value1;
        }
    }

    public static class MyWindowFunc extends ProcessWindowFunction, Tuple2, String, TimeWindow> {

        private transient ValueState sumState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
            sumState = getRuntimeContext().getState(stateDescriptor);
        }

        @Override
        public void process(String key, Context context, Iterable> elements, Collector> out) throws Exception {

            Integer historyCount = sumState.value();
            if (historyCount == null) {
                historyCount = 0;
            }
            //获取到窗口聚合后输出的结果
            Tuple2 tp = elements.iterator().next();
            Integer windowCount = tp.f1;
            Integer totalCount = historyCount + windowCount;
            //更新状态
            sumState.update(totalCount);
            tp.f1 = totalCount;
            //输出
            out.collect(tp);
        }
    }
}

2、aggregate结合WindowFunction

package cn._51doit.flink.day07;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

//累加当前窗口的数据,并与历史数据进行累加
public class WindowAggregateFunctionDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.enableCheckpointing(10000);
        //1000,spark,3
        //1200,spark,5
        //2000,hadoop,2
        //socketTextStream返回的DataStream并行度为1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));


        SingleOutputStreamOperator> wordAndCount = dataWithWaterMark.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //调用keyBy
        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        OutputTag> lateDataTag = new OutputTag>("late-data") {
        };

        //NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger
        // Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger
        WindowedStream, String, TimeWindow> windowed = keyed
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加
        //需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合
        SingleOutputStreamOperator> result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());

        result.print();

        env.execute();

    }


    private static class MyAggFunc implements AggregateFunction, Integer, Integer> {

        //创建一个初始值
        @Override
        public Integer createAccumulator() {
            return 0;
        }

        //数据一条数据,与初始值或中间累加的结果进行聚合
        @Override
        public Integer add(Tuple2 value, Integer accumulator) {
            return value.f1 + accumulator;
        }

        //返回的结果
        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        //如果使用的是非SessionWindow,可以不实现
        @Override
        public Integer merge(Integer a, Integer b) {
            return null;
        }
    }


    private static class MyWindowFunc extends ProcessWindowFunction, String, TimeWindow> {

        private transient ValueState sumState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
            sumState = getRuntimeContext().getState(stateDescriptor);
        }

        @Override
        public void process(String key, Context context, Iterable elements, Collector> out) throws Exception {

            Integer historyCount = sumState.value();
            if (historyCount == null) {
                historyCount = 0;
            }
            //获取到窗口聚合后输出的结果
            Integer windowCount = elements.iterator().next();
            Integer totalCount = historyCount + windowCount;
            //更新状态
            sumState.update(totalCount);

            //输出
            out.collect(Tuple2.of(key, totalCount));
        }
    }
}

3、ProcessWindowFunction使用

package cn._51doit.flink.day07;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

//累加当前串口的数据,并与历史数据进行累加
public class WindowProcessFunctionDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.enableCheckpointing(10000);
        //1000,spark,3
        //1200,spark,5
        //2000,hadoop,2
        //socketTextStream返回的DataStream并行度为1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));


        SingleOutputStreamOperator> wordAndCount = dataWithWaterMark.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //调用keyBy
        KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

        //NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger
        // Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger
        WindowedStream, String, TimeWindow> windowed = keyed
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加
        //需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合
        SingleOutputStreamOperator> result = windowed.process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {
            //窗口触发,才会调用process方法,该方法可以获取窗口内的全量获取窗口的数据,数据是缓存到windowstate中的
            @Override
            public void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {

                for (Tuple2 element : elements) {
                    out.collect(element);
                }
            }
        });

        result.print();

        env.execute();

    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761339.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号