我这里是完成编码之后,打包发送到集群上运行的!!!
1.使用IDEA创建MAVEN项目pom配置如下
4.0.0 com.tledu llll1.0-SNAPSHOT ${project.artifactId} My wonderfull scala app 2018 My License http://.... repo 1.8 1.8 UTF-8 2.11.11 2.11 4.2.0 org.scala-lang scala-library${scala.version} org.apache.spark spark-core_${scala.compat.version}2.3.2 provided org.apache.spark spark-sql_${scala.compat.version}2.3.2 provided org.apache.spark spark-hive_2.112.0.2 provided mysql mysql-connector-java8.0.23 junit junit4.12 test org.scalatest scalatest_${scala.compat.version}3.0.5 test org.specs2 specs2-core_${scala.compat.version}${spec2.version} test org.specs2 specs2-junit_${scala.compat.version}${spec2.version} test src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin3.3.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-surefire-plugin2.21.0 true org.scalatest scalatest-maven-plugin2.0.0 ${project.build.directory}/surefire-reports . TestSuiteReport.txt samples.AppTest test test maven-assembly-plugin jar-with-dependencies make-assembly package assembly
编码过程如下
// 1. 构建sparkSession
val sparkSession = SparkSession
.builder()
.appName("抽取mysql数据到hive")
.enableHiveSupport() // 开启hive支持
//.master("local[2]") // 指定运行模式,使用本地模式进行调试, 启动的时候指定即可,这个参数只在本地调试的时候使用
.getOrCreate()
//定义函数,获取mysql链接
def extractFromMysql(sparkSession: SparkSession, tableName: String): Dataframe = {
val DB_URL = "jdbc:mysql:// ip地址 /库名"
val jdbcMap = Map(
"driver" -> "com.mysql.jdbc.Driver",
"url" -> DB_URL,
"dbtable" -> tableName,
"user" -> "用户名",
"password" -> "密码"
)
sparkSession.read.format("jdbc").options(jdbcMap).load()
}
//调用函数获取dataframe
val df = extractFromMysql(sparkSession, "tablename")
// 加载hive表数据
// 切换数据库
sparkSession.sql("use hive库名")
// 读取数据
// spark 可以直接操作非事务表,但是无法操作事务表
val customerDF = sparkSession.sql(
"""
| select * from customer
|""".stripMargin)
// hive表中的数据
customerDF.show()
// 把数据存进去,全量的数据存储
df.write.mode(SaveMode.Append).format("hive").saveAsTable("customer")
sparkSession.close()
编码之后可能爆红,因为没有引入spark的jar包和mysql-connect的jar包
点击idea右上角的这里
引入你的jar包
(我在这里引入完之后 习惯 先mvn clean install 再rebuild 再restart 如果你不习惯,就看下一句)
之后就好了 不行就 rebuild 和 restart
最后mvn clean mvn package 执行打包操作
2.在集群上建个空的hive表可以用脚本建表(我用的脚本)
#! /bin/bash
hive -e "
use hive库名;
CREATE TABLE CUSTOMER (
CUSTKEY INT comment '',
NAME string comment '',
ADDRESS string comment '',
NATIonKEY string comment '',
PHONE string comment '',
ACCTBAL string comment '',
MKTSEGMENT string comment '',
COMMENT string comment ''
)
comment 'customer表'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' 01'
LINES TERMINATED BY 'n'
STORED AS textfile
TBLPROPERTIES(
'transactional'='false'
);
"
3. 上传你打好的包
在集群上 rz -bye 即可上传
可以写个脚本运行你的包(我写的脚本)
#! /bin/bash export HADOOP_CONF_DIR=/usr/hdp/3.1.0.0-78/hadoop/conf /usr/hdp/3.1.0.0-78/spark2/bin/spark-submit --class 这里是你要运行的类 --master local[2] --driver-memory 512m --executor-memory 512m --num-executors 2 /这里是你jar包的地址 最前面有这个/哦
sh start.sh 运行脚本 MYSQL数据就导入进HIVE数据库了



