栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink源码解析系列--JdbcDialect接口

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink源码解析系列--JdbcDialect接口

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

虽然不同的关系型数据库均遵循一定的统一标准,但各自的Driver实现,增删改查的SQL语法等可能略有不同。就跟中国话也有各地区的方言一样,数据库也有属于自己的"方言",即JdbcDialect。

JdbcDialect

在Flink中,JdbcDialect被定义为1个接口。

public interface JdbcDialect extends Serializable {

    // 方言名称
    String dialectName();

	// 获取数据类型转换器,用于将JDBC对象转化为Flink的内部对象
    JdbcRowConverter getRowConverter(RowType rowType);
	
	// 返回数据库的limit语句
    String getLimitClause(long limit);
	
	// 校验行数据类型
    void validate(RowType rowType) throws ValidationException;

    // 返回默认的驱动名称
    default Optional defaultDriverName() {
        return Optional.empty();
    }

    // 返回分隔符
    String quoteIdentifier(String identifier);

    // 返回Upsert SQL,实现幂等插入
    Optional getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields);
	
	// 返回Exists SQL	
	String getRowExistsStatement(String tableName, String[] conditionFields);

    // 返回Insert SQL
    String getInsertIntoStatement(String tableName, String[] fieldNames);

    // 返回Update SQL
    String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields);

    // 返回Delete SQL
    String getDeleteStatement(String tableName, String[] conditionFields);

    // 返回Select SQL
    String getSelectFromStatement(
            String tableName, String[] selectFields, String[] conditionFields);
}

由于各类数据库的Exists SQL、Insert SQL、Update SQL、Delete SQL、Select SQL基本相同,故Flink定义了AbstractDialect抽象类将上述操作统一实现了。

public abstract class AbstractDialect implements JdbcDialect {
	......
}

然后各个数据库继承AbstractDialect抽象类,把JdbcDialect接口的其他方法实现掉(主要是Upsert SQL)。

截止到Flink 1.15,官方已支持derby、mysql、oracle、postgresql等4种数据库,我们以postgresql的JdbcDialect实现进行讲解:

public class PostgresDialect extends AbstractDialect {

    private static final long serialVersionUID = 1L;

	// 定义PG数据库时间戳的最大和最小精度
	// https://www.postgresql.org/docs/12/datatype-datetime.html
    private static final int MAX_TIMESTAMP_PRECISION = 6;
    private static final int MIN_TIMESTAMP_PRECISION = 1;

    // 定义PG数据库浮点数的最大和最小精度
    // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
    private static final int MAX_DECIMAL_PRECISION = 1000;
    private static final int MIN_DECIMAL_PRECISION = 1;

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
		// 返回PostgresRowConverter
        return new PostgresRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long limit) {
        return "LIMIT " + limit;
    }

    @Override
    public Optional defaultDriverName() {
		// 默认的driver名称为"org.postgresql.Driver"
        return Optional.of("org.postgresql.Driver");
    }

    @Override
    public Optional getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String updateClause =
                Arrays.stream(fieldNames)
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON ConFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return identifier;
    }

    @Override
    public String dialectName() {
        return "PostgreSQL";
    }

    @Override
    public Optional decimalPrecisionRange() {
        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
    }

    @Override
    public Optional timestampPrecisionRange() {
        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
    }

    @Override
    public Set supportedTypes() {

        return EnumSet.of(
                LogicalTypeRoot.CHAR,
                LogicalTypeRoot.VARCHAR,
                LogicalTypeRoot.BOOLEAN,
                LogicalTypeRoot.VARBINARY,
                LogicalTypeRoot.DECIMAL,
                LogicalTypeRoot.TINYINT,
                LogicalTypeRoot.SMALLINT,
                LogicalTypeRoot.INTEGER,
                LogicalTypeRoot.BIGINT,
                LogicalTypeRoot.FLOAT,
                LogicalTypeRoot.DOUBLE,
                LogicalTypeRoot.DATE,
                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
                LogicalTypeRoot.ARRAY);
    }
}

下面写1个测试类,看一下PostgreSQL的UpsertSQL:

public class PostgresDialectTest {
    public static void main(String[] args) {
        String tableName = "student";
        String[] fieldNames = {"name", "age", "score"};
        String[] uniqueKeyFields = {"name"};
        JdbcDialect dialect = new PostgresDialect();
        Optional upsertStatement =
                dialect.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
        System.out.println(upsertStatement.get());

    }
}

执行结果:

INSERT INTO student(name, age, score) VALUES (:name, :age, :score) ON ConFLICT (name) DO UPDATE SET name=EXCLUDED.name, age=EXCLUDED.age, score=EXCLUDED.score

JdbcDialectFactory

Flink使用工厂模式来创建对应数据库的JdbcDialect实例,统一定义了JdbcDialectFactory接口:

public interface JdbcDialectFactory {

    // 判断传入的URL是否可创建数据库连接
    boolean acceptsURL(String url);

    // 创建对应数据库的JdbcDialect实例
    JdbcDialect create();
}

同样以PostgreSQL数据库来说明:

@Internal
public class PostgresDialectFactory implements JdbcDialectFactory {
    @Override
    public boolean acceptsURL(String url) {
		// url是否以"jdbc:postgresql:"开头
        return url.startsWith("jdbc:postgresql:");
    }

    @Override
    public JdbcDialect create() {
		// 创建PostgresDialect
        return new PostgresDialect();
    }
}
JdbcDialectTypeMapper

各个数据库均定义了自身的数据类型,为了将其JDBC类型转化成Flink的Table Type,Flink提供了JdbcDialectTypeMapper包装接口。

public interface JdbcDialectTypeMapper {
    DataType mapping(ObjectPath tablePath, ResultSetmetaData metadata, int colIndex)
            throws SQLException;
}

还是以PostgreSQL为例:

@Internal
public class PostgresTypeMapper implements JdbcDialectTypeMapper {

    private static final Logger LOG = LoggerFactory.getLogger(PostgresTypeMapper.class);

    private static final String PG_SERIAL = "serial";
    private static final String PG_BIGSERIAL = "bigserial";
    private static final String PG_BYTEA = "bytea";
    private static final String PG_BYTEA_ARRAY = "_bytea";
    private static final String PG_SMALLINT = "int2";
    private static final String PG_SMALLINT_ARRAY = "_int2";
    private static final String PG_INTEGER = "int4";
    private static final String PG_INTEGER_ARRAY = "_int4";
    private static final String PG_BIGINT = "int8";
    private static final String PG_BIGINT_ARRAY = "_int8";
    private static final String PG_REAL = "float4";
    private static final String PG_REAL_ARRAY = "_float4";
    private static final String PG_DOUBLE_PRECISION = "float8";
    private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
    private static final String PG_NUMERIC = "numeric";
    private static final String PG_NUMERIC_ARRAY = "_numeric";
    private static final String PG_BOOLEAN = "bool";
    private static final String PG_BOOLEAN_ARRAY = "_bool";
    private static final String PG_TIMESTAMP = "timestamp";
    private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
    private static final String PG_TIMESTAMPTZ = "timestamptz";
    private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
    private static final String PG_DATE = "date";
    private static final String PG_DATE_ARRAY = "_date";
    private static final String PG_TIME = "time";
    private static final String PG_TIME_ARRAY = "_time";
    private static final String PG_TEXT = "text";
    private static final String PG_TEXT_ARRAY = "_text";
    private static final String PG_CHAR = "bpchar";
    private static final String PG_CHAR_ARRAY = "_bpchar";
    private static final String PG_CHARACTER = "character";
    private static final String PG_CHARACTER_ARRAY = "_character";
    private static final String PG_CHARACTER_VARYING = "varchar";
    private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";

    @Override
    public DataType mapping(ObjectPath tablePath, ResultSetmetaData metadata, int colIndex)
            throws SQLException {
        String pgType = metadata.getColumnTypeName(colIndex);

        int precision = metadata.getPrecision(colIndex);
        int scale = metadata.getScale(colIndex);

        switch (pgType) {
            case PG_BOOLEAN:
                return DataTypes.BOOLEAN();
            case PG_BOOLEAN_ARRAY:
                return DataTypes.ARRAY(DataTypes.BOOLEAN());
            case PG_BYTEA:
                return DataTypes.BYTES();
            case PG_BYTEA_ARRAY:
                return DataTypes.ARRAY(DataTypes.BYTES());
            case PG_SMALLINT:
                return DataTypes.SMALLINT();
            case PG_SMALLINT_ARRAY:
                return DataTypes.ARRAY(DataTypes.SMALLINT());
            case PG_INTEGER:
            case PG_SERIAL:
                return DataTypes.INT();
            case PG_INTEGER_ARRAY:
                return DataTypes.ARRAY(DataTypes.INT());
            case PG_BIGINT:
            case PG_BIGSERIAL:
                return DataTypes.BIGINT();
            case PG_BIGINT_ARRAY:
                return DataTypes.ARRAY(DataTypes.BIGINT());
            case PG_REAL:
                return DataTypes.FLOAT();
            case PG_REAL_ARRAY:
                return DataTypes.ARRAY(DataTypes.FLOAT());
            case PG_DOUBLE_PRECISION:
                return DataTypes.DOUBLE();
            case PG_DOUBLE_PRECISION_ARRAY:
                return DataTypes.ARRAY(DataTypes.DOUBLE());
            case PG_NUMERIC:
                // see SPARK-26538: handle numeric without explicit precision and scale.
                if (precision > 0) {
                    return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
                }
                return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
            case PG_NUMERIC_ARRAY:
                // see SPARK-26538: handle numeric without explicit precision and scale.
                if (precision > 0) {
                    return DataTypes.ARRAY(
                            DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
                }
                return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
            case PG_CHAR:
            case PG_CHARACTER:
                return DataTypes.CHAr(precision);
            case PG_CHAR_ARRAY:
            case PG_CHARACTER_ARRAY:
                return DataTypes.ARRAY(DataTypes.CHAr(precision));
            case PG_CHARACTER_VARYING:
                return DataTypes.VARCHAr(precision);
            case PG_CHARACTER_VARYING_ARRAY:
                return DataTypes.ARRAY(DataTypes.VARCHAr(precision));
            case PG_TEXT:
                return DataTypes.STRING();
            case PG_TEXT_ARRAY:
                return DataTypes.ARRAY(DataTypes.STRING());
            case PG_TIMESTAMP:
                return DataTypes.TIMESTAMP(scale);
            case PG_TIMESTAMP_ARRAY:
                return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
            case PG_TIMESTAMPTZ:
                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
            case PG_TIMESTAMPTZ_ARRAY:
                return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
            case PG_TIME:
                return DataTypes.TIME(scale);
            case PG_TIME_ARRAY:
                return DataTypes.ARRAY(DataTypes.TIME(scale));
            case PG_DATE:
                return DataTypes.DATE();
            case PG_DATE_ARRAY:
                return DataTypes.ARRAY(DataTypes.DATE());
            default:
                throw new UnsupportedOperationException(
                        String.format("Doesn't support Postgres type '%s' yet", pgType));
        }
    }
}

本质上,就是建立了1个JDBC Type–>Flink Table Type的映射关系。

由于Flink官方支持的数据库类型有限,如果想要支持新数据库的JdbcDialect,需要实现或继承以下3个接口或抽象类:

AbstractDialectJdbcDialectFactoryJdbcDialectTypeMapper

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760380.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号