批处理:批处理任务失败,支持replay重跑就可
流处理:需要机制能够保证任务出错或数据出错时能够保证数据是正确有效的。
数据出现故障了(计算错误),还能进行恢复并重新计算,保证数据有效处理一次.
在计算时:比如1+2+3+4+,在+5的时候报错,恢复后重新从+5开始计算,所以我们需要把中间结果通过checkpoint存起来在写出端,出现写入错误,重写时会出现重复,需要幂等写入或事务机制
数据处理语义的分类
- At-least-once 至少一次语义,允许重复
- At-Most-once 至多一次语义,不允许重复,可以丢失
- Exactly-once 精确一致性 ,数据被不多不少有效的处理一次
处理仅一次语义的方式(at least once)
-
at least once + 去重
-
at least once + 幂等
幂等:处理数据的次数和处理数据本身不影响。 比如 1 ^ N = 1,
幂等性实现如下2种
①插入数据时,进行upsert操作,当前key存在就update更新 ,否则就插入
② Hbase、redis 根据 key插入时,不考虑版本,基于key,value只有一个
- 分布式快照 (checkpoint) + 事务机制(二段提交)
端对端的仅一次实现分布式快照: checkpoint将 读入source,处理 transformation,写出sink等中间结果都保存在checkpoint
事务机制: 预提交+正式提交(回滚)
事务机制(二段提交),先预提交到底层的预写文件中事务成功了,正式提交,提交失败了就回滚.目前,oracle0.11+,kafka,mysql都支持事务机制
source
记录每次消费的位置,将 offset 保存到指定位置 checkpoint 中
transformation
算子的状态要保存到 checkpoint 中,定期保存,如果程序出现bug,从checkpoint中快速恢复,恢复到之前的最新一次快照。
sink
Flink-kafka的端对端的仅一次语义事务机制 Flink借鉴了数据库中的事务处理技术,同时结合自身的Checkpoint机制来保证Sink只对外部输出产生一次影响。
Flink 事务写提供两个方式(具体实现方式) 预写日志(Write-Ahead-Log,WAL)和 两阶段提交(Two-Phase-Commit,2PC) ;
两种方式区别:
WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的Exactly-Once,因为WAL预习日志会先写内存,而内存是易失介质。
如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。
事务写入缺点:事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。但输出数据不再是实时写入到外部系统,而
是分批次地提交。
幂等写入
幂等写操作是指:任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
比如,重复向一个HashMap里插入同一个Key-Value二元对,第一次插入时这个HashMap发生变化,后续的插入操作不会改变HashMap的结果,这就是一个幂等写操作。
Hbase、Redis和Cassandra这样的KV数据库一般经常用来作为Sink,用以实现端到端的Exactly-Once。
注意:并不是说一个KV数据库就百分百支持幂等写。幂等写对KV对有要求,那就是Key-Value必须是可确定性(Deterministic)计算的。假如我们设计的Key是:name+ curTimestamp,每次执行数据重发时,生成的Key都不相同,会产生多次结果,整个操作不是幂等的。因此,为了追求端到端的Exactly-Once,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型
需求 从kafka中读取数据并写入到 kafka ,模拟数据出错,快速恢复并保证数据的端对端仅一次语义
需要外部系统支持事务机制 比如kafka 0.11+ ,普通数据库基本都支持事务机制
- Flink 处理数据到 kafka 或 数据库,需要 TwoPhaseCommitSinkFunction 二段提交,完成四个方法的重写
beginTransaction 开启事务,创建临时的文件,数据库的文件
precommit 将结果数据写入到这个临时文件
commit将临时文件移动到数据库中的文件中
abort 如果当前写入失败,直接删除临时文件
端对端的仅一次语义 Flink 实现方式 :checkpoint + 二段提交
代码实现步骤
1.checkpoint
2.支持事务机制,sink 保证 Semantic.Exactly-once ,设置参数为 事务的超时时间
//todo 获取流执行环境 //todo 设置chk 1s 状态后端到hdfs或本地file //todo 设置chk属性配置,仅一次模式、超时、并行、容忍、最小间隔、取消任务保存chk //todo 设置重启策略 3次,10s间隔 //todo 配置kafka consumer 属性:服务器、消费组、重置从最新、自动发现分区 //todo 设置consumer设置从最新的读取 //todo 设置提交offset到chk //todo 添加kafka数据源 //todo 切分单词并记1。遍历每个单词中,随机从0~4中给一个值,如果该值大于3就模拟异常bug,将[单词,1]收集 //todo 对数据流进行分组、聚合 //todo 对最终word和count进行map映射成 word:::count //todo 设置写到kafka的属性 服务器和事务超时时间 5s //todo 创建 FlinkKafkaProducer(底层实现了二段提交TwoPhaseCommitSinkFunction) //todo 将 producer 添加到sink //todo 执行流环境
准备: linux 开启kafka的生产者和消费者
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic words_output bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic words_input
详情代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
import java.util.Random;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerbase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
public class KafkaToKafkaExactlyOnce {
public static void main(String[] args) throws Exception {
//todo 获取流执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//todo 设置chk 1s 状态后端到hdfs或本地file
env.enableCheckpointing(1000);
CheckpointConfig config = env.getCheckpointConfig();
//config.setCheckpointStorage("hdfs://node1:8020/flink-checkpoints");
config.setCheckpointStorage("file:///d:/chk-02");
//todo 设置chk属性配置,仅一次模式、超时、并行、容忍、最小间隔、取消任务保存chk
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointTimeout(1000 * 60);
config.setMaxConcurrentCheckpoints(1);
config.setTolerableCheckpointFailureNumber(10);
config.setMinPauseBetweenCheckpoints(500);
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//todo 设置重启策略 3次,10s间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2 * 1000));
//todo 配置kafka consumer 属性:服务器、消费组、重置从最新、自动发现分区
Properties consumer_props = new Properties();
consumer_props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
//设置消费者组
consumer_props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "_consumer_word_input_");
//如果当前失败从哪里消费
consumer_props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//分区自动发现
consumer_props.setProperty(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 60 * 1000 + "");
//创建 FlinkKafkaConsumer
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
"words_input",
new SimpleStringSchema(),
consumer_props
);
//todo 设置consumer设置从最新的读取
consumer.setStartFromLatest();
//todo 设置提交offset到chk
consumer.setCommitOffsetsOnCheckpoints(true);
//todo 添加kafka数据源
DataStreamSource source = env.addSource(consumer);
//todo 切分单词并记1。遍历每个单词中,随机从0~4中给一个值,如果该值大于3就模拟异常bug,将[单词,1]收集
//通过 , 来分割字符串 hello,world
SingleOutputStreamOperator> flatMapStream = source
.flatMap(new FlatMapFunction>() {
Random rm = new Random();
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(",");
//生成一个 5 以内的随机数
int idx = rm.nextInt(5);
System.out.println("当前随机值是:" + idx);
for (String word : words) {
if (idx > 3) {
//抛出异常,模拟bug
throw new RuntimeException("除零错误,程序bug");
}
out.collect(Tuple2.of(word, 1));
}
}
});
//todo 对数据流进行分组、聚合flatMapStream
SingleOutputStreamOperator result = flatMapStream.keyBy(t -> t.f0)
.sum(1)
//todo 对最终word和count进行map映射成 word:::count
.map(new MapFunction, String>() {
@Override
public String map(Tuple2 value) throws Exception {
return value.f0 + ":::" + value.f1;
}
});
//将结果数据落地kafka
Properties producer_props = new Properties();
//todo 设置写到kafka的属性 服务器和事务超时时间 5s
producer_props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
producer_props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 1000 + "");
//todo 将 producer 添加到sink
//todo 创建 FlinkKafkaProducer
result.addSink(new FlinkKafkaProducer(
"words_output",
new KafkaSerializationSchema() {
@Override
public ProducerRecord serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord(
"words_output",
element.getBytes()
);
}
},
producer_props,
FlinkKafkaProducer.Semantic.AT_LEAST_onCE
));
//todo 执行流环境
env.execute();
}
}
Flink-mysql实现端对端的仅一次语义(二段提交)
需求 从socket中读取数据将数据进行wordcount处理,使用二段提交的方式将数据保存到数据库中,如果出现错误,九江数据回滚到之前最新的latest的状态重新提交
代码步骤
//todo 创建流执行环境、设置并行度
//todo 开启chk 5s 开启chk状态后端为 hdfs或file
//todo 设置chk属性 超时时间、最大并行chk、chkMode、最小间隔时间
//todo 开启socket数据源
//todo wordcount求和生成 Tuple2
//todo 将数据实时写入到MySQL通过二段提交方式
//todo 执行流环境
//todo 继承TwoPhaseCommitSinkFunction, ConnectionState, Void>
//todo 构造方法 super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
//todo 重写invoke方法
//通过state获取连接,通过连接获取预编译状态SQL
//"INSERT INTO t_wordcount (word, counts) VALUES (?, ?) on"
//设置每个参数
//执行更新并关闭
//手动制造异常,如果当前f0=hive 抛个 1/0 错误。
//todo 重写 beginTransaction 方法
//设置驱动类,创建连接,设置连接不自动提交,返回连接状态
//todo 重写 preCommit 方法 给个提示
//todo 重写 commit 方法
//获取连接,执行提交和连接关闭
//todo 重写 abort 方法
//获取连接,回滚并关闭连接
//todo 静态内部类 ConnectionState
//创建变量 Connection,构造方法传参
准备
//开启9999端口 nc -lk 9999 //建库建表 create database mysql; use mysql; create table t_wordcount( word char(10) not null, counts tinyint not null );
详情代码
public class SocketToMysqlExactlyOnce {
//生成一个 Logger
private static Logger Logger = LoggerFactory.getLogger(SocketToMysqlExactlyOnce.class.getSimpleName());
public static void main(String[] args) throws Exception {
//todo 创建流执行环境、设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//todo 开启chk 5s 开启chk状态后端为 hdfs或file
env.enableCheckpointing(5000L);
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointStorage("file:///d:/chk-03");
//todo 设置chk属性 超时时间、最大并行chk、chkMode、最小间隔时间
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
//任务被取消是否保留checkpoint
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//todo 开启socket数据源
DataStreamSource source = env.socketTextStream("node1", 9999);
//todo wordcount求和生成 Tuple2
SingleOutputStreamOperator> result = source.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0)
.sum(1);
//todo 将数据实时写入到MySQL通过二段提交方式
result.addSink(new MyTwoPhaseCommitsink());
//todo 执行流环境
env.execute();
}
//todo 继承TwoPhaseCommitSinkFunction, ConnectionState, Void>
private static class MyTwoPhaseCommitsink extends TwoPhaseCommitSinkFunction,
ConnectionState, Void> {
public MyTwoPhaseCommitsink() {
//todo 构造方法 super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer
// .INSTANCE);
super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
}
public MyTwoPhaseCommitsink(TypeSerializer transactionSerializer,
TypeSerializer contextSerializer) {
super(transactionSerializer, contextSerializer);
}
//todo 重写 beginTransaction 方法
@Override
protected ConnectionState beginTransaction() throws Exception {
//设置驱动类,创建连接,设置连接不自动提交,返回连接状态
Class.forName("com.mysql.jdbc.Driver");
//创建连接
Connection conn = DriverManager.getConnection(
//mysql表示数据库名称,
"jdbc:mysql://node1:3306/mysql?useSSL=false&charactorEncoding=utf-8",
"root",
"123456"
);
//将当前连接的事务自动提交修改位手动提交
conn.setAutoCommit(false);
//将生成的连接放到连接池里
ConnectionState connectionState = new ConnectionState(conn);
return connectionState;
}
//todo 重写invoke方法
@Override
protected void invoke(ConnectionState transaction, Tuple2 value,
Context context) throws Exception {
//通过state获取连接,通过连接获取预编译状态SQL
Connection conn = transaction.conn;
//定义写入数据库的 sql
String sql = "insert into t_wordcount(word,counts) value(?,?) on duplicate key update counts=?";
//生成 statement
PreparedStatement ps = conn.prepareStatement(sql);
//"INSERT INTO t_wordcount (word, counts) VALUES (?, ?) on"
//设置每个参数
ps.setString(1, value.f0);
ps.setInt(2, value.f1);
ps.setInt(3, value.f1);
//执行更新并关闭
ps.executeUpdate();
//手动制造异常,如果当前f0=hive 抛个 1/0 错误。
if (value.f0.equalsIgnoreCase("error")) {
throw new RuntimeException("程序出现 bug,请检查!");
}
}
//todo 重写 preCommit 方法 给个提示
@Override
protected void preCommit(ConnectionState connectionState) throws Exception {
Logger.info("当前程序被预提交");
}
//todo 重写 commit 方法
@Override
protected void commit(ConnectionState transaction) {
//获取连接,执行提交和连接关闭
Connection conn = transaction.conn;
//将当前的事务提交并关闭连接
try {
conn.commit();
if (!conn.isClosed()) {
conn.commit();
}
} catch (SQLException e) {
Logger.error("当前提交数据异常" + e.getSQLState() + e.getMessage());
}
}
//todo 重写 abort 方法
//获取连接,回滚并关闭连接
@Override
protected void abort(ConnectionState transaction) {
//获取连接,回滚并关闭连接
Connection conn = transaction.conn;
//当前程序回滚
try {
conn.rollback();
} catch (SQLException e) {
Logger.error("当前程序回滚!");
}finally {
try {
if (!conn.isClosed()){
conn.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
//todo 静态内部类 ConnectionState
//创建变量 Connection,构造方法传参
*/
/*
static class ConnectionState {
//创建连接mysql的连接对象
//为了保证连接不被序列化,加一个 关键字 transient
//valatile : 场景是多线程读取,保证读取直接修改内存,而不是修改CPU的缓存
private transient Connection conn;
//将当前连接的对象进行 构造器 实例化
public ConnectionState(Connection _conn) {
this.conn = _conn;
}
}
}



