第一步:PySpark 应用程序从初始化开始,SparkSession这是 PySpark 的入口点
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
第二步:创建Dataframe,三种方式
Dataframe是在Spark 1.3中正式引入的一种以RDD为基础的不可变的分布式数据集,类似于传统数据库的二维表格,数据在其中以列的形式被组织存储。如果熟悉Pandas,其与Pandas Dataframe是非常类似的东西。
#从行列表创建 PySpark Dataframe
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
#从 pandas Dataframe 创建 PySpark Dataframe
pandas_df = pd.Dataframe({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataframe(pandas_df)
#从包含元组列表的 RDD 创建 PySpark Dataframe
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataframe(rdd, schema=['a', 'b', 'c', 'd', 'e'])
工作中读取数据的方式
#普通读取csv为Dataframes数据
# 读取csv为Dataframe
traffic = spark.read.csv('Pokemon.csv', header='true')
# 创建临时表
traffic.createOrReplaceTempView("traffic")
#通过pandas辅助读取csv
import pandas as pd
df = pd.read_csv('Pokemon.csv')
traffic = spark.createDataframe(df)
traffic.createOrReplaceTempView("traffic")
备注由于Pokemon.csv这个文件中有空值,所以spark.createDataframe()会失败的,但是使用第种方式读取就行了
SparkSQL基础语法 Spark RDD Spark Streaming任务1:PySpark数据处理
任务2:PySpark数据统计
任务3:PySpark分组聚合
任务5:SparkML基础:分类模型
任务6:SparkML基础:回归模型
任务7:SparkML:聚类模型



