官网地址:http://spark.apache.org/sql/
1.1.1.什么是spark sqlspark sql是spark用来处理结构化数据的一个模块,它提供了一个编程抽象Dataframe,作为分布式SQL查询的引擎,它是将spark sql转换成RDD,然后提交到集群中去运行,执行效率非常快。支持多种使用方式:SQL、Dataframe API、DataSet API。
相比于spark RDD API,spark sql包含了对结构化数据和在其上运算的更多信息,spark sql使用这些信息进行了额外的优化,对结构化数据的操作更加高效和方便。
1.1.2.spark sql优点
易整合
将sql查询与spark程序无缝整合。可以使用java、scala、python、R等语言的API操作。
统一的数据访问方式
以相同的方式连接到任何数据源。
兼容Hive
支持hive SQL的语法。
标准的数据连接
可以使用行业标准的JDBC或ODBC进行连接
六字诀: 易用、兼容、标准
1.2.Dataframe 1.2.1.什么是Dataframe在spark中,Dataframe是一种以RDD为基础的分布式数据集,它的前身是SchemaRDD,类似于传统数据库的二维表格。
从Spark 1.3.0开始由SchemaRDD更名为Dataframe。它与SchemaRDD的主要区别:Dataframe不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。可以在Dataframe上调用rdd方法将其转换为一个RDD。
Dataframe带有Schema元数据信息,即Dataframe所表示的二维表数据集的每一列都带有名称和类型,底层做了更多的优化。可以通过多种方式构建Dataframe:已经存在的RDD、结构化文件、外部数据库、Hive表。
dataframe记录了对应列的名称和类型
dataframe引入schema和off-heap(使用操作系统层面上的内存)
1、解决了RDD的缺点
序列化和反序列化开销大频繁的创建和销毁对象造成大量的GC 2、丢失了RDD的优点
RDD编译时进行类型检查RDD具有面向对象编程的特性 1.2.2.Dataframe与RDD区别
RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息
Dataframe是分布式的Row对象的集合,其提供了由列组成的详细模式信息
Dataframe还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)
Dataframe除了提供了比RDD更丰富的算子以外,更重要的特点是减少数据读取以及执行计划的优化,提升执行效 率。易用性更好
1.2.3.Dataframe与RDD的优缺点RDD优缺点
优点:
编译时类型安全面向对象的编程风格 缺点:
序列化和反序列化的性能开销大GC的性能开销大 Dataframe优缺点
优点:
Dataframe引入了schema,存储了结构化数据的元数据信息,操作方便Dataframe引入了off-heap(非堆内存),解决了RDD 中GC性能开销大的问题 缺点:
Dataframe不是类型安全的Dataframe不是面向对象的 1.2.4.如何创建Dataframe
在spark2.0版本之前,spark sql中SqlContext是创建Dataframe和执行SQL的入口。利用hiveContext通过hive sql语句操作hive表数据,兼容hive操作,并且hiveContext继承自SQLContext。
在spark2.0之后,这些都统一于SparkSession,SparkSession 封装了SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。
启动spark-shell:(node03)
#cd /export/servers/spark-2.0.2-bin-hadoop2.7/bin spark-shell --master local[2]1.2.4.1.读取文本文件创建Dataframe
1.准备数据文件
people.txt:文件内容有三列(id、name、age)
1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40
2.加载数据文件、关联样例类
#在spark-shell中加载数据文件
val lineRDD= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" "))
#定义People样例类
case class People(id:Int,name:String,age:Int)
#将lineRDD和样例类People关联
val peopleRDD=lineRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt))
3.将peopleRDD转换成Dataframe
val peopleDF=peopleRDD.toDF
4.显示Dataframe中数据:
peopleDF.show
5.显示Dataframe中schema信息:
peopleDF.printSchema
6.直接通过SparkSession创建Dataframe
val dataframe=spark.read.text("file:///export/servers/testdata/people.txt")
1.2.4.2.读取json文件创建Dataframe
1.准备数据文件
使用spark安装包下提供了数据文件:
#文件路径
$SPARK_HOME/examples/src/main/resources/people.json
#加载数据文件
val jsonDF=spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json")
1.2.4.3.读取parquet列式存储格式文件创建Dataframe
1.准备数据文件
使用spark安装包下提供了数据文件:
#文件路径
$SPARK_HOME/examples/src/main/resources/users.parquet
#加载数据文件
val parquetDF= spark.read.parquet("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/users.parquet")
细节:关于SparkSession读取文件的相关方法
输入**spark.read.**按tab键,自动提示
1.3.Dataframe常用操作 1.3.1.DSL风格语法说明:DSL(Domain Specific Language)特定领域语言,是一种为了特定任务而设计的开发语言。比如XSLT、Html等。
1.3.1.1.启动spark-shellspark-shell --master local[2]1.3.1.2.查看Dataframe完整内容
#加载数据文件
val rdd1= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" "))
#定义样例类
case class People(id:Int,name:String,age:Int)
#将rdd与样例类进行关联
val rdd2=rdd1.map(x=>People(x(0).toInt,x(1),x(2).toInt))
#将rdd2转换成Dataframe
val peopleDF=rdd2.toDF
#查看peopleDF中的完整内容 peopleDF.show1.3.1.3.查看Dataframe部分列内容
peopleDF.select("name").show
peopleDF.select(col("name")).show
peopleDF.select($"name").show
peopleDF.select(peopleDF.col("name")).show
1.3.1.4.查看Dataframe部分列内容,在列上执行操作
peopleDF.select("name","age").show
peopleDF.select(col("name"),col("age")).show
#取出年龄列数据,并且执行+1操作
peopleDF.select(col("name"),col("age"),col("age")+1).show
1.3.1.5查看Dataframe满足条件的内容
#查看年龄大于等于30的用户
peopleDF.filter(col("age")>=30).show
1.3.1.6.执行统计操作
#统计用户记录数 peopleDF.count1.3.1.7.执行分组统计操作
#按照年龄列进行分组统计
peopleDF.groupBy("age").count.show
1.3.1.8.打印Dataframe的schema信息
peopleDF.printSchema1.3.2.SQL风格语法
说明:Dataframe的一个强大之处就是我们可以将它看作是一个关系型数据表,可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个Dataframe。
使用SQL风格的语法,需要将Dataframe注册成表,采用如下的方式:
Dataframe.registerTempTable("表名称")
1.3.2.1.将Dataframe注册成表
peopleDF.registerTempTable("t_people")
1.3.2.2.查询年龄最大的前两名
spark.sql("select * from t_people order by age desc limit 2").show
1.3.2.3.查询年龄大于30的人信息
spark.sql("select * from t_people where age>30").show
1.3.2.4.显示表的schema信息
spark.sql("desc t_people").show
1.4.DataSet
1.4.1.什么是DataSet
DataSet是分布式的数据集合,提供了强类型支持,是在RDD的每行数据加了类型约束。DataSet是在spark1.6中添加的新的接口,它集中了RDD的优点和spark sql优化的执行引擎。
DataSet包含了Dataframe的功能,Dataframe表示为DataSet[Row],即DataSet的子集。在spark2.0中两者已经统一。
1.4.2.RDD、Dataframe、DataSet区别RDD:
Dataframe:
DataSet:
**细节:**DataSet融合了RDD和Dataframe两者的优点。
DataSet中,数据有类型信息(Dataframe的优点)DataSet会在编译时检查类型(RDD的优点)DataSet是面向对象编程的接口(RDD的优点) 1.4.3.Dataframe与DataSet转换 1.4.3.1.Dataframe转换为DataSet
Dataframe.as[ElementType]1.4.3.2.DataSet转换为Dataframe
DataSet.toDF()1.4.4.创建DataSet 1.4.4.1.Dataframe转换生成DataSet
#加载数据文件
val df1= spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json")
#定义样例类
case class Person(age:BigInt,name:String)
#通过as[类型]转换成DataSet
val ds1=df1.as[Person]
1.4.4.2.通过toDS方法生成DataSet
#定义样例类 case class Person(id:Long,name:String,age:Long) #定义集合数据 val data=List(Person(1,"zhangsan",18),Person(2,"lisi",28)) #通过toDs方法转换成DataSet val ds2=data.toDS1.4.4.3.spark.createDataSet方法创建DataSet
#从已经存在的scala集合中构建DataSet
val ds3=spark.createDataset(1 to 5)
#从已经存在的rdd构建DataSet
val rdd4=sc.textFile("file:///export/servers/testdata/people.txt")
val ds4=spark.createDataset(rdd4)
2.编程方式执行spark sql查询
2.1.编写spark sql程序将RDD转换成Dataframe
2.1.1.通过反射推断schema方式
scala支持使用case class类型导入RDD转换为Dataframe,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。这种RDD可以高效的转换为Dataframe并注册为表。
2.1.1.1.创建项目 2.1.1.2.导入依赖2.1.1.3.编写代码4.0.0 cn.liny spark-teach-day06-01project 1.0-SNAPSHOT jar 2.11.8 2.0.2 org.scala-lang scala-library ${scala.version} org.apache.spark spark-sql_2.11 ${spark.version} org.scala-tools maven-scala-plugin 2.15.2 compile org.apache.maven.plugins maven-compiler-plugin 3.2 1.8 1.8 UTF-8
scala版本:
package cn.liny.rdd.df
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
// 定义样例类
case class People(id:Int,name:String,age:Int)
object RddToDFScala {
// 执行入口
def main(args: Array[String]): Unit = {
// 1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("RddToDFScala").master("local[2]").getOrCreate()
// 2.从SparkSession中,获取SparkContext对象
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
// 3.加载数据文件
val peopleRDD: RDD[String] = sc.textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt")
// 4.切分每一行记录
val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" "))
// 5.将RDD与样例类进行关联
val casePeopleRDD: RDD[People] = splitRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt))
// 6.通过toDF方法把RDD转换成Dataframe
import spark.implicits._
val peopleDF: Dataframe = casePeopleRDD.toDF
// 7.通过DSL方式操作Dataframe==============================DSL
// 显示schema信息
peopleDF.printSchema()
// 显示所有字段
peopleDF.columns.foreach(x=>println(x))
// 显示数据,默认显示20行
peopleDF.show()
// 显示第一行记录
val firstRow: Row = peopleDF.head()
println(firstRow)
// 统计记录数量
println(peopleDF.count())
// 显示指定列的值
peopleDF.select("id","name").show()
// 过滤操作
peopleDF.filter($"age">30).show()
// 分组统计操作
peopleDF.groupBy("age").count().show()
// 8.通过sql语句方式操作Dataframe===============================sql
println("通过sql语句方式操作Dataframe===============================sql")
// 把Dataframe注册为表
peopleDF.createOrReplaceTempView("t_people")
// 显示表的schema信息
spark.sql("desc t_people").show()
// 查询全表数据
spark.sql("select * from t_people").show()
// 9.释放资源
sc.stop()
spark.stop()
}
}
java版本:
People javaBean:
package cn.liny.jrdd.po;
import java.io.Serializable;
public class People implements Serializable {
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String toString() {
return "People{" +
"id=" + id +
", name='" + name + ''' +
", age=" + age +
'}';
}
}
RddToDFJava:
package cn.liny.jrdd.df;
import cn.liny.jrdd.po.People;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
public class RddToDFJava {
public static void main(String[] args) {
// 1.创建SparkSession对象
SparkSession spark = SparkSession.builder().appName("RddToDFJava").master("local[2]").getOrCreate();
// 2获取SparkContext对象,设置日志输出级别
SparkContext sc = spark.sparkContext();
sc.setLogLevel("WARN");
// 3.加载数据文件
Dataset peopleDS = spark.read().textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt");
JavaRDD stringJavaRDD = peopleDS.toJavaRDD();
// 4.切分每一行数据,转换成 JavaRDD对象
JavaRDD peopleJavaRDD = stringJavaRDD.map(line -> {
String[] arr = line.split(" ");
// 创建People对象
People people = new People();
people.setId(Integer.valueOf(arr[0]));
people.setName(arr[1]);
people.setAge(Integer.valueOf(arr[2]));
return people;
});
// 5.调用createDataframe转换
Dataset peopleDF = spark.createDataframe(peopleJavaRDD, People.class);
// 6.通过DSL方式操作Dataframe==============================DSL
// 显示schema信息
peopleDF.printSchema();
// 显示所有字段
String[] columns = peopleDF.columns();
System.out.println(Arrays.asList(columns));
// 显示数据,默认显示20行
peopleDF.show();
// 显示第一行记录
Row head = peopleDF.head();
System.out.println(head);
// 统计记录数量
System.out.println(peopleDF.count());
// 显示指定列的值
peopleDF.select("id","name","age").show();
// 分组统计操作
peopleDF.groupBy("age").count().show();
// 7.通过sql语句方式操作Dataframe===============================sql
System.out.println("通过sql语句方式操作Dataframe===============================sql");
// 将Dataframe注册成表
peopleDF.createOrReplaceTempView("t_people");
// 显示表的schema信息
spark.sql("desc t_people").show();
// 查询全表数据
Dataset allRow = spark.sql("select * from t_people");
allRow.show();
// 查询年龄大于等于30
Dataset ageRow = spark.sql("select * from t_people where age>=30");
ageRow.show();
// 8.释放资源
sc.stop();
spark.stop();
}
}
2.1.2.通过StructType定义schema方式
2.1.2.1.scala版本
package cn.liny.rdd.df
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object RddToDFScalaByStructType {
def main(args: Array[String]): Unit = {
// 1.创建SparkSession
val spark: SparkSession = SparkSession.builder().appName("RddToDFScalaByStructType").master("local[2]").getOrCreate()
// 2.通过SparkSession,获取SparkContext
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
// 3.加载数据文件
val peopleRDD: RDD[String] = sc.textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt")
// 4.切分每一行数据
val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" "))
// 5.加载数据到Row对象中
val rowRDD: RDD[Row] = splitRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
// 6.创建schema
val structType: StructType = (new StructType)
.add(StructField("id",IntegerType,false))
.add(StructField("name",StringType,false))
.add(StructField("age",IntegerType,false))
// 7.利用RDD和schema创建Dataframe
val rowDF: Dataframe = spark.createDataframe(rowRDD,structType)
// 8.通过DSL方式操作Dataframe==============================DSL
// 显示schema信息
rowDF.printSchema()
// 显示数据,默认显示20行
rowDF.show()
// 9.通过sql语句方式操作Dataframe===============================sql
println("通过sql语句方式操作Dataframe===============================sql")
// 将Dataframe注册为表
rowDF.createOrReplaceTempView("t_people")
// 显示表结构schema信息
spark.sql("desc t_people").show()
// 查询全部数据
spark.sql("select * from t_people").show()
// 10.释放资源
sc.stop()
spark.stop()
}
}
2.1.2.2.java版本
package cn.liny.jrdd.df;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
public class RddToDFJavaByStructType {
public static void main(String[] args) {
// 1.创建SparkSession
SparkSession spark = SparkSession.builder().appName("RddToRDDJavaByStructType").master("local[2]").getOrCreate();
// 2.通过SparkSession,获取SparkContext
SparkContext sc = spark.sparkContext();
sc.setLogLevel("WARN");
// 3.加载数据文件,并且转换成JavaRDD
Dataset stringDS = spark.read().textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt");
JavaRDD stringJavaRDD = stringDS.toJavaRDD();
// 4.切分每一行数据,加载数据到Row对象中
JavaRDD rowJavaRDD = stringJavaRDD.map(line -> {
String[] arr = line.split(" ");
Integer id = Integer.valueOf(arr[0]);
String name = arr[1];
Integer age = Integer.valueOf(arr[2]);
// 通过RowFactory工厂,创建Row对象
return RowFactory.create(id, name, age);
});
// 5.通过StructType定义schema
List fields = new ArrayList();
fields.add(DataTypes.createStructField("id",IntegerType,false));
fields.add(DataTypes.createStructField("name",StringType,false));
fields.add(DataTypes.createStructField("age",IntegerType,false));
StructType structType = DataTypes.createStructType(fields);
// 6.利用RDD和schema定义Dataframe
Dataset peopleDF = spark.createDataframe(rowJavaRDD, structType);
// 7.通过DSL方式操作Dataframe==============================DSL
// 显示schema信息
peopleDF.printSchema();
// 显示数据,默认显示20行
peopleDF.show();
// 8.通过sql语句方式操作Dataframe===============================sql
System.out.println("通过sql语句方式操作Dataframe===============================sql");
// 把Dataframe注册成表
peopleDF.createOrReplaceTempView("t_people");
// 显示表结构schema信息
spark.sql("desc t_people").show();
// 查询全部数据
spark.sql("select * from t_people").show();
// 9.释放资源
sc.stop();
spark.stop();
}
}
2.2.编写spark sql程序操作HiveContext
HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类。
在spark2.0之后,HiveContext和SqlContext在SparkSession进行了统一,可以通过操作SparkSession来操作HiveContext和SqlContext。
2.2.1.导入依赖2.2.2.编写代码org.apache.spark spark-hive_2.11 ${spark.version}
package cn.liny.hi
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object SparkHiveSupport {
def main(args: Array[String]): Unit = {
// 1.创建SparkSession对象
val spark = SparkSession.builder()
.appName("SparkHiveSupport")
.master("local[2]")
.config("spark.sql.warehouse.dir","D:\02teach\03tmp\spark\spark-warehouse")
.enableHiveSupport()// 开启支持hive
.getOrCreate()
// 2.通过SparkSession,获取SparkContext对象
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
// 3.创建hive表
spark.sql("CREATE TABLE IF NOT EXISTS student(id int, name string, age int) row format delimited fields terminated by ' '")
// 4.导入数据到hive表
spark.sql("LOAD DATA LOCAL INPATH 'D:/02teach/02resources/0322/04bigdata/testdata/people.txt' INTO TABLE student")
// 5.执行sql查询
spark.sql("select * from student ").show()
// 6.释放资源
sc.stop()
spark.stop()
}
}
3.数据源
spark sql可以通过JDBC从关系型数据库中读取数据的方式创建Dataframe,通过对Dataframe一系列的计算后,再将数据写回关系型数据库中。
3.1.spark sql 从mysql数据库中加载数据 3.1.1.编写代码操作 3.1.1.1.导入依赖3.1.1.2.编写代码5.1.38 mysql mysql-connector-java ${mysql.version}
scala版本:
package cn.liny.db
import java.util.Properties
import org.apache.spark.sql.{Dataframe, SparkSession}
object DataFromMysql {
def main(args: Array[String]): Unit = {
// 1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
// 2.创建Properties对象,设置连接mysql的用户名和密码
val prop: Properties =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","admin")
// 3.读取mysql中的数据
val iplocationDF: Dataframe = spark.read.jdbc("jdbc:mysql://127.0.0.1:3308/spark","iplocation",prop)
// 4.显示mysql中表的数据
iplocationDF.show()
// 5.释放资源
spark.stop()
}
}
java版本:
package cn.liny.jrdd.db;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class DataFromMysqlForJava {
public static void main(String[] args) {
// 1.创建SparkSession对象
SparkSession spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate();
//2.创建Properties对象,设置连接mysql的用户名和密码
Properties prop = new Properties();
prop.setProperty("user","root");
prop.setProperty("password","admin");
// 3.读取mysql中的数据
Dataset iplocation = spark.read().jdbc("jdbc:mysql://127.0.0.1:3308/spark", "iplocation", prop);
// 4.显示数据
iplocation.show();
// 5.释放资源
spark.stop();
}
}
3.1.2.通过spark-shell 操作
3.1.2.1.启动spark-shell,需要指定mysql驱动包
spark-shell --master spark://node01:7077 --executor-memory 1g --total-executor-cores 2 --jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar --driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar3.1.2.2.从mysql中加载数据
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.53.120:3306/mysql", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
mysqlDF.show
3.2.spark sql将数据写入到mysql数据库中
3.2.1.本地模式运行
package cn.liny.db
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
// 定义样例类
case class Teacher(id:Int,name:String,age:Int)
object DataToMysql {
def main(args: Array[String]): Unit = {
// 1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate()
// 2.读取数据
val data: RDD[String] = spark.sparkContext.textFile(args(0))
// 3.切分每一行
val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
// 4.RDD关联样例类
val teacherRDD: RDD[Teacher] = arrRDD.map(x=>Teacher(x(0).toInt,x(1),x(2).toInt))
// 5.导入隐式转换
import spark.implicits._
// 6.将RDD转换成Dataframe
val teacherDF: Dataframe = teacherRDD.toDF()
// 7.将Dataframe注册成表
teacherDF.createOrReplaceTempView("teacher")
// 8.操作teacher表 ,按照年龄进行降序排列
val resultDF: Dataframe = spark.sql("select * from teacher order by age desc")
// resultDF.show()
// 9.把结果保存在mysql表中
// 9.1.创建Properties对象,设置连接mysql的用户名和密码
val prop: Properties =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","admin")
// 9.2.操作数据库表
resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3308/spark","teacher",prop)
// 释放资源
spark.stop()
}
}
3.2.2.打包提交到集群环境运行
3.2.2.1.改造代码
package cn.liny.db
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
// 定义样例类
case class TeacherMaster(id:Int,name:String,age:Int)
object DataToMysqlForMaster {
def main(args: Array[String]): Unit = {
// 1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder().appName("DataToMysqlForMaster").getOrCreate()
// 2.读取数据
val data: RDD[String] = spark.sparkContext.textFile(args(0))
// 3.切分每一行
val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
// 4.RDD关联样例类
val teacherRDD: RDD[TeacherMaster] = arrRDD.map(x=>TeacherMaster(x(0).toInt,x(1),x(2).toInt))
// 5.导入隐式转换
import spark.implicits._
// 6.将RDD转换成Dataframe
val teacherDF: Dataframe = teacherRDD.toDF()
// 7.将Dataframe注册成表
teacherDF.createOrReplaceTempView("teacher")
// 8.操作teacher表 ,按照年龄进行降序排列
val resultDF: Dataframe = spark.sql("select * from teacher order by age desc")
// resultDF.show()
// 9.把结果保存在mysql表中
// 9.1.创建Properties对象,设置连接mysql的用户名和密码
val prop: Properties =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","bigdata")
// 9.2.操作数据库表
resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.80.22:3306/spark","teacher",prop)
// 释放资源
spark.stop()
}
}
3.2.2.2.打包
3.2.2.3.提交集群运行
#将文件上传到hdfs hdfs dfs -mkdir -p /spark/sql/input hdfs dfs -put people.txt /spark/sql/input #提交执行任务 spark-submit --class cn.liny.db.DataToMysqlForMaster --master spark://node01:7077 --executor-memory 1g --total-executor-cores 2 --jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar --driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar /export/servers/testjar/spark_sql-1.0-SNAPSHOT.jar hdfs://192.168.53.100:8020/spark/sql/input/people.txt



