栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL中collect对数据的检索

Spark SQL中collect对数据的检索

每天都在努力的我,每天都在努力的你,总有一天我们会相见

        Spark collect和collectAsList是用于将RDD/Dataframe/DataSet的所有元素检索到驱动程序节点的操作。如果数据集较大,使用collect之后可能会导致内存不足

val data = Seq(Row(Row("James","","Smith"),"36636","M",3000), 

Row(Row("Michael","Rose",""),"40288","M",4000), 

Row(Row("Robert","","Williams"),"42114","M",4000),

 Row(Row("Maria","Anne","Jones"),"39192","F",4000), 

Row(Row("Jen","Mary","Brown"),"","F",-1)) 
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType) 
.add("middlename",StringType) 
.add("lastname",StringType)) 
.add("id",StringType) 
.add("gender",StringType) 
.add("salary",IntegerType) 
val dataframe = sparkSession.createDataframe(
sparkSession.sparkContext.parallelize(data), schema)

        在一般的情况下,使用show()默认的情况下显示的前20行。

使用Collect()和CollectAsList()

        collect()动作函数用于数据集中检索所有元素,作为Array[Row]的一部分。

        什么意思呢?

        collect返回的是数组类型的,不过这个数组里面包含的Row对象。拿上面的数据作为例子.

        一个Array[Row] =》("Michael","Rose","")

        collectAsList()函数类似于collect,但是返回的是Java的列表

        collect() : scala.Array[T] collectAsList() : java.util.List[T]

        val colList = df.collectAsList() 
val colData = df.collect() 
colData.foreach(row=> { 
val salary = row.getInt(3)
//Index starts from zero
 println(salary) })

        上面代码就是将dataframe经过Collect之后的返回的Array[Row]进行遍历。

        问题来了,上面的数据,第一个字段name是嵌套字段,如何打印嵌套字段里面的数据。

val collist = dataframe.collectAsList() 
val colData = dataframe.collect() 
colData.foreach(row => {
 // 总共四个字段 name,id,gender,salary 
val salary = row.getInt(3) 
val fullName = row.getStruct(0) 
val firstname = fullName.getString(0) 
val senondname = fullName.get(1) 
val lastName = fullName.getAs[String]("lastname") 
println(salary + " " +firstname + " " + senondname + " " + lastName) })

        对于嵌套字段,使用getStruct首先获取到嵌套字段,获取之后使用get来获取里面的嵌套字段,下表从0开始。

 

何时避免使用Collect()

        collect通常用于处理小数据集,当collect大数据集之后,可能会导致内存不足,避免使用大数据集在collect上

如何删除重复行

        在Spark SQL中可以使用distinct进行去重,删除重复的行或者列。

        distinct可以删除所有列上具有相同值的行,dropDpulicates可以删除多个选定列上具有相同值的行列。

        先看一下数据集

val simpleData = Seq(("James","Sales",3000), 
("michael","Sales",4600),
 ("Robert","Sales",4100), 
("Maria","Finance",3000), 
("James", "Sales", 3000), 
("Scott", "Finance", 3300),
 ("Jen", "Finance", 3900),
 ("Jeff", "Marketing", 3000), 
("Kumar", "Marketing", 2000), 
("Saif", "Sales", 4100)) 
val dataframe = simpleData.toDF("employees_name", "department", "salary") 

dataframe.show()

        可以看到James这个id中有两次出现,所有的值一摸一样,其它的数据有四行在deparment和salary中具有重复值。

        如果只是用distinct的话,那么去重的只有James。

        val value = dataframe.distinct() println(value.count())

        本来有10条数据,经过distinct之后有9条数据,因为去重了一条数据

        如果想要去除某个列相同的数据的行,那么使用dropDuplicate就可以删除Dataframe上重复的行。

        如果使用dropDuplicate函数的时候没有指定参,那么作用就是和distinct的作用是一样的。

下面通过添加参数来实现通过重复列进行删除。

        val value2 = dataframe.dropDuplicates("department", "salary") println(value2.count())

        上面的数据中有四条数据是department和salary都是一样的。

        所以后面经过dropDuplicates之后的数据变成了8行。

总结

        人不可能一天都在学习,但是我们可以保持每天都在学习。

        我的她也要越来越好呢

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752896.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号