栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink入门系列--安装部署及任务提交(1.14.3版本)

Flink入门系列--安装部署及任务提交(1.14.3版本)

部署模式

Flink 的部署模式分为3种:

Application 模式Per-Job 模式Session 模式
Application 模式

Application 模式即 Flink 为1个应用专门创建1个集群,Flink集群和应用同生命周期,Application 的main()方法将由 JobManager执行。在其他部署模式下,用户 Jar 文件是通过 RPC 来进行分发的,因为需要走网络IO,当用户 Jar 文件较大时,JobManager 加载用户 Jar 文件将是1个耗时的操作。为了避免上述问题,Application 模式要求用户将 Jar 文件放置到 Flink 的 classpath 下,一般是 Flink 根目录下的 lib 目录。

Application 模式允许提交由多个 Job 组成的 Application。在main()方法中,如果是使用阻塞的 execute() 来提交的任务,不同任务将依次执行,即同一时刻仅能有1个任务在执行;如果想同时执行多个任务,则需要使用非阻塞的 executeAsync() 方法。

Per-Job 模式

为了提供更好的资源隔离保证,Per-Job 模式基于资源管理框架(Yarn、Kubernetes等)为每个提交的 Job 均单独启动1个集群。 Job 执行完,对应的集群资源也会被释放。当某个 Job 的 JobManager 或 TaskManager 因异常宕掉后,其他 Job 不会受到任何影响。

生产环境下建议使用Per-Job 模式。

Session 模式

Session 模式假定1个正在运行的集群,并使用该集群的资源执行任何提交到该集群上的应用程序。该模式的优点是无需为每个提交的 Job 支付启动完整集群的资源开销,但其缺点也很明显:

因为只有1个集群,JobManager 需要负责所有提交上来的 Job,负载较大;当某个 Job 导致集群的 TaskManager 宕掉后,运行在该集群上的所有 Job 均会受到故障的影响。除了故障本身的负面影响外,这也意味着潜在的大规模恢复过程。

生产环境下不建议使用 Session 模式,除非提交上去的 Job 均是短时执行的离线任务,如果均是常驻的实时任务,Session 模式慎用!

资源管理模式

Flink 自身实现了1套资源管理框架,即 Standalone,同时也支持 Yarn、Kubernetes 等第三方资源管理框架。

Standalone 软件需求

Flink 运行在所有类 UNIX 环境下,例如 Linux,Mac OS X 和 Cygwin (Windows),集群由一个 master 节点以及一个或多个 worker 节点构成。在配置系统之前,请确保在每个节点上安装有以下软件:

Java 1.8.x 或更高版本,ssh (必须运行 sshd 以执行用于管理 Flink 各组件的脚本)

如果集群不满足软件要求,那么你需要安装/更新这些软件。

使集群中所有节点使用免密码 SSH 以及拥有相同的目录结构可以让你使用脚本来控制一切。

JAVA_HOME 配置

Flink 需要 master 和所有 worker 节点设置 JAVA_HOME 环境变量,并指向你的 Java 安装目录。

你可以在 conf/flink-conf.yaml 文件中通过 env.java.home 配置项来设置此变量。

下载

下载地址: https://flink.apache.org/zh/downloads.html

在下载完最新的发布版本后,复制压缩文件到 master 节点并解压:

tar xzf flink-*.tgz
cd flink-*
Flink配置

在解压完文件后,Flink 的 conf 目录下主要为配置文件,bin 目录下主要为启停脚本文件。

其中,bin/start-cluster.sh 和 bin/stop-cluster.sh 依赖 conf/masters 和 conf/workers 来确定集群组件实例的数量。

第1个例子,单机启动1个包含2个 TaskManager 的集群,可以进行如下配置:

conf/masters:

localhost

conf/workers:

localhost
localhost

第2个例子,启动1个分布式集群,该集群包含4个节点(master1, worker1, worker2, worker),且节点间网络是互通的。

conf/masters:

master1

conf/workers:

worker1
worker2
worker3

除此之外,还需要将 conf/flink-conf.yaml 中的 jobmanager.rpc.address 设置为 master1。

当然,你也可以自行修改conf/flink-conf.yaml中的其他配置,特别的:

每个 JobManager 的可用内存值(jobmanager.memory.process.size)每个 TaskManager 的可用内存值 (taskmanager.memory.process.size每台机器的可用 CPU 数(taskmanager.numberOfTaskSlots)集群中所有 CPU 数(parallelism.default)临时目录(io.tmp.dirs)

因为是多节点,在 master1 配置完成后,将 master1 节点的 Flink 目录复制到所有 worker 节点的相同目录下,或者可以使用共享的 NFS 目录。

部署模式支持

Standalone 模式下,只支持 Application 模式和 Session 模式,不支持 Per-Job 模式。

因为不支持Standalone 不支持 Per-Job 模式,生产环境下慎用。

Standalone Flink 集群的 Session 模式

在任意一台节点上(一般在 master 节点)执行 bin/start-cluster.sh 均可启动该集群。

启动时,master 节点会启动用于 JobMaster 的 JVM 进程,各个 worker 节点会启动用于 TaskManager 的 JVM 进程,同时 JobManager 会提供 http://master:8081 用来访问 Flink Web。

提交任务:

$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

该过程会启动1个 Client(1个短暂的 JVM 进程)用于向 JobManager 提交任务。

停止集群:

$ ./bin/stop-cluster.sh
Standalone Flink 集群的 Application 模式

要使用嵌入式 Applcation 来启动 Flink JobManager,可以使用 bin/standalone-job.sh 脚本。 通过本地启动 TopSpeedWindowing.jar 示例来演示此模式,该示例在单个 TaskManager 上运行。

应用程序 jar 文件需要在 classpath 中可用。 实现这一目标的最简单方法是将 jar 放入 lib/ 目录下:

$ cp ./examples/streaming/TopSpeedWindowing.jar lib/

然后,我们启动 JobManager 进程:

$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

此时,Flink web(http://localhost:8081) 已经可以访问,但任务还无法执行,因为还没有可用的 TaskManager 进程。

$ ./bin/taskmanager.sh start

当然,如果 Application 需要较多的资源,可以启动多个 TaskManager。

需要停止服务的话,依次执行相应的停止脚本即可:

$ ./bin/taskmanager.sh stop
$ ./bin/standalone-job.sh stop

或者直接执行 bin/stop-all.sh,该脚本将停止所有的 JobManager 和 TaskManager 进程。

除了直接部署,Standalone 集群还支持通过 Docker 和 Kubernetes 来进行部署,后面会专门写1篇文章介绍该部分,此处不再赘述。

Native Kubernetes 基础要求

Kubernetes >= 1.9。KubeConfig,可以通过 ~/.kube/config 配置 pods 和 services 的增删改查权限。你也可以通过运行 kubectl auth can-i pods 来校验相应的权限。支持 Kubernetes DNS。具有 RBAC 权限的默认服务账户,用于 pods 的增加和删除。 部署模式支持

Kubernetes Flink 集群只支持 Application 模式和 Session 模式,不支持 Per-Job 模式。

如果是Kubernetes Flink 集群,生产环境下推荐使用 Application 模式。

Kubernetes Flink 集群的 Session 模式
# (1) 启动 Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

# (2) 提交任务
$ ./bin/flink run 
    --target kubernetes-session 
    -Dkubernetes.cluster-id=my-first-flink-cluster 
    ./examples/streaming/TopSpeedWindowing.jar

# (3) 停止集群
$ kubectl delete deployment/my-first-flink-cluster

除了上述 detached 模式,Session 还支持 attached 模式,允许用户输入命令来控制正在运行的 Flink 集群,例如: stop 停止正在运行的 Session 集群。

# 启动集群
$ ./bin/kubernetes-session.sh 
    -Dkubernetes.cluster-id=my-first-flink-cluster 
    -Dexecution.attached=true
# 停止集群	
$ echo 'stop' | ./bin/kubernetes-session.sh 
    -Dkubernetes.cluster-id=my-first-flink-cluster 
    -Dexecution.attached=true
Kubernetes Flink 集群的 Application 模式

为了让本地 Jar 能够被 K8S 访问到,首先需要1个 base Docker image:

FROM flink
RUN mkdir -p $Flink_HOME/usrlib
COPY /path/of/my-flink-job.jar $Flink_HOME/usrlib/my-flink-job.jar

读者自行替换为自己的 Jar 路径。

在 custom-image-name 下创建并发布 Docker 镜像后,可以使用以下命令启动 Application 集群:

$ ./bin/flink run-application 
    --target kubernetes-application 
    -Dkubernetes.cluster-id=my-first-application-cluster 
    -Dkubernetes.container.image=custom-image-name 
    local:///opt/flink/usrlib/my-flink-job.jar

K8S 的 Applcation 模式仅支持单机部署。

kubernetes.cluster-id 选项指定集群名称并且必须是唯一的。 如果不指定此选项,则 Flink 将生成一个随机名称。

kubernetes.container.image 选项指定启动 pod 的镜像。

部署 Applcation 集群后,可以与之交互:

# 列出集群上正在运行的 Job
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# 取消集群上正在运行的 Job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster 
Yarn 基础要求

Yarn Version >= 2.4.1配置 HADOOP_CLASSPATH 环境变量

export HADOOP_CLASSPATH=`hadoop classpath`
部署模式支持

Yarn Flink 集群支持 Session、Application、Per-Job模式。

如果是Yarn Flink 集群,生产环境下推荐使用 Per-Job 模式。

Yarn Flink 集群的 Session 模式

Session 模式有2种操作模式:

附加模式(默认):yarn-session.sh 客户端将 Flink 集群提交给 YARN,但客户端一直在运行,跟踪集群的状态。 如果集群失败,客户端将显示错误。 如果客户端被终止,它也会发出集群关闭的信号。分离模式(-d 或 --detached):yarn-session.sh 客户端将 Flink 集群提交给 YARN,然后客户端返回。 需要再次调用客户端或 YARN 工具来停止 Flink 集群。

session 模式会在 /tmp/.yarn-properties- 中创建一个隐藏的 YARN 属性文件,在提交作业时,命令行界面将获取该文件用于集群发现。

export HADOOP_CLASSPATH=`hadoop classpath`

# 启动 YARN Session
./bin/yarn-session.sh --detached

# 提交任务
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

# 停止任务
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

也可以在提交 Flink 作业时在命令行界面中手动指定目标 YARN 集群。 如:

./bin/flink run -t yarn-session 
  -Dyarn.application.id=application_XXXX_YY 
  ./examples/streaming/TopSpeedWindowing.jar

可以使用以下命令重新附加到 YARN 会话:

./bin/yarn-session.sh -id application_XXXX_YY

除了通过 conf/flink-conf.yaml 文件传递配置外,还可以在提交时使用 -Dkey=value 参数将任何配置传递给 ./bin/yarn-session.sh 客户端。

YARN 会话客户端也有一些常用设置的“快捷参数”。 它们可以用 ./bin/yarn-session.sh -h 列出。

Yarn Flink 集群的 Application 模式

Application 模式将在 YARN 上启动 Flink 集群,其中应用程序 jar 的 main() 方法在 YARN 中的 JobManager 上执行。 应用程序完成后,集群将立即关闭。 也可以使用 yarn application -kill 或取消 Flink 作业来手动停止集群。

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

部署 Application 模式集群后,可以与其交互以执行取消或获取保存点等操作。

# 列出集群上正在运行的任务
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# 取消正在运行的任务
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY 

在 Yarn 模式下,Flink 还支持将应用程序的 Jar 及其依赖项提前上传到 HDFS 等分布式文件系统上,避免任务提交时,Client 还需要将 Jar 文件传输到 Yarn 集群上。

Yarn Flink 集群的 Per-Job 模式

Per-job Cluster 模式会在 YARN 上启动一个 Flink 集群,然后在本地运行提供的应用程序 jar,最后将 JobGraph 提交到 YARN 上的 JobManager。 如果您传递 --detached 参数,一旦提交被接受,客户端将停止。

一旦作业停止,YARN 集群将停止。

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

部署 Per-Job 集群后,可以与其交互以执行取消或获取保存点等操作。

# 列出集群上正在运行的任务
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# 取消正在运行的任务
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761145.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号