栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL中读取数据和写出数据(附案例)

SparkSQL中读取数据和写出数据(附案例)

1.1  Spark 读取数据的统一入口:

spark.read.format(指定的格式).load(文件的路径)或者spark.read.格式的名称(文件的路径)

1.2  Spark 写出数据的统一出口:

Dataframe.write.format(保存为什么格式).save(保存到哪个路径)或者Dataframe.write.保存的格式(保存到哪个路径)

1.3  Spark 写出数据有 4 种方式:

append:如果数据源或者表已经存在,继续追加overwrite:如果数据源已经存在,覆盖写出ignore:如果数据源已经存在,将忽略(ignore) Dataframe中的数据,如果不存在,则创建并写出。官网的比喻是类似这条SQL语句:create table if not existserrorifexists:如果数据源(可以理解为文件)已经存在,将抛出异常


2.1  SparkSQL读取数据的综合案例:
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate()
    
    # 1.1-读取json文件并打印
    print('json:')
    df1=spark.read.format('json').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json')
    df1.printSchema()
    df1.show()
    # 1.2-写法二
    df2=spark.read.json('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.json')
    df2.printSchema()
    df2.show()

    # 2-读取csv文件并打印
    print('csv:')
    df3=spark.read.option('sep',';').option('header',True).option('inferSchema',True).csv('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/people.csv')
    df3.printSchema()
    df3.show()

    # 3-读取parquet文件并打印
    print('parquet:')
    # 下面3种写法效果一样
    df4=spark.read.format('parquet').load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
    df4.printSchema()
    df4.show()
    df5=spark.read.load('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
    df5.printSchema()
    df5.show()
    df6=spark.read.parquet('file:///export/pyworkspace/pyspark_sz27/pyspark_sql/data/sql/users.parquet')
    df6.printSchema()
    df6.show()

    # 4-读取jdbc-mysql文件并打印
    print('jdbc:')
    df7=spark.read.format('jdbc')
        .option('url','jdbc:mysql://node1:3306')
        .option('user','root')
        .option('password','123456')
        .option('dbtable','bigdata.people')
        .load()
    df7.printSchema()
    df7.show()

2.2  SparkSQL写出数据的综合案例:
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 创建上下文对象
    spark=SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions','6').getOrCreate()
    sc=spark.sparkContext
    # 使用SparkContext,读取txt形成RDD,转换成Dataframe
    rdd1=sc.textFile('file:///root/1.txt')
    rdd2=rdd1.map(lambda str:(str.split(',')[0],int(str.split(',')[1].strip())))
    df=rdd2.toDF(['name','age'])
    # 并打印schema和数据
    df.printSchema()
    df.show()

    # 1.1-将Dataframe保存成为json文件,
    df.coalesce(1).write.format('json').mode('overwrite').save('file:///root/out/json1')
    # 1.2-写法二
    df.coalesce(1).write.mode('overwrite').json('file:///root/out/json2')

    # 2-将Dataframe保存成为csv文件
    df.coalesce(1).write.mode('overwrite')
        .option('header',True)
        .option('sep',';')
        .csv('file:///root/out/csv1')

    # 3-将Dataframe保存成为parquet文件(SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据)
    df.coalesce(1).write.mode('overwrite').save('file:///root/out/parquet1')

    # 4-将Dataframe保存成为jdbc-mysql数据
    df.write.format('jdbc').mode('overwrite') 
        .option('url', 'jdbc:mysql://node1:3306') 
        .option('user', 'root') 
        .option('password', '123456') 
        .option('dbtable', 'bigdata.people') 
        .save()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745244.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号