在 Spark 上完成转换后,您可以使用toPandas()方法轻松地将其转换回 Pandas 。
#Convert PySpark to Pandas pandasDF = pysparkDF.toPandas()
注意: toPandas()方法是将数据收集到 Spark Driver 内存中的操作,因此在处理大型数据集时必须非常小心。如果收集的数据不适合 Spark Driver 内存,您将收到 OutOfMemoryException。
2. 从 Pandas 创建 PySpark DataFrame由于在多台机器上的所有内核上并行执行,PySpark 的运行速度比 Pandas 更快,因此我们经常需要将 Pandas DataFrame 转换为 PySpark(Spark with Python)以获得更好的性能。这是 Pandas 与 PySpark DataFrame 之间的主要区别之一。
#Create PySpark DataFrame from Pandas Pydf_value=pandas_df.values.tolist() Pydf_schema=list(pandas_df.columns) pysparkDF2 = spark.createDataFrame(Pydf_value,Pydf_schema) pysparkDF2.printSchema() pysparkDF2.show()
#注意 当pandas_df的记录数为0时,Pydf_schema无法通过判定pandas_df中数据类型来指定Pydf_schema中每个col的类型,这时候会报错,解决方法是指定Pydf_schema的类型
from pyspark.sql.types import *
cols=list(pandas_df.columns)
Pydf_schema=_schema1 = StructType([
StructField(cols[0],IntegerType(),True),
StructField(cols[1],IntegerType(),True),
StructField(cols[2],StringType(),True),
……
StructField(cols[n],StringType(),True)
])
pysparkDF2 = spark.createDataFrame(Pydf_value,Pydf_schema)
3. 从 PySpark DataFrame转为hive可用SQl查询表
pysparkDF2.createTempTable('hivetablenametemp')
spark.sql('select * from hivetablenametemp')



