栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

11.2.8、flinkj核心

11.2.8、flinkj核心

1、kafka消费端

唯一一次Exactly once(由消费者维护自己读取的偏移量)
Flink 分布式快照保存数据计算的状态和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量

2、flink读取kafka数据,统计数据个数(聚合运算)(这时第三种的 exactly once的了)

–读取kafka的数据(6,5,4,| 3,2,1),flink的source端做checkpoint安装挡板,做keyby,计算个数
—假设挡板在3和4之间,读取5的时候任务失败了,恢复后数据还是恢复到3这个checkpoint,会再次读取4,5…,所以虽然读取了两次的4,5数据,但是结果个数没有变
—chenkpoint的时候会将消费偏移量和计算结果保存到hdfs上
—运行注意依赖
—由消费者维护自己读取的偏移量,读取到哪个数据在checkoint中,恢复不会丢失
—做了checkpoint只会,每次只会从checkpoing位置开始计算数据

object Demo04flinkonKafkaExactlyOnce {
  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(20000)
    // 高级选项:
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    val config: CheckpointConfig = env.getCheckpointConfig
    //任务失败后自动保留最新的checkpoint文件
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置状态后端,保存状态的位置
    val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
    env.setStateBackend(stateBackend)
    
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "mdsgfgf")
    //创建kafka的消费者
    val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema(), properties)
    //读取最新的数据
    flinkKafkaConsumer.setStartFromLatest()
    val lineDS: DataStream[String] = env.addSource(flinkKafkaConsumer)
    //统计单词个数
    lineDS.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()
    env.execute()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701019.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号