Flink 也遵循主从原则,主节点为JobManager,从节点为TaskManager
1.1. Client
将任务提交到JobManager,并和JobManager进行任务交互获取任务执行状态。
1.2. JobManager
负责任务的调度和资源的管理。负责Checkpoint的协调过程。
获取到客户端的任务后,会根据集群中 TaskManager 上 TaskSlot 的使用情况, 为提交的任务分配相应的 TaskSlots 资源,并命令 TaskManager 启动。 JobManager 在任务执行过程中,会触发 Checkpoints 操作,每个 TaskManager 收到 Checkpoint 指令后,完成 Checkpoint 操作。完成任务后,Flink 会将结果反 馈给客户端,并释放掉 TaskManager 中的资源。
1.3. TaskManager
负责任务的执行。负责对应任务在每个节点上的资源申请与管理。 TaskManager 从 JobManager 接受到任务后,使用 Slot 资源启动 Task,开始接 受并处理数据。
1.4. ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配、管理 task slots。 Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
1.5. Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每 个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执 行信息。
1.6. JobMaster
JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个 作业,每个作业都有自己的 JobMaster。
2.1 本地模式
在本地以多线程的方式模拟 Flink 中的多个角色。(开发环境不用)
下载地址:https://flink.apache.org/downloads.html
这里选择下载:flink-1.13.0-bin-scala_2.12.tgz
上传到节点上并解压
tar -zxvf flink-1.13.0-bin-scala_2.12.tgz
启动: 切换到 flink 的 bin 目录下,执行./start-cluster.sh,然后查看进程。
./start-cluster.sh2.1 Standalone 独立集群模式
1.上传、解压 tar 包
当前使用的版本是基于 Scala2.12 的 Flink1.13 版本。
tar -zxvf flink-1.13.0-bin-scala_2.12.tgz
2.修改配置文件
```java vi conf/flink-conf.yaml 指定 Flink 集群 JobManager RPC 通信地址。 vi conf/flink-conf.yaml ------------------------------------------ jobmanager.rpc.address: master ------------------------------------------
注意:flink-conf.yaml中配置key/value时候在 :后面需要有一个空格,否则配置不会生效。
其他重要配置解释
jobmanager 和 taskmanager 通信的端口号 jobmanager.rpc.port: 6123 # JobManager 的总进程内存大小 jobmanager.memory.process.size: 1600m #当前 taskmanager 整个进程占用的内存是多少 taskmanager.memory.process.size: 1728m #每个 taskmanager 可以提供的 slots taskmanager.numberOfTaskSlots: 1 #默认的并行度 parallelism.default: 1 #jobmanager 故障转移策略,1.9 之后出现的新属性,区域性恢复策略 jobmanager.execution.failover-strategy: region #flink web 界面的端口号 rest.port: 8081
vi conf/workers 指定 Flink 集群 TaskManager
vi /conf/workers --------------------------------- master slave1 slave2 ---------------------------------
3.分发 Flink 到其他节点
scp -r flink-1.13.0 slave1:$PWD scp -r flink-1.13.0 slave2:$PWD
4. 配置环境变量
vi /etc/profile.d/myenv.sh ---------------------------------------------- export Flink_HOME=/usr/software/flink-1.13.0 export PATH=$PATH:$Flink_HOME/bin ---------------------------------------------- source /etc/profile
5. 启动集群
在master节点上执行start-cluster.sh
start-cluster.sh
可以通过 master:8081 访问 flink web UI 界面
集群规划
master: JobManager + TaskManager
slave1: JobManager + TaskManager
slave2: TaskManager
修改配置文件
vi masters 修改集群 JobManager 节点
vi master ------------------------- master:8081 slave1:8081 -------------------------
vi workers 修改集群 TaskManager节点
vi workers ------------------------- master slave1 slave2 -------------------------
vi flink-conf.yaml 添加如下配置
vi flink-conf.yaml -------------------------------------------------------------------------- state.backend: filesystem state.backend.fs.checkpointdir: hdfs://master:9000/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://master:9000/flink/ha/ high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181 --------------------------------------------------------------------------
3. 同步 flink 配置文件目录 conf 到其他两台节点上
scp -r conf slave1:$PWD scp -r conf slave2:$PWD
4.修改 slave1 上的 JobManager 通讯地址
vi flink-conf.yaml --------------------------------------- jobmanager.rpc.address:slave1 ---------------------------------------
5. 启动
启动 ZooKeeper zkServer.sh start 启动 HDFS start-dfs.sh 将 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 拷贝到 flink 的 lib 目录下,三台 节点都拷贝。 启动 Flink start-cluster.sh
6. 测试
yum -y install nc nc -lk 1234
在新窗口执行如下 jar 包
flink run /usr/software/flink-1.13.0/examples/streaming/SocketWindowWordCount.jar --hostname master --port 12342.3 Flink On Yarn 2.3.1Session 模式
- 向yarn申请一块空间后,资源永远保持不变
- 资源被占用,下一个作业就无法提交
- 适合作业递交比较频繁的场景,小作业比较多的场景
- 一个任务会对应一个job
- 作业与作业之间是相互独立的
- 适合作业比较少的场景、大作业的场景
Session模式
优点
每次递交作业不需要重新申请资源,直接使用已经申请好的资源,从而提高执行效率
缺点
资源不会被释放,因此一直会占用系统资源
Pre-Job模式
优点
作业运行完成,资源会立刻被释放,不会一直占用系统资源
缺点
每次递交作业都需要申请资源,影响执行效率,因为申请资源需要消耗时间
Session模式提交
yarn-session.sh -n 2 -tm 800 -s 1 -d flink run ***.jar -n 表示申请n个Taskmanager -tm 表示每个TaskManager的内存大小 -s 表示每个TaskManager的slots数量 -d 表示以后台程序方式运行
Pre-Job模式提交
flink run -m yarn-cluster -yjm 1024 -ytm 1024 ***.jar -m jobmanager的地址 -yjm 1024 指定jobmanager的内存信息 -ytm 1024 指定taskmanager的内存信息



