本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
进行源码分析之前,首先看一下Flink DataStream JDBC-Sink的官方示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?, ?, ?, ?, ?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(1, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbmetadata().getUrl())
.withDriverName(getDbmetadata().getDriverClass())
.build()
));
env.execute();
可以看到,addSink传入的是JdbcSink类通过sink方法构建的。
public staticSinkFunction sink( String sql, JdbcStatementBuilder statementBuilder, JdbcConnectionOptions connectionOptions) { // 本质上调用的第2个sink方法 return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions); }
第2个sink方法,可以传入具体的执行配置JdbcExecutionOptions:
public staticSinkFunction sink( String sql, JdbcStatementBuilder statementBuilder, JdbcExecutionOptions executionOptions, JdbcConnectionOptions connectionOptions) { // 创建GenericJdbcSinkFunction return new GenericJdbcSinkFunction<>( // 需要传入JdbcOutputFormat参数 new JdbcOutputFormat<>( // 默认是simple连接器 new SimpleJdbcConnectionProvider(connectionOptions), executionOptions, // 默认是simple批执行器 context -> JdbcBatchStatementExecutor.simple( sql, statementBuilder, Function.identity()), JdbcOutputFormat.RecordExtractor.identity())); }
所以整个方法的传递路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。
我们先分析下JdbcOutputFormat类。
JdbcOutputFormat首先看一下JdbcOutputFormat的构造函数:
public JdbcOutputFormat( @Nonnull JdbcConnectionProvider connectionProvider, @Nonnull JdbcExecutionOptions executionOptions, @Nonnull StatementExecutorFactoryJdbcConnectionProviderstatementExecutorFactory, @Nonnull RecordExtractor recordExtractor) { this.connectionProvider = checkNotNull(connectionProvider); this.executionOptions = checkNotNull(executionOptions); this.statementExecutorFactory = checkNotNull(statementExecutorFactory); this.jdbcRecordExtractor = checkNotNull(recordExtractor); }
第1个参数是JDBC连接的提供接口:
@Internal
public interface JdbcConnectionProvider {
// 返回存在的数据库连接
@Nullable
Connection getConnection();
// 判断连接是否合法
boolean isConnectionValid() throws SQLException;
// 返回已有的数据库连接,若没有,则新建
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
// 关闭数据库连接
void closeConnection();
// 关闭现有连接并创建1个新连接
Connection reestablishConnection() throws SQLException, ClassNotFoundException;
}
Flink提供了1个默认实现SimpleJdbcConnectionProvider。
@NotThreadSafe
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
private static final long serialVersionUID = 1L;
private final JdbcConnectionOptions jdbcOptions;
private transient Driver loadedDriver;
private transient Connection connection;
static {
DriverManager.getDrivers();
}
public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
this.jdbcOptions = jdbcOptions;
}
@Override
public Connection getConnection() {
return connection;
}
@Override
public boolean isConnectionValid() throws SQLException {
return connection != null
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}
private static Driver loadDriver(String driverName)
throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
// 若DriverManager里已加载该Driver,则直接返回
Enumeration drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver.getClass().getName().equals(driverName)) {
return driver;
}
}
// 否则基于反射创建1个新的Driver
Class> clazz =
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
return (Driver) clazz.newInstance();
} catch (Exception ex) {
throw new SQLException("Fail to create driver of class " + driverName, ex);
}
}
private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
if (loadedDriver == null) {
loadedDriver = loadDriver(jdbcOptions.getDriverName());
}
return loadedDriver;
}
@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection != null) {
return connection;
}
if (jdbcOptions.getDriverName() == null) {
// 根据jdbcOptions配置,基于DriverManager创建连接
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {
Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
// 基于Driver创建连接
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
throw new SQLException(
"No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
}
@Override
public void closeConnection() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.warn("JDBC connection close failed.", e);
} finally {
connection = null;
}
}
}
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
closeConnection();
return getOrEstablishConnection();
}
}
JdbcExecutionOptions
第2个主要定义执行的配置。
@PublicEvolving
public class JdbcExecutionOptions implements Serializable {
// 默认重试次数为3
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
// 默认提交时间间隔为0毫秒
private static final int DEFAULT_INTERVAL_MILLIS = 0;
// 默认批次大小为5000
public static final int DEFAULT_SIZE = 5000;
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchIntervalMs = batchIntervalMs;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
}
public long getBatchIntervalMs() {
return batchIntervalMs;
}
public int getBatchSize() {
return batchSize;
}
public int getMaxRetries() {
return maxRetries;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JdbcExecutionOptions that = (JdbcExecutionOptions) o;
return batchIntervalMs == that.batchIntervalMs
&& batchSize == that.batchSize
&& maxRetries == that.maxRetries;
}
@Override
public int hashCode() {
return Objects.hash(batchIntervalMs, batchSize, maxRetries);
}
public static Builder builder() {
return new Builder();
}
public static JdbcExecutionOptions defaults() {
return builder().build();
}
public static final class Builder {
private long intervalMs = DEFAULT_INTERVAL_MILLIS;
private int size = DEFAULT_SIZE;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
public Builder withBatchSize(int size) {
this.size = size;
return this;
}
public Builder withBatchIntervalMs(long intervalMs) {
this.intervalMs = intervalMs;
return this;
}
public Builder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public JdbcExecutionOptions build() {
return new JdbcExecutionOptions(intervalMs, size, maxRetries);
}
}
}
StatementExecutorFactory
第3个参数是预编译对象执行工厂类。
public interface StatementExecutorFactory> extends SerializableFunction {}
其是一个接口,其泛型为JdbcBatchStatementExecutor的子类,并继承了持久化函数接口SerializableFunction
函数接口实现会以RuntimeContext作为输入,然后输出T,即JdbcBatchStatementExecutor的实现类实例。
接着看一下JdbcBatchStatementExecutor是何方神圣?
@Internal public interface JdbcBatchStatementExecutor{ // 根据connection创建预编译对象 void prepareStatements(Connection connection) throws SQLException; void addToBatch(T record) throws SQLException; // 将批次数据提交 void executeBatch() throws SQLException; // 关闭预编译对象 void closeStatements() throws SQLException; // 创建KeyedBatchStatementExecutor static JdbcBatchStatementExecutor keyed( String sql, Function keyExtractor, JdbcStatementBuilder statementBuilder) { return new KeyedBatchStatementExecutor<>(sql, keyExtractor, statementBuilder); } // 创建SimpleBatchStatementExecutor static JdbcBatchStatementExecutor simple( String sql, JdbcStatementBuilder paramSetter, Function valueTransformer) { return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer); } }
JdbcBatchStatementExecutor有如下实现类:
具体实现逻辑也都很简单,就是先将每条record插入到batch缓存中,当batch满了、snapshot时或者定时任务到期时(若有设置),执行executeBatch()方法将批次数据入库。
以SimpleBatchStatementExecutor为例:
class SimpleBatchStatementExecutorimplements JdbcBatchStatementExecutor { private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class); private final String sql; // JdbcStatement的创建函数 private final JdbcStatementBuilder parameterSetter; private final Function valueTransformer; // batch缓存容器 private final List batch; private transient PreparedStatement st; SimpleBatchStatementExecutor( String sql, JdbcStatementBuilder statementBuilder, Function valueTransformer) { this.sql = sql; this.parameterSetter = statementBuilder; this.valueTransformer = valueTransformer; this.batch = new ArrayList<>(); } @Override public void prepareStatements(Connection connection) throws SQLException { // 这里通过传入的sql构建出prepareStatement this.st = connection.prepareStatement(sql); } @Override public void addToBatch(T record) { // 将记录添加到缓存中 batch.add(valueTransformer.apply(record)); } @Override public void executeBatch() throws SQLException { // 将缓存中的数据一次性持久化到数据库中 if (!batch.isEmpty()) { for (V r : batch) { // 调用消费者函数的accept方法构建JdbcStatement parameterSetter.accept(st, r); st.addBatch(); } st.executeBatch(); // 清空缓存,准备下一批次 batch.clear(); } } @Override public void closeStatements() throws SQLException { if (st != null) { st.close(); st = null; } } }
在构建SimpleBatchStatementExecutor时,sql和parameterSetter是最重要的2个输入。
sql是SQL语句,不解释。
parameterSetter是JdbcStatementBuilder函数接口。
@PublicEvolving public interface JdbcStatementBuilderextends BiConsumerWithException , Serializable {}
其继承了Flink的自定义BiConsumerWithException函数式接口:
@FunctionalInterface public interface BiConsumerWithException{ // 消费第1个输入T和第2个输入U。若消费过程中发生异常,抛出异常E void accept(T t, U u) throws E; // 将BiConsumerWithException转化为JDK自带的BiConsumer static BiConsumer unchecked( BiConsumerWithException biConsumerWithException) { return (A a, B b) -> { try { biConsumerWithException.accept(a, b); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; } }
由于是函数式接口,所以外部传入的时候,一般为Lambda表达式或匿名类。
如章节1案例中就是典型的Lambda表达式:
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(1, t.qty);
}
RecordExtractor
第4个参数是记录提取函数接口。
public interface RecordExtractorextends Function , Serializable { // 提供了静态方法,返回输入自身 static RecordExtractor identity() { return x -> x; } }
看完了构造函数,接着看一下JdbcOutputFormat的其他方法实现:
@Internal public class JdbcOutputFormat> extends RichOutputFormat implements Flushable, InputTypeConfigurable { protected final JdbcConnectionProvider connectionProvider; // 输入泛型的序列化器 @Nullable private TypeSerializer serializer; ... @Override public void open(int taskNumber, int numTasks) throws IOException { try { connectionProvider.getOrEstablishConnection(); } catch (Exception e) { throw new IOException("unable to open JDBC writer", e); } jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory); // 如果执行配置里的批处理时间间隔不为0,且批次大小不为1,则创建定时任务线程池,以固定时间间隔强制触发批数据提交 if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay( () -> { synchronized (JdbcOutputFormat.this) { if (!closed) { try { flush(); } catch (Exception e) { flushException = e; } } } }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS); } } ... @Override public final synchronized void writeRecord(In record) throws IOException { checkFlushException(); try { // 如果提供了输入泛型的序列化器,会先执行拷贝,然后再添加到批缓存里面 In recordCopy = copyIfNecessary(record); addToBatch(record, jdbcRecordExtractor.apply(recordCopy)); batchCount++; // 当批缓存中的数据到达设置的阈值时,触发 flush()将缓存数据落盘到数据库中 if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) { flush(); } } catch (Exception e) { throw new IOException("Writing records to JDBC failed.", e); } } private In copyIfNecessary(In record) { // 有持久化器,就执行拷贝 return serializer == null ? record : serializer.copy(record); } protected void addToBatch(In original, JdbcIn extracted) throws SQLException { jdbcStatementExecutor.addToBatch(extracted); } @Override public synchronized void flush() throws IOException { checkFlushException(); // 重试MaxRetries次 for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connectionProvider.isConnectionValid()) { updateExecutor(true); } } catch (Exception exception) { LOG.error( "JDBC connection is not valid, and reestablish connection failed.", exception); throw new IOException("Reestablish JDBC connection failed", exception); } try { // 每次重试之间的间隔不断拉大 // 第1次和第2次间隔0秒,第2次和第3次间隔1秒,第3次和第四次间隔2秒,以此类推 Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException( "unable to flush; interrupted while doing another attempt", e); } } } } protected void attemptFlush() throws SQLException { jdbcStatementExecutor.executeBatch(); } @Override public synchronized void close() { if (!closed) { closed = true; if (this.scheduledFuture != null) { scheduledFuture.cancel(false); this.scheduler.shutdown(); } if (batchCount > 0) { try { flush(); } catch (Exception e) { LOG.warn("Writing records to JDBC failed.", e); throw new RuntimeException("Writing records to JDBC failed.", e); } } try { if (jdbcStatementExecutor != null) { jdbcStatementExecutor.closeStatements(); } } catch (SQLException e) { LOG.warn("Close JDBC writer failed.", e); } } connectionProvider.closeConnection(); checkFlushException(); } public static Builder builder() { return new Builder(); } ...
几个重要点记一下:
若执行配置里的批处理时间间隔不为0,且批次大小不为1,则会创建定时任务线程池,以固定时间间隔强制触发批数据提交。之所以这样设计,主要是为了保证实时性,比如批缓存设置的为1000,此时缓存中的数据为990,之后上游长时间未有新的数据到达。这种情况下,数据库中的数据长时间比实际数据落后990条,实时性和一致性受到了很大影响。设置了定时间隔提交后,如果缓存数据长时间不再增加,就会强制触发批数据提交,保证了一致性,提高了实时性。Flink的flush()操作具有重试策略,且每次重试之间的间隔不断拉大。JdbcOutputFormat是线程安全的,由synchronized关键字来保证。
跟完了最内层的JdbcOutputFormat,接着看外层的GenericJdbcSinkFunction。
GenericJdbcSinkFunction// 继承了RichSinkFunction接口 @Internal public class GenericJdbcSinkFunction总结extends RichSinkFunction implements CheckpointedFunction, InputTypeConfigurable { private final JdbcOutputFormat outputFormat; // 构造类只有1个JdbcOutputFormat入参 public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat outputFormat) { this.outputFormat = Preconditions.checkNotNull(outputFormat); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext ctx = getRuntimeContext(); outputFormat.setRuntimeContext(ctx); outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } @Override public void invoke(T value, Context context) throws IOException { // 底层通过outputFormat的方法执行操作 outputFormat.writeRecord(value); } @Override public void initializeState(FunctionInitializationContext context) {} @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 执行snapshotState时,会强制执行outputFormat的Flush方法,将批缓存中的数据提交到数据库中 outputFormat.flush(); } @Override public void close() { outputFormat.close(); } @Override public void setInputType(TypeInformation> type, ExecutionConfig executionConfig) { outputFormat.setInputType(type, executionConfig); } }
DataStream的jdbc-sink的实现路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。
JdbcSink.sink方法的第2个参数本质上是1个Flink自定义的BiConsumer函数式接口,一般通过Lambda表达式构建。
拓展一下,通常我们在构建程序时,均是面向对象的,所以最后持久化的时候,流里面的泛型一般也为POJO。但按照官方示例的方法,针对每个POJO,我们均需要构建1个Insert SQL和1个JdbcStatementBuilder函数接口,但其实上述2个参数,可以通过POJO基于反射动态构建出来,所以我们可以在Flink官方实现的基础上做一步增强,使其支持POJO Sink。



