栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink cdc 2.0.2 针对mysql Date类型数据的解决思路

flink cdc 2.0.2 针对mysql Date类型数据的解决思路

1,实践了flink cdc 的小伙伴肯定会遇到时间字段的问题,以mysql作为数据源为案例
我们读取的数据为Date类型

实际打印写入doris之后发现是:

2,在社区里询问通过issues了解到

Flink-CDC 在同步字段为timestamp类型的数据时,初始化数据和增量数据时区不一致 · Issue #317 · ververica/flink-cdc-connectors · GitHub

 从这里我找到了自己想要的信息:

3,我们找到 RowDataDebeziumDeserializeSchema类:

 就是参考类似的思想,然后转换。

4,我们从mysql读取的Date类型的数据变成

这些数字是啥呢? 

是距离1970-01-01的天数,如下面所示:

 

那我们现在需要作的就是找到是Date类型的字段,然后给将天数转成Date类型或者字符串。

5,寻找到关键方法思路

我们在debugger里面可以得知到很多信息:

先看Struct

再看Schema

 

我们这时候发现有Date类,并且有个方法

 可以尝试调用,结果是可用的,但是数据格式不是我想要的,大家可以懂事试试,需要

import org.apache.kafka.connect.data.Date

自己做个简单的代码如下:

 打印输出:

6,然后下面是大佬scala写的代码

注:我写的代码很low,感谢 谢中杨大佬:

 

import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord

import java.sql
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSonObject


 
class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] {

  override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = {
    // 解析主键
    val key = sourceRecord.key().asInstanceOf[Struct]
    val keyJs = parseStruct(key)

    // 解析值
    val value = sourceRecord.value().asInstanceOf[Struct]
    val source = value.getStruct("source")
    val before = parseStruct(value.getStruct("before"))
    val after = parseStruct(value.getStruct("after"))

    val row = Row.withNames()
    row.setField("table", s"${source.get("db")}.${source.get("table")}")
    row.setField("key", keyJs)
    row.setField("op", value.get("op"))
    row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
    row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
    row.setField("before", before)
    row.setField("after", after)
    collector.collect(row)
  }

  
  private def parseStruct(struct: Struct): String = {
    if (struct == null) return null
    val map = struct.schema().fields().asScala.map(field => {
      val v = struct.get(field)
      val typ = field.schema().name()
      println(s"$v, $typ, ${field.name()}")
      val value = v match {
        case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ)
        case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ)
        case iv if iv == null => null
        case _ => convertObjToTime(v, typ)
      }
      (field.name(), value)
    }).filter(_._2 != null).toMap
    JSONObject.apply(map).toString()
  }

  
  private def convertObjToTime(obj: Any, typ: String): Any = {
    typ match {
      case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>
        sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString
      case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>
        sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString
      case _ => obj
    }
  }

  
  private def convertLongToTime(obj: Long, typ: String): Any = {
    val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")
    val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
    val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
    typ match {
      case Time.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case MicroTime.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case NanoTime.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
      case Timestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case MicroTimestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case NanoTimestamp.SCHEMA_NAME =>
        val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
        java.sql.Timestamp.valueOf(t).toString
      case Date.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
      case _ => obj
    }
  }

  private def convertIntToDate(obj:Int, typ: String): Any ={
    val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
    typ match {
      case Date.SCHEMA_NAME =>
        org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
      case _ => obj
    }
  }

  override def getProducedType: TypeInformation[Row] = {
    TypeInformation.of(classOf[Row])
  }
}
7,总结:

虽然这只是一个小小的细节问题,但是可以通过解决问题的思路,更深一步去了解cdc,我觉得花时间是值得的。

补充:代码判断时间戳,时区问题,我这里做的最简单的案例,仅供参考:

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467642.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号