- 环境准备
- 一.准备代码
- 1.1 maven准备
- 1.2 Java代码准备
- 二.打包
- 三.通过Web UI执行jar文件
- 3.1 上传文件
本地Windows环境已安装Flink 1.9.0版本。
一.准备代码 1.1 maven准备配置Flink的依赖
1.2 Java代码准备org.apache.flink flink-java1.9.0 provided org.apache.flink flink-streaming-java_2.111.9.0 provided org.apache.flink flink-core1.9.0
还是以大家耳熟能详的wordCount程序为例
package com.zqs.study.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class wordCount {
public static void main(String[] args) throws Exception{
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件中读取数据
String inputPath = "C:\Users\Administrator\IdeaProjects\FlinkStudy\src\main\java\com\zqs\study\flink\hello.txt";
DataSet inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) // 按照第一个位置的word分组
.sum(1); // 将第二个位置上的数据求和;
resultSet.print();
//env.execute();
//env.execute("Word Count Example");
}
//自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
//按空格分词
String[] words = value.split(" ");
//遍历所有word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
}
}
二.打包
我这边是直接package了,一般步骤是clean、complie、test、package
如下截图是打包生成的文件路径
选择"Submit New Job"后,选择"Add New"
选中第二步打包生产的jar文件
如下提示上传成功
但是要注意的是,我们只是把jar文件是上传到服务器上,而并没有开始执行
双击界面上的jar文件,可以看到有参数
Entry class 我们需要运行的class的完整路径
Parallelism 并行度
Program Arguments Java程序中的自定义变量
Savepoint Path Savepoint保存的路径
我们直接输入class名称,其余的默认,点击Submit
程序开始执行
等待一会儿,执行成功



