栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark学习之RDD与算子

spark学习之RDD与算子

spark学习之RDD

Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集

  • 累加器:分布式共享只写变量

  • 广播变量:分布式共享只读变量

什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

# 弹性
    - 存储的弹性:内存与磁盘的自动切换;
    - 容错的弹性:数据丢失可以自动恢复;
    - 计算的弹性:计算出错重试机制;
    - 分片的弹性:可根据需要重新分片。
# 分布式:数据存储在大数据集群不同节点上
# 数据集:RDD封装了计算逻辑,并不保存数据
# 数据抽象:RDD是一个抽象类,需要子类具体实现
# 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新	的RDD里面封装计算逻辑
# 可分区、并行计算

核心属性

执行原理

1.启动Yarn集群环境

2.Spark通过申请资源创建调度节点和计算节点

3.Spark框架根据需求将计算逻辑根据分区划分成不同的任务

4.调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

RDD的创建

主要的创建方式有两种:从集合(内存中创建),从外部存储(文件)中创建

从内存中创建

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

val sparkConf =
    new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
    List(1,2,3,4)
)
//这种方法用的多,其实底层就是parallelize()
val rdd2 = sparkContext.makeRDD(
    List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()

从文件中创建

由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、Hbase等。

val sparkConf =
    new SparkConf().setMaster("local[*]").setAppName("spark")

val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input") //主持目录或者通配符pihao*.txt
fileRDD.collect().foreach(println)
sparkContext.stop()
RDD并行度与分区

默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

内存创建RDD分区策略

def main(args: Array[String]): Unit = {

      val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
      // conf.set("spark.default.parallelism", "4")
      val sc = new SparkContext(conf)

      // TODO 从内存中创建RDD - 分区

      // 1. 如果构建RDD时,没有指定数据处理分区的数量,那么会使用默认分区数量
      // makeRDD方法存在第二个参数,这个参数表示分区数量numSlices(存在默认值)
      // scheduler.conf.getInt("spark.default.parallelism", totalCores)
      // totalCores : 当前Master环境的总(虚拟)核数
      // 分区设置的优先级 : 方法参数 > 配置参数 > 环境配置
      
    
        // kafka生产者分区策略
        // 【1,3,5】【2,4】 : 轮询(2个分区)
        // 【1,2, 3】【5,4】 :范围(2个分区)
        // 【1,2】【3,4】【5】 :范围(3个分区)
        // Spark分区策略
        // 【1,2】【3,4,5】 :范围(2个分区)
        // 【1】【2,3】【4,5】:范围(3个分区)
      val rdd1 : RDD[Int] = sc.makeRDD(
        Seq(1,2,3,4,5), 3
      )

      // saveAsTextFile方法可以生成分区文件
      rdd1.saveAsTextFile("output2")
      sc.stop()

    }

文件创建RDD分区策略

def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    // TODO 从文件中创建RDD

    // textFile方法可以在读取文件时,设定分区
    // 设定分区时,应该传递第二个参数,如果不设定,存在默认值
    //    math.min(defaultParallelism, 2)
    // 第二个参数表示最小分区数,所以最终的分区数量可以大于这个值的。

    // TODO 1. spark读取文件其实底层就是hadoop读取文件
    // TODO 2. spark的分区数量其实就来自于hadoop读取文件的切片

    // (想要切片数量)numSplits = 2
    // totalSize = 7
    // (预计每个分区的字节大小)goalSize = totalSize / numSplits = 7 / 2 = 3
    // splitSize = Math.max(minSize(1), Math.min(goalSize(5G), blockSize(128M))) = 128;
    // 7 / 3 = 2...1 = 2 + 1 = 3
    val rdd = sc.textFile("data/word.txt", 2)

    rdd.saveAsTextFile("output")


    sc.stop()

  }
 TODO 1. 分区数据的处理也是由Hadoop决定的。
 TODO 2. hadoop在计算分区时会处理数据时的逻辑不一样。
 TODO 3. Spark读取文件数据底层使用的就是hadoop读取的,所以读取规则用的是hadoop
         3.1 hadoop读取数据是按行读取的,不是按字节读取
         3.2 hadoop读取数据是偏移量读取的
         3.3 hadoop读取数据时,不会重复读取相同的偏移量


	原始数据(13个字节):
			123
			456
			789
         1. 计算有多少个分区?
            13 / 3 = 4 (每个分区放4个字节)
            13 / 4 = 3...1 = 3 + 1 = 4 (最终确定4个分区)
         2. 计算每个分区放什么数据?

            123@@ => 01234
            456@@ => 56789
            789   => 101112
            ************************************
			计算读取偏移量(每个分区4个字节),从0开始,总共13个字节
            [0, 4] => [123] (0~4,第一行读完)
            [4, 8] => [456]	(4已经被读了,从5开始,5-8,但是读就会读一行,9也被读了)
            [8, 12] => [789](9被读了,从10开始)
            [12, 13] => []  (没数据了)


RDD算子

将RDD的方法称为算子的原因是为了和Scala集合的方法进行区分,RDD的方法很多,但是一般分为两大类:

逻辑封装,将旧的逻辑转换为新的逻辑,称之为转换算子

执行逻辑,将封装好的逻辑开始执行,让整个作业运行起来,称之为行动算子

RDD转换算子

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

Value类型 map
//使用案例
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - map
        val rdd = sc.makeRDD(List(1,2,3,4))

        // map算子表示将数据源中的每一条数据进行处理
        // map算子的参数是函数类型: Int => U(不确定)
        def mapFunction( num : Int ): Int = {
            num * 2
        }

        // A => B 把旧的RDD转换成新的RDD,这个代码不会执行
        //val rdd1: RDD[Int] = rdd.map(mapFunction)
        val rdd1: RDD[Int] = rdd.map(_ * 2)

        rdd1.collect().foreach(println) //collect()是行动算子,会执行


        sc.stop()

    }
//存在分区的情况:在RDD进行转换时,新的RDD分区数量与旧的RDD分区数量保持一致
//数据在处理过程中,默认的分区不变,原来在哪个分区,现在也在哪个分区
//数据在处理过程中,遵循执行的顺序:分区内有序,分区间无序,前提是多线程
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // 设置两个分区: 【1,2】,【3,4】
        val rdd = sc.makeRDD(List(1,2,3,4), 2)

    // TODO 分区 - 2
    // 数据执行顺序
    //rdd就是逻辑的封装,如果有多个Rdd的话,那么第一条数据应该是所有的逻辑执行完毕后,才执行下一条数据	
        val rdd1: RDD[Int] = rdd.map(
            num => {
                println("********* num = " + num) 
                num
            }
        )

        val rdd2: RDD[Int] = rdd1.map(
            num => {
                println("######## num = " + num)
                num
            }
        )
		
        rdd2.saveAsTextFile("output")
        sc.stop()

    }

小练习

从服务器日志数据apache.log中获取用户请求URL资源路径

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)
    val lineRDD = sc.textFile("data/apache.log")

    val url = lineRDD.map(
      line => {
        val data = line.split(" ")
        data(6)
      }
    )
    url.collect().foreach(println)

  }
mapPartitions

以一个分区的数据进行转换,有点类似批处理的感觉,将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

mapPartitions的入参函数的入参和返回值类型都要求是可迭代的对象

//可以过滤数据
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
    datas => {
        datas.filter(_==2)
    }
)
//以分区的数据做转换
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 -
        val rdd = sc.makeRDD(List(1,2,3,4), 2)

        val rdd1 = rdd.mapPartitions(
            list => {
                println("*********************")  //走两遍,如果是map的话要走4遍
                list.map(_*2)
            }
        )

        rdd1.collect().foreach(println)
        sc.stop()

    }

小练习

获取每个分区的最大值

def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    // TODO 算子 - 转换 -
    val rdd = sc.makeRDD(List(1,2,3,4), 2)
    // 获取每个数据分区的最大值
    // 【1,2】【3,4】
    val rdd1 = rdd.mapPartitions(
      list => {
        val max = list.max
        List(max).iterator //包装转换成iterator类型
      }
    )
    rdd1.collect.foreach(println)
    sc.stop()

  }

map和mapPartitions的区别

#数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

#功能的角度
Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

#性能的角度
Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。

#完成比完美更重要
mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

假如有三个分区,我只要想第二个分区的数量,要怎么做呢

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)
		//	  0       1	       2 分区索引从0开始
        // 【1,2】,【3,4】,【5,6】

        val rdd1 = rdd.mapPartitionsWithIndex(
            (ind, list) => {
                if ( ind == 1 ) {
                    list
                } else {
                    Nil.iterator
                }
            }
        )
    
        rdd1.collect().foreach(println)
        sc.stop()
    }
flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 扁平化
        val rdd = sc.makeRDD(
            List(
                "Hello Scala", "Hello Spark"
            )
        )
        val rdd1 = sc.makeRDD(
            List(
                List(1,2), List(3,4)
            )
        )

        // 整体 => 个体
        //val rdd2 = rdd.flatMap(_.split(" "))
        val rdd2 = rdd.flatMap(
            str => { // 整体(1)
                // 容器 接受-->(个体(N))
                str.split(" ")
            }
        )

        val rdd3 = rdd1.flatMap(
            list => {
                list   //这里的两个list的含义不同
            }
        )

        rdd3.collect().foreach(println)


        sc.stop()

    }

小案例

将List(List(1,2),3,List(4,5))进行扁平化操作

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(List(1,2),3,List(4,5))
        )

        val rdd1 = rdd.flatMap { //数据类型不一致,使用模式匹配来做
            case list : List[_] => list
            case other => List(other)
        }

        rdd1.collect.foreach(println)
        sc.stop()

    }
glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        //两分区 123,456
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)

        val rdd1: RDD[Array[Int]] = rdd.glom() //类型Array[Int]

        rdd1.collect().foreach(a => println(a.mkString(",")))// 123   456
        sc.stop()

}

小案例

计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 扁平化
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)

        val rdd1: RDD[Array[Int]] = rdd.glom()

        val rdd2: RDD[Int] = rdd1.map(_.max)

        println(rdd2.collect().sum) // 9
        sc.stop()

    }
groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。

一个组的数据必须在一个分区中,但是并不是说一个分区中只有一个组(假如只有一个分区,多个组都放)

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test") //落盘会一闪而过
        val sc = new SparkContext(conf)

        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        // groupBy算子根据函数计算结果进行分组。
        // groupBy算子执行结果为KV数据类型
        // k是为分组的标识, v就是同一个组的数据集合
        val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
        // 1 => 1
        // 2 => 0
        // 3 => 1
        // 4 => 0
        // 5 => 1
        // 6 => 0

        sc.stop()
}

//分组后,一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称为Shuffle
//shuffle操作不允许在内存中等待,必须落盘

shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据,写数据的过程没有完成不允许读数据

shuffle的操作是可以更改分区的。会浪费资源你懂的

小练习1

将List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组

从服务器日志数据apache.log中获取每个时间段访问量

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

//		  将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组    
//        val rdd = sc.makeRDD(
//            List("Hello", "hive", "hbase", "Hadoop")
//        )
//
//        val rdd1 = rdd.groupBy(_.substring(0, 1).toUpperCase())
//        rdd1.collect.foreach(println)

    
    //178.152.18.59 - - 19/05/2015:04:05:08 +0000 GET /reset.css
        // 从服务器日志数据apache.log中获取每个时间段访问量
        // (10, 100), (11, 101)
        val lines = sc.textFile("data/apache.log")

        // (time, List((time,1),(time, 1)))
        // TODO groupBy算子可以实现 WordCount ( 1 / 10 )
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map(
            lines => {
                val datas = lines.split(" ")
                val time = datas(3)
                val times = time.split(":")
                (times(1), 1)
            }
        ).groupBy(_._1)

        val timeCnt: RDD[(String, Int)] = groupRDD.mapValues(_.size)
        timeCnt.collect.foreach(println)

        sc.stop()
    }
filter

按照指定的规则对每一条数据筛选过滤

true为保留,false丢弃

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        // filter算子可以按照指定的规则对每一条数据进行筛选过滤
        // 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
        val rdd1 = rdd.filter(
            num => num % 2 == 1
        )

        rdd1.collect.foreach(println)
        sc.stop()

    }

小案例

从服务器日志数据apache.log中获取2015年5月17日的请求路径

178.152.18.59 - - 19/05/2015:04:05:08 +0000 GET /reset.css

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换
        val lines = sc.textFile("data/apache.log")

        val filterLines = lines.filter(
            line => {
                //line.contains("17/05/2015")
                val datas = line.split(" ")
                val time = datas(3)
                time.startsWith("17/05/2015")
            }
        )

        val r = filterLines.map(
            line => {
                val datas = line.split(" ")
                datas(6)
            }
        )

        r.collect().foreach(println)
        sc.stop()

    }
sample

根据指定的规则从数据集中抽取数据

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换
        val rdd : RDD[Int] = sc.makeRDD(1 to 10)

        // 抽取数据,采样数据
        // 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
        // 第二个参数和第一个参数有关系
        //     如果抽取不放回的场合:参数表示每条数据被抽取的几率
        //     如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
        // 第三个参数是【随机数】种子
        //     随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
        //     3 = xxxxx(10)
        //     7 = xxxxx(3)
        //val rdd1: RDD[Int] = rdd.sample(false, 0.5)
        //val rdd1: RDD[Int] = rdd.sample(true, 2)
        val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
        rdd1.collect.foreach(println)


        sc.stop()

    }
coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 缩减分区
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4,5,6), 3
        )

        // 缩减 (合并), 默认情况下,缩减分区不会shuffle,还可能把两个数据多的分区合并了
        //val rdd1: RDD[Int] = rdd.coalesce(2)
        // 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
        val rdd2: RDD[Int] = rdd.coalesce(2, true)

        rdd.saveAsTextFile("output")
        rdd2.saveAsTextFile("output1")
        sc.stop()

    }
distinct

去重

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,1,1)
        )

        // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        // 【1,1,1】
        // 【(1, null),(1, null),(1, null)】
        // 【null, null, null】
        // 【null, null】
        // 【(1, null)】
        // 【1】
        val rdd1: RDD[Int] = rdd.distinct() // 存在shuffle,因此distinct()可以传入分区数
        rdd1.collect.foreach(println)

        //List(1,1,1,1,1).distinct  与单点集合的去重方法区分开
        sc.stop()

    }
repartition

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 缩减分区
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4,5,6), 2
        )

        // 扩大分区 - repartition
        // 在不shuffle的情况下,coalesce算子扩大分区是没有意义的。
        //val rdd1: RDD[Int] = rdd.coalesce(3, true)

        val rdd1: RDD[Int] = rdd.repartition(3) // shuffle默认为true


        rdd.saveAsTextFile("output")
        rdd1.saveAsTextFile("output1")
        sc.stop()

    }
sortBy

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 排序
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,4,3,2,6,5),2
        )
		
    	//肯定存在shuffle,刚开始是143,265,现在排序后变成了123,456
        val rdd1: RDD[Int] = rdd.sortBy(num => num, false) //false表示降序
        println(rdd1.collect.mkString(",")) // 1,2,3,4,5,6
        sc.stop()

    }
双Value类型

intersection,union,intersection,zip

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - 排序
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4),2
        )

        val rdd1 : RDD[Int] = sc.makeRDD(
            List(3,4,5,6),2
        )
        val rdd2 : RDD[String] = sc.makeRDD(
            List("3","4","5", "6"),2
        )
        // 交集
        //println(rdd.intersection(rdd1).collect().mkString(","))
        // 并集
        //println(rdd.union(rdd1).collect().mkString(","))    //1234445
        // 差集
        //println(rdd.subtract(rdd1).collect().mkString(","))
		// 交集并集差集的类型必须一样
		

        // 拉链:前提条件分区数量相等,每个分区的中的元素个数相同
        // 英文翻译:
        // Can only zip RDDs with same number of elements in each partition
        // Can't zip RDDs with unequal numbers of partitions: List(2, 3)
        //println(rdd.zip(rdd1).collect().mkString(","))
        println(rdd.zip(rdd2).collect().mkString(","))  //类型可以不一样

        sc.stop()

    }
Key-Value类型 partitionBy

将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - K-V
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4),2
        )
        rdd.sortBy(num=>num)
        //rdd.partitionBy(null);

        // partitionBy: 算子是根据指定的规则对每一条数据进行重分区
        // repartition : 强调分区数量的变化,数据怎么变不关心
        // partitionBy : 关心数据的分区规则

        val rdd1: RDD[(Int, Int)] = rdd.map((_, 1))

        // 下面调用RDD对象的partitionBy方法一定会报错。
        // 二次编译(隐式转换)
        // RDD => PairRDDFunctions
        // HashPartitioner是Spark中默认shuffle分区器,也就是hashcode多数量取余
        rdd1.partitionBy(new HashPartitioner(2)).saveAsTextFile("output");

        sc.stop()

    }
reduceByKey

可以将数据按照相同的Key对Value进行聚合

reduceByKey可以在shuffle之前,对分区内的数据进行预聚合,称之为combine,稍后会讲

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - reduceByKey
        val rdd : RDD[(String, Int)] = sc.makeRDD(
            List(
                ("a", 1),
                ("a", 1),
                ("a", 1)
            )
        )

        // reduceByKey算子的作用,是将相同的key的value分在一个组中,然后进行reduce操作
        // TODO reduceByKey可以实现WordCount ( 2 / 10 )
        val wordCount: RDD[(String, Int)] = rdd.reduceByKey(_ + _)

        wordCount.collect.foreach(println)

        sc.stop()

    }
groupByKey

将数据源的数据根据key对value进行分组

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 算子 - 转换 - groupByKey
        val rdd : RDD[(String, Int)] = sc.makeRDD(
            List(
                ("a", 1),
                ("a", 1),
                ("a", 1)
            )
        )
        val value: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)

        // TODO groupByKey & groupBy
        // 1. groupBy不需要考虑数据类型,groupByKey必须保证数据kv类型
        // 2. groupBy按照指定的规则进行分组,groupByKey必须根据key对value分组
        // 3. 返回结果类型
        //    groupByKey => (String, Iterable[Int])  、key作为键
        //    groupBy    => (String, Iterable[(String, Int)]) 、 那个指定规则的结果为键

        // groupByKey算子将相同key数据的value分在一个组中
        // TODO groupByKey也可以实现 WordCount ( 3 / 10 )
        val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()

        val rdd2: RDD[(String, Int)] = rdd1.mapValues(_.size)

        rdd2.collect.foreach(println)


        sc.stop()

    }

思考一个问题:reduceByKey和groupByKey的区别?

# 从shuffle的角度:
reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。

# 从功能的角度:
reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 取出每个分区内相同key的最大值然后分区间相加
        // 【(a,1),(a,2),(b,3)】
        //      => 【(a, 2), (b, 3)】
        //                => 【 (a, 8), (b, 8) 】
        //      => 【(b, 5), (a, 6)】
        // 【(b,4),(b,5),(a,6)】
        val rdd = sc.makeRDD(
            List(
                ("a",1),("a",2),("b",3),
                ("b",4),("b",5),("a",6)
            ),
            2
        )

        // aggregateByKey算子存在函数柯里化
        // 第一个参数列表中有一个参数
        //     参数为零值,表示计算初始值 zero, z, 用于数据进行分区内计算
        // 第二个参数列表中有两个参数
        //     第一个参数表示 分区内计算规则
        //     第二个参数表示 分区间计算规则
        val rdd1 = rdd.aggregateByKey(5)(
            (x, y) => {
                math.max(x, y)
            },
            (x, y) => {
                x + y
            }
        )

        rdd1.collect.foreach(println)
        sc.stop()

    }
foldByKey

当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

val rdd = sc.makeRDD(
            List(
                ("a",1),("a",2),("b",3),
                ("b",4),("b",5),("a",6)
            ),
            2
        )

// TODO aggregateByKey也可以实现WordCount ( 4 / 10 )
val rdd2 = rdd.aggregateByKey(0)(_+_, _+_)

// TODO foldByKey也可以实现WordCount ( 5 / 10 )
// TODO 如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化
val rdd3 = rdd.foldByKey(0)(_+_)

rdd3.collect.foreach(println)
combineByKey
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        // TODO 求每个key的平均值
        val rdd = sc.makeRDD(
            List(
                ("a", 1), ("a", 2), ("b", 3),
                ("b", 4), ("b", 5), ("a", 6)
            ),
            2
        )
        val rdd1 = sc.makeRDD(
            List(
                ("a", (1,1)), ("a", 2), ("b", (3,1)),
                ("b", (4,1)), ("b", 5), ("a", (6,1))
            ),
            2
        )
    	// 这几种算子都做不到
        // groupByKey() => total / cnt = avg
        // reduceByKey() => total / cnt = avg
        // aggregateByKey(z)(f1, f2) => total / cnt = avg
        // foldByKey(z)(f1) => total / cnt = avg
        // (a, 3)(b, 4)

        // combineByKey算子有三个参数
        // 第一个参数表示: 当第一个数据不符合我们的规则时,用于进行转换的操作
        // 第二个参数表示: 分区内计算规则
        // 第三个参数表示: 分区间计算规则
        val rdd2 = rdd.combineByKey(
            num => (num, 1),
            (x : (Int, Int), y) => {
                (x._1 + y, x._2 + 1)
            },
            ( x : (Int, Int), y:(Int, Int) ) => {
                (x._1 + y._1, x._2 + y._2)
            }
        )
        rdd2.collect.foreach(println)


        sc.stop()

    }
sortByKey

在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(
                ("a", 2), ("a", 1), ("c", 3), ("b", 4)
            )
        )

        //  ("a", 2) ("a", 1)("b", 4) ("c", 3)
        // sortByKey算子就是按照key排序,不管value的值
        val rdd1: RDD[(String, Int)] = rdd.sortByKey()

        rdd1.collect.foreach(println)

        sc.stop()

    }

设置key为自定义类User

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(
                (new User(), 2), (new User(), 1), (new User(), 3), (new User(), 4)
            )
        )

        // sortByKey算子就是按照key排序,不管value的值
        val rdd1: RDD[(User, Int)] = rdd.sortByKey(false)

        rdd1.collect.foreach(println)
        sc.stop()

    }

	//排序的话必须实现一个Ordered特质
    class User extends Ordered[User]{
        override def compare(that: User): Int = {
            1
        }
    }
join
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.local.dir", "e:/test")
        val sc = new SparkContext(conf)

        val rdd1 = sc.makeRDD(
            List(
                ("a", 1),  ("b", 2), ("c", 3)
            )
        )
        val rdd2 = sc.makeRDD(
            List(
                ("a", 4),  ("d", 5), ("c", 6)
            )
        )

        // spark中join操作主要针对于两个数据集中相同的key的数据连接
        // join操作可能会产生笛卡尔乘积,可能会出现shuffle,性能比较差
        // 所以如果能使用其他方式实现同样的功能,不推荐使用join
        //val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)  |(a,(1,4))(c,(3,6))

        // 主,从表
        //val rdd3 = rdd1.leftOuterJoin(rdd2)
        //val rdd4 = rdd1.rightOuterJoin(rdd2)
        //val rdd5 = rdd1.fullOuterJoin(rdd2)
    
        // connect + group 比较重要
        val rdd6 = rdd1.cogroup(rdd2)

        //rdd3.collect.foreach(println)
        println("**********************")
        //rdd4.collect.foreach(println)
        println("**********************")
        //rdd5.collect.foreach(println)
        println("**********************")
        rdd6.collect.foreach(println)

//(a,(CompactBuffer(1),CompactBuffer(4, 5, 6)))
//(b,(CompactBuffer(2),CompactBuffer())) 
//(c,(CompactBuffer(3),CompactBuffer()))

    
        sc.stop()

    }
实操案例
  1. 数据准备

    agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

  2. 需求描述

    统计出每一个省份每个广告被点击数量排行的Top3

  3. 需求分析

  4. 功能实现

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
        val sc = new SparkContext(conf)

        // TODO 统计出每一个省份每个广告被点击数量排行的Top3

        // TODO 1. 读取数据文件,获取原始数据
        //      1516609143867 6 7 64 16
        val lines = sc.textFile("data/agent.log")

        // TODO 2. 将原始数据进行结构的转换
        //     line => ((省份,广告),1)
        val wordToOne = lines.map(
            line => {
                val datas = line.split(" ")
                ( (datas(1), datas(4)), 1 )
            }
        )

        // TODO 3. 将转换结构后的进行统计
        //     ((省份,广告),1) => ((省份,广告),sum)
        val wordToSum = wordToOne.reduceByKey(_+_)

        // TODO 4. 将统计结果进行结构的转换,将省份独立出来
        //     ((省份,广告),sum) => (省份,(广告,sum))
        val wordToTuple = wordToSum.map {
            case ( (prv, adv), sum ) => {
                ( prv, (adv, sum) )
            }
        }

        // TODO 5. 将数据按照省份进行分组
        //    (省份,List[(广告,sum), (广告1,sum1), (广告2,sum2)])
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToTuple.groupByKey()

        // TODO 6. 将分组后的数据,根据点击数量进行排行(降序)
        // TODO 7. 将排序后的数据取前3
        val top3 = groupRDD.mapValues(
            iter => {
                iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
            }
        )

        // TODO 8. 将结果采集后打印在控制台上
        top3.collect.foreach(println)

        sc.stop()

    }

另一种实现方式,先省份分组,再做广告的统计,排序

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
        val sc = new SparkContext(conf)

        // TODO 统计出每一个省份每个广告被点击数量排行的Top3

        // TODO 1. 读取数据文件,获取原始数据
        //      1516609143867 6 7 64 16
        val lines = sc.textFile("data/agent.log")

        // TODO 2. 将原始数据进行结构的转换
        //     line => (省份,(广告,1))
        val wordToOne = lines.map(
            line => {
                val datas = line.split(" ")
                (datas(1), (datas(4), 1 ))
            }
        )

        val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupByKey()

        val top3 = groupRDD.mapValues(
            iter => {
                val wordCountMap: Map[String, Int] = iter.groupBy(_._1).mapValues(_.size)
                wordCountMap.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
            }
        )

        top3.collect.foreach(println)
        sc.stop()

    }

以上两个例子,推荐使用第一种方式,先统计分析,然后再做分组,这样性能获得到极大的提升。

也就是先reduceByKey,而不是GroupByKey

局部排序与全局排序

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("Req")
        val sc = new SparkContext(conf)

        // TODO 统计出每一个省份每个广告被点击数量排行的Top3

        // TODO 1. 读取数据文件,获取原始数据
        //      1516609143867 6 7 64 16
        val lines = sc.textFile("data/agent.log")

        // TODO 2. 将原始数据进行结构的转换
        //     line => ((省份,广告),1)
        val wordToOne = lines.map(
            line => {
                val datas = line.split(" ")
                ( (datas(1), datas(4)), 1 )
            }
        )

        // TODO 3. 将转换结构后的进行统计
        //     ((省份,广告),1) => ((省份,广告),sum)
        val wordToSum = wordToOne.reduceByKey(_+_)

        // TODO 4. 将统计结果进行结构的转换,将省份独立出来
        //     ((省份,广告),sum) => (省份,(广告,sum))
        val wordToTuple = wordToSum.map {
            case ( (prv, adv), sum ) => {
                ( prv, (adv, sum) )
            }
        }

        // TODO 5. 将数据按照省份进行分组
        //    (省份,List[(广告,sum), (广告1,sum1), (广告2,sum2)])
        //val groupRDD: RDD[(String, Iterable[(String, Int)])] = wordToTuple.groupByKey()

        // TODO 6. 将分组后的数据,根据点击数量进行排行(降序)
        // TODO 7. 将排序后的数据取前3
//        val top3 = groupRDD.mapValues(
//            iter => {
//                iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
//            }
//        )
        // 【 (a, 1), (a, 2), (a, 3) 】 => 【(a, 2), (a, 3) 】
        // 【 (a, 4), (a, 5), (a, 6) 】 => 【 (a, 5), (a, 6)】
        // 【 (a, 7), (a, 8), (a, 9) 】 => 【(a, 8), (a, 9)】 => 【(a, 8), (a, 9)】
        // 分区内排序取前3名
        // 分区间排序取前3名
        // 【(广告,sum), (广告,sum), (广告,sum), (广告,sum)】 
    	//上面的toList很消耗内存,这种方式极大节省内存,分区内排序,然后再分区间排序
        val top3 = wordToTuple.aggregateByKey(ArrayBuffer[(String, Int)]())(
            (buff, t) => {
                buff.append(t)
                buff.sortBy(_._2)(Ordering.Int.reverse).take(3)
            },
            ( buff1, buff2 ) => {
                buff1.appendAll(buff2)
                buff1.sortBy(_._2)(Ordering.Int.reverse).take(3)
            }
        )

        // TODO 8. 将结果采集后打印在控制台上
        top3.collect.foreach(println)

        sc.stop()

    }
RDD行动算子

Spark RDD方法分为2大类,其中一个是转换算子,一个为行动算子

行动算子在被调用时,会触发Spark作业的执行,collect算子就是行动算子,行动算子执行时,会构建新的作业

reduce,collect,count,first,take,tabkeOrdered
def main(args: Array[String]): Unit = {

    // 一个应用程序, Driver程序
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(conf)

    // TODO 算子 - 行动
    val rdd = sc.makeRDD(List(1,4,3,2),2)

    // reduce算子
    val i: Int = rdd.reduce(_ + _)
    println(i) //10

    // 将数据从Executor端采集回到Driver端
    // collect会将数据全部拉取到Driver端的内存中,形成数据集合,可能会导致内存溢出
    val ints: Array[Int] = rdd.collect()
    println(ints.mkString(",")) //1,4,3,2

    val l: Long = rdd.count() 
    println(l)//4

    val i1: Int = rdd.first()
    println(i1) //1

    val ints1: Array[Int] = rdd.take(3)
    println(ints1.mkString(","))//1,4,3

    // 【1,2,3】
    val ints2: Array[Int] = rdd.takeOrdered(3) 
    println(ints2.mkString(",")) //1,2,3 排完序后取3个

    sc.stop()

  }
aggregate

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(List(1,4,3,2),2)
		val x: Int = rdd.aggregate(0)(_ + _, _ + _) // 10
    
        // aggregate & aggregateByKey的区别?
        // 1. 数据格式
        // 2. aggregateByKey是一个转换算子,所以执行后会产生新的RDD
        //    aggregate是一个行动算子,所以执行后会得到结果
        // 3. aggregateByKey执行计算时,初始值只会参与分区内计算
        //    aggregate执行计算时,初始值会参与分区内计算,也会参与分区间的计算
        //  【1,4】,【3,2】
        //  【5,1,4】,【5,3,2】
        // 【10】【10】
        // 【5, 10, 10】
        val i: Int = rdd.aggregate(5)(_ + _, _ + _) // 25
        val j: Int = rdd.fold(5)(_ + _)
        val k: Int = rdd.reduce(_ + _)

        println(i)

        sc.stop()

}
countByKey
def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(List(1,4,3,2),2)

        // countByKey算子表示相同key出现的次数
        val rdd1: RDD[(String, Int)] = rdd.map(("a", _))
        // (a, 1), (a, 4), (a, 3), (a, 2)
        // (a, 4) => (a, 1),(a, 1),(a, 1),(a, 1)
        // (a, 10)

        // TODO countByKey算子可以实现 WordCount (7 / 10)
        val map: collection.Map[String, Long] = rdd1.countByKey()
        println(map)

        sc.stop()

    }
countByValue

统计每个值出现的次数

def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        //val rdd = sc.makeRDD(List(1,1,1,1,2,2,3),2)  (1,4次),(2,两次)
        val rdd = sc.makeRDD(List(
            ("a", 2), ("a", 3)
        ),2)

        // countByKey算子表示相同key出现的次数
        //val rdd1: RDD[(String, Int)] = rdd.map(("a", _))

        // countByValue中Value不是KV键值对中的v的意思
        // 单value,双value,K-V
        // TODO countByValue可以实现 WordCount (8 / 10)

        // ("a", 2) => "a", "a"
        // ("a", 3) => "a", "a", "a"

        // ( a, 5 )
        val map = rdd.countByValue()

        println(map)
        sc.stop()

}
save
def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(List(
            ("a", 2), ("a", 3)
        ),2)

        rdd.saveAsTextFile("output")
        rdd.saveAsObjectFile("output1")
        rdd.saveAsSequenceFile("output2")

        sc.stop()
}
foreach
def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(
            List(1,4,3,2),2
        )

        // collect是按照分区号码进行采集,collect会按照分区号采集,先采集0号分区
        rdd.collect.foreach(println) //这个是数组的方法,单点的内存操作 1432
        println("****************************")
        rdd.foreach(println) //这个是rdd的,是算子(数据可能打乱,分布式的)

        sc.stop()
}
RDD序列化

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变

闭包检测
def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(
            List[Int](),2
        )

        val user = new User() // Driver
        rdd.foreach(
            num => {
                println(user.age + num) // Executor
            }
        )

        // Scala语法 : 闭包
        // Spark在执行算子时,如果算子的内部使用了外部的变量(对象),那么意味着一定会出现闭包
        // 在这种场景中,需要将Driver端的变量通过网络传递给Executor端执行,这个操作不用执行也能判断出来
        // 可以在真正执行之前,对数据进行序列化校验,
        // Spark在执行作业前,需要先进行闭包检测功能。

        sc.stop()

    }
    class User {
        val age = 30
    }
def main(args: Array[String]): Unit = {

        // 一个应用程序, Driver程序
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(conf)

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(List(
            "Hello", "Hive", "Spark", "Scala"
        ))

        val s = new Search("S")
        s.filterByQuery(rdd).foreach(println)

        sc.stop()
    }

// 加上case也不会报错,因为样例类中默认会添加很多方法,其中旧继承了序列化
    class Search( q : String )  {
        def filterByQuery( rdd : RDD[String] ): RDD[String] = {
            // 算子外 -> Driver
            // 算子内 -> Executor
            val s : String = this.q; // 
            rdd.filter(_.startsWith(s) ) //这种方式不会报错,如果直接是_.startwith(q)报错
        }
    }


// 先讲一个scala语法,如果一个构造参数在一个普通方法中调用,那么这个参数会被编译为这个类的属性
    class Test( name:String ) {
        
        def test(): Unit = {
            println(name)  //this.name
        }
    }
Kryo序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

注意:即使使用Kryo序列化,也要继承Serializable接口。

RDD依赖关系 RDD血缘关系

相邻两个RDD之间的关系,称之为依赖关系,多个连续的依赖关系称之为血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。相当于一个容错机制

def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("data/word.txt")
    println(lines.toDebugString)
    println("*******************************")

    val words = lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("*******************************")

    val wordToOne = words.map((_,1))
    println(wordToOne.toDebugString)
    println("*******************************")

    val wordToCount = wordToOne.reduceByKey(_+_)
    println(wordToCount.toDebugString)
    println("*******************************")

    wordToCount.collect().foreach(println)

    sc.stop()

  }

=============================================
(3) data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
 |  data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
 |  data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
 |  data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) MapPartitionsRDD[3] at map at Test1.scala:20 []
 |  MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
 |  data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
 |  data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
(3) ShuffledRDD[4] at reduceByKey at Test1.scala:24 []
 +-(3) MapPartitionsRDD[3] at map at Test1.scala:20 []
    |  MapPartitionsRDD[2] at flatMap at Test1.scala:16 []
    |  data/word.txt MapPartitionsRDD[1] at textFile at Test1.scala:12 []
    |  data/word.txt HadoopRDD[0] at textFile at Test1.scala:12 []
*******************************
依赖关系

依赖关系分为两类:

窄依赖(OneToOneDependency)

宽依赖(ShuffleDependency)

上游RDD的一个分区的数据被下游的RDD的一个分区所独享,称之为窄依赖
上游RDD的一个分区的数据被下游的RDD的多个分区所共享,称之为宽依赖,会打乱,shuffle
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("data/word.txt")
    println(lines.dependencies)
    println("*******************************")

    val words = lines.flatMap(_.split(" "))
    println(words.dependencies)
    println("*******************************")

    val wordToOne = words.map((_,1))
    println(wordToOne.dependencies)
    println("*******************************")

    val wordToCount = wordToOne.reduceByKey(_+_)
    println(wordToCount.dependencies)
    println("*******************************")

    wordToCount.collect().foreach(println)

    sc.stop()
  }

==================================================
List(org.apache.spark.OneToOneDependency@18d910b3)
*******************************
List(org.apache.spark.OneToOneDependency@59dc36d4)
*******************************
List(org.apache.spark.OneToOneDependency@4d41ba0f)
*******************************
List(org.apache.spark.ShuffleDependency@2daf06fc)
*******************************
RDD持久化

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

def main(args: Array[String]): Unit = {


        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val lines = sc.makeRDD(
            List("Hadoop Hive Hbase", "Spark scala Java")
        )
        val words = lines.flatMap(_.split(" "))
        val wordToOne = words.map(
            t => {
                println("*************************")
                (t, 1)
            }
        )
        // 设定数据持久化
        // cache方法可以将血缘关系进行修改,添加一个和缓存相关的依赖关系
        // cache操作不安全。
        wordToOne.cache()
        // 如果持久化的话,那么持久化的文件只能自己用。而且使用完毕后, 会删除
        wordToOne.persist(StorageLevel.DISK_ONLY_2)

        val wordToCount = wordToOne.reduceByKey(_+_)
        println(wordToCount.toDebugString)
        wordToCount.collect()//.foreach(println)
        println("--------------------------------------------")
       // val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
       // rdd2.collect()
        println(wordToCount.toDebugString)

        sc.stop()

    }
RDD检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

def main(args: Array[String]): Unit = {


        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("cp") //设置检查点目录

        val lines = sc.makeRDD(
            List("Hadoop Hive Hbase", "Spark scala Java")
        )
        val words = lines.flatMap(_.split(" "))
        val wordToOne = words.map(
            t => {
                println("*************************")
                (t, 1)
            }
        )

        // Spark可以将中间的计算结果保存到检查点中,让其他的应用使用数据
        // Checkpoint directory has not been set in the SparkContext
        wordToOne.checkpoint()

        val wordToCount = wordToOne.reduceByKey(_+_)
        println(wordToCount.toDebugString)
        wordToCount.collect()//.foreach(println)
        println("--------------------------------------------")
       // val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
       // rdd2.collect()
        println(wordToCount.toDebugString)

        sc.stop()

    }
def main(args: Array[String]): Unit = {


        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("cp")

        val lines = sc.makeRDD(
            List("Hadoop Hive Hbase", "Spark scala Java")
        )
        val words = lines.flatMap(_.split(" "))
        val wordToOne = words.map(
            t => {
                println("*************************")
                (t, 1)
            }
        )

        // Spark可以将中间的计算结果保存到检查点中,让其他的应用使用数据
        // Checkpoint directory has not been set in the SparkContext
        // 检查点可以切断血缘关系。
        // 检查点为了数据的安全,会重新再执行一遍作业,所以会执行2次
        // 为了解决这个问题,可以将检查点和缓存联合使用
        wordToOne.cache()
        wordToOne.checkpoint()

        val wordToCount = wordToOne.reduceByKey(_+_)
       // println(wordToCount.toDebugString)
        wordToCount.collect()//.foreach(println)
        println("--------------------------------------------")
       val rdd2: RDD[(Int, Iterable[(String, Int)])] = wordToOne.groupBy(_._2)
       rdd2.collect()
        //println(wordToCount.toDebugString)

        sc.stop()

    }

缓存和检查点的区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

自定义分区器

def main(args: Array[String]): Unit = {


        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val lines = sc.makeRDD(
            List(
                ("nba", "xxxxxx"),
                ("cba", "11111"),
                ("nba", "yyyyy"),
                ("wnba", "22222")
            ),2
        )

        val rdd2: RDD[(String, String)] = lines.partitionBy(new MyPartitioner())

        rdd2.saveAsTextFile("output")

        sc.stop()

    }
    // 自定义分区器
    // 1. 继承Partitioner
    // 2. 重写方法
    class MyPartitioner extends Partitioner {
        // TODO 分区数量
        override def numPartitions: Int = {
            3
        }

        // TODO 根据数据的key返回所在的分区编号,从0开始
        override def getPartition(key: Any): Int = {
            key match {
                case "nba" => 0
                case "cba" => 1
                case "wnba" => 2
            }
        }
    }
RDD文件读取与保存
 def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        //val rdd1: RDD[String] = sc.textFile("output")
        //rdd1.collect().foreach(println)

        //val rdd1 = sc.objectFile[(String, Int)]("output1") 泛型(原来存进去是什么类型)
        //rdd1.collect().foreach(println)

        val rdd2 = sc.sequenceFile[String, Int]("output2") 泛型
        rdd2.collect().foreach(println)

        sc.stop()

    }
累加器

累加器:分布式共享只写变量

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(1,2,3,4),2
        )

        var sum = 0; // Driver
        rdd.foreach(
            num => {
                sum = sum + num; // Executor
            }
        )

        println(sum); // 0,这是Driver端的,发现Executor端的sum无法传回Driver
        sc.stop()

    }
def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(1,2,3,4),2
        )

        // 1.创建累加器
        val sum = sc.longAccumulator("sum")
        //sc.collectionAccumulator()
    	//sc.doubleAccumulator()

        rdd.foreach(
            num => {
                // 2.使用累计器
                sum.add(num)
            }
        )

        // 3.获取累加器的结果
        println(sum.value);

        sc.stop()

    }

自定义累加器完成wordCount

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val rdd = sc.makeRDD(
            List(
                "scala",
                "scala",
                "scala",
                "scala",
                "scala",
                "scala",
                "spark",
                "spark",
                "spark",
                "spark"
            )
        )

        // TODO 创建累加器
        val acc = new WordCountAccumulator()
        // TODO 向Spark进行注册
        sc.register(acc, "wordCount")

        rdd.foreach(
            word => {
                // TODO 将单词放入到累加器中
                acc.add(word)
            }
        )

        // TODO 获取累加器的累加结果
        println(acc.value)


        sc.stop()

    }
    // 自定义数据累加器
    // 1. 继承AccumulatorV2
    // 2. 定义泛型
    //    IN : String 		输入类型
    //    OUT : Map[K, V] 	输出类型
    // 3. 重写方法(3 + 3)
    class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {

        private val wcMap = mutable.Map[String,Int]()

        // 判断累加器是否为初始状态
        // copyAndReset must return a zero value copy
        // TODO 3. true
        override def isZero: Boolean = {
            wcMap.isEmpty
        }

        // TODO 1.
        override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
            new WordCountAccumulator()
        }

        // 重置累加器
        // TODO 2.
        override def reset(): Unit = {
            wcMap.clear()
        }

        // 从外部向累加器中添加数据
        override def add(word: String): Unit = {
            val oldCnt = wcMap.getOrElse(word, 0)
            wcMap.update(word, oldCnt + 1)
        }

        // 合并两个累加器的结果
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {

            other.value.foreach {
                case (word, cnt) => {
                    val oldCnt = wcMap.getOrElse(word, 0)
                    wcMap.update( word, oldCnt + cnt )
                }
            }
        }

        // 将结果返回到外部
        override def value: mutable.Map[String, Int] = wcMap
    }
广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(conf)

        val rdd1 = sc.makeRDD(
            List(
                ("a", 1), ("b", 2)
            )
        )

//        val rdd2 = sc.makeRDD(
//            List(
//                ("a", 3), ("b", 4)
//            )
//        )
        val map = mutable.Map[String, Int](
            ("a", 3), ("b", 4)
        )
    	//包装成广播变量,这是提供一个结构
        val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)


        val rdd3 = rdd1.map {
            case ( word, cnt ) => {
				//bcMap映射,但是存在性能问题,如果分区过多,存在大量的冗余,寻求一个共享的数据
                val cnt2 = bcMap.value.getOrElse(word, 0) 

                (word, (cnt, cnt2))
            }
        }

        rdd3.collect.foreach(println)

        sc.stop()

    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423034.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号