栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink——角色及安装部署

Flink——角色及安装部署

一、Flink 中的角色

Flink 也遵循主从原则,主节点为JobManager,从节点为TaskManager

1.1. Client
将任务提交到JobManager,并和JobManager进行任务交互获取任务执行状态。

1.2. JobManager
负责任务的调度和资源的管理。负责Checkpoint的协调过程。

获取到客户端的任务后,会根据集群中 TaskManager 上 TaskSlot 的使用情况, 为提交的任务分配相应的 TaskSlots 资源,并命令 TaskManager 启动。 JobManager 在任务执行过程中,会触发 Checkpoints 操作,每个 TaskManager 收到 Checkpoint 指令后,完成 Checkpoint 操作。完成任务后,Flink 会将结果反 馈给客户端,并释放掉 TaskManager 中的资源。

1.3. TaskManager
负责任务的执行。负责对应任务在每个节点上的资源申请与管理。 TaskManager 从 JobManager 接受到任务后,使用 Slot 资源启动 Task,开始接 受并处理数据。

1.4. ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配、管理 task slots。 Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

1.5. Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每 个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执 行信息。

1.6. JobMaster
JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个 作业,每个作业都有自己的 JobMaster。

二、Flink 的安装部署

2.1 本地模式
在本地以多线程的方式模拟 Flink 中的多个角色。(开发环境不用)
下载地址:https://flink.apache.org/downloads.html
这里选择下载:flink-1.13.0-bin-scala_2.12.tgz
上传到节点上并解压

 tar -zxvf flink-1.13.0-bin-scala_2.12.tgz 

启动: 切换到 flink 的 bin 目录下,执行./start-cluster.sh,然后查看进程。

./start-cluster.sh
2.1 Standalone 独立集群模式


1.上传、解压 tar 包

当前使用的版本是基于 Scala2.12 的 Flink1.13 版本。

tar -zxvf flink-1.13.0-bin-scala_2.12.tgz

2.修改配置文件

```java
vi conf/flink-conf.yaml 指定 Flink 集群 JobManager RPC 通信地址。 

vi conf/flink-conf.yaml 
------------------------------------------
jobmanager.rpc.address: master
------------------------------------------

注意:flink-conf.yaml中配置key/value时候在 :后面需要有一个空格,否则配置不会生效。

其他重要配置解释

jobmanager 和 taskmanager 通信的端口号
jobmanager.rpc.port: 6123 
# JobManager 的总进程内存大小 
jobmanager.memory.process.size: 1600m 
#当前 taskmanager 整个进程占用的内存是多少 
taskmanager.memory.process.size: 1728m 
#每个 taskmanager 可以提供的 slots 
taskmanager.numberOfTaskSlots: 1 
#默认的并行度 parallelism.default: 1 
#jobmanager 故障转移策略,1.9 之后出现的新属性,区域性恢复策略 
jobmanager.execution.failover-strategy: region 
#flink web 界面的端口号 
rest.port: 8081

vi conf/workers 指定 Flink 集群 TaskManager

vi /conf/workers
---------------------------------
master
slave1
slave2
---------------------------------

3.分发 Flink 到其他节点

scp -r flink-1.13.0 slave1:$PWD
scp -r flink-1.13.0 slave2:$PWD

4. 配置环境变量

vi /etc/profile.d/myenv.sh 
----------------------------------------------
export Flink_HOME=/usr/software/flink-1.13.0 
export PATH=$PATH:$Flink_HOME/bin 
----------------------------------------------
source /etc/profile

5. 启动集群
在master节点上执行start-cluster.sh

start-cluster.sh


可以通过 master:8081 访问 flink web UI 界面

2.2 StandaloneHA高可用集群模式

集群规划

master: JobManager + TaskManager

slave1: JobManager + TaskManager

slave2: TaskManager

修改配置文件

vi masters 修改集群 JobManager 节点

vi master
-------------------------
master:8081
slave1:8081
-------------------------

vi workers 修改集群 TaskManager节点

vi workers
-------------------------
master
slave1
slave2
-------------------------

vi flink-conf.yaml 添加如下配置

vi flink-conf.yaml
--------------------------------------------------------------------------
state.backend: filesystem 
state.backend.fs.checkpointdir: hdfs://master:9000/flink-checkpoints 
high-availability: zookeeper 
high-availability.storageDir: hdfs://master:9000/flink/ha/ 
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
--------------------------------------------------------------------------

3. 同步 flink 配置文件目录 conf 到其他两台节点上

scp -r conf slave1:$PWD
scp -r conf slave2:$PWD

4.修改 slave1 上的 JobManager 通讯地址

vi flink-conf.yaml
---------------------------------------
jobmanager.rpc.address:slave1
---------------------------------------

5. 启动

启动 ZooKeeper
zkServer.sh start

启动 HDFS
start-dfs.sh
将 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 拷贝到 flink 的 lib 目录下,三台 节点都拷贝。

启动 Flink
start-cluster.sh

6. 测试

yum -y install nc
nc -lk 1234

在新窗口执行如下 jar 包

flink run /usr/software/flink-1.13.0/examples/streaming/SocketWindowWordCount.jar --hostname master --port 1234
2.3 Flink On Yarn

2.3.1Session 模式
  • 向yarn申请一块空间后,资源永远保持不变
  • 资源被占用,下一个作业就无法提交
  • 适合作业递交比较频繁的场景,小作业比较多的场景
2.3.2Pre-Job模式
  • 一个任务会对应一个job
  • 作业与作业之间是相互独立的
  • 适合作业比较少的场景、大作业的场景


Session模式
优点
每次递交作业不需要重新申请资源,直接使用已经申请好的资源,从而提高执行效率
缺点
资源不会被释放,因此一直会占用系统资源

Pre-Job模式
优点
作业运行完成,资源会立刻被释放,不会一直占用系统资源
缺点
每次递交作业都需要申请资源,影响执行效率,因为申请资源需要消耗时间

Session模式提交

yarn-session.sh -n 2 -tm 800 -s 1 -d
flink run ***.jar
	-n 表示申请n个Taskmanager
	 -tm 表示每个TaskManager的内存大小
	-s 表示每个TaskManager的slots数量
	-d 表示以后台程序方式运行

Pre-Job模式提交

flink run -m yarn-cluster -yjm 1024 -ytm 1024 ***.jar
	-m  jobmanager的地址
	-yjm 1024 指定jobmanager的内存信息
	-ytm 1024 指定taskmanager的内存信息
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423463.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号