- 概述
- Flink
- 集群规划
- 安装配置
- 文件同步
- Standalone模式集群启动
- web UI
- 测试
- 关闭集群
- Flink on Yarn
- 同步到其他机器
- 提交任务
- 安装完成
- 附件:kafka生产者
- 附件:WordCount 代码
| server110 | server111 | server112 | |
|---|---|---|---|
| Flink | JobManager TaskManager | TaskManager | TaskManager |
[root@server110 software]# tar -xzvf flink-1.13.2-bin-scala_2.11.tgz -C /opt/modules/ [root@server110 modules]# cd flink-1.13.2/conf/ #配置文件 [root@server110 conf]# vim flink-conf.yaml jobmanager.rpc.address: server110 #配置jobManager [root@server110 conf]# vim masters server110:8081 #配置taskManager [root@server110 conf]# vim workers server110 server111 server112文件同步
[root@server110 modules]# scp -r flink-1.13.2/ server111:/opt/modules/ [root@server110 modules]# scp -r flink-1.13.2/ server112:/opt/modules/Standalone模式集群启动
[root@server110 flink-1.13.2]# bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host server110. Starting taskexecutor daemon on host server110. Starting taskexecutor daemon on host server111. Starting taskexecutor daemon on host server112.
[root@server110 flink-1.13.2]# /opt/jpsall.sh ----------------jps server110 -------------------- 31744 QuorumPeerMain 34785 StandaloneSessionClusterEntrypoint 35108 TaskManagerRunner 32182 Kafka 17387 NameNode 17916 NodeManager 17549 DataNode 35215 Jps ----------------jps server111 -------------------- 3504 TaskManagerRunner 17265 DataNode 17540 NodeManager 532 QuorumPeerMain 17383 ResourceManager 18029 JobHistoryServer 3582 Jps 975 Kafka ----------------jps server112 -------------------- 26338 QuorumPeerMain 17207 DataNode 17289 SecondaryNameNode 26765 Kafka 29565 Jps 29486 TaskManagerRunner 17407 NodeManagerweb UI
http://server110:8081/
Flink自带的web管理界面
三个节点正常启动
web界面直接提交任务,
Entry Class : 有main方法的入口类名,带包名
Parallelism : 并行度
填写好对应的入口类,并行度为1
查看执行计划
提交任务
在TaskManager中看到,server110这个节点Free slots为0,说明任务在这个节点上运行
因为代码写的直接把结果输出到控制台,所以查看server110的stdout,可以看到对应的输出
[root@server110 flink-1.13.2]# bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 39167) on host server110. Stopping taskexecutor daemon (pid: 6299) on host server111. Stopping taskexecutor daemon (pid: 31624) on host server112. Stopping standalonesession daemon (pid: 38853) on host server110.Flink on Yarn
用yarn管理flink任务,需要导一个jar包
flink-shaded-hadoop3-uber-blink-3.7.0.jar
这个jar包我是用maven下载的,直接下载不下来
同步到其他机器com.alibaba.blink flink-shaded-hadoop3-uber blink-3.7.0
[root@server110 flink-1.13.2]# scp lib/flink-shaded-hadoop3-uber-blink-3.7.0.jar server111:/opt/modules/flink-1.13.2/lib/ [root@server110 flink-1.13.2]# scp lib/flink-shaded-hadoop3-uber-blink-3.7.0.jar server112:/opt/modules/flink-1.13.2/lib/提交任务
使用Flink on yarn 模式启动任务,可以直接在yarn管理界面查看到任务http://server111:8088/
点击右方的Tracking UI,可直接跳转到Flink管理界面,
在Running Jobs里可以看到当前正在运行的job,
在TaskManagers里可以看到当前运行的job的运行情况,因为代码直接写的打印,所以最终结果打印在Stdout 里
点击Cancel job,任务停止,刷新flink管理界面自动跳转到yarn管理界面,状态为killed
提交任务脚本,不需要启动start-cluster,直接执行flink run就行
[root@server110 flink-1.13.2]# bin/flink run -m yarn-cluster -c com.z.WordCount /opt/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
yarn管理界面
点击Tracking UI , 直接跳转到flink管理界面
查看TaskMamager,yarn管理,只启动需要的节点,所以只有一个节点正在运行
查看输出结果
取消任务
返回yarn管理界面,任务状态killed,Tracking UI也没了
测试结束
安装完成 附件:kafka生产者需要手动输入数据,才能看到效果
[root@server111 kafka_2.11-2.4.1]# bin/kafka-console-producer.sh --broker-list server112:9092 --topic test-topic >hello world >hello sara >hello scala >hello flink >附件:WordCount 代码
1.13.2 2.11
package com.z
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object WordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度1
env.setParallelism(1)
//topic
val topic = "test-topic"
//kafka的配置信息
val prop = new Properties()
prop.setProperty("bootstrap.servers", "server110:9092,server111:9092,server112:9092")
prop.setProperty("group.id", "test-group")
//创建kafka数据源
val kafka = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//添加kafka数据源
val inputStream = env.addSource(kafka)
val resultStream = inputStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
//打印结果
resultStream.print()
//阻塞进程,一直等待数据
env.execute()
}
}



