项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据
id,data 1,"Ming,20,15552211521" 2,"hong,19,13287994007" 3,"zhi,21,15552211523"输出数据
+---+----+---+-----------+ | id| 列1|列2| 列3| +---+----+---+-----------+ | 1|Ming| 20|15552211521| | 2|hong| 19|13287994007| | 3| zhi| 21|15552211523| +---+----+---+-----------+程序代码
package com.cch.bigdata.spark.process.split
import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, Dataframe, SparkSession, functions}
import scala.collection.mutable.ListBuffer
//列拆分
class Spliter extends AbstractTransform {
//需要拆分的列
private val column = "data"
//列值分隔符
private val separator = ","
//新列名称
private val new_column_names = Array[String]("c1","c2","c3")
//新列中文明村
private val new_column_cnames = Array[String]("列1","列2","列3")
override def process(): Unit = {
//参数判断 start
if (column.isEmpty) {
throw new RuntimeException("拆分列不能为空")
}
if (separator.isEmpty) {
throw new RuntimeException("拆分分隔符不能为空")
}
if (new_column_names.isEmpty) {
throw new RuntimeException("拆分后新列名称不能为空")
}
//参数判断 end
//获取输入流
val df: Dataframe = loadCsv("src/main/resources/csv/split.csv",spark)
//获取输入流的字段
val names: Array[String] = df.schema.fieldNames
//指定一个拆分列的名称,它在dataframe中也是一个列
//最后要删除它
val splitColumnName = "split_column"
//根据指定的列名拆分列
//split_column可以随便指定,这个是新产生的列
//比如之前的列是id,data
//那么现在就是 id,data,split_column
//新列的值是[值1, 值2, 值3...],值是column类型,使用getItem来获取每一个列
val splitDataframe: Dataframe = df.withColumn(
splitColumnName,
functions.split(col(column),
separator
))
//构造结果列list,用来存储最后需要展示的列
val list: ListBuffer[Column] = new ListBuffer()
//添加原来的字段到list,并去除被拆分字段
//比如原来的列是id,data(被拆分列)
//那么结果列中,就不需要显示这个data列了
names.foreach(c => {
if (!c.equals(column)) {
list.append(col(c))
}
})
//遍历新列,将新列定义添加到 结果列 列表中
var index = 0
new_column_names.foreach(c => {
list.append(col(splitColumnName).getItem(index).as(new_column_cnames(index)))
index += 1
})
//查询结果列
splitDataframe.select(list.map(c => {
c
}): _*).drop(splitColumnName).show()
}
override def getAppName(): String = "列拆分"
}
object Spliter{
def main(args: Array[String]): Unit = {
new Spliter().process()
}
}
参数解释
column:指定需要拆分的列名separator:拆分列中需要根据什么分隔符来拆分new_column_names:为分割的列指定一个新名词new_column_cnames:新列的别名



