local 本地测试
:idea运行
flink集群搭建
1、standallone cluster集群搭建1)、准备工作 配置JAVA_HOME 免密钥 2)、上传解压 tar -xvf flink-1.11.0-bin-scala_2.11.tgz 配置环境变量 3)、修改配置文件 vim conf/flink-conf.yaml jobmanager.rpc.address: master 主节点ip地址 vim conf/workers 增加从节点 node1 node2 vim conf/masters 改成主节点ip ```bash 同步到所有节点 scp -r flink-1.11.0/ node1:`pwd` 4)、启动集群 bin/start-cluster.sh http://master:8081 访问web界面
## 2、standallone cluster提交任务方式 1)、在web页面提交任务 2)、同flink命令提交任务 flink run -c com.shujia.core.Demo01WordCount flinkproject-1.0-SNAPSHOT.jar 3)、rpc方式提交任务 # 3、部署flink on yarn ```bash 只需要部署一个节点 (1)、配置HADOOP_CONF_DIR vim /etc/profile export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/ (2)、将hadoop依赖jar上传到flink lib目录 flink-shaded-hadoop-2-uber-2.6.5-10.0 flink和spark一样都是粗粒度资源申请4、flink yarn的启动方式 (提交任务2种)
1)、yarn-session 在yarn里面启动一个flink集群 jobManager
先启动hadoop
yarn-session.sh -jm 1024m -tm 1096m (jobManager和taskManager的内存大小)
提交任务 任务提交的是根据并行度动态申请taskmanager a、在web页面提交任务 b、同flink命令提交任务 flink run -c com.shujia.core.Demo01WordCount flinkproject-1.0-SNAPSHOT.jar c、rpc方式提交任务
2)、直接提交任务到yarn 每一个任务都会有一个jobManager
flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c com.shujia.core.
杀掉yarn上的任务 yarn application -kill application_1599820991153_0005(资源需要足够才可以执行)
3)2种yarn提交方式的区别
yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager (提交任务更快,任务之间共享jobmanager , 相互有影响)
直接提交任务模型,为每一个任务启动一个joibmanager (每一个任务独立jobmanager , 任务运行稳定)
--远程RPC提交
object Demo01distances {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment
.createRemoteEnvironment("master"
, 37851
, "D:\Program Files\IDEA\IdeaProject\shujiabigdata\flinkproject\target\flinkproject-1.0-SNAPSHOT.jar")
//设置并行度,不设置的话,不同数据在不同task中,12核就是12个数字前面
//数据被分到不同的核中计算数据
env.setParallelism(2)
//读取scoket的数据
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
//将数据拆开
val wordsDS: DataStream[String] = lineDS.flatMap(_.split(","))
//将数据后面个数设置为1
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//使用KeyBy进行分组,将不同的数据放在不同的reduce task中
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
//求和,对第二个元素进行求和,传入下标,按下标的数据进行分,下标从0开始
//有状态算子
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
//打印输出
countDS.print()
//启动,执行流,7*24小时执行任务
env.execute()
}
}



