SparkSql 相比较 HiveSql 具有更快的运行速度和更高的灵活性,平常使用中经常需要进行数据转换,常见的有 RDD[T] -> Dataframe,Dataframe -> RDD[T] 还有 RDD[row] -> sql.dataframe,下面简单介绍下常用用法。
初始化 SparkSession :
// 1.配置Spark
val conf = {
if (local)
new SparkConf().setAppName("spark_sql").setMaster("local[*]")
else
new SparkConf().setAppName("spark_sql")
}
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
二.RDD[T] 转 DataFrmae
这里 parallelize 模拟一个用户 RDD 包含 name 和 age 属性,通常情况下 map 返回 pairRdd,这里通过隐式转换 sqlContext.implicits._,使得 RDD 转换为 Dataframe,注意这里列名 cols 需要与 pariRdd 元祖的大小匹配。后续通过 createOrReplcaeTempView 可以构建临时表,随后通过 .sql 方法按照 hiveSql 语句进行任务执行命令。
// 2.RDD[T] 转 Dataframe
val userInfo = sc.parallelize(Array[(String, String)](
("A", "90s"),
("B", "00s"),
("C", "10s"),
("D", "20s")))
val userDf = userInfo.map(info => {
val name = info._1
val age = info._2
(name, age)
}).toDF("name", "age")
userDf.createOrReplaceTempView("tmp_user_info")
sqlContext.sql("select * from tmp_user_info where name != 'A'").show(10)
+----+---+ |name|age| +----+---+ | B|00s| | C|10s| | D|20s| +----+---+三.Dataframe 转 RDD[T]
spark 提供 read csv 方法可以将 Text 文本文件 or CSV 文件直接读取为 Dataframe,dataframe 每一行数据为 row,有些同学需要转换为 Array 或者其他类执行后续代码,可以通过下述方法实现:
// 3.Dataframe 转 RDD[T]
val userRdd = spark.read.option("header", "false")
.option("delimiter", "t")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.csv(".user.gz").rdd.map(row => {
val info = row.toSeq.toArray.map(f => if (f == null) "NULL" else f.toString)
info
})
userRdd.take(5).foreach(arr => println(arr.mkString(",")))
这里 header 代表是否保留列名,适用于 csv 文件,delimiter 代表文件的分隔符,这里需要注意 timestampFormat 参数,如果这个参数不设置,会报如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX
四.RDD[row] 转 Dataframe除了上述场景,还有对 dataframe 过滤的场景,一方面可以采用 sqlContext 作为临时表进行过滤,也可以采取转 RDD 解析字符串进行解析,这时 Dataframe 就会转变为 RDD[row],如果需要转回 Dataframe,可以执行以下操作:
// 4.RDD[row] 转 sql.dataframe
val nullable: Boolean = true
val schema = StructType(
List(
StructField("name", StringType, nullable),
StructField("age", StringType, nullable)
)
)
val filterUserRdd = userDf.rdd.filter(line => {
val info = line.toSeq.map(_.toString)
val age = info(1)
age.equals("90s")
})
val userSqlDf = sqlContext.createDataframe(filterUserRdd, schema)
userSqlDf.createOrReplaceTempView("new_df")
sqlContext.sql("select * from new_df").show(5)
StructField 标识字段类型,除了 StringType 外,还有其他常用的类型比如 DoubleType,BooleanType 等等,有需要进 rg.apache.spark.sql.types 类寻找即可,这里通过定义列名与列属性结合原始 DF,可以实现 row -> dataframe 的转变,后续可以继续 HiveSql 相关操作。
+----+---+ |name|age| +----+---+ | A|90s| +----+---+



