栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink分区

大数据之flink分区

一、分区

1、rebalance轮询分区

package cn._51doit.flink.day03;


import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class RebalancingPartitioning {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource words = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator mapDataStream = words.map(new RichMapFunction() {

            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " :" + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //使用轮询的方式见将数据发送小下游
        DataStream rebalanced = mapDataStream.rebalance();

        rebalanced.addSink(new RichSinkFunction() {

            @Override
            public void invoke(String value, Context context) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + indexOfThisSubtask);
            }
        });


        env.execute();
    }

}

2、Shuffle随机分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class ShufflePartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator mapped = lines.map(new RichMapFunction() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(2);

        //shuffle
        DataStream shuffled = mapped.shuffle();

        shuffled.addSink(new RichSinkFunction() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();


    }
}

3、Broadcast广播分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class BroadcastPartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator mapped = lines.map(new RichMapFunction() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //广播,上游的算子将一个数据广播到下游所以的subtask
        DataStream shuffled = mapped.broadcast();

        shuffled.addSink(new RichSinkFunction() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();


    }
}

4、Custom自定义分区

package cn._51doit.flink.day03;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomPartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一个非并行的Source
        //并行度是1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        //并行度2
        SingleOutputStreamOperator> mapped = lines.map(new RichMapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(value, indexOfThisSubtask);
            }
        });//.setParallelism(2);

        //按照指定的规则进行分区
        DataStream> partitioned = mapped.partitionCustom(new Partitioner() {

            @Override
            public int partition(String key, int numPartitions) {
                //System.out.println("key: " + key  + " ,下游task的并行度:" + numPartitions);
                int res = 0;
                if("spark".equals(key)) {
                    res = 1;
                } else if ("flink".equals(key)){
                    res = 2;
                } else if("hadoop".equals(key)) {
                    res = 3;
                }
                return res;
            }
        }, tp -> tp.f0);

        partitioned.addSink(new RichSinkFunction>() {

            @Override
            public void invoke(Tuple2 value, Context context) throws Exception {

                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value.f0 + " , 上游 " + value.f1 + " -> 下游 " + index);
            }
        });

        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735119.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号