前言:本文主要介绍Spark中结构化API的使用。
一、数据源Read API的结构:
DataframeReader.format(文件类型).option(属性,值).schema(自定义的模式).load(文件路径)
format、schema、一系列option选项,每一步转换都会返回一个DataframeReader。
例如:
spark.read.format("csv")
.option("headr",true)
.option("mode",FAILFAST)
.option("inferSchema",true)
.load("D:/data/spark-data.csv")
读取模式:通过 option("mode",值)设置
permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中dropMalformed:删除包含错误格式记录的行failFast:遇到错误格式记录后立即返回失败
Write API:
DataframeWriter.format(文件格式)
.option(属性,值)
.partitionBy(字段)
.bucketBy(字段)
.sortBy(字段)
.save(路径)
1.CSV:
读文件
spark.read.format("csv")
.option("headr",true)
.option("mode",FAILFAST)
.option("inferSchema",true)
.load("D:/data/spark-data.csv")
写文件
df.write.repartition(1)
.format("csv")
.mode("overwrite")
.option("sep","t")
.save("D:/spark/data/spark-data.csv")
2.JSON
读文件
spark.read.format("json")
.option("header",true")
.option("inferSchmea",true)
.load("D:/spark/data/spark-data.json")
写文件
df.write.repartition(1)
.format("json")
.mode("overwrite")
.option("sep","t")
.save("D:/spark/data/spark-data.json")
数据的读取格式基本都一样。
二、API操作本文使用的数据为
航班数据
| DEST_COUNTRY_NAME | ORIGIN_COUNTRY_NAME | count |
| United States | Romania | 15 |
| ......... | ...... | ... |
| United States | Croatia | 1 |
1.取列:col() column()$"列名" '列名
2.selectExpr函数
可以利用selectExpr来构建复杂的表达式来创建Dataframe
df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
3.添加列:withColumn(列名,列)
df.withColumn("numberOne",lit(1)).show(2)
4.重命名列 withColumnRename(原列名,新列名)
df.withColumnRenamed("DEST_COUNTRY_NAME","DEST").show(2)
5. 删除列:drop(列名)
df.drop("DEST_COUNTRY_NAME").show(2)
6.过滤行:filter、where
df.filter(col("count")<2).show(2)
df.where("count<2").show(2)
7.去重 distinct()
//scala
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count()
//sql
select count(distinct(DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME)) from dfTable
8.连接连个Dataframe,要求具有相同的Schema
df.union(newDF)
9.行排序 sort和orderBy 默认是升序
df.sort("count").show(3)
df.orderBy("count","DEST_COUNTRY_NAME").show(3)
import org.apache.spark.sql.functions.{desc,asc}
df.sort(desc("count")).show(2)
df.orderBy(desc("count")).show(2)



