这里主要使用pyspark
生成pandas的Dataframe和spark(pyspark)下的Dataframe
1.导入相应的库,和类,生成相应的对象:
import pandas as pd #print(pd.__version__)验证pandas库是否配置时使用 from pyspark.sql import SparkSession from pyspark import SparkConf spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
2.pandas 创建dataframe,并打印类型
dict_data= {'id':[1,2,3,4,5,6],
'name':['Alice','Bob','Cindy',
'Eric','Helen','Grace '],
'math':[90,89,99,78,97,93],
'english':[89,94,80,94,94,90]}
pandasDF_in = pd.Dataframe(dict_data)
print(type(pandasDF_in))
输出: >>>
3.Pyspark 创建datafrmame,并打印类型
sparkDF_in = spark.createDataframe([(1,"spark"),(2,"hadoop")
,(3,"scala"),(4,"java"),(5,"python")
,(6,"spark"),(7,"java"),(8,"java"),(9,"python")
,(10,"python")]
,["id","category"])
print(type(sparkDF_in))
输出:>>>
&1:pyspark的dataframe和pandas的dataframe的转换已经验证
pandasDF_out = spark.createDataframe(pandasDF_in) print(type(pandasDF_out)) # %% sparkDF_out = sparkDF_in.toPandas() print(type(sparkDF_out))
输出结果为:
&2.通过pyspark.Sql来间接使用sql语句,操作pandas的dataframe,并将结果转为pandas的
dataframe
pandasDF_out.createOrReplaceTempView("pd_data")
# %%
spark.sql("select * from pd_data").show()
# %%
res = spark.sql("""select * from pd_data
where math>= 90
order by english desc""")
res.show()
# %%
output_DF = res.toPandas()
print(type(output_DF))
上述以pandasDF_out为例:
1)创建临时表
2)对临时表进行查询全部,输出结果如下,即为pandas生成的原始数据:
3)使用sql对于对上表进行处理,sql语句为:
select * from pd_data where math>= 90 order by english desc
查询的结果展示为:
4)再转换为pandas的dataframe操作,执行pandas。Dataframe的相关处理。
output_DF = res.toPandas() print(type(output_DF))



