栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用sparksql解析高德api字段

使用sparksql解析高德api字段

代码如下:

package com.yh.musicproject.eds.machine

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.yh.musicproject.common.ConfigUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
import scalaj.http.{Http, HttpResponse}

import scala.collection.mutable.ListBuffer

object GenerateTwMacLocD {
  def main(args: Array[String]): Unit = {
    if(args.size==0){
      System.exit(1)
      println("请输入日期")
    }
    val inputDate: String = args(0)

    //使用ConfigUtils.LOCAL_RUN ,修改配置文件,当LOCAL_RUN=true时,在本地运行,当为false时在yarn集群上运行
    var spark: SparkSession =null
      if(ConfigUtils.LOCAL_RUN){
        spark = SparkSession.builder()
                            .master("local[*]")
                            .appName(this.getClass.getName)
                            .config("hive.metastore.uris",ConfigUtils.HIVE_metaSTORE_URIS)
                            .config("spark.sql.shuffle.partitions", 2)
                            .enableHiveSupport()
                            .getOrCreate()
      }else{
        spark = SparkSession.builder()
                            .master("yarn")
                            .appName(this.getClass.getName)
                            .config("hive.metastore.uris",ConfigUtils.HIVE_metaSTORE_URIS)
                            .config("spark.sql.shuffle.partitions", 2)
                            .enableHiveSupport()
                            .getOrCreate()
      }
      spark.sparkContext.setLogLevel("WARN")
      spark.sql("use songdb")

    // 一个机器可能存在很多人上边经纬度,并且上报的经纬度可能不一样
    // 根据机器编号、经纬度分组,统计出现的次数最多那个情况,作为后续查询的条件
    val df: Dataframe = spark.sql(
          s"""
            |select
            |	mid, x, y, cnt
            |from (
            |	select
            |		mid, x, y,cnt,
            |		row_number() over (partition by mid order by cnt desc) pm
            |	from(
            |		select mid ,lat x ,lng y , count(distinct uid) cnt
            |		from TO_YCAK_USR_LOC_D
            |		where data_dt = '${inputDate}' and lat != ''  and lng != ''
            |		group by mid,lat,lng
            |		) t1
            |	) t2
            |where pm = 1
            |""".stripMargin)
   // df.show()


      // 上边将查询结果存储在Dataframe中
      // 1. 自定义函数
      // 2. dataframe对象转换成RDD
   val rdd1: RDD[Row] = df.rdd.mapPartitions(iter => {
        val list3 = new ListBuffer[Row] //存储解析后的数据

        // 将迭代器转化成list集合, [如果不变成list集合,迭代器只能循环一次]
        val list: List[Row] = iter.toList

        var times = 0 //存储发送请求的次数
            if (list.size % 20 == 0) {
              times = list.size / 20
            } else {
              times = list.size / 20 + 1
            }
        for (i <- 0 until times) { //循环times次
          val list2: List[Row] = list.slice(i * 20, (i + 1) * 20)
          var yx = "" // 存储拼接后的经纬度
          for (elem <- list2) {
            val x: String = elem.getAs[String]("x")
            val y: String = elem.getAs[String]("y")
            yx = y + "," + x + "|"

          }
          // 经度,纬度|经度,纬度| substring(0,yx.length-1)可以去掉最后一个竖线
          val res: HttpResponse[String] = Http("https://restapi.amap.com/v3/geocode/regeo")
            .param("key", "72740e3cea1bc1b29502af781d1f0b39")
            .param("batch", "true")
            .param("location", s"${yx.substring(0, yx.length - 1)}").asString

          //println(res.body)
          val jSONObject: JSonObject = JSON.parseObject(res.body)
            if (!StringUtils.isBlank(res.body) && "10000".equals(jSONObject.getString("infocode"))) {
              val jsonarray: JSonArray = jSONObject.getJSonArray("regeocodes")
              for (i <- 0 until jsonarray.size) {
                val currentJsonObject = jsonarray.getJSonObject(i)
                val MID           : Int    = list2(i).getAs[String]("mid").toInt
                val X             : String = list2(i).getAs[String]("x")
                val Y             : String = list2(i).getAs[String]("y")
                val CNT           : Int    = list2(i).getAs[Long]("cnt").toInt
                val ADDER         : String = currentJsonObject.getString("formatted_address")
                val PRVC          : String = currentJsonObject.getJSonObject("addressComponent").getString("province")
                val CTY           : String = currentJsonObject.getJSonObject("addressComponent").getString("city")
                val CTY_CD        : String = currentJsonObject.getJSonObject("addressComponent").getString("citycode")
                val DISTRICT      : String = currentJsonObject.getJSonObject("addressComponent").getString("district")
                val AD_CD         : String = currentJsonObject.getJSonObject("addressComponent").getString("adcode")
                val TOWN_SHIP     : String = currentJsonObject.getJSonObject("addressComponent").getString("township")
                val TOWN_CD       : String = currentJsonObject.getJSonObject("addressComponent").getString("towncode")
                val NB_NM         : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("neighborhood").getString("name")
                val NB_TP         : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("neighborhood").getString("type")
                val BD_NM         : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("building").getString("name")
                val BD_TP         : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("building").getString("type")
                val STREET        : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("streetNumber").getString("street")
                val STREET_NB     : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("streetNumber").getString("number")
                val STREET_LOC    : String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("streetNumber").getString("location")
                val STREET_DRCTION: String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("streetNumber").getString("direction")
                val STREET_DSTANCE: String = currentJsonObject.getJSonObject("addressComponent").getJSonObject("streetNumber").getString("distance")
                val BUS_INFO      : String = currentJsonObject.getJSonObject("addressComponent").getString("businessAreas")

                list3.append(Row(MID, X, Y, CNT, ADDER, PRVC, CTY, CTY_CD, DISTRICT, AD_CD, TOWN_SHIP, TOWN_CD, NB_NM, NB_TP,
                  BD_NM, BD_TP, STREET, STREET_NB, STREET_LOC, STREET_DRCTION, STREET_DSTANCE, BUS_INFO))
              }
            }
        }
        list3.iterator
      })
      //rdd1.foreach(println)
    // 将rdd转换成dataframe
    // 方式1: rdd.toDF
    // 方式2: spark.createDataframe
    val schema = StructType(Array[StructField](
       StructField("MID",IntegerType),
       StructField("X",StringType),
       StructField("Y",StringType),
       StructField("CNT",IntegerType),
       StructField("ADDER",StringType),
       StructField("PRVC",StringType),
       StructField("CTY",StringType),
       StructField("CTY_CD",StringType),
       StructField("DISTRICT",StringType),
       StructField("AD_CD",StringType),
       StructField("TOWN_SHIP",StringType),
       StructField("TOWN_CD",StringType),
       StructField("NB_NM",StringType),
       StructField("NB_TP",StringType),
       StructField("BD_NM",StringType),
       StructField("BD_TP",StringType),
       StructField("STREET",StringType),
       StructField("STREET_NB",StringType),
       StructField("STREET_LOC",StringType),
       StructField("STREET_DRCTION",StringType),
       StructField("STREET_DSTANCE",StringType),
       StructField("BUS_INFO",StringType)
     ))

       spark.createDataframe(rdd1,schema).createTempView("mac_location")

   spark.sql(
       s"""
         |insert overwrite table tw_mac_loc_d partition(data_dt='${inputDate}')
         |select * from mac_location
         |""".stripMargin)

    spark.stop()
  }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780430.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号