栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

pyspark通过JDBC链接mysql(DataFrame)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

pyspark通过JDBC链接mysql(DataFrame)

一前言
Mysql版本:8.0.21
spark版本:3.1.1
hadoop版本:2.7.5
JDBC驱动程序版本:mysql-connector-java-5.1.46.tar.gz

二、正文
1、先在mysql里建立spark数据库,同时建立一个student表,向表中插入一些数据

mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> alter table student change id id int auto_increment primary key;
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;
+----+----------+--------+------+
| id | name     | gender | age  |
+----+----------+--------+------+
|  1 | Xueqian  | F      |   23 |
|  2 | Weiliang | M      |   24 |
+----+----------+--------+------+

2、下载JDBC驱动程序
将下载的文件解压到spark目录下的jars目录里。

3、启动msyql服务
service mysql start

4、启动pyspark

cd /opt/module/spark
./bin/pyspark 
--jars /opt/module/spark-3.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.46-bin.jar 
--driver-class-path /opt/module/spark-3.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.46-bin.jar

在一行的末尾加入斜杠,是为了告诉pyspark,命令还没有结束。

5、连接数据库

>>>jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.64.130:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "mysql密码").load()


6、查看数据

>>> jdbcDF.show()


7、写入数据

>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要设置模式信息
>>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
>>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
>>> studentDF = spark.createDataframe(rowRDD, schema)
>>> prop = {}
>>> prop['user'] = 'root'
>>> prop['password'] = '填写mysql密码'  
>>> prop['driver'] = "com.mysql.jdbc.Driver"
>>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

8、检验数据是否写入成功

mysql> select * from student;
+----+-----------+--------+------+
| id | name      | gender | age  |
+----+-----------+--------+------+
|  1 | Xueqian   | F      |   23 |
|  2 | Weiliang  | M      |   24 |
|  3 | Guanhua   | M      |   27 |
|  4 | Rongcheng | M      |   26 |
+----+-----------+--------+------+
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/631434.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号