栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

pyspark笔记40-dataframe的read、agg集合函数、别名和数据缓存、设置断点

对应课件:3.3 SparkSQL中的核心数据结构Dataframe第40.pdf

对应视频:40、agg聚合方法及数据缓存方法cache的讲解.mp4

一、spark.read方法读取文件创建dataframe。

dir(spark.read)查看。spark.read支持多种格式的文件的读取,包括:'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text'

hdfs dfs -mkdir /sql

hdfs dfs -put users.parquet   /sql

hdfs dfs -ls /sql

将文件users.parquet上传到创建好的目录下,查看一下。

df = spark.read.parquet('/sql/users.parquet') #读取文件

df.show() #展示样本数据

 df.printSchema()  #查看dataframe的结构

|-- element: integer (containsNull = true)表示favorite_numbers字段里边是数组
二、聚合函数dataframe.agg(*exprs),传入要聚合的字段及聚合的方式,以字典的方式进行组合

     通过read上的csv方法读取hdfs上的/customers.csv,返回dataframe,由关键字参数header指定保留文件的头信息,并使用头信息作为Dataframe的columns。调用printSchema方法打印出Dataframe的结构信息。

df = spark.read.csv('/sql/customers.csv',header=True)
df.printSchema()


df.show()

 

   有了dataframe之后,怎样来聚合呢?

这个Dataframe中第一列为编号,第二列为性别,第三列为年龄,第四列为实际收入,第五列为消费积分。现在就可以用agg聚合方法来找出Age的最大或最小值,实际收入的均值,消费积分的均值等信息了。例如计算年龄最大值、收入均值、消费积分均值.

    对多个字段求得聚合放到一个字典中,键为列名称,值为用来计算的函数名称,当然这里的函数可以是自定义的聚合函数udaf。注意:字典里边的键和值要用单引号或者双引号扩起来。

df.agg({"Age": "max","Annual Income (k$)":'mean','Spending Score (1-100)':"mean"}).show()

 三、dataframe.alias(‘别名’)。为Dataframe定义一个别名,稍后再函数中就可以利用这个别名来做相关的运算,例如说自关联.

from pyspark.sql.functions import *
df = spark.read.csv('/sql/customers.csv',header=True)

 

 df2 = df.alias('cus2')

df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')

df3.count()

Out[26]:

 

df3.show(4)

 四、cache(),将Dataframe缓存到StorageLevel对应的缓存级别中,默认是MEMORY_AND_DISK 

df = spark.read.csv('/sql/customers.csv',header=True)
a = df.cache()
a.show()

 

    将频繁需要查询的数据缓存起来,这样下一次要查询的时候就可以直接在内存中读取数据,
从而提升数据的读取速度提升计算的效率。

五、dataframe.checkpoint(eager=True) 。关键字参数eager,默认为True表示是否立即设置断
点。

    对Dataframe设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个
Dataframe上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。这个方法
对于需要大量需要迭代的算法非常有用,因为算法在迭代的过程中逻辑计划的数据量会呈现
指数级别的上升。要使用这个方法需要使用sparkcontext上面的setCheckpointDir设置检查
点数据在HDFS中的存储目录。

sc=spark.sparkContext   #从现有的sparksession对象spark中获取到sparkContext

sc.master

 

sc.setCheckpointDir('/datas/checkpoint')

#设置检查点数据在HDFS中的存储目录,hdfs上会多一个/checkpoint目录

a.checkpoint()
a.show()

运行完成之后,在hdfs的/datas/checkpoint目录下会看到checkpoint的数据

!hdfs dfs -ls /datas/checkpoint

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/280890.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号