示例1. WordCount
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用Source生成DataStream
DataStreamSource line = env.socketTextStream("linux01", 1993);
//调用transformation
SingleOutputStreamOperator> wordAndOne = line.flatMap(new LineSplitor());
KeyedStream, Tuple> keyed = wordAndOne.keyBy(0);
SingleOutputStreamOperator> result = keyed.sum(1);
//调用SinK
result.print();
//启动
env.execute("WordCount");
}
//使用内部静态类传参数
private static class LineSplitor extends RichFlatMapFunction>{
@Override
public void flatMap(String line, Collector> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word,1));
}
}
}
}
示例2.MapFuntion和RichMapFunction的使用
public class MapFunctionDemo {
public static void main(String[] args) throws Exception {
//创建flink运行的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用Source生成DataStream
DataStreamSource line = env.socketTextStream("linux01", 7777);
//调用transformation
//SingleOutputStreamOperator upper = line.map(new MyMapFunction());
SingleOutputStreamOperator upper = line.map(new MyRichFunction());
//调用Sink
upper.print();
//启动
env.execute("MapFunctionDemo");
}
private static class MyRichFunction extends RichMapFunction{
@Override
//初始化时执行一次 可以用来初始化数据库的连接
public void open(Configuration parameters) throws Exception {
//获取当前这个subtask的编号
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println("open 方法被执行了->"+indexOfThisSubtask);
}
@Override
//每条消息执行一次 可以用来使用连接关联数据
public String map(String value) throws Exception {
//获取当前这个subtask的编号
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println("map方法被执行了->"+indexOfThisSubtask);
return value.toUpperCase();
}
@Override
//停止Job时执行一次 可以用来关闭连接
public void close() throws Exception {
//获取当前这个subtask的编号
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println("close方法被执行了->"+indexOfThisSubtask);
}
}
private static class MyMapFunction implements MapFunction{
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
}
}
示例3.并行度
public class ParallelismDemo {
public static void main(String[] args) throws Exception {
//创建flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定运行环境的并行度
//env.setParallelism(12);
//获取当前运行环境的并行度
//在本地执行,运行环境的默认并行度为:当前机器CPU的逻辑核数(cpu线程数)
int parallelism = env.getParallelism();
System.out.println("当前运行环境的并行度->"+parallelism);
//调用Source创建DataStream
DataStreamSource line = env.socketTextStream("linux01", 7777);
//查看socketSource的并行度
//SocketSource即调用socketTextStream创建的DataStream的并行度永远为1
int parallelism1 = line.getParallelism();
System.out.println("SocketSource创建的DataStream的并行度->"+parallelism1);
//map是多并行的算子,如果任务没有指定该算子的并行度,默认与执行环境的并行度一致
SingleOutputStreamOperator upperStream = line.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
int parallelism2 = upperStream.getParallelism();
System.out.println("map算子所在Task的并行度->"+parallelism2);
//print sink也是多并行的算子,如果任务没有指定该算子的并行度,默认与执行环境的并行度一
DataStreamSink streamSink = upperStream.print();
int parallelism3 = streamSink.getTransformation().getParallelism();
System.out.println("Sink算子所在Task的并行度->"+parallelism3);
//启动
env.execute();
}
}
示例4.自己实现print算子的简单功能
public class MyPrintSink {
public static void main(String[] args) throws Exception {
//创建flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用Source生成DataStream
DataStreamSource line = env.socketTextStream("linux01", 7777);
//调用transformation
SingleOutputStreamOperator upperStream = line.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
//调用自己定义的print
upperStream.addSink(new MyPrint());
//启动
env.execute("MyPrintSink");
}
//写一个自己定义的print算子
private static class MyPrint extends RichSinkFunction{
@Override
//该方法每条数据执行一次
public void invoke(String value, Context context) throws Exception {
//获取正在运行的上下文对象
RuntimeContext runtimeContext = getRuntimeContext();
//获取这个subtask的编号
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
System.out.println(indexOfThisSubtask+1+" > "+value);
}
}
}



