在spark中,将datafream数据源解析为RDD后,通过添加对应的字段信息,再转换为dataFream,直接上代码,记录下
package com.ku.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
import scala.collection.mutable
object TestAddField1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val sourceFile: Dataframe = Seq(
(1, "张三_24_男_上海市"),
(2, "李四_33_女_北京市"),
(2, "王四_23_女_广州市"),
(2, "赵四_53_女_西安市"),
(2, "高四_32_女_成都市")
).toDF("id", "message")
sourceFile.show()
val dataRDD: RDD[Row] = sourceFile.rdd.map(
row => {
val message = row.getAs[String]("message")
var testSeq: Seq[Any] = Seq()
message.split("_").foreach(dv => {
testSeq = testSeq.:+(dv)
})
Row.fromSeq(testSeq)
})
dataRDD.foreach(row => println(row))
val fieldsConf = new mutable.linkedHashMap[String, DataType]()
fieldsConf.put("name", StringType)
fieldsConf.put("age", StringType)
fieldsConf.put("gender", StringType)
fieldsConf.put("address", StringType)
val fields = fieldsConf.map(fieldName => {
StructField(fieldName._1, fieldName._2)
}).toArray[StructField]
val schema = StructType(fields)
spark.createDataframe(dataRDD, schema).show()
}
}
上面有三次输出数据,格式如下:
+---+-----------------+ | id| message| +---+-----------------+ | 1|张三_24_男_上海市| | 2|李四_33_女_北京市| | 2|王四_23_女_广州市| | 2|赵四_53_女_西安市| | 2|高四_32_女_成都市| +---+-----------------+ [张三,24,男,上海市] [李四,33,女,北京市] [王四,23,女,广州市] [赵四,53,女,西安市] [高四,32,女,成都市] +----+---+------+-------+ |name|age|gender|address| +----+---+------+-------+ |张三| 24| 男| 上海市| |李四| 33| 女| 北京市| |王四| 23| 女| 广州市| |赵四| 53| 女| 西安市| |高四| 32| 女| 成都市| +----+---+------+-------+ Process finished with exit code 0



