栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark,通过RDD,Schema创建DataFrame

pyspark,通过RDD,Schema创建DataFrame

#%%
from pyspark import SparkConf,SparkContext
from  pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StructField, StructType,DataType
spark = SparkSession.builder.appName("myapp").getOrCreate()
sc = spark.sparkContext
#%%
rdd_test = spark.sparkContext.parallelize([(1,2,3),(4,5,6),(7,8,9)])

print(type(rdd_test))
#%%
rdd_test.countByKey()
#%%
rdd_test.collect()
#%%
rdd_test.count()
#%%
rdd_test.first()
#%%
rdd = spark.sparkContext.parallelize([[1,2,3],[4,5,6],[7,8,9]])
rdd.collect()[0][0]
rdds =rdd.map(lambda x:[x[0]**2,x[1]**3,x[2]**4])
rdds.collect()
#%%
from pyspark.sql.types import StructType, StructField, LongType, StringType
schema_test = StructType([
    StructField("col1",IntegerType(),True),
    StructField("col2",IntegerType(),True),
    StructField("col3",IntegerType(),True)

])
#%%
indf_one = spark.createDataframe(rdd_test,schema_test)
indf_two = spark.createDataframe(rdds,schema_test)
#%%
indf_one.printSchema()
indf_two.printSchema()
#%%
indf_one.show()
indf_two.show()
#%%
import math
#%%
list_s = []
for row in indf_two.collect():
    print(row)
    list_s.append([row["col1"]+1,row["col2"]+1,row["col3"]+1])
    #print(list_row)
#%%
list_s
#%%
rdd_outdf = sc.parallelize(list_s)
rdd_outdf.collect()
#%%
outdf_schema = StructType([
    StructField("square+1",IntegerType()),
    StructField("cube+1",IntegerType()),
    StructField("quartic+1",IntegerType())
])
#%%
outdf = spark.createDataframe(rdd_outdf,outdf_schema)
outdf.show()
#%%
outdf.createOrReplaceTempView("view")
#%%
spark.sql("""
    select `square+1` as column_one,
    `cube+1` as column_two,
    `quartic+1` as  column_three 
    from view 
    limit 2
    """).show()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342326.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号