- 任务简介
- 具体步骤简介
- 第一步:开启动态分区
- 第二步:提取前一天时间
- 第三步:读取MYSQL数据
- 第四步:全量写入数据
- 第五步:Main
- 第六步:打包集群
- 第七步:找到jar包
- 第八步:把jar包打包到集群目录下
- 第九步:进入Master目录下运行
- 第十步: 输入代码及——运行结果
- 整体代码
val spark: SparkSession =new sql.SparkSession
.Builder()
.appName("data_Extraction")
.master("master//:7707")
.config("hive.exec.dynamic",value=true)//开启动态分区
.config("hive.exec.dynamic.partitions","nonstrict")//关闭严格模式,否则必须要有个静态分区
.config("hvie.exec.dynamic.partition",10000)//动态分区数量
.enableHiveSupport()
.getOrCreate()
第二步:提取前一天时间
//提取前一天时间
def getYesterday(): String ={
val simpleDateFormat =new SimpleDateFormat("yyyyMMdd")//设置日期格式为年月日
val calendar=Calendar.getInstance()
calendar.add(Calendar.DATE,-1)//提取前一天的时间
simpleDateFormat.format(Calendar.getInstance)
}
第三步:读取MYSQL数据
//读取MYSQL数据
def extract(tableName:String):Dataframe={
val prop=new Properties()
prop.put("user","root")//输入用户名 登陆密码 以及链接驱动
prop.put("password","123456")
prop.put("dirver","com.mysql.jdbc.Diver")
val df=spark
.read
.jdbc(MYSQL_URL,tableName.toUpperCase,prop)
df
第四步:全量写入数据
//全量写入
def overWiteTable(tableName:String): Unit ={
val df=extract(tableName)
val yesterday=getYesterday()
df.withColumn("etldate",lit(yesterday))
.write
.format("hive")
.mode(SaveMode.Overwrite)
.insertInto(tableName)
第五步:Main
//主方法
def main(args: Array[String]): Unit = {
sql("use ods")//进入
overWiteTable("customer")
第六步:打包集群
可能会遇见的错误:没有那个文件目录
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.Properties
import org.apache.spark.sql.functions._
import org.apache.spark.sql
import org.apache.spark.sql.{Dataframe,SaveMode,SparkSession}
object DataExtraction {
val spark: SparkSession =new sql.SparkSession
.Builder()
.appName("data_Extraction")
.master("spark://192.168.3.89:7077")
.config("hive.exec.dynamic",value = true)//开启动态分区
.config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则必须要有个静态分区
.config("hive.exec.max.dynamic.partitions",10000)//动态分区数量
.enableHiveSupport()
.getOrCreate()
val MYSQL_URL="jdbc:mysql://master:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8"
import spark.sql
//提取前一天时间
def getYesterday(): String ={
val simpleDateFormat =new SimpleDateFormat("yyyyMMdd")//设置日期格式为年月日
val calendar=Calendar.getInstance()
calendar.add(Calendar.DATE,-1)//提取前一天的时间
simpleDateFormat.format(calendar.getTime)
}
//读取MYSQL数据
def extract(tableName:String):Dataframe={
val prop=new Properties()
prop.put("user","root")//输入用户名 登陆密码 以及链接驱动
prop.put("password","123456")
prop.put("driver","com.mysql.jdbc.Driver")
val df=spark
.read
.jdbc(MYSQL_URL,tableName.toUpperCase,prop)
df
}
//全量写入
def overWiteTable(tableName:String): Unit ={
val df=extract(tableName)
val yesterday=getYesterday()
df.withColumn("etldate",lit(yesterday))
.write
.format("hive")
.mode(SaveMode.Overwrite)//覆盖,适合全量更新,增量更新为.append
.insertInto(tableName)
}
//本地运行出错为没有链接到hive
//主方法
def main(args: Array[String]): Unit = {
sql("use ods").show//进入
overWiteTable("customer")
}
}



