首先创建一个scala工程叫做myhctest
因为运行会产生很多日志信息 着你喔导入一个叫log4j的文件进行消除
然后创建以下几个类和特质
首先SaveTrait
trait SaveTrait {
def dfSave(indf:Dataframe, ctx:SparkSession, tableName:String):Unit
}
//构建方法 indf是传入的dataframe也就是传入你hive中的表
//因为spark sql是用的sparkSession中的所以后续要书写一个sparksession类
// tablenname也就是你要重新保存的表名称
hivesaveimpl
trait HiveSaveImpl extends SaveTrait {
override def dfSave(indf: Dataframe, ctx:SparkSession, tableName: String): Unit = {
indf.createOrReplaceTempView("hctest")//创建一个临时视图
ctx.sql("insert overwrite table "+tableName+" select * from hctest")
}
}
mysqlsaveimpl
trait MySqlSaveImpl extends SaveTrait {
override def dfSave(indf: Dataframe, ctx:SparkSession, tableName:String)= {
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
prop.setProperty("driver","com.mysql.jdbc.Driver")
indf.write.mode(SaveMode.Overwrite)
.jdbc("jdbc:mysql://192.168.80.181:3306/mydemo"
,tableName,prop)
}
}
dataframe 连接mysql 如果只连接hive的话这个可以不写
datasave
class DataSave {
st:SaveTrait=>
def save(indf:Dataframe,ctx:SparkSession,tableName:String) ={
st.dfSave(indf,ctx,tableName)
}
}
//构建方法 indf是传入的dataframe也就是传入你hive中的表
//因为spark sql是用的sparkSession中的所以后续要书写一个sparksession类
// tablenname也就是你要重新保存的表名称
test
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("test")
.config("hive.metastore.uris","thirft://192.168.80.181:9083")
.enableHiveSupport().getOrCreate()
val df = spark.createDataframe(Seq((1, "zs"), (2, "ls")))
.toDF("id", "name")
// (new DataSave() with MySqlSaveImpl).dfSave(df,spark,"demo")
(new DataSave() with HiveSaveImpl)
.dfSave(df,spark,"mydemo.demo")
}
}
这里可以简单的测试一下
数据处理 (未完结)



