bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm bjsxt02 -c com.test.flink.wc.StreamWordCount ./appjars/test-1.0-SNAPSHOT.jar1.1.2、参数解释
-yn,--container表示分配容器的数量,也就是 TaskManager 的数量。 -d,--detached:设置在后台运行。 -yjm,--jobManagerMemory :设置 JobManager 的内存,单位是 MB。 -ytm,--taskManagerMemory :设置每个 TaskManager 的内存,单位是 MB。 -ynm,--name:给当前 Flink application 在 Yarn 上指定名称。 -yq,--query:显示 yarn 中可用的资源(内存、cpu 核数) -yqu,--queue :指定 yarn 资源队列 -ys,--slots :每个 TaskManager 使用的 Slot 数量。 -yz,--zookeeperNamespace :针对 HA 模式在 Zookeeper 上创建 NameSpace -yid,--applicationID : 指定 Yarn 集群上的任务 ID,附着到一个后台独立运行的 Yarn Session 中。 -s,-fromSavepoint 要恢复作业的保存点的路径 的保存点的路径(例如hdfs:///flink/savepoint-1537)。)
1 # 参数必选 :
-n,--container 分配多少个yarn容器 (=taskmanager的数量)
2 # 参数可选 :
-D 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue 指定YARN队列.
-s,--slots 每个TaskManager使用的slots数量
-tm,--taskManagerMemory 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId YARN集群上的任务id,附着到一个后台运行的yarn session中
3 # run [OPTIONS]
run操作参数:
-c,--class 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism 指定程序的并行度。可以覆盖配置文件中的默认值。
4 # 启动一个新的yarn-session,它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
连接指定host和port的jobmanager:
./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
5 # 注意:命令行的选项也可以使用./bin/flink 工具获得。
6 # 动作 "run "编译并运行一个程序。
语法:run [OPTIONS] 带有程序入口点的类
("main "方法或 "getPlan() "方法。
只有在JAR文件没有
在其清单中指定类的情况下才需要。
-C,-classpath 在所有节点上的每个用户代码中添加一个URL
簇中所有节点上的类加载器添加一个URL。
集群中的每个用户代码类加载器添加一个URL。路径必须指定一个
协议(如file://),并且在所有节点上都能访问(如
所有节点上都可以访问(例如,通过
的NFS共享)。) 你可以多次使用这个
选项,可以多次指定
一个以上的URL。该协议必须
是由{@link
java.net.URLClassLoader}.
-d,-detached 如果存在,以分离的方式运行作业
模式运行作业
-n,-allowNonRestoredState 允许跳过不能恢复的保存点状态。
允许跳过不能被恢复的保存点状态。你需要允许
如果你从你的程序中删除了一个操作者
你的程序中删除了一个操作符,而该操作符在保存点出现时是该程序的一部分。
你需要允许这样做,如果你从你的程序中删除了一个操作者,而这个操作者在保存点被触发时是
触发了。
-p,-parallelism 用来运行程序的并行性。程序。可选的标志,用于覆盖配置中指定的默认值。
-q,-sysoutLogging 如果存在,抑制日志输出到标准输出。
-s,-fromSavepoint 要恢复作业的保存点的路径 的保存点的路径(例如hdfs:///flink/savepoint-1537)。)
7 #Yarn-cluster 模式的选项。
-d,-detached 如果存在,在分离模式下运行作业
模式运行作业
-m,-jobmanager 要连接的JobManager(主站)的地址。
的地址。使用这个标志可以
连接到一个不同的JobManager,而不是
中指定的那个不同的JobManager。
配置中指定的JobManager。
-yD 使用给定属性的值
-yd,-yarndetached 如果存在,在分离模式下运行作业
模式运行作业(已废弃;使用非YARN
特定的选项代替)
-yh,--yarnhelp Yarn会话CLI的帮助。
-yid,-yarnapplicationId 附加到正在运行的YARN会话上
-yj,
1.2 、yarn-session 模式
1.2.1、命令行
bin/yarn-session.sh -n 3 -s 3 -nm bjsxt11.2.2、参数解释
-n,--container二、并行度 2.1并行度(Parallelism)表示分配容器的数量(也就是 TaskManager 的数量)。 -D 动态属性。 -d,--detached 在后台独立运行。 -jm,--jobManagerMemory :设置 JobManager 的内存,单位是 MB。 -nm,--name:在 YARN 上为一个自定义的应用设置一个名字。 -q,--query:显示 YARN 中可用的资源(内存、cpu 核数)。 -qu,--queue :指定 YARN 队列。 -s,--slots :每个 TaskManager 使用的 Slot 数量。 -tm,--taskManagerMemory :每个 TaskManager 的内存,单位是 MB。 -z,--zookeeperNamespace :针对 HA 模式在 ZooKeeper 上创建 NameSpace。 -id,--applicationId :指定 YARN 集群上的任务 ID,附着到一个后台独立运行的 yarn session 中
与Spark类似的,一个Flink Job在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为Parallelism,即并行度。
Flink程序中设定并行度有4种级别,从低到高分别为:算子级别、执行环境(ExecutionEnvironment)级别、客户端(命令行)级别、配置文件(flink-conf.yaml)级别。实际执行时,优先级则是反过来的,算子级别最高。简单示例如下。
算子级别
dataStream.flatMap(new SomeFlatMapFunction()).setParallelism(4);
执行环境级别
streamExecutionEnvironment.setParallelism(4);
命令行级别
bin/flink -run --parallelism 4 example-0.1.jar
flink-conf.yaml级别
parallelism.default: 4



