1、准备一个json的log文件(如果没有的话,可以到我的资源里去下载,0积分就可以下载)
2、spark环境的安装和配置(如果还没有安装的话,跳转到保姆级教程)
json数组分为cm、ap、et,cm里又分为ln,sv…等等等。
这样光看可读性比较差,大家可以使用菜鸟教程的在线解析json工具去看。很好用。
本次案例操作是在spark黑窗口就行的
1、首先先导入两个包,方便下面的代码使用
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._
2、读取文件
val optionRDD = sc.textFile("file:///opt/tmp/op.log")
3、读取cm、ap、et
val option1 = optionRDD.map(x=>x.split('|')).map(x=>(x(0),x(1)))
val jsonStr = option1.map(x=>x._2)
val jsonstrDF = jsonStr.toDF()
val jsonobj = jsonstrDF.select(get_json_object($"value","$.ap").as("ap"),get_json_object($"value","$.cm").as("cm"),get_json_object($"value","$.et").as("et"))
jsonobj.printSchema
jsonobj.show()
4、获取cm中的子元素
val jsonobj2 = jsonobj.select($"ap",get_json_object($"cm","$.ln").as("ln"),
get_json_object($"cm","$.sv").as("sv"),
get_json_object($"cm","$.os").as("os"),
get_json_object($"cm","$.g").as("g"),
get_json_object($"cm","$.mid").as("mid"),
get_json_object($"cm","$.nw").as("nw"),
get_json_object($"cm","$.l").as("l"),
get_json_object($"cm","$.vc").as("vc"),
get_json_object($"cm","$.hw").as("hw"),
get_json_object($"cm","$.ar").as("ar"),
get_json_object($"cm","$.uid").as("uid"),
get_json_object($"cm","$.t").as("t"),
get_json_object($"cm","$.la").as("la"),
get_json_object($"cm","$.md").as("md"),
get_json_object($"cm","$.vn").as("vn"),
get_json_object($"cm","$.ba").as("ba"),
get_json_object($"cm","$.sr").as("sr"),
$"et")
jsonobj2.show()
5、定义et子元素的schema
val schema = ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))
6、输出元素
jsonobj2.select(
$"ap",$"ln",$"sv",$"os",
$"g",$"mid",$"nw",$"l",
$"vc",$"hw",$"ar",$"uid",
$"t",$"la",$"md",$"vn",
$"ba",$"sr",from_json($"et",schema).as("events")
).withColumn("events", explode(col("events"))).select($"ap",$"ln",$"sv",$"os",
$"g",$"mid",$"nw",$"l",
$"vc",$"hw",$"ar",$"uid",
$"t",$"la",$"md",$"vn",
$"ba",$"sr",$"events.ett", $"events.en", $"events.kv").show()



