栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、将Canal数据转换成bean对象

三、完整代码


将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

消费kafka的 logistics Topic数据将消费到的数据转换成OggMessageBean对象递交作业启动运行

实现过程:

消费kafka的 logistics Topic数据

//2.1:获取物流系统相关的数据
val logisticsDF: Dataframe = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

将消费到的数据转换成OggMessageBean对象

默认情况下表名带有数据库名,因此需要删除掉数据库名

//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  iters.map(row => {
    //获取到value列的值(字符串)
    val jsonStr: String = row.getAs[String](0)
    //将字符串转换成javabean对象
    JSON.parseObject(jsonStr, classOf[OggMessageBean])
  }).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))

递交作业启动运行

// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("logistics").start()

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

消费kafka的 crm Topic数据将消费到的数据转换成 CanalMessageBean 对象递交作业启动运行

实现过程:

消费kafka的 crm Topic数据

//2.2:获取客户关系系统相关的数据
val crmDF: Dataframe = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

将消费到的数据转换成CanalMessageBean 对象

//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  iters.filter(row=>{
    //取到value列的数据
    val line: String = row.getAs[String](0)
    //如果value列的值不为空,且是清空表的操作
    if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  }).map(row=>{
    //取到value列的数据
    val jsonStr: String = row.getAs[String](0)
    //将json字符串转换成javaBean对象
    JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  }).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))

递交作业启动运行

crmDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("crm").start()

三、完整代码
package cn.it.logistics.etl.realtime
import java.sql.Connection

import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataframe, Dataset, Encoders, SparkSession}


object KuduStreamApp2 extends StreamApp {

  
  def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(this.getClass.getSimpleName)
    )

    //数据处理
    execute(sparkConf)
  }

  
  override def execute(sparkConf: SparkConf): Unit = {
    
    //1)创建sparksession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //2)获取数据源(获取物流相关数据以及crm相关数据)
    //2.1:获取物流系统相关的数据
    val logisticsDF: Dataframe = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

    //2.2:获取客户关系系统相关的数据
    val crmDF: Dataframe = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

    //导入隐式转换
    import  sparkSession.implicits._

    //导入自定义的POJO的隐士转换
    import  cn.it.logistics.common.BeanImplicit._

    //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
    //3.1:物流相关数据的转换
    val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
      iters.map(row => {
        //获取到value列的值(字符串)
        val jsonStr: String = row.getAs[String](0)
        //将字符串转换成javabean对象
        JSON.parseObject(jsonStr, classOf[OggMessageBean])
      }).toList.iterator
    })(Encoders.bean(classOf[OggMessageBean]))

    //3.2:客户关系相关数据的转换
    val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
      //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
      iters.filter(row=>{
        //取到value列的数据
        val line: String = row.getAs[String](0)
        //如果value列的值不为空,且是清空表的操作
        if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
      }).map(row=>{
        //取到value列的数据
        val jsonStr: String = row.getAs[String](0)
        //将json字符串转换成javaBean对象
        JSON.parseObject(jsonStr, classOf[CanalMessageBean])
      }).toList.toIterator
    })(Encoders.bean(classOf[CanalMessageBean]))

    //输出数据
    
    logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()

    
    crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }

  
  override def save(dataframe: Dataframe, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  }


博客主页:https://lansonli.blog.csdn.net欢迎点赞  收藏 ⭐留言  如有错误敬请指正!本文由 Lansonli 原创,首发于 CSDN博客大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/757988.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号