1.1 Spark 读取数据的统一入口:
spark.read.format(指定的格式).load(文件的路径)或者spark.read.格式的名称(文件的路径)
1.2 Spark 写出数据的统一出口:
Dataframe.write.format(保存为什么格式).save(保存到哪个路径)或者Dataframe.write.保存的格式(保存到哪个路径)
1.3 Spark 写出数据有 4 种方式:
append:如果数据源或者表已经存在,继续追加overwrite:如果数据源已经存在,覆盖写出ignore:如果数据源已经存在,将忽略(ignore) Dataframe中的数据,如果不存在,则创建并写出。官网的比喻是类似这条SQL语句:create table if not existserrorifexists:如果数据源(可以理解为文件)已经存在,将抛出异常
2.1 SparkSQL读取数据的综合案例:
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate()
# 1.1-读取json文件并打印
print('json:')
df1=spark.read.format('json').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json')
df1.printSchema()
df1.show()
# 1.2-写法二
df2=spark.read.json('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json')
df2.printSchema()
df2.show()
# 2-读取csv文件并打印
print('csv:')
df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.csv')
df3.printSchema()
df3.show()
# 3-读取parquet文件并打印
print('parquet:')
# 下面3种写法效果一样
df4=spark.read.format('parquet').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
df4.printSchema()
df4.show()
df5=spark.read.load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
df5.printSchema()
df5.show()
df6=spark.read.parquet('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
df6.printSchema()
df6.show()
# 4-读取jdbc-mysql文件并打印
print('jdbc:')
df7=spark.read.format('jdbc')
.option('url','jdbc:mysql://node1:3306')
.option('user','root')
.option('password','123456')
.option('dbtable','bigdata.people')
.load()
df7.printSchema()
df7.show()
2.2 SparkSQL写出数据的综合案例:
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 创建上下文对象
spark=SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions','6').getOrCreate()
sc=spark.sparkContext
# 使用SparkContext,读取txt形成RDD,转换成Dataframe
rdd1=sc.textFile('file:///root/1.txt')
rdd2=rdd1.map(lambda str:(str.split(',')[0],int(str.split(',')[1].strip())))
df=rdd2.toDF(['name','age'])
# 并打印schema和数据
df.printSchema()
df.show()
# 1.1-将Dataframe保存成为json文件,
df.coalesce(1).write.format('json').mode('overwrite').save('file:///root/out/json1')
# 1.2-写法二
df.coalesce(1).write.mode('overwrite').json('file:///root/out/json2')
# 2-将Dataframe保存成为csv文件
df.coalesce(1).write.mode('overwrite')
.option('header',True)
.option('sep',';')
.csv('file:///root/out/csv1')
# 3-将Dataframe保存成为parquet文件(SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据)
df.coalesce(1).write.mode('overwrite').save('file:///root/out/parquet1')
# 4-将Dataframe保存成为jdbc-mysql数据
df.write.format('jdbc').mode('overwrite')
.option('url', 'jdbc:mysql://node1:3306')
.option('user', 'root')
.option('password', '123456')
.option('dbtable', 'bigdata.people')
.save()


