栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink资源槽

大数据之flink资源槽

一、开启新链

package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StartNewChainDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //并行度就是1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.startsWith("error");
            }
        }).startNewChain(); //从filter开始,开启一个新链

        SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1);

        summed.print();

        env.execute();


    }
}

二、断开链

package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DisableChainingDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //并行度就是1
        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.startsWith("error");
            }
        }).disableChaining(); //将该Operator前后的链都断开

        SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1);

        summed.print();

        env.execute();


    }
}

三、设置共享资源槽

package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SetSharingGroupDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //并行度就是1
        DataStreamSource lines = env.socketTextStream(args[0], 8888);

        SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.startsWith("error");
            }
        }).setParallelism(2).disableChaining().slotSharingGroup("doit");

        //从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)
        SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        }).setParallelism(2).slotSharingGroup("default");

        SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;

        summed.print().setParallelism(2).slotSharingGroup("default");;

        env.execute();


    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745966.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号