在启动spark-shell时候,必须指定 jar 和 driver类路径,这两个路径是一致的,都是mysql驱动的路径。
然后选择对应的jar包
2.2读取数据import org.apache.spark.sql.SparkSession
object sparkSQLTestJDBC {
def main(args: Array[String]): Unit = {
//create spark commander
val spark = SparkSession.builder().master("local").appName("sparkSQLTest").getOrCreate()
//read
val studentDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://10.10.10.10:3306/dbName?serverTimezone=GMT") //IP and db name
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "student") //table name
.option("user", "root") //user name
.option("password", "123456") //password
.load()
//show
studentDF.show()
}
}
2.3写入数据
写入数据跟编程方式定义RDD一样
//for create table schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.Row
//for save connection info
import java.util.Properties
object sparkSQLTestJDBCWrite {
def main(args: Array[String]): Unit = {
//create spark commander
val spark = SparkSession.builder().appName("sparkSQLTest").master("local").getOrCreate()
//create table schema
val schema = StructType(List(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("score", IntegerType, true),
StructField("course", StringType, true)
))
//generate the insert data
val insertRDD: RDD[String] = spark.sparkContext.parallelize(Array("6 墨子 66 地理", "7 孔子 86 英语", "8 非子 96 化学"))
//split the rdd and into rows
val insertDataDF = insertRDD.map(_.split(" "))
.map(x => Row(x(0).trim.toInt, x(1), x(2).trim.toInt, x(3)))
//merge
val dataframe = spark.createDataframe(insertDataDF, schema)
//for restore the connection info
val prop = new Properties()
prop.put("user","bigdata")
prop.put("password","Bigdata2021")
prop.put("driver", "com.mysql.jdbc.Driver")
//write table
dataframe.write.mode("append")
.jdbc("jdbc:mysql://10.253.38.9:3306/finereport?serverTimezone=GMT","student",prop)
}
}
结果校验:
ps:在指定字段数据类型时候,注意包的选择



