- 一、在mysql建立数据库和表
- 二、下载mysql的jdbc驱动
- 三、启动pyspark,连接数据库,读写数据
1.启动mysql
[ root@hadoop00 mysql5.7]#service mysql start [ root@hadoop00 mysql5.7]#./bin/mysql -u root -p #启动mysql
2.建立数据库和表
mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> alter table student change id id int auto_increment primary key; mysql> insert into student values(1,'Xueqian','F',23); mysql> insert into student values(2,'Weiliang','M',24); mysql> select * from student;
结果:
1.wget+对应版本下载地址 下载驱动包
mysql的jdbc驱动包下载地址
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.26.tar.gz
2.解压包,移动到spark的jars目录下
tar -zvxf mysql-connector-java-8.0.26.tar.gzmvcal - connector-iava-8 0.26/ cp -r mysql-connector-java-8.0.26 /usr/local/spark3.1/jars
查看:
三、启动pyspark,连接数据库,读写数据1.启动pyspark,附加如下参数。
因为启动pyspark时,必须指定mysql连接驱动jar包(如果你前面已经采用下面方式启动了pyspark,就不需要重复启动了)。表示换行
cd /usr/local/spark ./bin/pyspark --jars /usr/local/spark/jars/mysql-connector-java-8.0.26/mysql-connector-java-8.0.26.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-8.0.26/mysql-connector-java-8.0.26.jar
过程:
2.通过spark.read.format(“jdbc”).option().option()…操作通过JDBC连接MySql数据库时。
jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "123456").load()
jdbcDF.show()
结果如图:
在pyspark中执行如下命令对数据进行读写:
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要设置模式信息
schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataframe(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = 'hadoop'
prop['driver'] = "com.mysql.cj.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
其中参数:
| 参数名称 | 含义 |
|---|---|
| url | 数据库的连接地址,如:jdbc:mysql://localhost:3306/spark(最后为数据库名) |
| Driver | 数据库的驱动程序,如:com.mysql.cj.jdbc.Driver |
| dbtable | 需要访问的表名 |
| user | 数据库用户名 |
| password | 数据库用户密码 |
过程:
在mysql中查看:
四、参考
1、驱动包下载和安装
Spark2.1.0入门:通过JDBC连接数据库(Dataframe)(Python版)
2、数据库连接
centos7下载spark连接mysql数据库提取数据(pyspark,Scala,python独立执行)
3、java、驱动包版本和mysql对应关系
mysql-connector-java与mysql版本的对应关系



