栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink中State管理与恢复之状态后端Backend案例

Flink中State管理与恢复之状态后端Backend案例

设置 HDFS 文件系统的状态后端,取消 Job 之后再次恢复 Job。

package state

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object TestCheckPointByHDFS {
  //使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cansol,在重新启动,看看状态是否是连续的
  def main(args: Array[String]): Unit = {
    //1.初始化Flink流计算的环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //开启CheckPoint并设置一些参数
    environment.enableCheckpointing(5000)//每个5秒开启一次CheckPoint
    environment.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1"))//存放检查点数据
    environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    environment.getCheckpointConfig.setCheckpointTimeout(5000)
    environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//终止job保留检查点数据

    //修改并行度
    environment.setParallelism(1)
    //2.导入隐式转换
    import org.apache.flink.streaming.api.scala._
    //3.读取数据,读取sock流中的数据,DataStream => 相当于spark中的DStream
    val stream: DataStream[String] = environment.socketTextStream("node1", 8888)
    //4.转换和处理数据
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0) //分组算子,0 或者 1 代表下标,前面的DataStream[二元组],0=>代表单词 1=>代表出现的次数
      .sum(1) //聚合累加算子
    //5.打印结果
    result.print("结果")
    //6.启动流计算程序
    environment.execute("wordCount")
  }
}

打包在服务器上执行

查看执行结果,可以看出 flink 出现三


取消 Job,可以看到 Job 已经停止

查看 HDFS 目录上的状态文件

重启任务,再次输入两个flink单词

查看结果,flink单词统计为5

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687307.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号