栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

1 flink基础

1 flink基础

文章目录
  • flink基础
    • 一、flink简介
      • 1.flink是什么
      • 2.为什么要用flink
      • 3.流处理的发展和演变
      • 4.flink特点
    • 二、flink快速上手
      • 1.pom文件依赖于插件
      • 2.批处理wordcount
      • 3.流处理wordcount
    • 三、flink部署
      • 1.standalone模式
        • 1.1 安装
        • 1.2 web页面任务提交
        • 1.3 命令任务提交
      • 2.yarn模式
        • 2.1 Session-cluster 模式:
        • 2.2 Per-Job-Cluster 模式:
      • 3.kubernetes部署
    • 四、flink运行架构
      • 1.Flink 运行时的组件
        • 1.1 作业管理器(JobManager)
        • 1.2 任务管理器(TaskManager)
        • 1.3 资源管理器(ResourceManager)
        • 1.4 分发器(Dispatcher)
      • 2.任务提交流程
      • 3.任务调度原理
        • 3.1 TaskManger 与 Slots
        • 3.2 并行度(Parallelism)
          • 并行度案例
        • 3.3 程序与数据流(DataFlow)

flink基础 一、flink简介 1.flink是什么

2.为什么要用flink

3.流处理的发展和演变

第一代

第二代

第三代

4.flink特点






二、flink快速上手 1.pom文件依赖于插件
    
        
            org.apache.flink
            flink-scala_2.12
            1.10.1 
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.10.1
        
    
    
        
            
            
                net.alchim31.maven
                scala-maven-plugin
                4.4.0
                
                
                    
                    
                        compile
                    
                
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.3.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    
2.批处理wordcount
package com.scy.wc

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
//批处理的word count
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理执行环境--上下文
    val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //从文件中读取数据
    val inputpath: String="E:\FlinkTutorial\src\main\resources\hello.txt"
    val inputDataSet: DataSet[String]= env.readTextFile(inputpath)

    //对数据进行转换处理统计,先分词,再安装word进行分组,最后进行聚合统计
    val resultDataSet: DataSet[(String,Int)] = inputDataSet
      .flatMap(_.split(" "))
      .map((_,1))
      .groupBy(0)  // 以第一个元素作为key进行分组
      .sum(1)    // 对所有数据的第二个元素求和
    //打印输出
    resultDataSet.print()
  }
}
3.流处理wordcount
package com.scy.wc

//import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 下面范围包含上面包范围,所以可以省略此行
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //创建流处理的执行环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setParallelism(2) 设置并行度

    //从外部命令提取参数,作为socket主机名和端口号
    val paramTool:ParameterTool = ParameterTool.fromArgs(args)
    val host: String = paramTool.get("host")
    val port: Int = paramTool.getInt("port")

    //接受一个socket文本流
    val inputDataStream:DataStream[String] = env.socketTextStream(host,port) // 进行转换处理统计
    val resultDataStream:DataStream[(String,Int)] = inputDataStream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_,1))//.setParallelism(2)可以在每个算子后面进行并行度设置
      .keyBy(0) //根据key的hashcode进行分组
      .sum(1)

    resultDataStream.print()

    //启动任务执行
    env.execute("stream word count")
    //线程序号默认根据服务器核心数
  }
}
三、flink部署 1.standalone模式 1.1 安装
解压缩 flink-1.10.1-bin-scala_2.12.tgz,进入 conf 目录中。
1)修改 flink/conf/flink-conf.yaml 文件:
		jobmanager.rpc.address: node4  指定任务管理调度主机
2)修改 /conf/slaves 文件:node1...
3)分发给集群中的机子:
4)启动:[scy@node1 flink-1.10.1]$ bin/start-cluster.sh 
访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。

1.2 web页面任务提交

将流计算代码打成jar包

然后上传至集群提交

task manager上显示真正执行的任务日期

1.3 命令任务提交
## 提交
[scy@node6 flink-1.10.1]$ bin/flink run -c com.scy.wc.StreamWordCount -p 2 /opt/software/FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host 10.1.59.65 --port 7777

## 取消任务
[scy@node6 flink-1.10.1]$ bin/flink list   查看运行的任务
[scy@node6 flink-1.10.1]$ bin/flink cancel  c9aaa0285d058a14a7c7fa0c0878d106  停止该任务

查看所有历史的任务 bin/flink list -a

2.yarn模式

以 Yarn 模式部署 Flink 任务时,要求 Flink 是有 Hadoop 支持的版本,Hadoop 环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。

2.1 Session-cluster 模式:


1.启动 yarn-session

[scy@node6 flink-1.10.1]$ ./bin/yarn-session.sh -jm 1024m -tm 1024m -rm testflink -d```

2.执行任务

```bash
bin/flink run -c com.scy.wc.StreamWordCount -p 2 /opt/software/FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host 10.1.59.65 --port 7777

3.去 yarn 控制台查看任务状态

  1. 取消 yarn-session
[scy@node1 lib]$ yarn application --kill application_1638411164084_3754
2.2 Per-Job-Cluster 模式:


不启动 yarn-session,直接执行 job

3.kubernetes部署 四、flink运行架构 1.Flink 运行时的组件

1.1 作业管理器(JobManager)

1.2 任务管理器(TaskManager)

1.3 资源管理器(ResourceManager)

1.4 分发器(Dispatcher)

2.任务提交流程

Flink 集群部署到 YARN 上,那么就会有如下的提交流程

3.任务调度原理

3.1 TaskManger 与 Slots


3.2 并行度(Parallelism)


并行度案例


3.3 程序与数据流(DataFlow)


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654363.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号