栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark:基于PySpark的DataFrame、SQL、TableAPI操作

Spark:基于PySpark的DataFrame、SQL、TableAPI操作

记录下

文章目录

官网文档环境测试说明Dataframe创建

从列表构建DF从Row对象创建DF为DF设置数据类型-指定类型为DF设置数据类型-字符串设置数据类型为DF设置数据类型-(单个)字符串设置数据类型从Parquet文件创建DF从JSON数据创建DF从CSV文件创建DF Dataframe 操作

数据去重、列重命名、增加列、更改列数据、删除列空值处理转成JSON SQL操作自定义函数UDFDataframe TableAPI

select、where、agg聚合、describe日期操作(使用内置函数)透视-转置列(groupby、pivot、sum)条件选择关联查询

官网文档

https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html

环境

Python3spark2.4.8jupyternotebookJDK8

jupyternotebook整合spark可参考:https://blog.csdn.net/maoyuanming0806/article/details/122886518

测试说明

测试基于pyspark,整合jupyternotebook测试,代码段测试如下,后面的功能测试只贴代码段部分

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("test2").getOrCreate()
sc = spark.sparkContext

# ===============代码段start====================

# ===============代码段end====================
sc.stop()
Dataframe创建 从列表构建DF
print("列表创建df " + "=======================")
a = [('Jack', 32),('Smith', 33)]
df = spark.createDataframe(a)
print(df.collect())
df.show()
# 设置df的列名
df = spark.createDataframe(a, ['name', 'age'])
df.show()
列表创建df =======================
[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)]
+-----+---+
|   _1| _2|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+
从Row对象创建DF
print("Row对象创建df " + "=======================")
# 从Row对象创建df
rdd = sc.parallelize(a)
from pyspark.sql import Row
Person = Row('name', 'age')
# 把rdd数据转换成person对象,person是个Row类型
person = rdd.map(lambda r:Person(*r)) 
df = spark.createDataframe(person)
df.printSchema()
df.show()

执行结果

Row对象创建df =======================
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+
为DF设置数据类型-指定类型
print("设置数据类型创建df " + "=======================")
# 设置数据类型
from pyspark.sql.types import *
schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
df = spark.createDataframe(rdd, schema)
print(df.collect())
df.printSchema()
df.show()

执行结果

设置数据类型创建df =======================
[Row(name='Jack', age=32), Row(name='Smith', age=33)]
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+
为DF设置数据类型-字符串设置数据类型
print("通过字符串设置数据类型创建df " + "=======================")
# 直接设置数据类型
df = spark.createDataframe(rdd, "name: string, age: int")
df.printSchema()
df.show()

执行结果

通过字符串设置数据类型创建df =======================
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+
为DF设置数据类型-(单个)字符串设置数据类型
print("通过字符串(单字段)设置数据类型创建df " + "=======================")
# 单字段直接设置数据类型
rdd2 = rdd.map(lambda x:x[0])
df = spark.createDataframe(rdd2, "string")
df.printSchema()
df.show()

执行结果

通过字符串(单字段)设置数据类型创建df =======================
root
 |-- value: string (nullable = true)

+-----+
|value|
+-----+
| Jack|
|Smith|
+-----+
从Parquet文件创建DF
print("从parquet文件创建df " + "=======================")
# 写入parquet,从parquet文件读取
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
# 写入 myuser.parquet 是目录必须不存在否则报错
df.write.parquet("D://a/myuser.parquet")
# 读取 myuser.parquet 是目录必须存在否则报错
peopleDf = spark.read.parquet("D://a/myuser.parquet")
peopleDf.show()

执行结果

从parquet文件创建df =======================
+-----+---+
| name|age|
+-----+---+
| 李四| 36|
|Smith| 33|
| Jack| 32|
+-----+---+
从JSON数据创建DF
print("从Json数据创建df " + "=======================")
# 写入json文件,从json文件读取
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
# 写入 myuser.json 是目录必须不存在否则报错
df.write.json("D://a/myuser.json")
# 读取 myuser.json 是目录必须存在否则报错
peopleDf = spark.read.format("json").load("D://a/myuser.json")
print(peopleDf.dtypes)
peopleDf.show()

执行结果

从Json数据创建df =======================
[('age', 'bigint'), ('name', 'string')]
+---+-----+
|age| name|
+---+-----+
| 36| 李四|
| 33|Smith|
| 32| Jack|
+---+-----+
从CSV文件创建DF
print("从CSV文件创建df " + "=======================")
# 写入CSV文件,从CSV文件读取
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
# 写入 myuser.csv 是目录必须不存在否则报错
df.write.csv("D://a/myuser.csv")
# 读取 myuser.json 是目录必须存在否则报错
peopleDf = spark.read.csv("D://a/myuser.csv")
print(peopleDf.dtypes)
peopleDf.show()

执行结果

从CSV文件创建df =======================
[('_c0', 'string'), ('_c1', 'string')]
+-----+---+
|  _c0|_c1|
+-----+---+
| 李四| 36|
|Smith| 33|
| Jack| 32|
+-----+---+

还有更多创建DF方式,例如读取JDBC等,更多请参看官方文档

Dataframe 操作 数据去重、列重命名、增加列、更改列数据、删除列
a = [('Jack', 32),('Smith', 33),('李四', 36), ('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
# 数据去重
print("数据去重 " + "=======================================")
df_distinct = df.distinct()
df_distinct.show()

# 列重命名
print("列重命名 " + "=======================================")
df.withColumnRenamed("name", "%xdel名").show()

# 增加列
print("增加列 " + "=======================================")
df.withColumn("age2", df.age + 1).show()

# 选择某列更改数据:按照表达式
print("更改列数据:按表达式 " + "=======================================")
df.selectExpr("age + 1", "name + '_后缀'").show()

# 删除列
print("删除列-按列明名 " + "=======================================")
df_drop_age = df.drop("age")
df_drop_age.show()
print("删除列-按另一个DF " + "=======================================")
df1 = spark.createDataframe([("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b",  3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataframe([("a", 1), ("b", 3)], ["C1", "C2"])
df1.exceptAll(df2).show() # 类似df相减

执行结果

数据去重 =======================================
+-----+---+
| name|age|
+-----+---+
|Smith| 33|
| Jack| 32|
| 李四| 36|
+-----+---+

列重命名 =======================================
+-------+---+
|%xdel名|age|
+-------+---+
|   Jack| 32|
|  Smith| 33|
|   李四| 36|
|   李四| 36|
+-------+---+

增加列 =======================================
+-----+---+----+
| name|age|age2|
+-----+---+----+
| Jack| 32|  33|
|Smith| 33|  34|
| 李四| 36|  37|
| 李四| 36|  37|
+-----+---+----+

更改列数据:按表达式 =======================================
+---------+--------------+
|(age + 1)|(name + _后缀)|
+---------+--------------+
|       33|          null|
|       34|          null|
|       37|          null|
|       37|          null|
+---------+--------------+

删除列-按列明名 =======================================
+-----+
| name|
+-----+
| Jack|
|Smith|
| 李四|
| 李四|
+-----+

删除列-按另一个DF =======================================
+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  2|
|  c|  4|
+---+---+
空值处理
# 空值处理
print("空值处理 " + "=======================================")
# 注意None值在df中显示的是null
a = [   ('01','张三', '男',32,5000),
        ('01', None, '男',33,6000),
        ('01','王五', '女',36,None),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,None),
]
rdd = sc.parallelize(a)
peopleDf = spark.createDataframe(rdd,"deptId:string,name:string,gender:string,age:int,salary:int")
peopleDf.show()
#将空值进行替换
peopleDf.na.fill({'salary':0, 'name':'unknown'}).show()

执行结果

空值处理 =======================================
+------+-----+------+---+------+
|deptId| name|gender|age|salary|
+------+-----+------+---+------+
|    01| 张三|    男| 32|  5000|
|    01| null|    男| 33|  6000|
|    01| 王五|    女| 36|  null|
|    02| Jack|    男| 42|  7000|
|    02|Smith|    女| 27|  6500|
|    02| Lily|    女| 45|  null|
+------+-----+------+---+------+

+------+-------+------+---+------+
|deptId|   name|gender|age|salary|
+------+-------+------+---+------+
|    01|   张三|    男| 32|  5000|
|    01|unknown|    男| 33|  6000|
|    01|   王五|    女| 36|     0|
|    02|   Jack|    男| 42|  7000|
|    02|  Smith|    女| 27|  6500|
|    02|   Lily|    女| 45|     0|
+------+-------+------+---+------+
转成JSON
# 转Json
print("转json " + "=======================================")
a = [('Jack', 32),('Smith', 33),('李四', 36), ('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
#转成JSON格式
print(df.toJSON().collect())

执行结果

转json =======================================
['{"name":"Jack","age":32}', '{"name":"Smith","age":33}', '{"name":"李四","age":36}', '{"name":"李四","age":36}']
SQL操作

SQL操作满足标准sql的DQL语法,这里只简单演示几个,基本标准HIVE的SQL都能执行

a = [('Jack', 32),('Smith', 33),('李四', 36), ('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
# 创建视图表
df.createOrReplaceTempView("user")
# 查询
print("SQL:常规查询 " + "=======================================")
df_select_count = spark.sql("select count(*) as counter from user")
df_select_count.show()

# 关联查询
print("SQL:关联查询 " + "=======================================")
staff = [   ('01','张三', '男',32,5000),
        ('01','李四', '男',33,6000),
        ('01','王五', '女',38,5500),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,9500)
]
staffRDD = sc.parallelize(staff)
staffDf = spark.createDataframe(staffRDD,"deptId:string,name:string,gender:string,age:int,salary:int")
dept = [   ('01','销售部'),
        ('02','研发部')
]
deptRDD = sc.parallelize(dept)
deptDf = spark.createDataframe(deptRDD, "id:string,name:string")
staffDf.createOrReplaceTempView("staff")
deptDf.createOrReplaceTempView("dept")
df_join = spark.sql("select staff.*, dept.name as deptName from staff left join dept on staff.deptId = dept.id")
df_join.show()

执行结果

SQL:常规查询 =======================================
+-------+
|counter|
+-------+
|      4|
+-------+

SQL:关联查询 =======================================
+------+-----+------+---+------+--------+
|deptId| name|gender|age|salary|deptName|
+------+-----+------+---+------+--------+
|    01| 张三|    男| 32|  5000|  销售部|
|    01| 李四|    男| 33|  6000|  销售部|
|    01| 王五|    女| 38|  5500|  销售部|
|    02| Jack|    男| 42|  7000|  研发部|
|    02|Smith|    女| 27|  6500|  研发部|
|    02| Lily|    女| 45|  9500|  研发部|
+------+-----+------+---+------+--------+
自定义函数UDF
conf = sc.getConf().get("spark.sql.shuffle.partitions")
print(conf)
# 注册UDF
strlen = spark.udf.register("strLen", lambda x: len(x))
# 准备数据和表
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataframe(rdd, "name: string, age: int")
df.createOrReplaceTempView("user")
# 测试UDF
df2 = spark.sql("select *,strLen(name) as len from user")
df2.show()

执行结果

None
+-----+---+---+
| name|age|len|
+-----+---+---+
| Jack| 32|  4|
|Smith| 33|  5|
| 李四| 36|  2|
+-----+---+---+
Dataframe TableAPI select、where、agg聚合、describe
strlen = spark.udf.register("strLen", lambda x: len(x))
person_info = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(person_info)
df = spark.createDataframe(rdd, "name: string, age: int")
df.createOrReplaceTempView("user")
# select where
print("TableAPI:selectwhere " + "=======================================")
df2 = df.select("name").where(strlen("name") > 4)
df2.show()

# agg 聚合
print("TableAPI:agg聚合 " + "=======================================")
df3 = df.agg({"age":"max"})
df3.show()

# describe
print("TableAPI:describe " + "=======================================")
df.describe(['age']).show()

执行结果

TableAPI:selectwhere =======================================
+-----+
| name|
+-----+
|Smith|
+-----+

TableAPI:agg聚合 =======================================
+--------+
|max(age)|
+--------+
|      36|
+--------+

TableAPI:describe =======================================
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 3|
|   mean|33.666666666666664|
| stddev| 2.081665999466133|
|    min|                32|
|    max|                36|
+-------+------------------+
日期操作(使用内置函数)
print("TableAPI:日期操作 " + "=======================================")
df = spark.createDataframe([('2020-05-10',),('2020-05-09',)], ['dt'])
from pyspark.sql.functions import add_months
df.select(add_months(df.dt, 1).alias('next_month')).show()

执行结果

TableAPI:日期操作 =======================================
+----------+
|next_month|
+----------+
|2020-06-10|
|2020-06-09|
+----------+
透视-转置列(groupby、pivot、sum)
# 透视-转置列
print("TableAPI:透视-转置列 " + "=======================================")
staff = [   ('01','张三', '男',32,5000),
        ('01','李四', '男',33,6000),
        ('01','王五', '女',38,5500),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,9500)
]
staffRDD = sc.parallelize(staff)
staffDf = spark.createDataframe(staffRDD, "deptId:string,name:string,gender:string,age:int,salary:int")
df_pivot = staffDf.groupBy("deptId") 
        .pivot("gender") 
        .sum("salary")
df_pivot.show()

执行结果

TableAPI:透视-转置列 =======================================
+------+-----+-----+
|deptId|   女|   男|
+------+-----+-----+
|    01| 5500|11000|
|    02|16000| 7000|
+------+-----+-----+
条件选择
# 条件选择
print("TableAPI:条件选择 " + "=======================================")
staff = [   ('01','张三', '男',32,5000),
        ('01','李四', '男',33,6000),
        ('01','王五', '女',38,5500),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,9500)
]
staffRDD = sc.parallelize(staff)
staffDf = spark.createDataframe(staffRDD, 
staffDf.select("name",staffDf.salary.between(6000,9500)).show()
staffDf.select("name","age").where(staffDf.name.like("Smi%")).show()

执行结果

TableAPI:条件选择 =======================================
+-----+---------------------------------------+
| name|((salary >= 6000) AND (salary <= 9500))|
+-----+---------------------------------------+
| 张三|                                  false|
| 李四|                                   true|
| 王五|                                  false|
| Jack|                                   true|
|Smith|                                   true|
| Lily|                                   true|
+-----+---------------------------------------+

+-----+---+
| name|age|
+-----+---+
|Smith| 27|
+-----+---+
关联查询
print("TableAPI:关联查询 " + "=======================================")
staff = [   ('01','张三', '男',32,5000),
        ('01','李四', '男',33,6000),
        ('01','王五', '女',38,5500),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,9500)
]
staffRDD = sc.parallelize(staff)
staffDf = spark.createDataframe(staffRDD,"deptId:string,name:string,gender:string,age:int,salary:int")
dept = [   ('01','销售部'),
        ('02','研发部')
]
deptRDD = sc.parallelize(dept)
deptDf = spark.createDataframe(deptRDD, "id:string,name:string")
#join函数第三个参数默认为inner,其他选项为:
# inner, cross, outer, full, full_outer, left, left_outer, 
# right, right_outer, left_semi, and left_anti.
staffDf.join(deptDf, staffDf.deptId == deptDf.id,'inner') 
  .groupBy(deptDf.name, "gender") 
  .agg({"salary": "avg", "age": "max"}) 
  .show()

执行结果

TableAPI:关联查询 =======================================
+------+------+-----------+--------+
|  name|gender|avg(salary)|max(age)|
+------+------+-----------+--------+
|研发部|    男|     7000.0|      42|
|销售部|    男|     5500.0|      33|
|销售部|    女|     5500.0|      38|
|研发部|    女|     8000.0|      45|
+------+------+-----------+--------+

以上只是演示了部分,更多完整请看官方文档

如有跑不出来的,请看移步:测试说明

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734680.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号