Counting word frequencies using groupby() and count()
groups = words_nonull.groupby(col("word"))
print(groups)
#
results = words_nonull.groupby(col("word")).count()
print(results)
# Dataframe[word: string, count: bigint]
results.show()
因为Spark是懒惰的,所以它不关心记录的顺序,除非我们明确要求它这样做。由于我们希望看到显示的顶部单词,让我们在数据框中进行一点排序,同时完成程序的最后一步:返回顶部单词频率。
使用orderBy在屏幕上排序结果PySpark为排序提供了两种不同的语法
Displaying the top 10 words in Jane Austen’s Pride and Prejudice
results.orderBy("count", ascending=False).show(10)
results.orderBy(col("count").desc()).show(10)
Writing data from a data frame
就像我们使用read()和SparkReader读取Spark中的数据一样,我们使用write()和SparkWriter对象将数据帧写回磁盘。我专门让SparkWriter将文本导出到CSV文件中,命名simple_count.csv
如果我们看一下结果,我们可以看到PySpark并没有创建一个结果。csv文件。相反,它创建了一个同名的目录,并将201个文件放入该目录中(200个CSV+1_SUCCESS file)。
默认情况下,PySpark将为每个分区提供一个文件。这意味着我们的程序在我的机器上运行时,最终会产生200个分区。为了减少分区的数量,我们将coalesce()方法应用于所需的分区数量。
下一个清单显示了在写入磁盘之前在数据帧上使用coalesce(1)时的区别。我们仍然有一个目录,但里面只有一个CSV文件。
results.coalesce(1).write.csv("./data/simple_count_single_partition.csv")
完整代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
explode,
lower,
regexp_extract,
split,
)
spark = SparkSession.builder.appName(
"Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()
book = spark.read.text("./data/gutenberg_books/1342-0.txt")
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower(col("word")).alias("word"))
words_clean = words_lower.select(
regexp_extract(col("word"), "[a-z']*", 0).alias("word")
)
words_nonull = words_clean.where(col("word") != "")
results = words_nonull.groupby(col("word")).count()
results.orderBy("count", ascending=False).show(10)
results.coalesce(1).write.csv("./simple_count_single_partition.csv")
使用PySpark的导入约定简化依赖关系
虽然没有硬性规定,但普遍的做法是使用F来表示PySpark的函数。
Since col, explode, lower, regexp_extract, and split are all in pyspark.sql.functions, we can import the whole module.
通过方法链接简化我们的程序用链式变换方法去除中间变量
list 3.7
# Before
book = spark.read.text("./data/gutenberg_books/1342-0.txt")
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower(col("word")).alias("word"))
words_clean = words_lower.select(
regexp_extract(col("word"), "[a-z']*", 0).alias("word")
)
words_nonull = words_clean.where(col("word") != "")
results = words_nonull.groupby("word").count()
# After
import pyspark.sql.functions as F
results = (
spark.read.text("./data/gutenberg_books/1342-0.txt")
.select(F.split(F.col("value"), " ").alias("line"))
.select(F.explode(F.col("line")).alias("word"))
.select(F.lower(F.col("word")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
.where(F.col("word") != "")
.groupby("word")
.count()
)
再举一个例子:
如果查看清单3.7中的“after”代码,您会注意到我在等号的右侧以一个圆括号开始(spark=([…])。这是我需要在Python中链接方法时使用的技巧。如果不将结果用括号括起来,则需要在每行末尾添加一个字符,这会给程序增加视觉噪音。PySpark代码在使用方法链接时特别容易出现换行:如下
使用spark submit以批处理模式启动程序以批处理模式提交作业
$ spark-submit ./code/Ch03/word_count_submit.pyScaling up our word frequency program



