栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用spark将MySQL数据导入hive

使用spark将MySQL数据导入hive

import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}

object spark_from_mysql_to_hive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master(“local[*]”).enableHiveSupport()
.config(“spark.debug.maxToStringFields”, “100”)
.config(“spark.sql.debug.maxToStringFields”, “100”)
.config(“hive.metastore.uris”, “thrift://ip:9083”)
//由于 Hive 和 SparkSQL 在 Decimal 类型上使用了不同的转换方式写入 Parquet,
// 导致 Hive 无法正确读取 SparkSQL 所导入的数据。对于已有的使用 SparkSQL 导入的数据,
// 如果有被 Hive/Impala 使用的需求,建议加上 spark.sql.parquet.writeLegacyFormat=true,重新导入数据。
.config(“spark.sql.parquet.writeLegacyFormat”, true)
.appName(“mysql_to_hive”).getOrCreate();
import spark.implicits._

val jdbcDF: Dataframe = spark.read.format(“jdbc”)
.option(“url”,“jdbc:mysql://ip:3306/finance? characterEncoding=utf-8&serverTimezone=UTC&useSSL=false”)
.option(“driver”,“com.mysql.jdbc.Driver”)
.option(“dbtable”,“base_store”)
.option(“user”,“root”)
.option(“password”,“d0sD6Ffs7sGkHDF8mPnHJ2cl”)
.load();
// spark.sql(“create table finance.base_store( id int, student String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘t’;”);
jdbcDF.show();
jdbcDF.createOrReplaceTempView(“temp”);
println("+"*500);
spark.sql(“select id,store_brand from temp”).show();
spark.sql(“use finance”);
// spark.sql(“set hive.stats.autogather=false”);
spark.sql(“drop table if exists base_store”);
spark.sql(“CREATE TABLE if not exists finance.base_store(id int,store_brand String)ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’”);
println(“开始导入”);
// spark.sql(“insert into finance.base_store select id,store_brand from temp”);
// 在saveAsTable之前要执行spark.table
val df = spark.table(“temp”);
// spark.conf.set(“spark.sql.parquet.writeLegacyFormat”, true);
df.write.mode(SaveMode.Overwrite).saveAsTable(“hive_records”);
spark.sql(“select * from hive_records”).show();
// jdbcDF.write.format(“hive”).saveAsTable(“base_store”);
// jdbcDF.write.mode(SaveMode.Overwrite).saveAsTable(“base_store”);
// jdbcDF.write.mode(“Overwrite”).saveAsTable(“base_store”);

println("导入完成");
spark.stop();

}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758298.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号