一、开启新链
package cn._51doit.flink.day05;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StartNewChainDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//并行度就是1
DataStreamSource lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] words = value.split(",");
for (String word : words) {
out.collect(word);
}
}
});
SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return !value.startsWith("error");
}
}).startNewChain(); //从filter开始,开启一个新链
SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1);
summed.print();
env.execute();
}
}
二、断开链
package cn._51doit.flink.day05;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DisableChainingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//并行度就是1
DataStreamSource lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] words = value.split(",");
for (String word : words) {
out.collect(word);
}
}
});
SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return !value.startsWith("error");
}
}).disableChaining(); //将该Operator前后的链都断开
SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1);
summed.print();
env.execute();
}
}
三、设置共享资源槽
package cn._51doit.flink.day05;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SetSharingGroupDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度就是1
DataStreamSource lines = env.socketTextStream(args[0], 8888);
SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] words = value.split(",");
for (String word : words) {
out.collect(word);
}
}
});
SingleOutputStreamOperator filtered = words.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
return !value.startsWith("error");
}
}).setParallelism(2).disableChaining().slotSharingGroup("doit");
//从这个算子开始,后期的task的共享资源槽的名称都是doit(就近原则)
SingleOutputStreamOperator> wordAndOne = filtered.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}).setParallelism(2).slotSharingGroup("default");
SingleOutputStreamOperator> summed = wordAndOne.keyBy(t -> t.f0).sum(1).setParallelism(2).slotSharingGroup("default");;
summed.print().setParallelism(2).slotSharingGroup("default");;
env.execute();
}
}



