栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

11.2.6、flink核心

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

11.2.6、flink核心

1、checkpoink的原理流程

jobmanager定期发起checkpoint,向source task发送触发标记(设置的时间)
—当sourcetask收到这个标记后会在数据流中安装挡板,同时自己会进行checkpoint,会将挡板向下游传递(防止分组前后的checkpoint点不一样)
—下游task收到挡板后进行checkpoint
—当所有的task都处理同一次checkpoint后,一次checkpoint就完成了
—删除旧了的checkpoint,只保留最新一次

2、增量快照导包

    org.apache.flink
    flink-statebackend-rocksdb_2.11
    1.11.2

3、代码实现

开启checkpoint
—任务失败后重新启动需要指定从哪一个checkpoint的路径中恢复任务 hdfs:///master:9000/flink/checkpoint/183606dcf5bcfde823b5a495ca435a04/chk-17
—命令行加一个-s恢复一样 flink run -c com.shujia.state.Demo4Checkpoint -s hdfs://master:9000/flink/checkpoint/8ae8e8bb237063d1c90295b84ae32a17/chk-31
flink-1.0.jar

object Demo04checkpoint {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(10000)

    // 高级选项:

    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    val config: CheckpointConfig = env.getCheckpointConfig
    //任务失败后自动保留最新的checkpoint文件
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置状态后端,保存状态的位置
    //val stateBackend: StateBackend = new FsStateBackend("hdfs://master:9000/flink/checkpoint", true)
    //增量快照
     val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
    env.setStateBackend(stateBackend)
  
    //获取数据
    val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
    //将单词切分
    val wordsDS = lineDS.flatMap(_.split(","))
    //将数据存储kv
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    //将数据分组
    kvDS.keyBy(_._1)
      .sum(1)
      .print()
    env.execute()
    
  }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696470.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号