自己记录一下
val new_schema = spark.read.parquet(path1).schema.add("col2", DataTypes.StringType, true)
val rdd1 = spark.read.parquet(path2).rdd.map(row => {
val col1 = row.getAs[String]("")
val col2 = row.getAs[String]("")
(col1, Row(col2)
})
val rdd2 = spark.read.parquet(path).rdd.map(row => {
val col1 = row.getAs[String]("")
(col1, row)
})
val rdd3 = rdd2.leftouterjoin(rdd1).map(row => {
val col2_Row = if(row._2._2 == None){
Row(null)
}else{
row._2._2.get
}
val old_Row = row._2._1
val new_Row = Row.merge(old_Row, col2_Row)
new_Row
})
val rdd4 = spark.createDataframe(rdd3, new_schema)
rdd4.repartition(1).write.parquet(path3)



