栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

06-SparkSQL

06-SparkSQL

1.spark sql 1.1.spark sql概述

官网地址:http://spark.apache.org/sql/

1.1.1.什么是spark sql

spark sql是spark用来处理结构化数据的一个模块,它提供了一个编程抽象Dataframe,作为分布式SQL查询的引擎,它是将spark sql转换成RDD,然后提交到集群中去运行,执行效率非常快。支持多种使用方式:SQL、Dataframe API、DataSet API。

相比于spark RDD API,spark sql包含了对结构化数据和在其上运算的更多信息,spark sql使用这些信息进行了额外的优化,对结构化数据的操作更加高效和方便。

1.1.2.spark sql优点

易整合

将sql查询与spark程序无缝整合。可以使用java、scala、python、R等语言的API操作。

统一的数据访问方式

以相同的方式连接到任何数据源。

兼容Hive

支持hive SQL的语法。

标准的数据连接

可以使用行业标准的JDBC或ODBC进行连接

六字诀: 易用、兼容、标准

1.2.Dataframe 1.2.1.什么是Dataframe

在spark中,Dataframe是一种以RDD为基础的分布式数据集,它的前身是SchemaRDD,类似于传统数据库的二维表格。

从Spark 1.3.0开始由SchemaRDD更名为Dataframe。它与SchemaRDD的主要区别:Dataframe不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。可以在Dataframe上调用rdd方法将其转换为一个RDD。

Dataframe带有Schema元数据信息,即Dataframe所表示的二维表数据集的每一列都带有名称和类型,底层做了更多的优化。可以通过多种方式构建Dataframe:已经存在的RDD、结构化文件、外部数据库、Hive表。

dataframe记录了对应列的名称和类型

dataframe引入schema和off-heap(使用操作系统层面上的内存)

1、解决了RDD的缺点

序列化和反序列化开销大频繁的创建和销毁对象造成大量的GC 2、丢失了RDD的优点

RDD编译时进行类型检查RDD具有面向对象编程的特性 1.2.2.Dataframe与RDD区别

RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息

Dataframe是分布式的Row对象的集合,其提供了由列组成的详细模式信息

Dataframe还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)

Dataframe除了提供了比RDD更丰富的算子以外,更重要的特点是减少数据读取以及执行计划的优化,提升执行效 率。易用性更好

1.2.3.Dataframe与RDD的优缺点

RDD优缺点

优点:

编译时类型安全面向对象的编程风格 缺点:

序列化和反序列化的性能开销大GC的性能开销大 Dataframe优缺点

优点:

Dataframe引入了schema,存储了结构化数据的元数据信息,操作方便Dataframe引入了off-heap(非堆内存),解决了RDD 中GC性能开销大的问题 缺点:

Dataframe不是类型安全的Dataframe不是面向对象的 1.2.4.如何创建Dataframe

在spark2.0版本之前,spark sql中SqlContext是创建Dataframe和执行SQL的入口。利用hiveContext通过hive sql语句操作hive表数据,兼容hive操作,并且hiveContext继承自SQLContext。

在spark2.0之后,这些都统一于SparkSession,SparkSession 封装了SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。

启动spark-shell:(node03)

#cd /export/servers/spark-2.0.2-bin-hadoop2.7/bin
spark-shell --master local[2]

1.2.4.1.读取文本文件创建Dataframe

1.准备数据文件

people.txt:文件内容有三列(id、name、age)

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

2.加载数据文件、关联样例类

#在spark-shell中加载数据文件
val lineRDD= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" "))

#定义People样例类
case class People(id:Int,name:String,age:Int)

#将lineRDD和样例类People关联
val peopleRDD=lineRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt))

3.将peopleRDD转换成Dataframe

val peopleDF=peopleRDD.toDF

4.显示Dataframe中数据:

peopleDF.show

5.显示Dataframe中schema信息:

peopleDF.printSchema

6.直接通过SparkSession创建Dataframe

val dataframe=spark.read.text("file:///export/servers/testdata/people.txt")

1.2.4.2.读取json文件创建Dataframe

1.准备数据文件

使用spark安装包下提供了数据文件:

#文件路径
$SPARK_HOME/examples/src/main/resources/people.json

#加载数据文件
val jsonDF=spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json")

1.2.4.3.读取parquet列式存储格式文件创建Dataframe

1.准备数据文件

使用spark安装包下提供了数据文件:

#文件路径
$SPARK_HOME/examples/src/main/resources/users.parquet
#加载数据文件
val parquetDF= spark.read.parquet("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/users.parquet")

细节:关于SparkSession读取文件的相关方法

输入**spark.read.**按tab键,自动提示

1.3.Dataframe常用操作 1.3.1.DSL风格语法

说明:DSL(Domain Specific Language)特定领域语言,是一种为了特定任务而设计的开发语言。比如XSLT、Html等。

1.3.1.1.启动spark-shell
spark-shell --master local[2]

1.3.1.2.查看Dataframe完整内容
#加载数据文件
val rdd1= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" "))
#定义样例类
case class People(id:Int,name:String,age:Int)
#将rdd与样例类进行关联
val rdd2=rdd1.map(x=>People(x(0).toInt,x(1),x(2).toInt))
#将rdd2转换成Dataframe
val peopleDF=rdd2.toDF

#查看peopleDF中的完整内容
peopleDF.show

1.3.1.3.查看Dataframe部分列内容
peopleDF.select("name").show

peopleDF.select(col("name")).show

peopleDF.select($"name").show

peopleDF.select(peopleDF.col("name")).show

1.3.1.4.查看Dataframe部分列内容,在列上执行操作
peopleDF.select("name","age").show

peopleDF.select(col("name"),col("age")).show

#取出年龄列数据,并且执行+1操作
peopleDF.select(col("name"),col("age"),col("age")+1).show

1.3.1.5查看Dataframe满足条件的内容
#查看年龄大于等于30的用户
peopleDF.filter(col("age")>=30).show

1.3.1.6.执行统计操作
#统计用户记录数
peopleDF.count

1.3.1.7.执行分组统计操作
#按照年龄列进行分组统计
peopleDF.groupBy("age").count.show

1.3.1.8.打印Dataframe的schema信息
peopleDF.printSchema

1.3.2.SQL风格语法

说明:Dataframe的一个强大之处就是我们可以将它看作是一个关系型数据表,可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个Dataframe。

使用SQL风格的语法,需要将Dataframe注册成表,采用如下的方式:

Dataframe.registerTempTable("表名称")
1.3.2.1.将Dataframe注册成表
peopleDF.registerTempTable("t_people")

1.3.2.2.查询年龄最大的前两名
spark.sql("select * from t_people order by age desc limit 2").show

1.3.2.3.查询年龄大于30的人信息
spark.sql("select * from t_people where age>30").show

1.3.2.4.显示表的schema信息
spark.sql("desc t_people").show

1.4.DataSet 1.4.1.什么是DataSet

DataSet是分布式的数据集合,提供了强类型支持,是在RDD的每行数据加了类型约束。DataSet是在spark1.6中添加的新的接口,它集中了RDD的优点和spark sql优化的执行引擎。

DataSet包含了Dataframe的功能,Dataframe表示为DataSet[Row],即DataSet的子集。在spark2.0中两者已经统一。

1.4.2.RDD、Dataframe、DataSet区别

RDD:

Dataframe:

DataSet:

**细节:**DataSet融合了RDD和Dataframe两者的优点。

DataSet中,数据有类型信息(Dataframe的优点)DataSet会在编译时检查类型(RDD的优点)DataSet是面向对象编程的接口(RDD的优点) 1.4.3.Dataframe与DataSet转换 1.4.3.1.Dataframe转换为DataSet

Dataframe.as[ElementType]
1.4.3.2.DataSet转换为Dataframe
DataSet.toDF()
1.4.4.创建DataSet 1.4.4.1.Dataframe转换生成DataSet
#加载数据文件
val df1= spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json")
#定义样例类
case class Person(age:BigInt,name:String)
#通过as[类型]转换成DataSet
val ds1=df1.as[Person]

1.4.4.2.通过toDS方法生成DataSet
#定义样例类
case class Person(id:Long,name:String,age:Long)
#定义集合数据
val data=List(Person(1,"zhangsan",18),Person(2,"lisi",28))
#通过toDs方法转换成DataSet
val ds2=data.toDS

1.4.4.3.spark.createDataSet方法创建DataSet
#从已经存在的scala集合中构建DataSet
val ds3=spark.createDataset(1 to 5)
#从已经存在的rdd构建DataSet
val rdd4=sc.textFile("file:///export/servers/testdata/people.txt")
val ds4=spark.createDataset(rdd4)

2.编程方式执行spark sql查询 2.1.编写spark sql程序将RDD转换成Dataframe 2.1.1.通过反射推断schema方式

scala支持使用case class类型导入RDD转换为Dataframe,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。这种RDD可以高效的转换为Dataframe并注册为表。

2.1.1.1.创建项目 2.1.1.2.导入依赖


    4.0.0

    cn.liny
    spark-teach-day06-01project
    1.0-SNAPSHOT

    jar

    
        2.11.8
        2.0.2
    

    
    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
        
    

    
    
        
            
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                        
                    
                
            
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.2
                
                    1.8
                    1.8
                    UTF-8
                
            
        
    
    

2.1.1.3.编写代码

scala版本:

package cn.liny.rdd.df

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Row, SparkSession}



// 定义样例类
case class People(id:Int,name:String,age:Int)

object RddToDFScala {

  // 执行入口
  def main(args: Array[String]): Unit = {

    // 1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("RddToDFScala").master("local[2]").getOrCreate()

    // 2.从SparkSession中,获取SparkContext对象
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    // 3.加载数据文件
    val peopleRDD: RDD[String] = sc.textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt")

    // 4.切分每一行记录
    val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" "))

    // 5.将RDD与样例类进行关联
    val casePeopleRDD: RDD[People] = splitRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt))

    // 6.通过toDF方法把RDD转换成Dataframe
    
    import spark.implicits._
    val peopleDF: Dataframe = casePeopleRDD.toDF

    // 7.通过DSL方式操作Dataframe==============================DSL
    // 显示schema信息
    peopleDF.printSchema()
    // 显示所有字段
    peopleDF.columns.foreach(x=>println(x))
    // 显示数据,默认显示20行
    peopleDF.show()
    // 显示第一行记录
    val firstRow: Row = peopleDF.head()
    println(firstRow)
    // 统计记录数量
    println(peopleDF.count())
    // 显示指定列的值
    peopleDF.select("id","name").show()
    // 过滤操作
    peopleDF.filter($"age">30).show()
    // 分组统计操作
    peopleDF.groupBy("age").count().show()

    // 8.通过sql语句方式操作Dataframe===============================sql
    println("通过sql语句方式操作Dataframe===============================sql")

    // 把Dataframe注册为表
    peopleDF.createOrReplaceTempView("t_people")

    // 显示表的schema信息
    spark.sql("desc t_people").show()
    // 查询全表数据
    spark.sql("select * from t_people").show()

    // 9.释放资源
    sc.stop()
    spark.stop()

  }

}

java版本:

People javaBean:

package cn.liny.jrdd.po;

import java.io.Serializable;


public class People implements Serializable {

    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String toString() {
        return "People{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", age=" + age +
                '}';
    }
}

RddToDFJava:

package cn.liny.jrdd.df;

import cn.liny.jrdd.po.People;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;


public class RddToDFJava {

    public static void main(String[] args) {
        // 1.创建SparkSession对象
        SparkSession spark = SparkSession.builder().appName("RddToDFJava").master("local[2]").getOrCreate();

        // 2获取SparkContext对象,设置日志输出级别
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("WARN");

        // 3.加载数据文件
        Dataset peopleDS = spark.read().textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt");
        JavaRDD stringJavaRDD = peopleDS.toJavaRDD();

        // 4.切分每一行数据,转换成 JavaRDD对象
        JavaRDD peopleJavaRDD = stringJavaRDD.map(line -> {
            String[] arr = line.split(" ");

            // 创建People对象
            People people = new People();
            people.setId(Integer.valueOf(arr[0]));
            people.setName(arr[1]);
            people.setAge(Integer.valueOf(arr[2]));

            return people;
        });

        // 5.调用createDataframe转换
        Dataset peopleDF = spark.createDataframe(peopleJavaRDD, People.class);

        // 6.通过DSL方式操作Dataframe==============================DSL
        // 显示schema信息
        peopleDF.printSchema();
        // 显示所有字段
        String[] columns = peopleDF.columns();
        System.out.println(Arrays.asList(columns));

        // 显示数据,默认显示20行
        peopleDF.show();
        // 显示第一行记录
        Row head = peopleDF.head();
        System.out.println(head);
        // 统计记录数量
        System.out.println(peopleDF.count());
        // 显示指定列的值
        peopleDF.select("id","name","age").show();
        // 分组统计操作
        peopleDF.groupBy("age").count().show();

        // 7.通过sql语句方式操作Dataframe===============================sql
        System.out.println("通过sql语句方式操作Dataframe===============================sql");
        // 将Dataframe注册成表
        peopleDF.createOrReplaceTempView("t_people");

        // 显示表的schema信息
        spark.sql("desc t_people").show();

        // 查询全表数据
        Dataset allRow = spark.sql("select * from t_people");
        allRow.show();

        // 查询年龄大于等于30
        Dataset ageRow = spark.sql("select * from t_people where age>=30");
        ageRow.show();

        // 8.释放资源
        sc.stop();
        spark.stop();
    }
}
2.1.2.通过StructType定义schema方式 2.1.2.1.scala版本
package cn.liny.rdd.df

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}


object RddToDFScalaByStructType {

  def main(args: Array[String]): Unit = {
    // 1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().appName("RddToDFScalaByStructType").master("local[2]").getOrCreate()

    // 2.通过SparkSession,获取SparkContext
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    // 3.加载数据文件
    val peopleRDD: RDD[String] = sc.textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt")

    // 4.切分每一行数据
    val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" "))

    // 5.加载数据到Row对象中
    val rowRDD: RDD[Row] = splitRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))


    // 6.创建schema
    val structType: StructType = (new StructType)
      .add(StructField("id",IntegerType,false))
      .add(StructField("name",StringType,false))
      .add(StructField("age",IntegerType,false))

    // 7.利用RDD和schema创建Dataframe
    val rowDF: Dataframe = spark.createDataframe(rowRDD,structType)

    // 8.通过DSL方式操作Dataframe==============================DSL
    // 显示schema信息
    rowDF.printSchema()
    // 显示数据,默认显示20行
    rowDF.show()

    // 9.通过sql语句方式操作Dataframe===============================sql
    println("通过sql语句方式操作Dataframe===============================sql")
    // 将Dataframe注册为表
    rowDF.createOrReplaceTempView("t_people")

    // 显示表结构schema信息
    spark.sql("desc t_people").show()

    // 查询全部数据
    spark.sql("select * from t_people").show()

    // 10.释放资源
    sc.stop()
    spark.stop()
  }

}
2.1.2.2.java版本
package cn.liny.jrdd.df;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;


public class RddToDFJavaByStructType {

    public static void main(String[] args) {
        // 1.创建SparkSession
        SparkSession spark = SparkSession.builder().appName("RddToRDDJavaByStructType").master("local[2]").getOrCreate();

        // 2.通过SparkSession,获取SparkContext
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("WARN");

        // 3.加载数据文件,并且转换成JavaRDD
        Dataset stringDS = spark.read().textFile("D:\02teach\02resources\0322\04bigdata\testdata\people.txt");
        JavaRDD stringJavaRDD = stringDS.toJavaRDD();

        // 4.切分每一行数据,加载数据到Row对象中
        JavaRDD rowJavaRDD = stringJavaRDD.map(line -> {
            String[] arr = line.split(" ");
            Integer id = Integer.valueOf(arr[0]);
            String name = arr[1];
            Integer age = Integer.valueOf(arr[2]);

            // 通过RowFactory工厂,创建Row对象
            return RowFactory.create(id, name, age);

        });

        // 5.通过StructType定义schema
        List fields = new ArrayList();
        fields.add(DataTypes.createStructField("id",IntegerType,false));
        fields.add(DataTypes.createStructField("name",StringType,false));
        fields.add(DataTypes.createStructField("age",IntegerType,false));

        StructType structType = DataTypes.createStructType(fields);

        // 6.利用RDD和schema定义Dataframe
        Dataset peopleDF = spark.createDataframe(rowJavaRDD, structType);

        // 7.通过DSL方式操作Dataframe==============================DSL
        // 显示schema信息
        peopleDF.printSchema();
        // 显示数据,默认显示20行
        peopleDF.show();

        // 8.通过sql语句方式操作Dataframe===============================sql
        System.out.println("通过sql语句方式操作Dataframe===============================sql");
        // 把Dataframe注册成表
        peopleDF.createOrReplaceTempView("t_people");
        // 显示表结构schema信息
        spark.sql("desc t_people").show();
        // 查询全部数据
        spark.sql("select * from t_people").show();

        // 9.释放资源
        sc.stop();
        spark.stop();

    }
}
2.2.编写spark sql程序操作HiveContext

HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类。

在spark2.0之后,HiveContext和SqlContext在SparkSession进行了统一,可以通过操作SparkSession来操作HiveContext和SqlContext。

2.2.1.导入依赖

    org.apache.spark
    spark-hive_2.11
    ${spark.version}

2.2.2.编写代码
package cn.liny.hi

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession


object SparkHiveSupport {

  def main(args: Array[String]): Unit = {
    // 1.创建SparkSession对象
    val spark = SparkSession.builder()
                            .appName("SparkHiveSupport")
                            .master("local[2]")
                            .config("spark.sql.warehouse.dir","D:\02teach\03tmp\spark\spark-warehouse")
                            .enableHiveSupport()// 开启支持hive
                            .getOrCreate()

    // 2.通过SparkSession,获取SparkContext对象
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    // 3.创建hive表
    spark.sql("CREATE TABLE IF NOT EXISTS student(id int, name string, age int) row format delimited fields terminated by ' '")

    // 4.导入数据到hive表
    spark.sql("LOAD DATA LOCAL INPATH 'D:/02teach/02resources/0322/04bigdata/testdata/people.txt' INTO TABLE student")

    // 5.执行sql查询
    spark.sql("select * from student ").show()

    // 6.释放资源
    sc.stop()
    spark.stop()

  }

}
3.数据源

spark sql可以通过JDBC从关系型数据库中读取数据的方式创建Dataframe,通过对Dataframe一系列的计算后,再将数据写回关系型数据库中。

3.1.spark sql 从mysql数据库中加载数据 3.1.1.编写代码操作 3.1.1.1.导入依赖
5.1.38

 

   mysql
   mysql-connector-java
   ${mysql.version}

3.1.1.2.编写代码

scala版本:

package cn.liny.db

import java.util.Properties

import org.apache.spark.sql.{Dataframe, SparkSession}


object DataFromMysql {

  def main(args: Array[String]): Unit = {
    // 1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()

    // 2.创建Properties对象,设置连接mysql的用户名和密码
    val prop: Properties =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","admin")

    // 3.读取mysql中的数据
    val iplocationDF: Dataframe = spark.read.jdbc("jdbc:mysql://127.0.0.1:3308/spark","iplocation",prop)

    // 4.显示mysql中表的数据
    iplocationDF.show()

    // 5.释放资源
    spark.stop()

  }

}

java版本:

package cn.liny.jrdd.db;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;


public class DataFromMysqlForJava {

    public static void main(String[] args) {
        // 1.创建SparkSession对象
        SparkSession spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate();

        //2.创建Properties对象,设置连接mysql的用户名和密码
        Properties prop = new Properties();
        prop.setProperty("user","root");
        prop.setProperty("password","admin");

        // 3.读取mysql中的数据
        Dataset iplocation = spark.read().jdbc("jdbc:mysql://127.0.0.1:3308/spark", "iplocation", prop);

        // 4.显示数据
        iplocation.show();

        // 5.释放资源
        spark.stop();
    }
}
3.1.2.通过spark-shell 操作 3.1.2.1.启动spark-shell,需要指定mysql驱动包
spark-shell 
--master spark://node01:7077 
--executor-memory 1g 
--total-executor-cores  2 
--jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar 
--driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar

3.1.2.2.从mysql中加载数据
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.53.120:3306/mysql", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()

mysqlDF.show

3.2.spark sql将数据写入到mysql数据库中 3.2.1.本地模式运行
package cn.liny.db

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}



// 定义样例类
case class Teacher(id:Int,name:String,age:Int)

object DataToMysql {

  def main(args: Array[String]): Unit = {

    // 1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate()

    // 2.读取数据
    val data: RDD[String] = spark.sparkContext.textFile(args(0))

    // 3.切分每一行
    val arrRDD: RDD[Array[String]] = data.map(_.split(" "))

    // 4.RDD关联样例类
    val teacherRDD: RDD[Teacher] = arrRDD.map(x=>Teacher(x(0).toInt,x(1),x(2).toInt))

    // 5.导入隐式转换
    import spark.implicits._

    // 6.将RDD转换成Dataframe
    val teacherDF: Dataframe = teacherRDD.toDF()

    // 7.将Dataframe注册成表
    teacherDF.createOrReplaceTempView("teacher")

    // 8.操作teacher表 ,按照年龄进行降序排列
    val resultDF: Dataframe = spark.sql("select * from teacher order by age desc")
   // resultDF.show()

    // 9.把结果保存在mysql表中
    // 9.1.创建Properties对象,设置连接mysql的用户名和密码
    val prop: Properties =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","admin")

    // 9.2.操作数据库表
    
    resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3308/spark","teacher",prop)

    // 释放资源
    spark.stop()

  }

}

3.2.2.打包提交到集群环境运行 3.2.2.1.改造代码
package cn.liny.db

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}



// 定义样例类
case class TeacherMaster(id:Int,name:String,age:Int)

object DataToMysqlForMaster {

  def main(args: Array[String]): Unit = {

    // 1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("DataToMysqlForMaster").getOrCreate()

    // 2.读取数据
    val data: RDD[String] = spark.sparkContext.textFile(args(0))

    // 3.切分每一行
    val arrRDD: RDD[Array[String]] = data.map(_.split(" "))

    // 4.RDD关联样例类
    val teacherRDD: RDD[TeacherMaster] = arrRDD.map(x=>TeacherMaster(x(0).toInt,x(1),x(2).toInt))

    // 5.导入隐式转换
    import spark.implicits._

    // 6.将RDD转换成Dataframe
    val teacherDF: Dataframe = teacherRDD.toDF()

    // 7.将Dataframe注册成表
    teacherDF.createOrReplaceTempView("teacher")

    // 8.操作teacher表 ,按照年龄进行降序排列
    val resultDF: Dataframe = spark.sql("select * from teacher order by age desc")
   // resultDF.show()

    // 9.把结果保存在mysql表中
    // 9.1.创建Properties对象,设置连接mysql的用户名和密码
    val prop: Properties =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","bigdata")

    // 9.2.操作数据库表
    
    resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.80.22:3306/spark","teacher",prop)

    // 释放资源
    spark.stop()

  }

}
3.2.2.2.打包

3.2.2.3.提交集群运行
#将文件上传到hdfs
hdfs dfs -mkdir -p /spark/sql/input

hdfs dfs -put people.txt /spark/sql/input

#提交执行任务
spark-submit 
--class cn.liny.db.DataToMysqlForMaster 
--master spark://node01:7077 
--executor-memory 1g 
--total-executor-cores 2 
--jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar 
--driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar 
/export/servers/testjar/spark_sql-1.0-SNAPSHOT.jar  
hdfs://192.168.53.100:8020/spark/sql/input/people.txt

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773918.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号