我们读取的数据为Date类型
实际打印写入doris之后发现是:
2,在社区里询问通过issues了解到Flink-CDC 在同步字段为timestamp类型的数据时,初始化数据和增量数据时区不一致 · Issue #317 · ververica/flink-cdc-connectors · GitHub
从这里我找到了自己想要的信息:
3,我们找到 RowDataDebeziumDeserializeSchema类:就是参考类似的思想,然后转换。
4,我们从mysql读取的Date类型的数据变成这些数字是啥呢?
是距离1970-01-01的天数,如下面所示:
那我们现在需要作的就是找到是Date类型的字段,然后给将天数转成Date类型或者字符串。
5,寻找到关键方法思路我们在debugger里面可以得知到很多信息:
先看Struct
再看Schema
我们这时候发现有Date类,并且有个方法
可以尝试调用,结果是可用的,但是数据格式不是我想要的,大家可以懂事试试,需要
import org.apache.kafka.connect.data.Date
自己做个简单的代码如下:
打印输出:
6,然后下面是大佬scala写的代码注:我写的代码很low,感谢 谢中杨大佬:
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord
import java.sql
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSonObject
class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] {
override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = {
// 解析主键
val key = sourceRecord.key().asInstanceOf[Struct]
val keyJs = parseStruct(key)
// 解析值
val value = sourceRecord.value().asInstanceOf[Struct]
val source = value.getStruct("source")
val before = parseStruct(value.getStruct("before"))
val after = parseStruct(value.getStruct("after"))
val row = Row.withNames()
row.setField("table", s"${source.get("db")}.${source.get("table")}")
row.setField("key", keyJs)
row.setField("op", value.get("op"))
row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
row.setField("before", before)
row.setField("after", after)
collector.collect(row)
}
private def parseStruct(struct: Struct): String = {
if (struct == null) return null
val map = struct.schema().fields().asScala.map(field => {
val v = struct.get(field)
val typ = field.schema().name()
println(s"$v, $typ, ${field.name()}")
val value = v match {
case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ)
case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ)
case iv if iv == null => null
case _ => convertObjToTime(v, typ)
}
(field.name(), value)
}).filter(_._2 != null).toMap
JSONObject.apply(map).toString()
}
private def convertObjToTime(obj: Any, typ: String): Any = {
typ match {
case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>
sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString
case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>
sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString
case _ => obj
}
}
private def convertLongToTime(obj: Long, typ: String): Any = {
val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")
val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
typ match {
case Time.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case MicroTime.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case NanoTime.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case Timestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case MicroTimestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case NanoTimestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case Date.SCHEMA_NAME =>
org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
case _ => obj
}
}
private def convertIntToDate(obj:Int, typ: String): Any ={
val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
typ match {
case Date.SCHEMA_NAME =>
org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
case _ => obj
}
}
override def getProducedType: TypeInformation[Row] = {
TypeInformation.of(classOf[Row])
}
}
7,总结:
虽然这只是一个小小的细节问题,但是可以通过解决问题的思路,更深一步去了解cdc,我觉得花时间是值得的。
补充:代码判断时间戳,时区问题,我这里做的最简单的案例,仅供参考:



