栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

11.2.1、flink核心

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

11.2.1、flink核心

local 本地测试
:idea运行

flink集群搭建

1、standallone cluster集群搭建
1)、准备工作
	配置JAVA_HOME
	免密钥
2)、上传解压
	tar -xvf flink-1.11.0-bin-scala_2.11.tgz
	配置环境变量

3)、修改配置文件
	vim conf/flink-conf.yaml
		jobmanager.rpc.address: master   主节点ip地址

	vim conf/workers
		增加从节点  node1  node2

	vim conf/masters  
		改成主节点ip

```bash
	同步到所有节点
		scp -r flink-1.11.0/ node1:`pwd`


4)、启动集群
	bin/start-cluster.sh

http://master:8081   访问web界面
## 2、standallone cluster提交任务方式

	1)、在web页面提交任务

	2)、同flink命令提交任务
		flink run -c com.shujia.core.Demo01WordCount flinkproject-1.0-SNAPSHOT.jar

	3)、rpc方式提交任务

# 3、部署flink on yarn

```bash
只需要部署一个节点
(1)、配置HADOOP_CONF_DIR
	vim /etc/profile
	export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/

(2)、将hadoop依赖jar上传到flink  lib目录
flink-shaded-hadoop-2-uber-2.6.5-10.0

flink和spark一样都是粗粒度资源申请
4、flink yarn的启动方式 (提交任务2种)

1)、yarn-session 在yarn里面启动一个flink集群 jobManager
先启动hadoop
yarn-session.sh -jm 1024m -tm 1096m (jobManager和taskManager的内存大小)

提交任务  任务提交的是根据并行度动态申请taskmanager
	a、在web页面提交任务

	b、同flink命令提交任务
		flink run -c com.shujia.core.Demo01WordCount flinkproject-1.0-SNAPSHOT.jar

	c、rpc方式提交任务

2)、直接提交任务到yarn 每一个任务都会有一个jobManager
flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c com.shujia.core.

杀掉yarn上的任务
yarn application -kill application_1599820991153_0005(资源需要足够才可以执行)

3)2种yarn提交方式的区别
yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager (提交任务更快,任务之间共享jobmanager , 相互有影响)
直接提交任务模型,为每一个任务启动一个joibmanager (每一个任务独立jobmanager , 任务运行稳定)

--远程RPC提交
object Demo01distances {

  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment
      .createRemoteEnvironment("master"
        , 37851
        , "D:\Program Files\IDEA\IdeaProject\shujiabigdata\flinkproject\target\flinkproject-1.0-SNAPSHOT.jar")

    
    //设置并行度,不设置的话,不同数据在不同task中,12核就是12个数字前面
    //数据被分到不同的核中计算数据
    env.setParallelism(2)
    //读取scoket的数据
    val lineDS: DataStream[String] = env.socketTextStream("master", 8888)

    //将数据拆开
    val wordsDS: DataStream[String] = lineDS.flatMap(_.split(","))

    //将数据后面个数设置为1
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    //使用KeyBy进行分组,将不同的数据放在不同的reduce task中
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    //求和,对第二个元素进行求和,传入下标,按下标的数据进行分,下标从0开始
    //有状态算子
    val countDS: DataStream[(String, Int)] = keyByDS.sum(1)

    //打印输出
    countDS.print()

    //启动,执行流,7*24小时执行任务
    env.execute()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/686345.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号