唯一一次Exactly once,两步提交
在checkpoint之前开启事务,checkpoint之后提交事务
–可能造成数据读取显示两次重复(类似kafka的生产者了)
—读取kafka的数据(6,5,4,| 3,2,1),flink的source端做checkpoint安装挡板,写回kafka
—假设挡板在3和4之间,读取5的时候任务失败了,恢复后数据还是恢复到3这个,但是4,5之前会写入kafka中了,还会写一次
—checkpoint之前开启事务,之后提交事务
—使用两步提交,flink消费kafka数据又checkpoint可以直接恢复
—flink数据写回kafka中,读取需要使用只读提交的数据–isolation-level
read_committed
object Demo05flinkonKafkaExactlyOnce {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(20000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
val config: CheckpointConfig = env.getCheckpointConfig
//任务失败后自动保留最新的checkpoint文件
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态后端,保存状态的位置
val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
env.setStateBackend(stateBackend)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "mdsgfgf")
//创建kafka的消费者
val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema(), properties)
//读取最新的数据
flinkKafkaConsumer.setStartFromLatest()
val lineDS: DataStream[String] = env.addSource(flinkKafkaConsumer)
//统计单词个数
val wordDS: DataStream[String] = lineDS.flatMap(_.split(","))
// val myProducer = new FlinkKafkaProducer[String](
// "master:9092", //broker集群
// "test2", // 目标 topic
// new SimpleStringSchema)
val properties1: Properties = new Properties()
properties1.setProperty("bootstrap.servers", "master:9092")
//事务超时时间
properties1.setProperty("transaction.timeout.ms", 5 * 60 * 1000 + "")
val myProducer = new FlinkKafkaProducer[String](
"test2", // 目标 topic
new SimpleStringSchema,
properties1,
null, //分区方法
Semantic.EXACTLY_ONCE, // 唯一一次
5 //produce线程池的大小
) // 序列化 schema
wordDS.addSink(myProducer)
env.execute()
}
}



