1搭建 maven 工程 FlinkTutorial
pom 文件
org.apache.flink flink-java 1.10.1 org.apache.flink flink-streaming-java_2.12 1.10.1
2批处理wordcount
//批处理
public class WordCount {
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件中读取数据
String path = "F:\IDEAJavaProject\Flink\WordCount\src\main\resources\hello.txt";
DataSource dataSource = env.readTextFile(path);
//对数据集进行处理,按空格分词展开,按照第一个位置的word分组,按第二个位置的数字求和
DataSet> resultSet = dataSource.flatMap(new MyFlatMap()).groupBy(0).sum(1);
resultSet.print();
}
//自定义类MyFlatMap,实现FlatMapFunction接口
public static class MyFlatMap implements FlatMapFunction>{
@Override
public void flatMap(String values, Collector> out) throws Exception {
//按空格分词
String[] words = values.split(" ");
//遍历所有word,包装成二元组输出
for (String word : words) {
out.collect(new Tuple2<>(word,1));
}
}
}
}
3流处理wordcount
基于本地文件
//流处理
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(8);
//从文件中读取数据
String path = "F:\IDEAJavaProject\Flink\WordCount\src\main\resources\hello.txt";
DataStream dataSource = env.readTextFile(path);
//基于数据流进行在在转换计算
SingleOutputStreamOperator> resultStream = dataSource.flatMap(new WordCount.MyFlatMap()).keyBy(0).sum(1);
resultStream.print();
//执行任务
env.execute();
}
}
基于liunx
public class StreamWordCountLinux {
public static void main(String[] args) throws Exception {
//创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(8);
//用parameter tool 工具,从程序启动参数中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//从socket文本流读取数据
//DataStream dataSource = env.socketTextStream("hadoop101", 7777);
DataStream dataSource = env.socketTextStream(host, port);
//基于数据流进行在在转换计算
SingleOutputStreamOperator> resultStream = dataSource.flatMap(new WordCount.MyFlatMap()).keyBy(0).sum(1);
resultStream.print();
//执行任务
env.execute();
}
}
测试—— 在 linux 系统中用 netcat 命令进行发送测试。
nc -lk 7777



