栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-11-09

2021-11-09

解决问题:从gp->kafka,数据集获取,发送kafka的数据变换

val sourceDF = Range(0, 10)
  .map(index => {
    val dbtable =
      s"""(
         |select
         |aid1,
         |aid2,
         |t1 ,
         |t2 ,
         |along_interval ,
         |source_id ,
         |create_time,
         |dt ,
         |thumbnail_id1,
         |thumbnail_url1,
         |image_id1,
         |image_url1 ,
         |thumbnail_id2 ,
         |thumbnail_url2,
         |image_id2 ,
         |image_url2 ,
         |score1 ,
         |score2
         |from dwd_bigdata_relation_peer_day_5030
         |WHERe  create_time > '2020-06-27 00:00:00' AND create_time <= '2020-06-28 00:00:00') AS t_tmp_$index""".stripMargin
    println(dbtable)
    spark
      .read
      .format("jdbc")
      .option("driver", "org.postgresql.Driver")
      .option("url","jdbc:postgresql://192.168.11.33:2222/bigdata_dwd" )
      .option("dbtable", dbtable)
      .option("user", "张三")
      .option("password", "123456")
      .option("fetchsize","5000")
      .load()
  })
  .reduce((rdd1, rdd2) => rdd1.union(rdd2))
println("加载同行事件")
sourceDF.show()

val producer = new KafkaProducer[String, String](props)


val peerArray = sourceDF.collect
for(i <- 0 to peerArray.length-1){


  val row = peerArray(i)


  val sourceAid = row.getAs[String]("aid1")
  val targetAid = row.getAs[String]("aid2")
  val time = row.getAs[String]("t1")
  val source_id = row.getAs[String]("source_id") //根据摄像头sourceid进行反查
  val along_interval = row.getAs[Int]("along_interval")
  val create_time = row.getAs[Timestamp]("create_time")
  val dt = row.getAs[String]("dt")
  val thumbnail_id1 = row.getAs[String]("thumbnail_id1")
  val thumbnail_url1 = row.getAs[String]("thumbnail_url1")
  val image_id1 = row.getAs[String]("image_id1")
  val image_url1 = row.getAs[String]("image_url1")
  val thumbnail_id2 = row.getAs[String]("thumbnail_id2")
  val thumbnail_url2 = row.getAs[String]("thumbnail_url2")
  val image_id2 = row.getAs[String]("image_id2")
  val image_url2 = row.getAs[String]("image_url2")
  val score1 = row.getAs[String]("score1")
  val score2 = row.getAs[String]("score2")
  val event = RelationshipPeer(sourceAid, targetAid, time, time, along_interval, source_id, create_time, dt, thumbnail_id1, thumbnail_url1, image_id1, image_url1, thumbnail_id2, thumbnail_url2, image_id2, image_url2, score1, score2)


  implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
  val data: String = Serialization.write(event)
  val message = new ProducerRecord[String, String](topic,s"key$i", data.toString())
  producer.send(message)
}

case class RelationshipPeer(aid1: String, aid2: String, t1: String, t2: String, along_interval: Int, source_id: String, create_time: Timestamp, dt: String, thumbnail_id1: String, thumbnail_url1: String, image_id1: String, image_url1: String , thumbnail_id2: String , thumbnail_url2: String, image_id2: String, image_url2: String, score1: String, score2: String)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号