栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark下dataframe的8种创建方式

pyspark下dataframe的8种创建方式

前言

在spark下,有很多种创建dataframe的方法,下面会一一例举

from pyspark.sql import SparkSession
from datetime import datetime,date
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql import Row
spark = SparkSession.builder.appName('test').master('local').getOrCreate()
方式1:从list中创建dataframe

在这种方式中,每一个list种,就代表着dataframe种的一行数据,好比是一个样本

df = spark.createDataframe([
    [1,2,'string1'],
    [3,4,'string3']
])
df
Dataframe[_1: bigint, _2: bigint, _3: string]
df.show()
+---+---+-------+
| _1| _2|     _3|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string3|
+---+---+-------+

从以上的结果中可以看出,系统通过list构建了一个2行3列的dataframe。
值得注意的地方有两点:
第一点:列名我们事先并没有指定,所以系统会自动给我们添加了列名【_1,_2,_3】
第二点:每一列的值的类型我们也没有指定,是系统自动给我们推算出来的,数字的被定义为bigint,字符串被定义为string。

隐藏的炸弹:
1.以上显然列名我们是需要的,所以我们应该想办法添加列名,而不是系统自己给我们分配的,
2.系统自己根据数据来推断数据类型,前提一定是数据要一致,如果不一致,如:
df = spark.createDataframe([
[1,‘leon’,‘string1’],
[3,4,‘string3’]
])
系统对于第二列就不知道该将其定义为bigint还是string类型。
所以为了改善以上的两个点,引入了schema

schema定义

schema本质就是用来手动指定dataframe的列名和数据类型

schema的第一种写法

构建一个StructType类型,里面是由StructField组成的list

_schema1 = StructType([
    StructField('a',IntegerType(),True),
    StructField('b',IntegerType(),True),
    StructField('c',StringType(),True)
])
df = spark.createDataframe([
    [1,2,'string1'],
    [3,4,'string3']
],schema=_schema1)
df
Dataframe[a: int, b: int, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string3|
+---+---+-------+
schema的第二种写法

用一个字符串表示,列名和类型用空格隔开,列之间用逗号隔开

_schema2 = 'a long, b long , c string '
df = spark.createDataframe([
    [1,2,'string1'],
    [3,4,'string3']
],schema=_schema2)
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string3|
+---+---+-------+
方式2:从字典中创建dataframe

注意的是,和list不一样,这里必须指定列名。
这里键就是列名,列名必须用引号,值是对应的值,表示具体某一列下某一行的值

df = spark.createDataframe([
    {'a':1,'b':2,'c':'string1'},
    {'a':3,'b':3,'c':'string2'}
])
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  3|string2|
+---+---+-------+

同样的,虽然列名我们已经指定,但是数据的类型我们没有指定,依旧是系统自己给我们推测出来的,如果我们要自己指定也行,那就是上面讲的schema

df = spark.createDataframe([
    {'a':1,'b':2,'c':'string1'},
    {'a':3,'b':3,'c':'string2'}
],schema='a long,b long , c string')
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  3|string2|
+---+---+-------+

值得留意的是,用schema指定数值类型的时候,也要完整的写schema(列名+数值类型)

方式3:从row中创建

注意这里和用字典创建的时候不同,这里的列名不需要引号
当不指定schema时,数据类型是系统推测出来,自己也可以手动指定

df = spark.createDataframe([
    Row(a=1,b=2,c='string1'),
    Row(a=3,b=4,c='string2')
])
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string2|
+---+---+-------+
方式4:从pandas中的dataframe中创建

当不指定schema时,数据类型是系统推测出来,自己也可以手动指定

pandas_df = pd.Dataframe({
    'a':[1,2],
    'b':[3,4],
    'c':['string1','string1']
})
df = spark.createDataframe(pandas_df)
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  3|string1|
|  2|  4|string1|
+---+---+-------+
方式5:从rdd中创建

当不指定schema时,列名和数据类型是系统推测出来,自己也可以手动指定

不指定schema
rdd = spark.sparkContext.parallelize([
    (1,2,'strin1'),
    (3,4,'string2')
])
df = spark.createDataframe(rdd)
df
Dataframe[_1: bigint, _2: bigint, _3: string]
df.show()
+---+---+-------+
| _1| _2|     _3|
+---+---+-------+
|  1|  2| strin1|
|  3|  4|string2|
+---+---+-------+
指定schema

1.单独指定列名时,数值类型由系统推测出来

rdd = spark.sparkContext.parallelize([
    (1,2,'strin1'),
    (3,4,'string2')
])
df = spark.createDataframe(rdd,schema=['a','b','c'])
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2| strin1|
|  3|  4|string2|
+---+---+-------+

2.列名和数值类型都由我们制定

rdd = spark.sparkContext.parallelize([
    (1,2.,'strin1'),
    (3,4.,'string2')
])
df = spark.createDataframe(rdd,schema='a long,b double,c string')
df
Dataframe[a: bigint, b: double, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0| strin1|
|  3|4.0|string2|
+---+---+-------+

3.还有从rdd种创建dataframe的方法,那就是rdd提供的toDF()方法,这也是在开发种最常用的方法

rdd = spark.sparkContext.parallelize([
    (1,2,'strin1'),
    (3,4,'string2')
])
df = rdd.toDF(schema=_schema1)
df
Dataframe[a: int, b: int, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2| strin1|
|  3|  4|string2|
+---+---+-------+
方式6:从元组中创建

本质和用list创建是一毛一样滴

df = spark.createDataframe([
    (1,2,'string1'),
    (3,4,'string2')
])
df
Dataframe[_1: bigint, _2: bigint, _3: string]
df.show()
+---+---+-------+
| _1| _2|     _3|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string2|
+---+---+-------+

增加schema

df = spark.createDataframe([
    (1,2,'string1'),
    (3,4,'string2')
],schema='a long,b long,c string')
df
Dataframe[a: bigint, b: bigint, c: string]
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  2|string1|
|  3|  4|string2|
+---+---+-------+
方式7:从csv中创建

1.通用的加载方式
下面只是罗列了这个函数所有的形参。
当所读csv中有表头时:指定inferSchema=“true”,header=‘true’,不指定schema
当所读的csv中美哟表头,指定schema,不指定inferSchema=“true”,header=‘true’

df = spark.read.load('./test.csv',format='csv',sep= ',',inferSchema="true",header='true',schema='col1 long, col2 long ,col3 string')
df.show()
+----+----+-------+
|col1|col2|   col3|
+----+----+-------+
|   1|   3|string1|
|   2|   4|string2|
+----+----+-------+

2.专用读取方式
关于读取的设置,需要在options中完成,也可以在option中完成,如果使用option时,一次只能设置一个,如果使用options,一次可以设置多个

df = spark.read.options(delimiter=',',inferSchema='true',header='true').csv('./test.csv')
df.show()
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|  3|string1|
|  2|  4|string2|
+---+---+-------+
方式8:从json中读取

1.通用的加载方式
下面只是罗列了这个函数所有的形参。
当所读csv中有表头时:指定inferSchema=“true”,header=‘true’,不指定schema
当所读的csv中美哟表头,指定schema,不指定inferSchema=“true”,header=‘true’

json格式如下
{“name”: “leon”,“age”: “4”}
{“name”: “traveler”,“age”: “5”}
1.通用加载方式

df = spark.read.load('./test.json',format='json',inferSchema='true',header='true')
df
Dataframe[age: string, name: string]
df.show()
+---+--------+
|age|    name|
+---+--------+
|  4|    leon|
|  5|traveler|
+---+--------+

2.专有读取方式

df = spark.read.json('./test.json')
df
Dataframe[age: string, name: string]
df.show()
+---+--------+
|age|    name|
+---+--------+
|  4|    leon|
|  5|traveler|
+---+--------+
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701744.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号