本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
虽然不同的关系型数据库均遵循一定的统一标准,但各自的Driver实现,增删改查的SQL语法等可能略有不同。就跟中国话也有各地区的方言一样,数据库也有属于自己的"方言",即JdbcDialect。
JdbcDialect在Flink中,JdbcDialect被定义为1个接口。
public interface JdbcDialect extends Serializable {
// 方言名称
String dialectName();
// 获取数据类型转换器,用于将JDBC对象转化为Flink的内部对象
JdbcRowConverter getRowConverter(RowType rowType);
// 返回数据库的limit语句
String getLimitClause(long limit);
// 校验行数据类型
void validate(RowType rowType) throws ValidationException;
// 返回默认的驱动名称
default Optional defaultDriverName() {
return Optional.empty();
}
// 返回分隔符
String quoteIdentifier(String identifier);
// 返回Upsert SQL,实现幂等插入
Optional getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields);
// 返回Exists SQL
String getRowExistsStatement(String tableName, String[] conditionFields);
// 返回Insert SQL
String getInsertIntoStatement(String tableName, String[] fieldNames);
// 返回Update SQL
String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields);
// 返回Delete SQL
String getDeleteStatement(String tableName, String[] conditionFields);
// 返回Select SQL
String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields);
}
由于各类数据库的Exists SQL、Insert SQL、Update SQL、Delete SQL、Select SQL基本相同,故Flink定义了AbstractDialect抽象类将上述操作统一实现了。
public abstract class AbstractDialect implements JdbcDialect {
......
}
然后各个数据库继承AbstractDialect抽象类,把JdbcDialect接口的其他方法实现掉(主要是Upsert SQL)。
截止到Flink 1.15,官方已支持derby、mysql、oracle、postgresql等4种数据库,我们以postgresql的JdbcDialect实现进行讲解:
public class PostgresDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// 定义PG数据库时间戳的最大和最小精度
// https://www.postgresql.org/docs/12/datatype-datetime.html
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// 定义PG数据库浮点数的最大和最小精度
// https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
private static final int MAX_DECIMAL_PRECISION = 1000;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
// 返回PostgresRowConverter
return new PostgresRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}
@Override
public Optional defaultDriverName() {
// 默认的driver名称为"org.postgresql.Driver"
return Optional.of("org.postgresql.Driver");
}
@Override
public Optional getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String uniqueColumns =
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String updateClause =
Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(
getInsertIntoStatement(tableName, fieldNames)
+ " ON ConFLICT ("
+ uniqueColumns
+ ")"
+ " DO UPDATE SET "
+ updateClause);
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public String dialectName() {
return "PostgreSQL";
}
@Override
public Optional decimalPrecisionRange() {
return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
}
@Override
public Optional timestampPrecisionRange() {
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
}
@Override
public Set supportedTypes() {
return EnumSet.of(
LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.VARBINARY,
LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.ARRAY);
}
}
下面写1个测试类,看一下PostgreSQL的UpsertSQL:
public class PostgresDialectTest {
public static void main(String[] args) {
String tableName = "student";
String[] fieldNames = {"name", "age", "score"};
String[] uniqueKeyFields = {"name"};
JdbcDialect dialect = new PostgresDialect();
Optional upsertStatement =
dialect.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
System.out.println(upsertStatement.get());
}
}
执行结果:
INSERT INTO student(name, age, score) VALUES (:name, :age, :score) ON ConFLICT (name) DO UPDATE SET name=EXCLUDED.name, age=EXCLUDED.age, score=EXCLUDED.score
JdbcDialectFactory
Flink使用工厂模式来创建对应数据库的JdbcDialect实例,统一定义了JdbcDialectFactory接口:
public interface JdbcDialectFactory {
// 判断传入的URL是否可创建数据库连接
boolean acceptsURL(String url);
// 创建对应数据库的JdbcDialect实例
JdbcDialect create();
}
同样以PostgreSQL数据库来说明:
@Internal
public class PostgresDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
// url是否以"jdbc:postgresql:"开头
return url.startsWith("jdbc:postgresql:");
}
@Override
public JdbcDialect create() {
// 创建PostgresDialect
return new PostgresDialect();
}
}
JdbcDialectTypeMapper
各个数据库均定义了自身的数据类型,为了将其JDBC类型转化成Flink的Table Type,Flink提供了JdbcDialectTypeMapper包装接口。
public interface JdbcDialectTypeMapper {
DataType mapping(ObjectPath tablePath, ResultSetmetaData metadata, int colIndex)
throws SQLException;
}
还是以PostgreSQL为例:
@Internal
public class PostgresTypeMapper implements JdbcDialectTypeMapper {
private static final Logger LOG = LoggerFactory.getLogger(PostgresTypeMapper.class);
private static final String PG_SERIAL = "serial";
private static final String PG_BIGSERIAL = "bigserial";
private static final String PG_BYTEA = "bytea";
private static final String PG_BYTEA_ARRAY = "_bytea";
private static final String PG_SMALLINT = "int2";
private static final String PG_SMALLINT_ARRAY = "_int2";
private static final String PG_INTEGER = "int4";
private static final String PG_INTEGER_ARRAY = "_int4";
private static final String PG_BIGINT = "int8";
private static final String PG_BIGINT_ARRAY = "_int8";
private static final String PG_REAL = "float4";
private static final String PG_REAL_ARRAY = "_float4";
private static final String PG_DOUBLE_PRECISION = "float8";
private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
private static final String PG_NUMERIC = "numeric";
private static final String PG_NUMERIC_ARRAY = "_numeric";
private static final String PG_BOOLEAN = "bool";
private static final String PG_BOOLEAN_ARRAY = "_bool";
private static final String PG_TIMESTAMP = "timestamp";
private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
private static final String PG_TIMESTAMPTZ = "timestamptz";
private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
private static final String PG_DATE = "date";
private static final String PG_DATE_ARRAY = "_date";
private static final String PG_TIME = "time";
private static final String PG_TIME_ARRAY = "_time";
private static final String PG_TEXT = "text";
private static final String PG_TEXT_ARRAY = "_text";
private static final String PG_CHAR = "bpchar";
private static final String PG_CHAR_ARRAY = "_bpchar";
private static final String PG_CHARACTER = "character";
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
@Override
public DataType mapping(ObjectPath tablePath, ResultSetmetaData metadata, int colIndex)
throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
case PG_BYTEA_ARRAY:
return DataTypes.ARRAY(DataTypes.BYTES());
case PG_SMALLINT:
return DataTypes.SMALLINT();
case PG_SMALLINT_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
case PG_INTEGER:
case PG_SERIAL:
return DataTypes.INT();
case PG_INTEGER_ARRAY:
return DataTypes.ARRAY(DataTypes.INT());
case PG_BIGINT:
case PG_BIGSERIAL:
return DataTypes.BIGINT();
case PG_BIGINT_ARRAY:
return DataTypes.ARRAY(DataTypes.BIGINT());
case PG_REAL:
return DataTypes.FLOAT();
case PG_REAL_ARRAY:
return DataTypes.ARRAY(DataTypes.FLOAT());
case PG_DOUBLE_PRECISION:
return DataTypes.DOUBLE();
case PG_DOUBLE_PRECISION_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
case PG_NUMERIC:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
case PG_NUMERIC_ARRAY:
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.ARRAY(
DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
}
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
case PG_CHARACTER:
return DataTypes.CHAr(precision);
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
return DataTypes.ARRAY(DataTypes.CHAr(precision));
case PG_CHARACTER_VARYING:
return DataTypes.VARCHAr(precision);
case PG_CHARACTER_VARYING_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAr(precision));
case PG_TEXT:
return DataTypes.STRING();
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
return DataTypes.TIMESTAMP(scale);
case PG_TIMESTAMP_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
case PG_TIMESTAMPTZ:
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
case PG_TIMESTAMPTZ_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
case PG_TIME:
return DataTypes.TIME(scale);
case PG_TIME_ARRAY:
return DataTypes.ARRAY(DataTypes.TIME(scale));
case PG_DATE:
return DataTypes.DATE();
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
default:
throw new UnsupportedOperationException(
String.format("Doesn't support Postgres type '%s' yet", pgType));
}
}
}
本质上,就是建立了1个JDBC Type–>Flink Table Type的映射关系。
由于Flink官方支持的数据库类型有限,如果想要支持新数据库的JdbcDialect,需要实现或继承以下3个接口或抽象类:
AbstractDialectJdbcDialectFactoryJdbcDialectTypeMapper



