数据:
sensor_1,1547718199,35.8 sensor_1,1547718199,23.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_7,1547718202,3.7 sensor_10,1547718205,38.1 sensor_10,1543228205,32.1 sensor_10,1147248205,40.1
1:时间滚动窗口,聚合函数,全窗口函数
package com.atguigu.window;
import com.atguigu.bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;
public class TimeWindow01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);
DataStream inputStream = socketTextStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
SingleOutputStreamOperator res = inputStream.keyBy("id")
// .countWindow(10,2); //参数:窗口大小,时间步长
// .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
// .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
.timeWindow(Time.seconds(10)) //参数:窗口大小,时间步长
.aggregate(new AggregateFunction() {
@Override
public Integer createAccumulator() { //创建一个累加器,初始值
return 0;
}
@Override
public Integer add(SensorReading value, Integer accumulator) { //累加规则
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) { //结果
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) { //合并
return a + b;
}
});
SingleOutputStreamOperator> apply = inputStream.keyBy("id")
.timeWindow(Time.seconds(10))
.apply(new WindowFunction, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector> out) throws Exception {
String id = tuple.getField(0);
Long windowEndTime =window.getEnd();
int size = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEndTime,size));
}
});
apply.print();
env.execute();
}
}
1:计数滚动窗口,聚合函数
package com.atguigu.window;
import com.atguigu.bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class CountWindow01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);
DataStream inputStream = socketTextStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
SingleOutputStreamOperator aggregate = inputStream.keyBy("id")
.countWindow(5, 2)
.aggregate(new AggregateFunction, Double>() {
@Override
public Tuple2 createAccumulator() {
return new Tuple2<>(0.0, 0);
}
@Override
public Tuple2 add(SensorReading value, Tuple2 accumulator) {
return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2 accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2 merge(Tuple2 a, Tuple2 b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
});
aggregate.print();
env.execute();
}
}
其它可选 API
•
.trigger() —— 触发器
➢
定义 window 什么时候关闭,触发计算并输出结果
•
.evictor() —— 移除器
➢
定义移除某些数据的逻辑
•
.allowedLateness() —— 允许处理迟到的数据,到时间先输出聚合结果,然后迟到数据来一个计算一次
•
.sideOutputLateData() —— 将迟到的数据放入侧输出流
•
.getSideOutput() —— 获取侧输出流
SingleOutputStreamOperator sumStream = inputStream.keyBy("id")
.timeWindow(Time.seconds(10))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(new OutputTag("late") {
})
.sum("temperature");
//获取侧输出流
DataStream



