栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark 抽样,spark数据分析实践

spark 抽样,spark数据分析实践

模块B离线数据抽取
  • 任务简介
  • 具体步骤简介
    • 第一步:开启动态分区
    • 第二步:提取前一天时间
    • 第三步:读取MYSQL数据
    • 第四步:全量写入数据
    • 第五步:Main
    • 第六步:打包集群
    • 第七步:找到jar包
    • 第八步:把jar包打包到集群目录下
    • 第九步:进入Master目录下运行
    • 第十步: 输入代码及——运行结果
  • 整体代码

任务简介









具体步骤简介

第一步:开启动态分区
val spark: SparkSession =new sql.SparkSession
    .Builder()
  .appName("data_Extraction")
  .master("master//:7707")
  .config("hive.exec.dynamic",value=true)//开启动态分区
  .config("hive.exec.dynamic.partitions","nonstrict")//关闭严格模式,否则必须要有个静态分区
  .config("hvie.exec.dynamic.partition",10000)//动态分区数量
  .enableHiveSupport()
  .getOrCreate()
第二步:提取前一天时间

//提取前一天时间

def getYesterday(): String ={
    val simpleDateFormat =new SimpleDateFormat("yyyyMMdd")//设置日期格式为年月日
    val calendar=Calendar.getInstance()
    calendar.add(Calendar.DATE,-1)//提取前一天的时间
    simpleDateFormat.format(Calendar.getInstance)
}
第三步:读取MYSQL数据
  //读取MYSQL数据
    def extract(tableName:String):Dataframe={
        val prop=new Properties()
        prop.put("user","root")//输入用户名 登陆密码 以及链接驱动
        prop.put("password","123456")
        prop.put("dirver","com.mysql.jdbc.Diver")
        val df=spark
          .read
          .jdbc(MYSQL_URL,tableName.toUpperCase,prop)
          df
第四步:全量写入数据
 //全量写入
    def overWiteTable(tableName:String): Unit ={
    val df=extract(tableName)
    val  yesterday=getYesterday()
     df.withColumn("etldate",lit(yesterday))
          .write
          .format("hive")
          .mode(SaveMode.Overwrite)
          .insertInto(tableName)
第五步:Main
  //主方法
    def main(args: Array[String]): Unit = {
        sql("use ods")//进入
        overWiteTable("customer")
第六步:打包集群


第七步:找到jar包


第八步:把jar包打包到集群目录下


第九步:进入Master目录下运行


可能会遇见的错误:没有那个文件目录

第十步: 输入代码及——运行结果


整体代码
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.Properties

import org.apache.spark.sql.functions._
import org.apache.spark.sql
import org.apache.spark.sql.{Dataframe,SaveMode,SparkSession}



object DataExtraction {
val spark: SparkSession =new sql.SparkSession
    .Builder()
  .appName("data_Extraction")
  .master("spark://192.168.3.89:7077")
  .config("hive.exec.dynamic",value = true)//开启动态分区
  .config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则必须要有个静态分区
  .config("hive.exec.max.dynamic.partitions",10000)//动态分区数量
  .enableHiveSupport()
  .getOrCreate()

    val MYSQL_URL="jdbc:mysql://master:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8"
    import spark.sql

    //提取前一天时间
    def getYesterday(): String ={
        val simpleDateFormat =new SimpleDateFormat("yyyyMMdd")//设置日期格式为年月日
        val calendar=Calendar.getInstance()
        calendar.add(Calendar.DATE,-1)//提取前一天的时间
        simpleDateFormat.format(calendar.getTime)
    }
    //读取MYSQL数据
    def extract(tableName:String):Dataframe={
        val prop=new Properties()
        prop.put("user","root")//输入用户名 登陆密码 以及链接驱动
        prop.put("password","123456")
        prop.put("driver","com.mysql.jdbc.Driver")
        val df=spark
          .read
          .jdbc(MYSQL_URL,tableName.toUpperCase,prop)
          df
    }
    //全量写入
    def overWiteTable(tableName:String): Unit ={
    val df=extract(tableName)
    val  yesterday=getYesterday()
     df.withColumn("etldate",lit(yesterday))
          .write
          .format("hive")
          .mode(SaveMode.Overwrite)//覆盖,适合全量更新,增量更新为.append
          .insertInto(tableName)
    }
    //本地运行出错为没有链接到hive
    //主方法
    def main(args: Array[String]): Unit = {
        sql("use ods").show//进入
        overWiteTable("customer")
    }
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号