唯一一次Exactly once(由消费者维护自己读取的偏移量)
Flink 分布式快照保存数据计算的状态和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量
–读取kafka的数据(6,5,4,| 3,2,1),flink的source端做checkpoint安装挡板,做keyby,计算个数
—假设挡板在3和4之间,读取5的时候任务失败了,恢复后数据还是恢复到3这个checkpoint,会再次读取4,5…,所以虽然读取了两次的4,5数据,但是结果个数没有变
—chenkpoint的时候会将消费偏移量和计算结果保存到hdfs上
—运行注意依赖
—由消费者维护自己读取的偏移量,读取到哪个数据在checkoint中,恢复不会丢失
—做了checkpoint只会,每次只会从checkpoing位置开始计算数据
object Demo04flinkonKafkaExactlyOnce {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(20000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
val config: CheckpointConfig = env.getCheckpointConfig
//任务失败后自动保留最新的checkpoint文件
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态后端,保存状态的位置
val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
env.setStateBackend(stateBackend)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "mdsgfgf")
//创建kafka的消费者
val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema(), properties)
//读取最新的数据
flinkKafkaConsumer.setStartFromLatest()
val lineDS: DataStream[String] = env.addSource(flinkKafkaConsumer)
//统计单词个数
lineDS.flatMap(_.split(","))
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print()
env.execute()
}
}



