准备
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerJobListener(new JobListener() {
@Override
public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
Logger.getLogger("test").info("onJobSubmitted");
}
@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
Logger.getLogger("test").info("onJobExecuted");
}
});
1. 运算
1)map(MapFunction
输入一个数据,输出一个数据,中间可以做任意变换,下面例子中输入流是TestObj类型,最终输出的是String类型
ListtestObjs=new ArrayList<>(); testObjs.add(new TestObj(1,"苹果,梨")); testObjs.add(new TestObj(2,"柚子,橘子")); testObjs.add(new TestObj(3,"猫,虎")); testObjs.add(new TestObj(4,"狗,狼")); DataStream data=env.fromCollection(testObjs); data.map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getValue(); } }).print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
结果
7> 苹果,梨
8> 柚子,橘子
1> 猫,虎
2> 狗,狼
>号前面的7、8、1、2是子任务序号,后面是输出结果,这里输出了testObj的value值
MapFunction
Function的子接口,泛型T是输入值类型,O是输出值类型,包含唯一方法 O map(T var1) throws Exception,输入T类型数据返回O类型数据,中间可以自定义处理。
2)flatMap(FlatMapFunction
输入一个数据,输出一个或多个数据,下面例子是把testObj的value值用逗号分割后存入集合,最终输出了2倍的数据
data.flatMap(new FlatMapFunction() { @Override public void flatMap(TestObj testObj, Collector collector) throws Exception { String[] ss=testObj.getValue().split(","); for (String s:ss){ collector.collect(testObj.getKey()+":"+s); } } }).print();
结果:
4> 1:苹果
6> 3:猫
7> 4:狗
6> 3:虎
5> 2:柚子
5> 2:橘子
4> 1:梨
7> 4:狼
冒号前面的数字是testObj的key值,相同key的两个数据来自于一个初始数据,可以看到一个初始数据的testObj依然是一个子任务执行的
FlatMapFunction
Function的子接口,泛型T是输入值类型,O是输出值类型,包含唯一方法 void flatMap(T var1, Collector
3)filter(FilterFunction
输入一个数据,根据自定义判断是否保留该数据,下面例子数据key值为2的倍数的testObj,为了方便看结果用map做了一下转换,不然输出的是object地址。
data.filter(new FilterFunction() { @Override public boolean filter(TestObj testObj) throws Exception { return testObj.getKey()%2==0; } }).map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
结果:
1> 2:柚子,橘子
3> 4:狗,狼
仅输出了key为2和4的数据
FilterFunction
Function的子接口,泛型T是输入值类型,输出值布尔类型,包含唯一方法 boolean filter(T var1) throws Exception,返回true保留数据,false去掉数据。
4)assignTimestampsAndWatermarks(WatermarkStrategy
水位线,一般用于处理乱序事件
data.assignTimestampsAndWatermarks(new WatermarkStrategy() { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator () { @Override public void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) { Logger.getLogger("test").info("onEvent: "+testObj.getKey()); //时间触发时执行,检查并记忆时间戳或生成watermark } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { Logger.getLogger("test").info("onPeriodicEmit: "); //周期性执行,可能会生成新的Watermark } }; } }).map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
九月 30, 2021 4:17:03 下午 com.test.flink.Test$1 onJobSubmitted
信息: onJobSubmitted
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 1
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 2
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 3
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 4
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onPeriodicEmit
信息: onPeriodicEmit:
8> 4:狗,狼
6> 2:柚子,橘子
7> 3:猫,虎
5> 1:苹果,梨
九月 30, 2021 4:17:03 下午 com.test.flink.Test$1 onJobExecuted
信息: onJobExecuted
可以看出会给每个数据都打上一个watermark
5) process(ProcessFunction
ProcessFunction比FlatMapFunction多了一个Context参数,context可以获得时间戳和watermark,当然前提是之前设置过,不然返回null
data.assignTimestampsAndWatermarks(new WatermarkStrategy() { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator () { @Override public void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) { } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { } }; } }).process(new ProcessFunction () { @Override public void processElement(TestObj testObj, Context context, Collector collector) throws Exception { long ts = context.timestamp(); long cpt = context.timerService().currentProcessingTime(); long cw = context.timerService().currentWatermark(); collector.collect(testObj.getKey()+":"+ts+"-"+cpt+"-"+cw); } }).print();
输出
1> 4:-9223372036854775808-1633654899323--9223372036854775808
6> 1:-9223372036854775808-1633654899323--9223372036854775808
7> 2:-9223372036854775808-1633654899324--9223372036854775808
8> 3:-9223372036854775808-1633654899323--9223372036854775808
2. 分区
1)keyBy(KeySelector
data.keyBy(v->v.getKey()).print(); data.keyBy(TestObj::getKey).print(); data.keyBy(new KeySelector() { @Override public Integer getKey(TestObj testObj) throws Exception { return testObj.getKey(); } }).print();
这几种写法都是一个意思,大概翻了一下源码,好像是用key分了个区。
DataSream.class
publicKeyedStream keyBy(KeySelector key) { Preconditions.checkNotNull(key); return new KeyedStream(this, (KeySelector)this.clean(key)); } protected F clean(F f) { return this.getExecutionEnvironment().clean(f); }
KeyedStream.class
public KeyedStream(DataStreamdataStream, KeySelector keySelector) { this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType())); } public KeyedStream(DataStream dataStream, KeySelector keySelector, TypeInformation keyType) { this(dataStream, new PartitionTransformation(dataStream.getTransformation(), new KeyGroupStreamPartitioner(keySelector, 128)), keySelector, keyType); } @Internal KeyedStream(DataStream stream, PartitionTransformation partitionTransformation, KeySelector keySelector, TypeInformation keyType) { super(stream.getExecutionEnvironment(), partitionTransformation); this.keySelector = (KeySelector)this.clean(keySelector); this.keyType = this.validateKeyType(keyType); }
PartitionTransformation.class 的官方说明
This transformation represents a change of partitioning of the input elements.
这个transformation代表输入数据的分区变化
This does not create a physical operation, it only affects how upstream operations are connected to downstream operations.
它不会生成一个实际上的算子,仅影响上游算子如何连接下游算子
也就是说keyBy本身其实没有执行运算,因此如果把后面的print()去掉,仅保留一个keyBy是不能执行的,会报错:java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. 但是map等方法是可以的。
稍微修改一下数据
ListtestObjs=new ArrayList<>(); testObjs.add(new TestObj(1,"苹果,梨")); testObjs.add(new TestObj(1,"柚子,橘子")); testObjs.add(new TestObj(3,"猫,虎")); testObjs.add(new TestObj(3,"狗,狼")); DataStream data=env.fromCollection(testObjs); data.keyBy(new KeySelector () { @Override public Integer getKey(TestObj testObj) throws Exception { return testObj.getKey(); } }).map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
8> 3:猫,虎
6> 1:苹果,梨
8> 3:狗,狼
6> 1:柚子,橘子
可以看出分区之后同一个分区的数据由一个子任务执行
keyBy方法返回的是KeyedSream,DataStream的一个子类,可以使用DataSteam除了分区方法之外的所有方法。
2)forward()
直接保留上游的分区,下面例子先用keyBy分区,然后运算,forward()之后再次运算
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).forward().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
8> 3:猫,虎
8> 3:狗,狼
6> 1:苹果,梨
6> 1:柚子,橘子
依然是两两一组
3)rebalence()
把上游数据循环分区到下游
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).rebalance().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
1> 3:狗,狼
7> 1:柚子,橘子
8> 3:猫,虎
6> 1:苹果,梨
重新分配使用了不同的子任务
4)shuffle()
随机分配上游数据到下游
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).shuffle().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
7> 3:树袋熊
8> 1:苹果,梨
8> 1:葡萄
8> 3:猫,虎
8> 3:羊,牛
5> 1:柚子,橘子
5> 3:狗,狼
为了看出随机效果加了些数据,key还是1,3两个
5)rescale()
把上游分区数据分别循环到下游分区中
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).rescale().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print().setParallelism(4);
输出
1> 1:柚子,橘子
2> 1:葡萄
4> 1:苹果,梨
1> 3:猫,虎
2> 3:狗,狼
3> 3:羊,牛
4> 3:树袋熊
keyBy之后是两个分区,每个分区分别有3个和4个数据,之后设置为四个分区,用rescale重分区后,之前的两个分区各自循环到新分区上
6)global()
上游所有数据分配到下游第一个分区
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).global().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
1> 1:苹果,梨
1> 1:柚子,橘子
1> 1:葡萄
1> 3:猫,虎
1> 3:狗,狼
1> 3:羊,牛
1> 3:树袋熊
7)broadcast()
将上游的数据分配到下游的每个分区上
data.keyBy(TestObj::getKey).map(new MapFunction() { @Override public TestObj map(TestObj testObj) throws Exception { return testObj; } }).broadcast().map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print().setParallelism(3);
输出
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:羊,牛
3> 1:柚子,橘子
3> 3:羊,牛
3> 1:柚子,橘子
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:羊,牛
3> 1:柚子,橘子
1> 3:狗,狼
3> 3:狗,狼
3> 1:苹果,梨
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:狗,狼
2> 1:苹果,梨
2> 3:狗,狼
2> 1:苹果,梨
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:狗,狼
2> 1:苹果,梨
2> 3:猫,虎
2> 3:树袋熊
2> 1:葡萄
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
1> 1:苹果,梨
1> 3:狗,狼
1> 1:苹果,梨
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
1> 3:狗,狼
1> 1:苹果,梨
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
2> 3:羊,牛
2> 1:柚子,橘子
1> 3:羊,牛
1> 1:柚子,橘子
1> 3:狗,狼
1> 1:苹果,梨
8)partitionCustom(Partitioner
自定义分区,改了下数据,下面例子是按照key值对2取余进行分区的
ListtestObjs=new ArrayList<>(); testObjs.add(new TestObj(1,"苹果,梨")); testObjs.add(new TestObj(2,"柚子,橘子")); testObjs.add(new TestObj(3,"猫,虎")); testObjs.add(new TestObj(4,"狗,狼")); testObjs.add(new TestObj(5,"羊,牛")); testObjs.add(new TestObj(6,"葡萄")); testObjs.add(new TestObj(7,"树袋熊")); DataStream data=env.fromCollection(testObjs); data.partitionCustom(new Partitioner () { @Override public int partition(Integer integer, int i) { return integer%2; } },TestObj::getKey).map(new MapFunction () { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).print();
输出
1> 2:柚子,橘子
1> 4:狗,狼
1> 6:葡萄
2> 1:苹果,梨
2> 3:猫,虎
2> 5:羊,牛
2> 7:树袋熊
最终奇数在一个分区,偶数在一个分区
3. 输出
1)print()
已经用过很多次了,输出到控制台
2)writeToSocket(String hostName, int port, SerializationSchema
输出到特定地址
hostName - 主机地址
port - 端口
schema - 序列化方法
3)addSink(SinkFunction
输出到其他位置
data.map(new MapFunction() { @Override public String map(TestObj testObj) throws Exception { return testObj.getKey()+":"+testObj.getValue(); } }).addSink(new SinkFunction () { @Override public void invoke(String value, Context context) throws Exception { //实际输出方法 //e.g. 输出到文件 FileUtils.writeFileUtf8(file,value); } });
4. 剩下的window学完再来补



