1、业务背景
业务需求把数据写入clickhouse,同时还需要支持主键更新。目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行改造,支持jdbc的方式把实时数据写入clickhouse集群。
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public final class JdbcDialects {
private static final List DIALECTS = Arrays.asList(
new DerbyDialect(),
new MySQLDialect(),
new PostgresDialect()
);
public static Optional get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
2、自定义JdbcDialect,参考MySQLDialect来实现
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.flink.connector.jdbc.internal.converter.ClickhouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
public class ClickhouseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(
RowType rowType) {
return new ClickhouseRowConverter(rowType);
}
@Override
public Optional defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List unsupportedTypes() {
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED
);
}
@Override
public String dialectName() {
return "clickhouse";
}
}
3、自定义实现JdbcRowConverter,参考MySQLRowConverter
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
public class ClickhouseRowConverter extends AbstractJdbcRowConverter{
public ClickhouseRowConverter(RowType rowType) {
super(rowType);
}
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "clickhouse";
}
}
4、现在我们把创建好的ClickhouseDialect 放入到JdbcDialects 代码中
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public final class JdbcDialects {
private static final List DIALECTS = Arrays.asList(
new DerbyDialect(),
new MySQLDialect(),
new PostgresDialect(),
new ClickhouseDialect()
);
public static Optional get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
这样就完成了,flink-jdbc-connector支持clickhouse的改造。
5、业务sql
create table tableA(
client_time string,
user_id string,
client_ip string,
session_id string,
query string,
dayno string,
) COMMENT 'tableA'
WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_servers',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";',
'properties.group.id' = 'kafka_groupUd',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1638962488170',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true',
'csv.field-delimiter' = U&' 009'
);
create table tableB(
client_time string,
user_id string,
session_id string,
query string,
dayno string,
primary key (session_id) NOT ENFORCED
) COMMENT 'session_id维度汇总数据'
WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:clickhouse://host:ip/database',
'connector.table' = 'clickhouse_table' ,
'connector.username' = 'xxx',
'connector.password' = 'xxx',
'connector.write.flush.max-rows' = '5000',
'connector.write.flush.interval' = '5'
);
insert into tableB
select
client_time,
user_id,
session_id,
query,
dayno,
from(
select
LISTAGG(client_time,',') client_time,
max(user_id) user_id,
session_id,
LISTAGG(query,',') query,
min(dayno) dayno,
count(1) cnt
from tableA
where REGEXP(dayno,'[0-9]{4}-[0-9]{2}-[0-9]{2}') and session_id is not null and session_id <> ''
group by session_id
)x where cnt <= 10;
上面的sql代码是支持持续更新的,所以会生成update语句,但是clickhouse不支持update语句,要求写入insert语句,根据报错信息,找对应的代码进行改造,只执行insert语句,业务就成功运行了。
package org.apache.flink.connector.jdbc.internal.executor; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHousePreparedStatementImpl; import javax.annotation.Nonnull; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public final class InsertOrUpdateJdbcExecutorimplements JdbcBatchStatementExecutor { private static final Logger LOG = LoggerFactory.getLogger(InsertOrUpdateJdbcExecutor.class); private final String existSQL; private final String insertSQL; private final String updateSQL; private final JdbcStatementBuilder existSetter; private final JdbcStatementBuilder insertSetter; private final JdbcStatementBuilder updateSetter; private final Function keyExtractor; private final Function valueMapper; private final Map batch; private transient PreparedStatement existStatement; private transient PreparedStatement insertStatement; private transient PreparedStatement updateStatement; public InsertOrUpdateJdbcExecutor( @Nonnull String existSQL, @Nonnull String insertSQL, @Nonnull String updateSQL, @Nonnull JdbcStatementBuilder existSetter, @Nonnull JdbcStatementBuilder insertSetter, @Nonnull JdbcStatementBuilder updateSetter, @Nonnull Function keyExtractor, @Nonnull Function valueExtractor) { this.existSQL = checkNotNull(existSQL); this.insertSQL = checkNotNull(insertSQL); this.updateSQL = checkNotNull(updateSQL); this.existSetter = checkNotNull(existSetter); this.insertSetter = checkNotNull(insertSetter); this.updateSetter = checkNotNull(updateSetter); this.keyExtractor = checkNotNull(keyExtractor); this.valueMapper = checkNotNull(valueExtractor); this.batch = new HashMap<>(); } @Override public void prepareStatements(Connection connection) throws SQLException { existStatement = connection.prepareStatement(existSQL); insertStatement = connection.prepareStatement(insertSQL); updateStatement = connection.prepareStatement(updateSQL); } @Override public void addToBatch(R record) { batch.put(keyExtractor.apply(record), valueMapper.apply(record)); } @Override public void executeBatch() throws SQLException { if (!batch.isEmpty()) { for (Map.Entry entry : batch.entrySet()) { processoneRowInBatch(entry.getKey(), entry.getValue()); } if(updateStatement instanceof ClickHousePreparedStatementImpl) { insertStatement.executeBatch(); } else { updateStatement.executeBatch(); insertStatement.executeBatch(); } batch.clear(); } } private void processoneRowInBatch(K pk, V row) throws SQLException { if(updateStatement instanceof ClickHousePreparedStatementImpl) { insertSetter.accept(insertStatement, row); insertStatement.addBatch(); } else if (exist(pk)) { updateSetter.accept(updateStatement, row); updateStatement.addBatch(); } else { insertSetter.accept(insertStatement, row); insertStatement.addBatch(); } } private boolean exist(K pk) throws SQLException { existSetter.accept(existStatement, pk); try (ResultSet resultSet = existStatement.executeQuery()) { return resultSet.next(); } } @Override public void closeStatements() throws SQLException { for (PreparedStatement s : Arrays.asList(existStatement, insertStatement, updateStatement)) { if (s != null) { s.close(); } } } }



