栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序(2):Submitting andscaling your firstPySpark prog

Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序(2):Submitting andscaling your firstPySpark prog

Grouping records: Counting word frequencies

Counting word frequencies using groupby() and count()

groups = words_nonull.groupby(col("word"))
print(groups)
# 
results = words_nonull.groupby(col("word")).count()
print(results)
# Dataframe[word: string, count: bigint]
results.show()

 

 因为Spark是懒惰的,所以它不关心记录的顺序,除非我们明确要求它这样做。由于我们希望看到显示的顶部单词,让我们在数据框中进行一点排序,同时完成程序的最后一步:返回顶部单词频率。

使用orderBy在屏幕上排序结果

PySpark为排序提供了两种不同的语法

Displaying the top 10 words in Jane Austen’s Pride and Prejudice

results.orderBy("count", ascending=False).show(10)
results.orderBy(col("count").desc()).show(10)

Writing data from a data frame 

就像我们使用read()和SparkReader读取Spark中的数据一样,我们使用write()和SparkWriter对象将数据帧写回磁盘。我专门让SparkWriter将文本导出到CSV文件中,命名simple_count.csv

如果我们看一下结果,我们可以看到PySpark并没有创建一个结果。csv文件。相反,它创建了一个同名的目录,并将201个文件放入该目录中(200个CSV+1_SUCCESS file)。

 默认情况下,PySpark将为每个分区提供一个文件。这意味着我们的程序在我的机器上运行时,最终会产生200个分区。为了减少分区的数量,我们将coalesce()方法应用于所需的分区数量。

下一个清单显示了在写入磁盘之前在数据帧上使用coalesce(1)时的区别。我们仍然有一个目录,但里面只有一个CSV文件。

results.coalesce(1).write.csv("./data/simple_count_single_partition.csv")

 完整代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    explode,
    lower,
    regexp_extract,
    split,
)

spark = SparkSession.builder.appName(
    "Analyzing the vocabulary of Pride and Prejudice."
).getOrCreate()

book = spark.read.text("./data/gutenberg_books/1342-0.txt")

lines = book.select(split(book.value, " ").alias("line"))

words = lines.select(explode(col("line")).alias("word"))

words_lower = words.select(lower(col("word")).alias("word"))

words_clean = words_lower.select(
    regexp_extract(col("word"), "[a-z']*", 0).alias("word")
)

words_nonull = words_clean.where(col("word") != "")

results = words_nonull.groupby(col("word")).count()

results.orderBy("count", ascending=False).show(10)

results.coalesce(1).write.csv("./simple_count_single_partition.csv")
使用PySpark的导入约定简化依赖关系

虽然没有硬性规定,但普遍的做法是使用F来表示PySpark的函数。

Since col, explode, lower, regexp_extract, and split are all in pyspark.sql.functions, we can import the whole module.

通过方法链接简化我们的程序

用链式变换方法去除中间变量

list 3.7

# Before
book = spark.read.text("./data/gutenberg_books/1342-0.txt")
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower(col("word")).alias("word"))
words_clean = words_lower.select(
regexp_extract(col("word"), "[a-z']*", 0).alias("word")
)
words_nonull = words_clean.where(col("word") != "")
results = words_nonull.groupby("word").count()
# After
import pyspark.sql.functions as F
results = (
spark.read.text("./data/gutenberg_books/1342-0.txt")
.select(F.split(F.col("value"), " ").alias("line"))
.select(F.explode(F.col("line")).alias("word"))
.select(F.lower(F.col("word")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
.where(F.col("word") != "")
.groupby("word")
.count()
)

再举一个例子:

如果查看清单3.7中的“after”代码,您会注意到我在等号的右侧以一个圆括号开始(spark=([…])。这是我需要在Python中链接方法时使用的技巧。如果不将结果用括号括起来,则需要在每行末尾添加一个字符,这会给程序增加视觉噪音。PySpark代码在使用方法链接时特别容易出现换行:如下

使用spark submit以批处理模式启动程序

以批处理模式提交作业

$ spark-submit ./code/Ch03/word_count_submit.py
Scaling up our word frequency program

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758138.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号