栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark(四)

spark(四)

spark+java数据结构
        • Kryo 序列化框架
        • RDD依赖关系
          • RDD血缘关系
          • RDD 依赖关系
          • RDD 窄依赖
          • RDD 宽依赖
          • RDD 阶段划分
          • 任务划分源码
  • 排序
    • 时间复杂度
      • 度量一个程序(算法)执行时间的两种方法
      • 常见复杂度
      • 平均时间复杂度和最坏时间复杂度
    • 空间复杂度
    • 冒泡排序
    • 选择排序
    • 插入排序
    • 希尔排序
    • 快速排序
    • 归并排序

Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也 比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度 是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型 已经在 Spark 内部使用 Kryo 来序列化。 注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。

object serializable_Kryo {
 def main(args: Array[String]): Unit = {
 val conf: SparkConf = new SparkConf()
 .setAppName("SerDemo")
 .setMaster("local[*]")
 // 替换默认的序列化机制
 .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
 // 注册需要使用 kryo 序列化的自定义类
 .registerKryoClasses(Array(classOf[Searcher]))
 val sc = new SparkContext(conf)
 val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
"atguigu", "hahah"), 2)
 val searcher = new Searcher("hello")
 val result: RDD[String] = searcher.getMatchedRDD1(rdd)
 result.collect.foreach(println)
 }
}
case class Searcher(val query: String) {
 def isMatch(s: String) = {
 s.contains(query)
 }
 def getMatchedRDD1(rdd: RDD[String]) = {
 rdd.filter(isMatch) 
 }
 def getMatchedRDD2(rdd: RDD[String]) = {
 val q = query
 rdd.filter(_.contains(q))
 }
}
RDD依赖关系 RDD血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。

package dep

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Dep1 {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext =new SparkContext(sparkConf)
    val lines: RDD[String] = sc.textFile("datas/word.txt")
    println(lines.toDebugString)
    println("******************")

    val words: RDD[String] =lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("******************")

    val wordToOne: RDD[(String, Int)] =words.map(word=>(word,1))
    println(wordToOne.toDebugString)
    println("******************")

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    println(wordToSum.toDebugString)
    println("******************")

    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()
  }

}

RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

package dep

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Dep1 {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc: SparkContext =new SparkContext(sparkConf)
    val lines: RDD[String] = sc.textFile("datas/word.txt")
    println(lines.toDebugString)
    println("******************")

    val words: RDD[String] =lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("******************")

    val wordToOne: RDD[(String, Int)] =words.map(word=>(word,1))
    println(wordToOne.toDebugString)
    println("******************")

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    println(wordToSum.toDebugString)
    println("******************")

    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()
  }

}

RDD 窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,

RDD 宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会 引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

任务划分源码
val tasks: Seq[Task[_]] = try {
 stage match {
 case stage: ShuffleMapStage =>
 partitionsToCompute.map { id =>
 val locs = taskIdToLocations(id)
 val part = stage.rdd.partitions(id)
 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
 taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, 
Option(jobId),
 Option(sc.applicationId), sc.applicationAttemptId)
 }
 case stage: ResultStage =>
 partitionsToCompute.map { id =>
 val p: Int = stage.partitions(id)
 val part = stage.rdd.partitions(p)
 val locs = taskIdToLocations(id)
 new ResultTask(stage.id, stage.latestInfo.attemptId,
 taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
 Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
 }
 }
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
 .findMissingPartitions(shuffleDep.shuffleId)
 .getOrElse(0 until numPartitions)
}

排序 时间复杂度

1)时间频度:一个算法执行所耗费的时间,从理论上是不能算出来的,必须上机运行测试才能知道。但我们不可能也没有必要对每个算法都上机测试,只需知道哪个算法花费的时间多,哪个算法花费的时间少就可以了。并且一个算法花费的时间与算法中语句的执行次数成正比例,哪个算法中语句执行次数多,它花费时间就多。一个算法中的语句执行次数称为语句频度或时间频度。记为T(n)。

2)时间复杂度: 在刚才提到的时间频度中,n称为问题的规模,当n不断变化时,时间频度T(n)也会不断变化。但有时我们想知道它变化时呈现什么规律。为此,我们引入时间复杂度概念。 一般情况下,算法中基本操作重复执行的次数是问题规模n的某个函数,用T(n)表示,若有某个辅助函数f(n),使得当n趋近于无穷大时,**T(n)/f(n)的极限值为不等于零的常数,则称f(n)是T(n)的同数量级函数。记作T(n)=**O(f(n)),称O(f(n))为算法的渐进时间复杂度,简称时间复杂度

度量一个程序(算法)执行时间的两种方法

1)事后统计的方法
这种方法可行, 但是有两个问题:一是要想对设计的算法的运行性能进行评测,需要实际运行该程序;二是所得时间的统计量依赖于计算机的硬件、软件等环境因素, 这种方式,要在同一台计算机的相同状态下运行,才能比较那个算法速度更快。

2)事前估算的方法
通过分析某个算法的时间复杂度来判断哪个算法更优.

常见复杂度

1)常数阶O(1)

无论代码执行了多少行,只要是没有循环等复杂结构,那这个代码的时间复杂度就都是O(1)

2)对数阶O(log2n)

在while循环里面,每次都将 i 乘以 2,乘完之后,i 距离 n 就越来越近了。假设循环x次之后,i 就大于 2 了,此时这个循环就退出了,也就是说 2 的 x 次方等于 n,那么 x = log2n也就是说当循环 log2n 次以后,这个代码就结束了。因此这个代码的时间复杂度为:O(log2n) 。 O(log2n) 的这个2 时间上是根据代码变化的,i = i * 3 ,则是 O(log3n) .

3)线性阶O(n)

这段代码,for循环里面的代码会执行n遍,因此它消耗的时间是随着n的变化而变化的,因此这类代码都可以用O(n)来表示它的时间复杂度

4)线性对数阶O(nlog2n)

线性对数阶O(nlogN) 其实非常容易理解,将时间复杂度为O(logn)的代码循环N遍的话,那么它的时间复杂度就是 n * O(logN),也就是了O(nlogN)

5)平方阶O(n^2)

平方阶O(n²) 就更容易理解了,如果把 O(n) 的代码再嵌套循环一遍,它的时间复杂度就是 O(n²),这段代码其实就是嵌套了2层n循环,它的时间复杂度就是 O(nn),即 O(n²) 如果将其中一层循环的n改成m,那它的时间复杂度就变成了 O(mn)

6)立方阶O(n^3)

7)k次方阶O(n^k)

8)指数阶O(2^n)

•常见的算法时间复杂度由小到大依次为:Ο(1)<Ο(log2n)<Ο(n)<Ο(nlog2n)<Ο(n2)<Ο(n3)<Ο(nk) <Ο(2n) ,随着问题规模n的不断增大,上述时间复杂度不断增大,算法的执行效率越低

平均时间复杂度和最坏时间复杂度

1)平均时间复杂度是指所有可能的输入实例均以等概率出现的情况下,该算法的运行时间。

2)最坏情况下的时间复杂度称最坏时间复杂度。一般讨论的时间复杂度均是最坏情况下的时间复杂度。 这样做的原因是:最坏情况下的时间复杂度是算法在任何输入实例上运行时间的界限,这就保证了算法的运行时间不会比最坏情况更长。

3)平均时间复杂度和最坏时间复杂度是否一致,和算法有关

空间复杂度

1)类似于时间复杂度的讨论,一个算法的空间复杂度(Space Complexity)定义为该算法所耗费的存储空间,它也是问题规模n的函数。

2)空间复杂度(Space Complexity)是对一个算法在运行过程中临时占用存储空间大小的量度。有的算法需要占用的临时工作单元数与解决问题的规模n有关,它随着n的增大而增大,当n较大时,将占用较多的存储单元,例如快速排序和归并排序算法就属于这种情况

3)在做算法分析时,主要讨论的是时间复杂度。从用户使用体验上看,更看重的程序执行的速度。一些缓存产品(redis, memcache)和算法(基数排序)本质就是用空间换时间.

冒泡排序
package Sort;

import java.text.SimpleDateFormat;
import java.util.Date;

public class BubbleSort {
    public static void main(String[] args) {

        //测试
        //创建一个80000个数据的随机数组
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }
        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:"+format.format(new Date()));

        bubbleSort(arr);

        System.out.println("排序后时间:"+format.format(new Date()));
        long time2=System.currentTimeMillis();

        System.out.println("总共用时"+(time2-time1)/1000+"秒");

    }

    //将冒泡排序 封装成方法
    public static void bubbleSort(int[] arr) {
        boolean flag = false;//标识变量
        //冒泡排序,时间复杂度O(n^2)
        for (int i = 0; i < arr.length - 1; i++) {
            for (int j = 0; j < arr.length - 1 - i; j++) {
                //如果前面的数比后面的大,则交换
                if (arr[j + 1] < arr[j]) {
                    flag = true;
                    int temp = arr[j + 1];
                    arr[j + 1] = arr[j];
                    arr[j] = temp;
                }
            }
            if (!flag) {
                break;
            } else {
                flag = false;//重置flag,进行下次的判断
            }
        }
    }
}

选择排序
package Sort;

import java.text.SimpleDateFormat;
import java.util.Date;

public class SelectSort {
    public static void main(String[] args) {
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }
        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:"+format.format(new Date()));

        selectSort(arr);

        System.out.println("排序后时间:"+format.format(new Date()));
        long time2=System.currentTimeMillis();

        System.out.println("总共用时"+(time2-time1)/1000+"秒");


    }

    //选择排序
    public static void selectSort(int[] arr) {

        for (int i = 0; i < arr.length - 1; i++) {
            int temp = arr[i];
            int index = i;
            for (int j = i + 1; j < arr.length; j++) {
                if (arr[j] < temp) {
                    temp = arr[j];
                    index = j;
                }
            }

            if (index != i) {
                arr[index] = arr[i];
                arr[i] = temp;
            }
        }

    }
}

插入排序

插入排序(Insertion Sorting)的基本思想是:把n个待排序的元素看成为一个有序表和一个无序表,开始时有序表中只包含一个元素,无序表中包含有n-1个元素,排序过程中每次从无序表中取出第一个元素,把它的排序码依次与有序表元素的排序码进行比较,将它插入到有序表中的适当位置,使之成为新的有序表。

package Sort;

import java.text.SimpleDateFormat;
import java.util.Date;

public class InsertSort {
    public static void main(String[] args) {

        //测试
        //创建一个80000个数据的随机数组
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }
        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:"+format.format(new Date()));

        insertSort(arr);

        System.out.println("排序后时间:"+format.format(new Date()));
        long time2=System.currentTimeMillis();

        System.out.println("总共用时"+(time2-time1)/1000+"秒");



    }
    //插入排序
    public static void insertSort(int[]arr){
        for (int i = 1; i =0&&insertVal 
 
希尔排序 

希尔排序法介绍

希尔排序是希尔(Donald Shell)于1959年提出的一种排序算法。希尔排序也是一种插入排序,它是简单插入排序经过改进之后的一个更高效的版本,也称为缩小增量排序。

希尔排序法基本思想

希尔排序是把记录按下标的一定增量分组,对每组使用直接插入排序算法排序;随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1时,整个文件恰被分成一组,算法便终止

package Sort;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

public class ShellSort {
    public static void main(String[] args) {
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }

        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:" + format.format(new Date()));

        shellSort2(arr);

        System.out.println("排序后时间:" + format.format(new Date()));
        long time2 = System.currentTimeMillis();

        System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");


    }

    //交换排序
    public static void shellSort(int[] arr) {

        int temp = 0;
        for (int gap = arr.length / 2; gap > 0; gap /= 2) {
            for (int i = gap; i < arr.length; i++) {

                //遍历各组中所有的元素(共有gap组,每组有两个元素)
                for (int j = i - gap; j >= 0; j -= gap) {
                    //如果当前的元素大于加上步长后的那个元素,,说明交换
                    if (arr[j] > arr[j + gap]) {
                        temp = arr[j];
                        arr[j] = arr[j + gap];
                        arr[j + gap] = temp;
                    }
                }

            }
        }
    }

    //移位法(优化)
    public static void shellSort2(int[] arr) {
        //增量gap,并逐步的缩小增量
        for (int gap = arr.length / 2; gap > 0; gap /= 2) {
            //从第gap个元素,逐个对其所在的组进行直接插入排序
            for (int i = gap; i < arr.length; i++) {
                int j = i;
                int temp = arr[j];
                if (arr[j] < arr[j - gap]) {
                    while (j - gap >= 0 && temp < arr[j - gap]) {
                        //移动
                        arr[j] = arr[j - gap];
                        j -= gap;
                    }
                    //当退出while后,就给temp找到插入的位置
                    arr[j] = temp;
                }
            }
        }
    }
}

快速排序

快速排序(Quicksort)是对冒泡排序的一种改进。基本思想是:通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都要小,然后再按此方法对这两部分数据分别进行快速排序,整个排序过程可以递归进行,以此达到整个数据变成有序序列

package Sort;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

public class QuickSort {
    public static void main(String[] args) {
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }

        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:" + format.format(new Date()));

        quickSort(arr,0,arr.length-1);

        System.out.println("排序后时间:" + format.format(new Date()));
        long time2 = System.currentTimeMillis();

        System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");
    }

    public static void quickSort(int[] arr, int left, int right) {
        int l = left;//左下标
        int r = right;//右下标
        //pivot 中值
        int pivot = arr[(left + right) / 2];
        int temp =0;//临时变量
        //while循环的目的是让比pivot 值小的放到左边
        //比pivot 值大放到右边
        while (l < r) {

            //在pivot的左边一直找,找到大于等于pivot的值,才退出
            while (arr[l] < pivot) {
                l += 1;
            }
            //在pivot的右边一直找,找到小于等于pivot的值,才推出
            while (arr[r] > pivot) {
                r -= 1;
            }
            //如果l>=r说明pivot 的左右两的值,已经按照左边全部是
            //小于等于pivot值,右边全部是大于等于pivot的值
            if (l >= r) {
                break;
            }
            //交换
            temp=arr[l];
            arr[l]=arr[r];
            arr[r]=temp;

            //如果交换完后,发现这个arr[l]==pivot 前移
            if(arr[l]==pivot){
                r--;
            }
            //如果交换完后arr[r]==pivot 后移
            if(arr[r]==pivot){
                l++;
            }
        }
        //如果l==r,必须让l++,r--,否会出现栈溢出
        if(l==r){
            l++;
            r--;
        }

        //向左递归
        if(leftl){
            quickSort(arr,l,right);
        }

    }
}
归并排序

归并排序(MERGE-SORT)是利用归并的思想实现的排序方法,该算法采用经典的分治(divide-and-conquer)策略(分治法将问题分(divide)成一些小的问题然后递归求解,而治(conquer)的阶段则将分的阶段得到的各答案修补在一起,即分而治之)。

package Sort;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

public class MergeSort {
    public static void main(String[] args) {
        int[] arr = new int[80000];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = (int) (Math.random() * 8000000);
        }

        //冒泡排序,时间复杂度O(n^2)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long time1 = System.currentTimeMillis();
        System.out.println("排序前时间:" + format.format(new Date()));

        mergeSort(arr,0,arr.length-1,new int[arr.length]);

        System.out.println("排序后时间:" + format.format(new Date()));
        long time2 = System.currentTimeMillis();

        System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");
    }

    //分+和方法
    public static void mergeSort(int []arr,int left,int right,int[]temp){
        if(left
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307580.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号