package com.aisainfo
import org.apache.spark.sql.{Column, Dataframe, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object Test {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("test").master("local[2]").getOrCreate()
val linesDF: Dataframe = spark.read.json("E:\work_space\work_space\work_asiainfo\java\scala\kafka_spark_hive_json\src\main\scala\com\aisainfo\json")
val columns1: List[Column] = getAllColumns(linesDF).toList
linesDF.createOrReplaceTempView("test")
spark.sql(s"select ${columns1.mkString(", ")} from test").show()
println(columns1.mkString(", "))
println(linesDF.schema.fields.mkString(", "))
//{"asd":{"device_id": 0, "device_type": "sensor-ipad", "ip": {"fgh":123, "456":456}},"qwe":"zxc"}
//linesDF.select("asd.ip.fgh").show()
}
def getAllColumns(df: Dataframe) = {
df.schema.fields.flatMap { data: StructField =>
recursiveSolution(data)
}
}
private def recursiveSolution(data: StructField, prefix: String = ""): List[Column] = {
data match {
case column_info if column_info.dataType.isInstanceOf[StructType] => {
column_info.dataType.asInstanceOf[StructType].fields.flatMap { field =>
if (prefix != "") recursiveSolution(field, s"$prefix.${column_info.name}")
else recursiveSolution(field, s"${column_info.name}")
}.toList
}
case column_info => {
//json字段名称
if (prefix != "") List(col(s"$prefix.${column_info.name}"))
//json路径拼接字段名称
//if (prefix != "") List(col(s"$prefix.${column_info.name} as ${prefix.replace(".", "_")}_${column_info.name}"))
else List(col(s"${column_info.name}"))
}
}
}
}