栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

pyspark 入门

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

pyspark 入门

文章目录
    • 为什么用
    • 怎么用
      • 初始化类 class pyspark.sql.SparkSession 类
      • 函数说明
      • 利用Dataframe API查询
      • 利用sql查询
      • Dataframe函数说明
        • agg(*exps)
        • alias(alias)
        • approxQuantile(col,pro probabilities, relativeError)
        • cache()
        • checkpoint([eager])
        • coalesce(numPartitions)
        • colRegex(colName)
        • collect()
        • corr(col1, col2[, method])
        • count()
        • cov(col1, col2)
        • createGlobalTempView(name)
        • createOrReplaceGlobalTempView(name)
        • createOrReplaceTempView(name)
        • createTempView(name)
        • crossJoin(other)
        • crosstab(col1, col2)
        • cube(*cols)
        • describe(*cols)
        • distinct()
        • drop(*cols)
        • dropDuplicates([subset])
        • drop_duplicates([subset])
        • dropna([how, thresh, subset])
        • exceptAll(other)
        • explain([extended, mode])
        • fillna(value[, subset])
        • filter(condition)
        • first()
        • foreach(f)
        • foreachPartition(f)
        • freqItems(cols[, support])
        • groupBy(*cols)=group_by
        • head([n]) 返回前n行
        • hint(name, *parameters)
        • inputFiles()
        • intersect(other)
        • intersectAll(other)
        • isLocal()
        • join(other, on, how)
        • limit(num)
        • localCheckpoint([eager])
        • mapInPandas(func, schema)
        • orderBy(*cols, **kwargs)
        • persist([storageLevel])
        • printSchema()
        • randomSplit(weights[, seed])
        • registerTempTable(name)
        • repartition(numPartitions, *cols)
        • repartitionByRange(numPartitions, *cols)
        • replace(to_replace, value, subset)
        • rollup(*cols)
        • sameSemantics(other)
        • sample([withReplacement, fraction, seed])
        • sampleBy(col, fractions[, seed])
        • select(*cols)
        • selectExpr(*expr)
        • semanticHash()
        • show([n, truncate, vertical])
        • sort(*cols, **kwargs)
        • sortWithinPartitions(*cols, **kwargs)
        • subtract(other)
        • summary(*statistics)
        • tail(num)
        • take(num)
        • toDF(*cols)
        • toJSON([use_unicode])
        • toLocalIterator([prefetchPartitions])
        • toPandas()
        • transform(func)
        • union(other)
        • unionAll(other)
        • unionByName(other[, allowMissingColumns])
        • unpersist([blocking])
        • where(condition)
        • withColumn(colName, col)
        • withColumnRenamed(existing, new)
        • withWatermark(eventTime, delayThreshold)
        • writeTo(table)
      • Dataframe属性
        • columns
        • dtypes
        • isStreaming
        • na
        • rdd
        • schema
        • stat
        • storageLevel
        • write
        • writeStream

为什么用

python中pandas是数据分析的利器,具有并行的特兹那个,而且函数和数据计算的方法非常方便,是数据分析中的瑞士军刀。但是受限于单个机器性能和配置的限制,当大规模数据,比如100G-10TB规模的数据时,pandas就显得局限了,就像瑞士军刀杀牛,难以下手。这时就需要基于分布式计算的大数据工具spark,是基于分布式计算,可以基于hadoop和hive,进行分布式的数据计算,同时spark具有python API,可以通过类似python的语法,无门槛的过渡。

怎么用

pyspark支持RDD和Dataframe的数据类型,但是RDD在python中相比于scala和R语言,是非常慢的,而Dataframe使性能在各种语言中都保持稳定。所以我们可以试用pyspark的Dataframe对大规模数据进行数据清理,然后转为pandas.dataframe,进行数据可视化。

初始化类 class pyspark.sql.SparkSession 类
spark = SparkSession.builder 
...     .master("local") 
...     .appName("Word Count") 
...     .config("spark.some.config.option", "some-value") 
...     .getOrCreate

Builder for SparkSession 这个就是生成一个 sparksession 实例。他下面有一些支持的函数
master: 设置 spark master 的 url 。由于我们集群使用的是 spark on yarn 的模式,所以可以用选择不设置这个参数。

appName: 在程序运行的时候看到的应用名称。

config: 其实其他参数也是调用 .config 设置相应的 config 参数,例如 .master 就是调用了 .config(“spark.master”, master)。

enableHiveSupport: 启动对 hive 的支持。例如支持 hivesql 以及 hive udf 等。

getOrCreate: 得到一个现成的 SparkSession ,如果没有就生成一个。

  • SparkSession.catalog:
    提供一个接口来操作 create drop alter query 库或者表,比如:
catalog.listTables().select($"name").show(2,false)
  • SparkSession.createDataframe:
    可以获得从 rdd python list 和 pandas df 创建 df 的能力。下面贴一下官方的例子:
l = [('Alice', 1)]
>>> spark.createDataframe(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataframe(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]


>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataframe(d).collect()
[Row(age=1, name=u'Alice')]


>>> rdd = sc.parallelize(l)
>>> spark.createDataframe(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataframe(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
  • SparkSession.sql:
    使用sql方法返回的是df,例如:
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
  • SparkSession.table
    这个可以返回指定表的 df ,将表复制成df
函数说明
  • 查询
    1,.show(n),方法默认显示前10行
    2,spark.sql("select * from ").collect()
利用Dataframe API查询
  • swimmers.count() 返回dataframe 的行数
  • swimmers.select().filter().show() 返回筛选后数据
利用sql查询
  • spark.sql(“select count(1) from swimmers”).show()
    sql语句和sql语言一致,也可以试用where 筛选
Dataframe函数说明
  • agecol=people.age 选择其中的列
agg(*exps)

应用到整个没有分组的dataframe上,全称是df.groupby().agg()

alias(alias)

返回一个设置了别名alias的dataframe,具有重命名的功能

df_as1 = df.alias("df_as1")
approxQuantile(col,pro probabilities, relativeError)

返回一个列的中值列表

cache()

返回dataframed的存贮水平

checkpoint([eager])

返回这个数据集的检查点版本,检查点可以用来截断这个Dataframe的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。它将保存到SparkContext.setCheckpointDir()设置的检查点目录中的文件中。

coalesce(numPartitions) colRegex(colName)

正则化选择匹配的多列

df = spark.createDataframe([("a", 1), ("b", 2), ("c",  3)], ["Col1", "Col2"])
df.select(df.colRegex("`(Col1)?+.+`")).show()
+----+
|Col2|
+----+
|   1|
|   2|
|   3|
+----+
collect()

返回所有的数据记录的list

corr(col1, col2[, method]) count()

返回dataframe的行数

cov(col1, col2)

计算给定列的样本协方差

createGlobalTempView(name)

通过 df 创建一个全局的临时表。他的生命周期跟 spark 应用的生命周期相关联。如果视图中已经存在这个名字了会抛出 TempTableAlreadyExistsException 的错误。

createOrReplaceGlobalTempView(name)

这个对比 createGlobalTempView 不同的点是不会抛出 TempTableAlreadyExistsException 的错误,会直接替换。

createOrReplaceTempView(name)

使用现成的df创建临时视图,可以使用sql语句获取数据

createTempView(name) crossJoin(other)

和另外一个 df 取笛卡尔积例如

>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df2.select("name", "height").collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
>>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85),
 Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]
crosstab(col1, col2) cube(*cols)

使用指定的列为当前 Dataframe 创建一个多维立方体,以便我们可以对它们运行聚合。

df.cube("name", df.age).count().orderBy("name", "age").show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|null|    2|
| null|   2|    1|
| null|   5|    1|
|Alice|null|    1|
|Alice|   2|    1|
|  Bob|null|    1|
|  Bob|   5|    1|
+-----+----+-----+
describe(*cols)

计算数字和字符串列的基本统计信息

>>> df.describe(['age']).show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               3.5|
| stddev|2.1213203435596424|
|    min|                 2|
|    max|                 5|
+-------+------------------+
>>> df.describe().show()
+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 2|    2|
|   mean|               3.5| null|
| stddev|2.1213203435596424| null|
|    min|                 2|Alice|
|    max|                 5|  Bob|
+-------+------------------+-----+
distinct()

返回一个不包含重复值的新的dataframe

drop(*cols)

返回删除指定列的dataframe

dropDuplicates([subset])

返回一个新的 df ,这个 df 里面不再有重复的记录。可选参数可以让我们选择关心的字段进行去重。

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ 
...     Row(name='Alice', age=5, height=80), 
...     Row(name='Alice', age=5, height=80), 
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+
drop_duplicates([subset]) dropna([how, thresh, subset])

返回了删除某列含有空值的行的dataframe

exceptAll(other) explain([extended, mode]) fillna(value[, subset])

填充缺失值,subset指定需要填充的列

>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    50|  Bob|
| 50|    50|  Tom|
| 50|    50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10|    80|  Alice|
|  5|  null|    Bob|
| 50|  null|    Tom|
| 50|  null|unknown|
+---+------+-------+
filter(condition)

给定特定条件过滤,和where是同名函数

>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]

>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]

>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]

>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
first()

返回第一条 df

foreach(f)

定义一个函数,会让每个 df 都执行该函数

>>> def f(person):
...     print(person.name)
>>> df.foreach(f)
foreachPartition(f)

定义一个函数,会让每个 partitions 都执行这个函数

>>> def f(people):
...     for person in people:
...         print(person.name)
>>> df.foreachPartition(f)
freqItems(cols[, support]) groupBy(*cols)=group_by

可以使用 agg 方法对其进行各种各样的聚合, spark sql 专门有个类为其提供了非常多的处理函数。See GroupedData for all the available aggregate functions.

>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect())
[Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
head([n]) 返回前n行

没有指定n时,默认是一行

hint(name, *parameters) inputFiles() intersect(other) intersectAll(other) isLocal() join(other, on, how)

使用给定的条件和其他 df 进行 join。
other:另外一个df.
on:条件,join的位置 列名
how:默认是inner

limit(num)

限制拿多少条

localCheckpoint([eager]) mapInPandas(func, schema) orderBy(*cols, **kwargs)

返回一个被指定 col 排序好的 df

>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
persist([storageLevel]) printSchema()

打印出该 df 的 schema

randomSplit(weights[, seed]) registerTempTable(name)

将 df 注册成一个临时生命周期是 SparkSession 的周期,这个跟上面 createOrReplaceTempView 互为同名函数,就是调用该函数生成的。

repartition(numPartitions, *cols)

返回一个新的 df,这个新的 df 被给定的 numPartitions 数量进行 hash 重分区。numPartitions可以是指定分区或列的目标数量的int。如果它是一个列,它将被用作第一个分区列。如果没有指定,则使用默认分区数。

repartitionByRange(numPartitions, *cols) replace(to_replace, value, subset)

返回一个 df 用参数位置2的值替换掉参数位置是1的值。Dataframe.replace() and DataframeNaFunctions.replace()互为同名函数

>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  20|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+

>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   A|
|   5|  null|   B|
|null|  null| Tom|
|null|  null|null|
+----+------+----+
rollup(*cols) sameSemantics(other) sample([withReplacement, fraction, seed]) sampleBy(col, fractions[, seed]) select(*cols)

返回 select 能找到的数据

selectExpr(*expr) semanticHash() show([n, truncate, vertical])

打印前n行数据带控制台

sort(*cols, **kwargs)

根据给定的 cols 进行排序之后返回新的 df

>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
sortWithinPartitions(*cols, **kwargs) subtract(other)

取目标 df ohter 和 df 的补集。

summary(*statistics) tail(num) take(num)

返回df前num条数据

toDF(*cols)

返回新的df,新的列名对应list顺序

toJSON([use_unicode])

将dataframe 数据格式转成rdd格式

toLocalIterator([prefetchPartitions])

返回一个迭代器,包含了dataframe的所有列

toPandas()

将数据转换为pandas.dataframe数据类型

transform(func)

返回一个新的dataframe

union(other)

交集

unionAll(other)

并集

unionByName(other[, allowMissingColumns])

返回包含这个和其他dataframe中独一无二的列的,交集dataframe

unpersist([blocking]) where(condition)

功能与filter一样

withColumn(colName, col)

返回一个通过添加或替换列名的新的dataframe

withColumnRenamed(existing, new)

返回一个重命名列名的dataframe

withWatermark(eventTime, delayThreshold) writeTo(table) Dataframe属性 columns

以列表形式返回所有列名。

dtypes

以列表形式返回所有列名及其数据类型。

isStreaming na rdd schema

返回dataframe的架构

stat storageLevel write

非流接口向外部存储写入数据

writeStream

流接口向外部存储写入数据

参考文章:
https://www.cnblogs.com/piperck/p/10446720.html
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Dataframe.html

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/313654.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号