栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

将mysql数据通过canal+kafka+sparkstructedstreaming写入hudi并同步hive

将mysql数据通过canal+kafka+sparkstructedstreaming写入hudi并同步hive

1 配置canal 读取mysql日志正则将数据分发(动态分区)至对应kafka topic 2 sparkstructedstreaming获取kafka数据 并将数据存储至hudi

本人有大量表名为 document_xxx(document_1,document_2,document_3…)
通过canal将数据存储kafka topic (document)

object SSSHudiETL {

  case class Model_document(table: String, sql_type: String, data1: documents)

  def main(args: Array[String]): Unit = {


    val sparkConf = new SparkConf().setAppName("sparkstructedstreaming")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.sql.shuffle.partitions", "3")
    //      .setMaster("local[*]")

    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()


    val document_df = sparkSession.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:6667")
      //topic
      .option("subscribe", "document")
      //如果本地checkpointLocation 后即使配置 也会读取存储的位置开始
      .option("startingOffsets", "earliest")
      //数据保存7 天如果失败 则没数据
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "30000")
      .load()
	
    val b2s = new Bean2Sql
    import sparkSession.implicits._

    
    val document_query = document_df.selectExpr("cast (value as string)").as[String]
      .map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val table = if (jsonObject.containsKey("table")) jsonObject.getString("table") else ""
        val data = if (jsonObject.containsKey("data")) ParseJsonData.getJsonData(jsonObject.getJSONArray("data").get(0).toString) else null
        val sql_type = if (jsonObject.containsKey("type")) jsonObject.getString("type") else ""
		//将json里的字段转为存储同步Hive的分区字段 和hudi的TS时间戳
        data.put("DT", castDateFormat(data.get("CREATE_DATE").toString))
        data.put("TS", castDateTimeStamp(data.get("CREATE_DATE").toString))
		# 工具类 将json数据转为 case document对象
        val data1 = b2s.json2document(data)
        Model_document(table, sql_type, data1)

      })
      .writeStream.foreachBatch { (batchDF: Dataset[Model_document], _: Long) =>
      batchDF.cache() //缓存数据

      val upsertData = batchDF.filter("sql_type =='INSERT' or sql_type =='UPDATE'") //新增数据 和修改数据
      val deleteData = batchDF.filter("sql_type =='DELETE'") //删除数据

      upsertHudidocument(upsertData, sparkSession)
      deleteHudidocument(deleteData, sparkSession)

      batchDF.unpersist().show()

    }.option("checkpointLocation", "/save_path/checkpoint/document").start()

    document_query.awaitTermination()

  }

  

  def upsertHudidocument(batchDF: Dataset[Model_document], sparkSession: SparkSession) {


    import sparkSession.implicits._
    val result = batchDF.mapPartitions(partitions => {
      partitions.map(item => {
        item.data1

      })
    })

    //写入hudi
    result.write.format("org.apache.hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      //本人字段
      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
      .option("hoodie.table.name", "document") //hudi表名
      .option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATAbase.key(), "hudi") //设置hudi与hive同步的数据库
      .option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有Hbase,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
      // 本来想通过下列方式让 hudi compact 速度快点,但是试了好像没有效果,好像是flink的配置
      //      .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
      .option("hoodie.bulkinsert.shuffle.parallelism", "12")
      .option("hoodie.insert.shuffle.parallelism", "12")
      .option("hoodie.upsert.shuffle.parallelism", "12")
      .option("hoodie.bootstrap.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/save_path/hudi/document")

  }

  
  def deleteHudidocument(batchDF: Dataset[Model_document], sparkSession: SparkSession) = {
    import sparkSession.implicits._
    val result = batchDF.mapPartitions(partitions => {
      partitions.map(item => {
        item.data1
      })
    })
    result.write.format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      .option(DataSourceWriteOptions.OPERATION.key(), "delete") //删除数据操作
      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
      .option("hoodie.table.name", "document") //hudi表名
      .option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATAbase.key(), "hudi") //设置hudi与hive同步的数据库
      .option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      //      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName) // 没有分区
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有Hbase,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
      //      .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
      .option("hoodie.bulkinsert.shuffle.parallelism", "12")
      .option("hoodie.delete.shuffle.parallelism", "12")
      .option("hoodie.bootstrap.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/save_path/hudi/document")
  }

  def castDateFormat(date: String): String = {
    val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val spf2 = new SimpleDateFormat("yyyy-MM-dd")
    val dateFormat = spf1.parse(date)
    spf2.format(dateFormat)
  }

  def castDateTimeStamp(date: String): Long = {
    val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    val dateFormat = spf1.parse(date)
    dateFormat.getTime()

  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761663.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号