栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Data.Analysis.with.Python.and.PySpark:4 Analyzing tabular data with pyspark.sql

Data.Analysis.with.Python.and.PySpark:4 Analyzing tabular data with pyspark.sql

创建SparkSession对象以开始使用PySpark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
PySpark如何表示表格数据?
my_grocery_list = [
["Banana", 2, 1.74],
["Apple", 4, 2.04],
["Carrot", 1, 1.09],
["Cake", 1, 10.99],
]
df_grocery_list = spark.createDataframe(
my_grocery_list, ["Item", "Quantity", "Price"]
)
df_grocery_list.printSchema()

我们的第一个参数是数据本身。您可以提供项目列表(这里是列表列表)、data frame或弹性分布式数据集;第二个参数是data frame的模式。同时,传递列名列表会推断出我们的列的类型(分别是string、long和double)。

主节点知道数据帧的结构,但实际数据在工作节点上表示。每一列都映射到存储在PySpark管理的集群中某个位置的数据。我们在抽象结构上操作,让主节点高效地委派工作。


PySpark用于分析和处理表格数据

名词解释:exploratory data analysis (or EDA)

PySpark不提供任何图表功能,也不使用其他图表库(如Matplotlib、seaborn、Altair或plot.ly)通常的解决方案是使用PySpark转换数据,使用toPandas()方法将PySpark数据框转换为pandas数据框,然后使用您最喜欢的图表库。

数据准备:

download the file on the Canada Open Data portal (http://mng.bz/y4YJ);
select the BroadcastLogs_2018_Q3_M8 file.

You also need to download the Data Dictionary in .doc form, as well as the
Reference Tables zip file, unzipping them into a ReferenceTables directory in data/
broadcast_logs.

make sure you have the following:

在PySpark中读取和评估分隔的数据  通过SparkReader专门处理CSV文件

数据操作的基础:选择、删除、重命名、排序、诊断
logs.select("BroadcastLogID", "LogServiceID", "LogDate").show(5, False)
# +--------------+------------+-------------------+
# |BroadcastLogID|LogServiceID|LogDate |
# +--------------+------------+-------------------+
# |1196192316 |3157 |2018-08-01 00:00:00|
# |1196192317 |3157 |2018-08-01 00:00:00|
# |1196192318 |3157 |2018-08-01 00:00:00|
# |1196192319 |3157 |2018-08-01 00:00:00|
# |1196192320 |3157 |2018-08-01 00:00:00|
# +--------------+------------+-------------------+
# only showing top 5 rows

Four ways to select columns in PySpark, all equivalent in terms of results

# Using the string to column conversion
logs.select("BroadCastLogID", "LogServiceID", "LogDate")
logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"])
# Passing the column object explicitly
logs.select(
F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")
)
logs.select(
*[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")]
)

当显式选择几列时,不必将它们包装到列表中。如果已经在处理列列表,可以使用*前缀将其解压缩。

data frame在columns属性中跟踪其列;logs.columns是一个Python列表,包含logs daya frame的所有列名。

使用drop()方法除去列

logs = logs.drop("BroadcastLogID", "SequenceNO")

Getting rid of columns, select style

logs = logs.select(
*[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
)

创建不存在的内容:使用withColumn()创建新列

Extracting the hours, minutes, and seconds from the Duration column

 

WARNING If you create a column withColumn() and give it a name that
already exists in your data frame, PySpark will happily overwrite the column.

WARNING Creating many (100+) new columns using withColumns() will slow
Spark down to a grind. If you need to create a lot of columns at once, use the
select() approach. While it will generate the same work, it is less tasking on
the query planner.

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761310.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号