栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Sql之扁平化嵌套结构列

Spark Sql之扁平化嵌套结构列

因为有她,所有我要走的更远,更久

前言:

        之前的博客中也有一些关于嵌套列的讲解。不过当时的嵌套列普遍只嵌套了一列。但是解决的做法是什么?

        数据集中含有一层嵌套列,通过StructType创建schema映射。

        创建dataframe的时候将数据集和schema字段映射一起创建。

        就是上面的流程,不过在创建schema字段映射的时候,对于之前的嵌套字段,也仅仅只是多了一个new StructType().add就可以将嵌套的一级字段加上。但是遇到多级嵌套怎么办呢、。今天这篇博客讲解的就是嵌套多级字段的处理方法。

        如何展平嵌套数组列

        先来创建一下数据集

val structureData = Seq(
 Row(Row("James ","","Smith"),Row(Row("CA","Los Angles"),Row("CA","Sandiago"))), 

Row(Row("Michael ","Rose",""),Row(Row("NY","New York"),Row("NJ","Newark"))), 

Row(Row("Robert ","","Williams"),Row(Row("DE","Newark"),Row("CA","Las Vegas"))), 

Row(Row("Maria ","Anne","Jones"),Row(Row("PA","Harrisburg"),Row("CA","Sandiago"))),

 Row(Row("Jen","Mary","Brown"),Row(Row("CA","Los Angles"),Row("NJ","Newark"))) )

        可以看到,嵌套了很多列,最多时候的嵌套,将近有两个层级。

        如果使用之前的StructType进行schema映射,应该是下面的写法

val structureSchema = new StructType() 
.add("name",new StructType()
     .add("firstname",StringType) 
     .add("middlename",StringType) 
    .add("lastname",StringType))
.add("address",new StructType() 
    .add("current",new StructType() 
        .add("state",StringType)
        .add("city",StringType))
     .add("previous",new StructType()
         .add("state",StringType) 
        .add("city",StringType)))

        拿address这个嵌套字段来说,下面有两个嵌套字段,current和previous。在current和previous下面又有字段。嵌套了两个层级。

创建dataframe并且打印schema

        

        虽然看着挺多,但是实质上还有只有两个字段,一个name字段,一个struct字段。

 

        如果我们想要将里面嵌套的字段也展开,因该怎么办?

val df2 = df.select(col("name.*"),
 col("address.current.*"), 
col("address.previous.*")) 

df2.toDF("fname","mename","lname","currAddState", "currAddCity","prevAddState","prevAddCity") .show(false)

        上面的代码的实质是通过df.select之后返回的就是一个sql.Dataframe类型,将该对象df2通过toDF生成一个新的dataframe,并且赋予字段名。就会生成一个新的dataframe。

        来看一下展开后的效果

        可以看到,还是可以展开的。

        由于我们的列是有限的,只有两三个,并没有几十个,试想一下,如果有100多个列并在一个选择中引用所有的列,会非常的麻烦。

        为此,我们可以创建一个函数,可以轻松的展平几百个嵌套的列,通过创建一个函数就可以完成这个操作,这个函数可以在每个级别的迭代里面创建一个Array[Column]

        这个函数的本质是通过递归。一个递归函数进行递归,将StructType进行迭代,如果了解递归,那么这个函数你会非常的清楚。

def flattenStructSchema(schema: StructType, prefix: String = null) : Array[Column] = { 

schema.fields.flatMap(
    f => { val columnName = if (prefix == null) f.name else (prefix + "." + f.name) 

    f.dataType match { 
case st: StructType => flattenStructSchema(st, columnName) 
case _ => Array(col(columnName).as(columnName.replace(".","_"))) } }) }

 

        下面展示一下,经过函数处理后,进行打印shcema和表

val df3 = df.select(flattenStructSchema(df.schema):_*) df3.printSchema() df3.show(false)

        进行表结构的展示

        对schema进行展示

        这个函数扁平化嵌套列可以使我们不再一个个手敲每个字段的名字。特别的方便。还有在创建StructType的时候,可以先对schema进行创建字符串,将所需要的字段名放入字符串中,然后对字符串进行map操作,进行创建每一个StructField对象。将map后的对象放入ArrayType对象当中。

总结

        今天的内容还是挺贴近实战的。

        希望我的那个她每天都能快乐的成长。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758035.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号