栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark 自定义输出文件名,文件格式,csv文件添加列头

spark 自定义输出文件名,文件格式,csv文件添加列头

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer


object PushPreMain {
  val spark = SparkSession.builder().appName("CdpPushPre")
    .enableHiveSupport().getOrCreate()
  spark.sparkContext.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
  //配置输出文件不生成success文件
  spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

  def main(args: Array[String]): Unit = {
   
    //获取当前时间并格式化
    val time = new Date().getTime
    val etl_date = new SimpleDateFormat("yyyyMMdd").format(time)
    println(new SimpleDateFormat("yyyyMMddHH").format(time))

    val location = s"s3://cdp2-dev-raw/data/cdp/d201_dwm_customer/etl_date=$etl_date/"

    val filename = "D301_DWM_CUSTOMER"

    val workSql =
      """
        |SELECt CUSTOMER_ID
        |,FREQUENT_CHANNEL_STORE_R12
        |,frequent_puchased_city
        |,'' EXTENDS
        |FROM ad_dw.D301_DWM_CUSTOMER
        |""".stripMargin
    val dataRdd = spark.sql(workSql).cache()


    val colum = dataRdd.columns.mkString(",").toUpperCase

    dataRdd.rdd.repartition((dataRdd.count() / 330000 + 1).toInt)
      .mapPartitions(row => {
        //在每个分区文件中添加表字段
        val buffer: ListBuffer[(String, String)] = ListBuffer((filename.toLowerCase + "_" + etl_date + "_" + TaskContext.getPartitionId().toString, colum))
        while (row.hasNext) {
          val rowString = row.next().toString()
          buffer.append((filename.toLowerCase + "_" + etl_date + "_" + TaskContext.getPartitionId().toString, rowString.substring(1, rowString.length - 1)))
        }
        buffer.iterator
      }).saveAsHadoopFile(location, classOf[String], classOf[String], classOf[CustomOutputFormat])

    spark.close()
  }
}


//定义内部类,重写文件名,指定文件格式
class CustomOutputFormat() extends MultipleTextOutputFormat[Any, Any] {
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
    //这里的key和value指的就是要写入文件的rdd对
    key.asInstanceOf[String] + ".csv"

  }


  override def generateActualKey(key: Any, value: Any): String = {
    //输出文件中只保留value 故 key 返回为空
    null
  }

  override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {
    val outDir: Path = FileOutputFormat.getOutputPath(job)
    if (outDir != null) {
      //相同文件名的文件自动覆盖
      //避免第二次运行分区数少于第一次,历史数据覆盖失败,直接删除已经存在的目录
      try {
        ignored.delete(outDir, true)
      } catch {
        case _: Throwable => {}
      }


      FileOutputFormat.setOutputPath(job, outDir)
    }
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389102.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号