- 环境介绍
- 一.Maven配置
- 二.Java代码编写
- 三.Maven打包并上传
- 四.运行jar文件
- 五.运行其它的class文件
- 参考
测试服务器CDH 6.3.1版本安装Flink 1.9版本。
hello.txt文件
hello word hello hdfs hello mapreduce hello yarn hello hive hello spark hello flink一.Maven配置
Flink依赖的配置
org.apache.flink flink-java1.9.0 provided org.apache.flink flink-streaming-java_2.111.9.0 provided
因为是本地写Java代码,要打包成jar文件,然后上传到服务器后运行,要设置主入口,不然会报错
org.apache.maven.plugins maven-jar-plugin3.0.2 true org.example.wordCount org.springframework.boot spring-boot-maven-plugin
其中 org.example.wordCount 需要自己调整
org.example 是包名
wordCount 是类名
如下:
package org.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class wordCount {
public static void main(String[] args) throws Exception{
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件中读取数据
String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
DataSet inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) // 按照第一个位置的word分组
.sum(1); // 将第二个位置上的数据求和;
resultSet.print();
//env.execute();
//env.execute("Word Count Example");
}
//自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
//按空格分词
String[] words = value.split(" ");
//遍历所有word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
}
}
三.Maven打包并上传
通过mvn package命令打包
C:UsersAdministratorIdeaProjectsFlinkStudy>mvn package [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered while building the effective model for org.example:FlinkStudy:jar:1.0-SNAPSHOT [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-compiler-plugin @ line 98, column 17 [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-surefire-plugin @ line 107, column 17 [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 116, column 17 [WARNING] [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build. [WARNING] [WARNING] For this reason, future Maven versions might no longer support building such malformed projects. [WARNING] [INFO] [INFO] -----------------------< org.example:FlinkStudy >----------------------- [INFO] Building FlinkStudy 1.0-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ FlinkStudy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 0 resource [INFO] [INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ FlinkStudy --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 2 source files to C:UsersAdministratorIdeaProjectsFlinkStudytargetclasses [INFO] [INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ FlinkStudy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 0 resource [INFO] [INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ FlinkStudy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-surefire-plugin:2.19:test (default-test) @ FlinkStudy --- [INFO] Tests are skipped. [INFO] [INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ FlinkStudy --- [INFO] Building jar: C:UsersAdministratorIdeaProjectsFlinkStudytargetFlinkStudy-1.0-SNAPSHOT.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.848 s [INFO] Finished at: 2021-08-25T09:41:03+08:00 [INFO] ------------------------------------------------------------------------ C:UsersAdministratorIdeaProjectsFlinkStudy>
然后将生产的FlinkStudy-1.0-SNAPSHOT.jar文件上传到服务器
四.运行jar文件命令:
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/flink/FlinkStudy-1.0-SNAPSHOT.jar
运行结果:
Web界面显示执行结果:
虽然我们pom文件指定了main class,如果不指定对应的class,就执行pom文件里面指定的class,如果我们想执行该工程下其它class文件怎么办?这个时候我们需要用 -c 或者–class命令来指定对应的class文件
代码:
package org.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class wordCount2 {
public static void main(String[] args) throws Exception{
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件中读取数据
String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
DataSet inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) // 按照第一个位置的word分组
.sum(1); // 将第二个位置上的数据求和;
resultSet.print();
//env.execute();
//env.execute("Word Count Example");
System.out.println("这是第二个测试的wordcount");
}
//自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) throws Exception {
//按空格分词
String[] words = value.split(" ");
//遍历所有word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
}
}
运行命令:
-- 正确 flink run -m yarn-cluster -c org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar -- 正确 flink run -m yarn-cluster --class org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar --错误(依旧执行pom文件里面的main class) flink run -m yarn-cluster FlinkStudy-1.0-SNAPSHOT.jar -c org.example.wordCount2
执行截图:
- https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/projectsetup/dependencies.html



