部署Debezium Oracle Connecter需要将odbc的驱动包放到kafka的lib目录下。由于之前使用的cdc程序不是Debezium,当时程序使用的是ojdbc版本是7,但是官方版本要求是odbc8。由于自己的粗心,导致启动一直报错,以致于被这个问题困惑了许久。
2.数字类型解析到kafka格式异常number类型的数字到kafka中格式出现异常
查阅官方文档得知Debezium默认使用java.math.BigDecimal以二进制形式精确表示值。
解决:连接器参数添加
"decimal.handling.mode": "string"
数据展示正常
3.Debezium捕获Oracle数据延迟大成功跑起来Debezium并捕获到了数据,但是发现捕获数据的延迟过大,常常需要5-10分钟后数据库变更数据才会进入kafka中。
求助官方社群得知需要修改相关配置,查阅官方文档
解决:连接器参数添加
"log.mining.strategy":"online_catalog"
延迟基本都在2s以内了。
4.Debezium时间戳相比数据库多了8小时Debezium 默认将所有时间戳字段转换为 UTC,所以会导致时间与本地相差了8小时。搜索网络上的解决方案,通过添加以下连接器参数均无效
"database.serverTimezone":"UTC" "database.serverTimezone":"Asia/Shanghai"
解决:通过自定义Converter 来将UTC时间转换为本地时间,校准时间戳。(Custom Converters)
参考实现:
public class OracleTimeConverter implements CustomConverter{ private static final ZoneId GMT_ZONE_ID = ZoneId.systemDefault(); private static final Pattern TO_DATE = Pattern.compile("TO_DATE\('(.*)',[ ]*'(.*)'\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\('(.*)'\)", Pattern.CASE_INSENSITIVE); private static final Pattern TIMESTAMP_OR_DATE_REGEX = Pattern.compile("^TIMESTAMP[(]\d[)]$|^DATE$", Pattern.CASE_INSENSITIVE); private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder() .parseCaseInsensitive() .appendPattern("dd-MMM-yy hh.mm.ss") .optionalStart() .appendPattern(".") .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) .optionalEnd() .appendPattern(" a") .toFormatter(Locale.ENGLISH); private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() .parseCaseInsensitive() .appendPattern("yyyy-MM-dd HH:mm:ss") .optionalStart() .appendPattern(".") .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) .optionalEnd() .toFormatter(); @Override public void configure(Properties props) { } @Override public void converterFor(RelationalColumn column, ConverterRegistration registration) { String typeName = column.typeName(); if (TIMESTAMP_OR_DATE_REGEX.matcher(typeName).matches()) { registration.register(SchemaBuilder.int64().optional(), value -> { if (value == null) { if (column.isOptional()) { return null; } else if (column.hasDefaultValue()) { return column.defaultValue(); } else { return null; } } if (value instanceof Long) { return value; } if (value instanceof String) { Instant instant = resolveTimestampStringAsInstant((String) value); if (instant != null) { return instant.toEpochMilli(); } } return null; }); } } private Instant resolveTimestampStringAsInstant(String data) { LocalDateTime dateTime; final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data); if (toTimestampMatcher.matches()) { String dateText = toTimestampMatcher.group(1); if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) { dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim())); } else { dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim())); } return dateTime.atZone(GMT_ZONE_ID).toInstant(); } final Matcher toDateMatcher = TO_DATE.matcher(data); if (toDateMatcher.matches()) { dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1))); return dateTime.atZone(GMT_ZONE_ID).toInstant(); } // 解析失败 return null; } }



