栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

编程实现利用 DataFrame 读写 MySQL 的数据

编程实现利用 DataFrame 读写 MySQL 的数据

编程实现利用 Dataframe 读写 MySQL 的数据

(1) 在 MySQL 数据库中新建数据库 sparktest,再建表 employee,包含下列两行数据;
表 1 employee 表原有数据

idnamegenderage
1AliceF22
2JohnM25

mysql执行命令如下:
1)启动mysql

mysql -uroot -p

2)创建数据库以及数据表

create database sparktest; 
use sparktest;
create table employee(id int(4),name char(50), gender char(20), age int(10)); 
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
select * from employee;

(2) 配置Spark通过JDBC连接数据库MySQL,编程实现利用Dataframe插入下列数据到MySQL,最后打印出 age 的最大值和 age 的总和。
表 2 employee 表新增数据

idnamegenderage
3MaryF26
4TomM23

1)下载jar包:

https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.zip

2)启动spark

spark-shell --jars /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar --driver-class-path /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar

3)在Dataframe中执行命令

scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
scala> import  spark.implicits._
scala> val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
scala> val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))
scala> val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
scala> val employeeDF=spark.createDataframe(rowRDD,schema)
scala> val prop=new Properties()
scala> prop.put("user","root")
scala> prop.put("password","root")
scala> prop.put("driver","com.mysql.jdbc.Driver")
scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false", "sparktest.employee", prop)
scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false").option("driver","com.mysql.jdbc.Driver").option("dbtable", "employee").option("user", "root").option("password", ".Xuan0613").load()
scala> jdbcDF.agg("age" -> "max", "age" -> "sum").show()

4)执行结果


*注:错误1:

scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
Fri Jan 21 23:51:59 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,648 milliseconds ago.  The last packet sent successfully to the server was 1,628 milliseconds ago.
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
  at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
  at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:201)
  at com.mysql.jdbc.MysqlIO.negotiateSSLConnection(MysqlIO.java:4912)
  at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1663)
  at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)
  at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)
  at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)
  at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)
  at com.mysql.jdbc.ConnectionImpl.(ConnectionImpl.java:776)
  at com.mysql.jdbc.JDBC4Connection.(JDBC4Connection.java:47)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
  at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)
  at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:59)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:59)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
  at org.apache.spark.sql.DataframeWriter.save(DataframeWriter.scala:215)
  at org.apache.spark.sql.DataframeWriter.jdbc(DataframeWriter.scala:446)
  ... 52 elided
Caused by: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)
  at sun.security.ssl.HandshakeContext.(HandshakeContext.java:171)
  at sun.security.ssl.ClientHandshakeContext.(ClientHandshakeContext.java:101)
  at sun.security.ssl.TransportContext.kickstart(TransportContext.java:238)
  at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:394)
  at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:373)
  at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:186)
  ... 73 more
解决:未添加安全链接限制
jdbc:mysql://localhost:3306/sparktest?**characterEncoding=utf-8&useSSL=false**
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710280.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号