栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark业务开发-列拆分

spark业务开发-列拆分

spark业务开发-列拆分

项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据

id,data
1,"Ming,20,15552211521"
2,"hong,19,13287994007"
3,"zhi,21,15552211523"
输出数据
+---+----+---+-----------+
| id| 列1|列2|        列3|
+---+----+---+-----------+
|  1|Ming| 20|15552211521|
|  2|hong| 19|13287994007|
|  3| zhi| 21|15552211523|
+---+----+---+-----------+
程序代码
package com.cch.bigdata.spark.process.split

import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, Dataframe, SparkSession, functions}

import scala.collection.mutable.ListBuffer

//列拆分
class Spliter extends AbstractTransform {

  //需要拆分的列
  private val column = "data"

  //列值分隔符
  private val separator = ","

  //新列名称
  private val new_column_names = Array[String]("c1","c2","c3")

  //新列中文明村
  private val new_column_cnames = Array[String]("列1","列2","列3")


  override def process(): Unit = {

    //参数判断 start
    if (column.isEmpty) {
      throw new RuntimeException("拆分列不能为空")
    }

    if (separator.isEmpty) {
      throw new RuntimeException("拆分分隔符不能为空")
    }

    if (new_column_names.isEmpty) {
      throw new RuntimeException("拆分后新列名称不能为空")
    }
    //参数判断 end


    //获取输入流
    val df: Dataframe = loadCsv("src/main/resources/csv/split.csv",spark)

    //获取输入流的字段
    val names: Array[String] = df.schema.fieldNames

    //指定一个拆分列的名称,它在dataframe中也是一个列
    //最后要删除它
    val splitColumnName = "split_column"

    //根据指定的列名拆分列
    //split_column可以随便指定,这个是新产生的列
    //比如之前的列是id,data
    //那么现在就是 id,data,split_column
    //新列的值是[值1, 值2, 值3...],值是column类型,使用getItem来获取每一个列
    val splitDataframe: Dataframe = df.withColumn(
      splitColumnName,
      functions.split(col(column),
        separator
      ))

    //构造结果列list,用来存储最后需要展示的列
    val list: ListBuffer[Column] = new ListBuffer()

    //添加原来的字段到list,并去除被拆分字段
    //比如原来的列是id,data(被拆分列)
    //那么结果列中,就不需要显示这个data列了
    names.foreach(c => {
      if (!c.equals(column)) {
        list.append(col(c))
      }
    })

    //遍历新列,将新列定义添加到 结果列 列表中
    var index = 0
    new_column_names.foreach(c => {
      list.append(col(splitColumnName).getItem(index).as(new_column_cnames(index)))
      index += 1
    })

    //查询结果列
    splitDataframe.select(list.map(c => {
      c
    }): _*).drop(splitColumnName).show()

  }

  override def getAppName(): String = "列拆分"

}

object Spliter{
  def main(args: Array[String]): Unit = {
    new Spliter().process()
  }
}


参数解释

column:指定需要拆分的列名separator:拆分列中需要根据什么分隔符来拆分new_column_names:为分割的列指定一个新名词new_column_cnames:新列的别名

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706749.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号