项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据
order_number,order_date,purchaser,quantity,product_id,remark 10001,2016-01-16,1001,1,102,机q器w记e录r 10003,2016-01-17,1002,2,105,人工记录 10002,2016-01-19,1002,3,106,人工补录 10004,2016-02-21,1003,4,107,自然交易 10001,2016-01-16,1001,1,102,机器记录输出数据
+------------+-------------------+---------+--------+----------+------------+ |order_number| order_date|purchaser|quantity|product_id| remark| +------------+-------------------+---------+--------+----------+------------+ | 10001|2016-01-16 00:00:00| 1001| 1| 102|机q器w记e录r| | 10003|2016-01-17 00:00:00| 1002| 2| 105| 人工记录| | 10002|2016-01-19 00:00:00| 1002| 3| 106| 人工补录| | 10004|2016-02-21 00:00:00| 1003| 4| 107| 自然交易| | 10001|2016-01-16 00:00:00| 1001| 1| 102| 计算机记录| +------------+-------------------+---------+--------+----------+------------+程序代码
package com.cch.bigdata.spark.process.replace
import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.Dataframe
import org.apache.spark.sql.functions.{col, expr, when}
class ValueReplacer extends AbstractTransform{
case class ReplaceColumn(name:String,strategy:Int,old_value:Any,new_value:Any)
//需要替换值的列
private val column = Array[String]("remark")
//替换方式
private val strategy = Array[String]("1")
//需要替换的值
private val old_value = Array[Any]("机器记录")
//替换后的新值
private val new_value = Array[Any]("计算机记录")
override def process(): Unit = {
if(column.isEmpty){
throw new RuntimeException("替换列配置不能为空")
}
if(old_value==null){
throw new RuntimeException("替换列被替换值配置不能为空")
}
if(new_value==null){
throw new RuntimeException("替换列新值配置不能为空")
}
//获取上游数据集
var df: Dataframe = loadCsv("src/main/resources/csv/orders.csv",spark)
var index = 0
column.foreach(c=>{
val replaceColumn: ReplaceColumn = ReplaceColumn(c,strategy(index).toInt,old_value(index),new_value(index))
replaceColumn.strategy match {
case 1 =>{
//字符串或数值替换
df = valueReplace(df,replaceColumn.name,replaceColumn.old_value, replaceColumn.new_value)
}
case 2 =>{
//正则表达式替换
df = regexpReplace(df,replaceColumn.name,replaceColumn.old_value.toString, replaceColumn.new_value)
}
}
index+=1
})
df.show()
}
//数值/字符串类型替换
def valueReplace(df:Dataframe,columnName:String, oldValue:Any,newValue:Any):Dataframe={
df.withColumn(columnName, when(col(columnName) === oldValue,newValue)
.otherwise(col(columnName)))
}
//正则替换
def regexpReplace(df:Dataframe,columnName:String,regexp:String,newValue:Any):Dataframe={
val exprString:String = "regexp_replace("+columnName+",'"+regexp+"','"+newValue+"')"
df.withColumn(columnName,expr(exprString).alias(columnName))
}
override def getAppName(): String = "值替换"
}
object ValueReplacer{
def main(args: Array[String]): Unit = {
new ValueReplacer().process()
}
}
参数解释
columns:需要进行值替换的列,字符串数组strategy:替换策略,(1:字符串或值替换 2->使用正则表达式替换)old_value:需要被替换的值new_value:替换后的新值



