栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据平台实时数仓从0到1搭建之 - 09 Flink 安装配置

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据平台实时数仓从0到1搭建之 - 09 Flink 安装配置

大数据平台实时数仓从0到1搭建之 - 09 Flink 安装配置
  • 概述
  • Flink
    • 集群规划
    • 安装配置
    • 文件同步
    • Standalone模式集群启动
    • web UI
    • 测试
    • 关闭集群
  • Flink on Yarn
    • 同步到其他机器
    • 提交任务
  • 安装完成
  • 附件:kafka生产者
  • 附件:WordCount 代码

概述 Flink 集群规划
server110server111server112
FlinkJobManager
TaskManager
TaskManagerTaskManager
安装配置
[root@server110 software]# tar -xzvf flink-1.13.2-bin-scala_2.11.tgz -C /opt/modules/
[root@server110 modules]# cd flink-1.13.2/conf/
#配置文件
[root@server110 conf]# vim flink-conf.yaml
jobmanager.rpc.address: server110
#配置jobManager
[root@server110 conf]# vim masters
server110:8081
#配置taskManager
[root@server110 conf]# vim workers
server110
server111
server112
文件同步
[root@server110 modules]# scp -r flink-1.13.2/ server111:/opt/modules/
[root@server110 modules]# scp -r flink-1.13.2/ server112:/opt/modules/
Standalone模式集群启动
[root@server110 flink-1.13.2]# bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host server110.
Starting taskexecutor daemon on host server110.
Starting taskexecutor daemon on host server111.
Starting taskexecutor daemon on host server112.
[root@server110 flink-1.13.2]# /opt/jpsall.sh 
----------------jps server110 --------------------
31744 QuorumPeerMain
34785 StandaloneSessionClusterEntrypoint
35108 TaskManagerRunner
32182 Kafka
17387 NameNode
17916 NodeManager
17549 DataNode
35215 Jps
----------------jps server111 --------------------
3504 TaskManagerRunner
17265 DataNode
17540 NodeManager
532 QuorumPeerMain
17383 ResourceManager
18029 JobHistoryServer
3582 Jps
975 Kafka
----------------jps server112 --------------------
26338 QuorumPeerMain
17207 DataNode
17289 SecondaryNameNode
26765 Kafka
29565 Jps
29486 TaskManagerRunner
17407 NodeManager
web UI

http://server110:8081/
Flink自带的web管理界面
三个节点正常启动


测试

web界面直接提交任务,
Entry Class : 有main方法的入口类名,带包名
Parallelism : 并行度


填写好对应的入口类,并行度为1

查看执行计划

提交任务

在TaskManager中看到,server110这个节点Free slots为0,说明任务在这个节点上运行

因为代码写的直接把结果输出到控制台,所以查看server110的stdout,可以看到对应的输出

关闭集群
[root@server110 flink-1.13.2]# bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 39167) on host server110.
Stopping taskexecutor daemon (pid: 6299) on host server111.
Stopping taskexecutor daemon (pid: 31624) on host server112.
Stopping standalonesession daemon (pid: 38853) on host server110.
Flink on Yarn

用yarn管理flink任务,需要导一个jar包
flink-shaded-hadoop3-uber-blink-3.7.0.jar
这个jar包我是用maven下载的,直接下载不下来


    com.alibaba.blink
    flink-shaded-hadoop3-uber
    blink-3.7.0

同步到其他机器
[root@server110 flink-1.13.2]# scp lib/flink-shaded-hadoop3-uber-blink-3.7.0.jar server111:/opt/modules/flink-1.13.2/lib/
[root@server110 flink-1.13.2]# scp lib/flink-shaded-hadoop3-uber-blink-3.7.0.jar server112:/opt/modules/flink-1.13.2/lib/
提交任务

使用Flink on yarn 模式启动任务,可以直接在yarn管理界面查看到任务http://server111:8088/
点击右方的Tracking UI,可直接跳转到Flink管理界面,
在Running Jobs里可以看到当前正在运行的job,
在TaskManagers里可以看到当前运行的job的运行情况,因为代码直接写的打印,所以最终结果打印在Stdout 里
点击Cancel job,任务停止,刷新flink管理界面自动跳转到yarn管理界面,状态为killed

提交任务脚本,不需要启动start-cluster,直接执行flink run就行

[root@server110 flink-1.13.2]# bin/flink run -m yarn-cluster -c com.z.WordCount /opt/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

yarn管理界面

点击Tracking UI , 直接跳转到flink管理界面

查看TaskMamager,yarn管理,只启动需要的节点,所以只有一个节点正在运行

查看输出结果

取消任务

返回yarn管理界面,任务状态killed,Tracking UI也没了

测试结束

安装完成 附件:kafka生产者

需要手动输入数据,才能看到效果

[root@server111 kafka_2.11-2.4.1]# bin/kafka-console-producer.sh --broker-list server112:9092 --topic test-topic
>hello world
>hello sara
>hello scala
>hello flink
>

附件:WordCount 代码
1.13.2
2.11
package com.z

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties


object WordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度1
    env.setParallelism(1)

    //topic
    val topic = "test-topic"
    //kafka的配置信息
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "server110:9092,server111:9092,server112:9092")
    prop.setProperty("group.id", "test-group")

    //创建kafka数据源
    val kafka = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

    //添加kafka数据源
    val inputStream = env.addSource(kafka)
    val resultStream = inputStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    //打印结果
    resultStream.print()
    //阻塞进程,一直等待数据
    env.execute()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/287823.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号