增大分区、提高并行度、解决数据倾斜。
分区元素会随机均匀分发到下游分区,网络开销较大。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorconsole result: 2.rebalance 场景:source = env.addSource(new xxxSourceFunction()); source.shuffle.print();
增大分区、提高并行度、解决数据倾斜。
轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销较大。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorconsole result: 3.rescale 场景:source = env.addSource(new xxxSourceFunction()); source.rebalance.print();
减少分区、防止发生大量网络传输、不会发生全量的重分区。
轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合而不是一个个元素。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorconsole result: 4.broadcast 场景:source = env.addSource(new xxxSourceFunction()); source.process(xxx).setParallelism(2); source.rescale.process(xxx).setParallelism(4);
需要使用映射表,且映射表会经常发生变动的场景。
上游中每一个元素内容广播到下游每一个分区中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorconsole result: 5.global 场景:source = env.addSource(new xxxSourceFunction()); source.process(xxx).setParallelism(2); source.broadcast.process(xxx).setParallelism(4);
并行度降为1。
上游分区的数据只发给下游的第一个分区。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorconsole result: 6.forward 场景:source = env.addSource(new xxxsourceFunction()); source.process(xxx).setParallelism(2); source.global.process(xxx).setParallelism(4);
一对一的数据分发,map、flatMap、filter等都是这种分区策略。
上游分区数据分发到下游对应分区中。
demo:必须保证上下游分区数(并行度)一致,不然会抛出如下异常:
Forward partitioning does not allow change of parallelism
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator7.keyBy 场景:source = env.addSource(new xxxSourceFunction()); source.process(xxx).setParallelism(2); source.forward.process(xxx).setParallelism(4);
与业务场景匹配。
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator8.PartitionCustom 场景:source = env.addSource(new xxxSourceFunction()); source.process(xxx).setParallelism(2); source.keyBy(0).process(xxx).setParallelism(4);
通过自定义的分区器,来决定元素使如何从上游分区分发到下游分区。
demo:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperatorsource = env.addSource(new xxxSourceFunction()); source.process(xxx).setParallelism(2); source.partitionCustom(new CustomPartitioner(), 0).process(xxx).setParallelism(4); class customPartitioner implements Partitioner { @Override public int partition(Long aLong, int numPartitions) { return aLong.intValue() % numPartitions; } }



