- Kryo 序列化框架
- RDD依赖关系
- RDD血缘关系
- RDD 依赖关系
- RDD 窄依赖
- RDD 宽依赖
- RDD 阶段划分
- 任务划分源码
- 排序
- 时间复杂度
- 度量一个程序(算法)执行时间的两种方法
- 常见复杂度
- 平均时间复杂度和最坏时间复杂度
- 空间复杂度
- 冒泡排序
- 选择排序
- 插入排序
- 希尔排序
- 快速排序
- 归并排序
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也 比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度 是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型 已经在 Spark 内部使用 Kryo 来序列化。 注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object serializable_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu",
"atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
RDD依赖关系
RDD血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。
package dep
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Dep1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext =new SparkContext(sparkConf)
val lines: RDD[String] = sc.textFile("datas/word.txt")
println(lines.toDebugString)
println("******************")
val words: RDD[String] =lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")
val wordToOne: RDD[(String, Int)] =words.map(word=>(word,1))
println(wordToOne.toDebugString)
println("******************")
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.toDebugString)
println("******************")
val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)
sc.stop()
}
}
RDD 依赖关系
这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系
package dep
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Dep1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc: SparkContext =new SparkContext(sparkConf)
val lines: RDD[String] = sc.textFile("datas/word.txt")
println(lines.toDebugString)
println("******************")
val words: RDD[String] =lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")
val wordToOne: RDD[(String, Int)] =words.map(word=>(word,1))
println(wordToOne.toDebugString)
println("******************")
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.toDebugString)
println("******************")
val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)
sc.stop()
}
}
RDD 窄依赖
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,
RDD 宽依赖宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会 引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
RDD 阶段划分DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
任务划分源码val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties,
Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
排序
时间复杂度
1)时间频度:一个算法执行所耗费的时间,从理论上是不能算出来的,必须上机运行测试才能知道。但我们不可能也没有必要对每个算法都上机测试,只需知道哪个算法花费的时间多,哪个算法花费的时间少就可以了。并且一个算法花费的时间与算法中语句的执行次数成正比例,哪个算法中语句执行次数多,它花费时间就多。一个算法中的语句执行次数称为语句频度或时间频度。记为T(n)。
2)时间复杂度: 在刚才提到的时间频度中,n称为问题的规模,当n不断变化时,时间频度T(n)也会不断变化。但有时我们想知道它变化时呈现什么规律。为此,我们引入时间复杂度概念。 一般情况下,算法中基本操作重复执行的次数是问题规模n的某个函数,用T(n)表示,若有某个辅助函数f(n),使得当n趋近于无穷大时,**T(n)/f(n)的极限值为不等于零的常数,则称f(n)是T(n)的同数量级函数。记作T(n)=**O(f(n)),称O(f(n))为算法的渐进时间复杂度,简称时间复杂度
度量一个程序(算法)执行时间的两种方法1)事后统计的方法
这种方法可行, 但是有两个问题:一是要想对设计的算法的运行性能进行评测,需要实际运行该程序;二是所得时间的统计量依赖于计算机的硬件、软件等环境因素, 这种方式,要在同一台计算机的相同状态下运行,才能比较那个算法速度更快。
2)事前估算的方法
通过分析某个算法的时间复杂度来判断哪个算法更优.
1)常数阶O(1)
无论代码执行了多少行,只要是没有循环等复杂结构,那这个代码的时间复杂度就都是O(1)
2)对数阶O(log2n)
在while循环里面,每次都将 i 乘以 2,乘完之后,i 距离 n 就越来越近了。假设循环x次之后,i 就大于 2 了,此时这个循环就退出了,也就是说 2 的 x 次方等于 n,那么 x = log2n也就是说当循环 log2n 次以后,这个代码就结束了。因此这个代码的时间复杂度为:O(log2n) 。 O(log2n) 的这个2 时间上是根据代码变化的,i = i * 3 ,则是 O(log3n) .
3)线性阶O(n)
这段代码,for循环里面的代码会执行n遍,因此它消耗的时间是随着n的变化而变化的,因此这类代码都可以用O(n)来表示它的时间复杂度
4)线性对数阶O(nlog2n)
线性对数阶O(nlogN) 其实非常容易理解,将时间复杂度为O(logn)的代码循环N遍的话,那么它的时间复杂度就是 n * O(logN),也就是了O(nlogN)
5)平方阶O(n^2)
平方阶O(n²) 就更容易理解了,如果把 O(n) 的代码再嵌套循环一遍,它的时间复杂度就是 O(n²),这段代码其实就是嵌套了2层n循环,它的时间复杂度就是 O(nn),即 O(n²) 如果将其中一层循环的n改成m,那它的时间复杂度就变成了 O(mn)
6)立方阶O(n^3)
7)k次方阶O(n^k)
8)指数阶O(2^n)
•常见的算法时间复杂度由小到大依次为:Ο(1)<Ο(log2n)<Ο(n)<Ο(nlog2n)<Ο(n2)<Ο(n3)<Ο(nk) <Ο(2n) ,随着问题规模n的不断增大,上述时间复杂度不断增大,算法的执行效率越低
平均时间复杂度和最坏时间复杂度1)平均时间复杂度是指所有可能的输入实例均以等概率出现的情况下,该算法的运行时间。
2)最坏情况下的时间复杂度称最坏时间复杂度。一般讨论的时间复杂度均是最坏情况下的时间复杂度。 这样做的原因是:最坏情况下的时间复杂度是算法在任何输入实例上运行时间的界限,这就保证了算法的运行时间不会比最坏情况更长。
3)平均时间复杂度和最坏时间复杂度是否一致,和算法有关
空间复杂度1)类似于时间复杂度的讨论,一个算法的空间复杂度(Space Complexity)定义为该算法所耗费的存储空间,它也是问题规模n的函数。
2)空间复杂度(Space Complexity)是对一个算法在运行过程中临时占用存储空间大小的量度。有的算法需要占用的临时工作单元数与解决问题的规模n有关,它随着n的增大而增大,当n较大时,将占用较多的存储单元,例如快速排序和归并排序算法就属于这种情况
3)在做算法分析时,主要讨论的是时间复杂度。从用户使用体验上看,更看重的程序执行的速度。一些缓存产品(redis, memcache)和算法(基数排序)本质就是用空间换时间.
冒泡排序package Sort;
import java.text.SimpleDateFormat;
import java.util.Date;
public class BubbleSort {
public static void main(String[] args) {
//测试
//创建一个80000个数据的随机数组
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:"+format.format(new Date()));
bubbleSort(arr);
System.out.println("排序后时间:"+format.format(new Date()));
long time2=System.currentTimeMillis();
System.out.println("总共用时"+(time2-time1)/1000+"秒");
}
//将冒泡排序 封装成方法
public static void bubbleSort(int[] arr) {
boolean flag = false;//标识变量
//冒泡排序,时间复杂度O(n^2)
for (int i = 0; i < arr.length - 1; i++) {
for (int j = 0; j < arr.length - 1 - i; j++) {
//如果前面的数比后面的大,则交换
if (arr[j + 1] < arr[j]) {
flag = true;
int temp = arr[j + 1];
arr[j + 1] = arr[j];
arr[j] = temp;
}
}
if (!flag) {
break;
} else {
flag = false;//重置flag,进行下次的判断
}
}
}
}
选择排序
package Sort;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SelectSort {
public static void main(String[] args) {
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:"+format.format(new Date()));
selectSort(arr);
System.out.println("排序后时间:"+format.format(new Date()));
long time2=System.currentTimeMillis();
System.out.println("总共用时"+(time2-time1)/1000+"秒");
}
//选择排序
public static void selectSort(int[] arr) {
for (int i = 0; i < arr.length - 1; i++) {
int temp = arr[i];
int index = i;
for (int j = i + 1; j < arr.length; j++) {
if (arr[j] < temp) {
temp = arr[j];
index = j;
}
}
if (index != i) {
arr[index] = arr[i];
arr[i] = temp;
}
}
}
}
插入排序
插入排序(Insertion Sorting)的基本思想是:把n个待排序的元素看成为一个有序表和一个无序表,开始时有序表中只包含一个元素,无序表中包含有n-1个元素,排序过程中每次从无序表中取出第一个元素,把它的排序码依次与有序表元素的排序码进行比较,将它插入到有序表中的适当位置,使之成为新的有序表。
package Sort;
import java.text.SimpleDateFormat;
import java.util.Date;
public class InsertSort {
public static void main(String[] args) {
//测试
//创建一个80000个数据的随机数组
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:"+format.format(new Date()));
insertSort(arr);
System.out.println("排序后时间:"+format.format(new Date()));
long time2=System.currentTimeMillis();
System.out.println("总共用时"+(time2-time1)/1000+"秒");
}
//插入排序
public static void insertSort(int[]arr){
for (int i = 1; i =0&&insertVal
希尔排序
希尔排序法介绍
希尔排序是希尔(Donald Shell)于1959年提出的一种排序算法。希尔排序也是一种插入排序,它是简单插入排序经过改进之后的一个更高效的版本,也称为缩小增量排序。
希尔排序法基本思想
希尔排序是把记录按下标的一定增量分组,对每组使用直接插入排序算法排序;随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1时,整个文件恰被分成一组,算法便终止
package Sort;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
public class ShellSort {
public static void main(String[] args) {
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:" + format.format(new Date()));
shellSort2(arr);
System.out.println("排序后时间:" + format.format(new Date()));
long time2 = System.currentTimeMillis();
System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");
}
//交换排序
public static void shellSort(int[] arr) {
int temp = 0;
for (int gap = arr.length / 2; gap > 0; gap /= 2) {
for (int i = gap; i < arr.length; i++) {
//遍历各组中所有的元素(共有gap组,每组有两个元素)
for (int j = i - gap; j >= 0; j -= gap) {
//如果当前的元素大于加上步长后的那个元素,,说明交换
if (arr[j] > arr[j + gap]) {
temp = arr[j];
arr[j] = arr[j + gap];
arr[j + gap] = temp;
}
}
}
}
}
//移位法(优化)
public static void shellSort2(int[] arr) {
//增量gap,并逐步的缩小增量
for (int gap = arr.length / 2; gap > 0; gap /= 2) {
//从第gap个元素,逐个对其所在的组进行直接插入排序
for (int i = gap; i < arr.length; i++) {
int j = i;
int temp = arr[j];
if (arr[j] < arr[j - gap]) {
while (j - gap >= 0 && temp < arr[j - gap]) {
//移动
arr[j] = arr[j - gap];
j -= gap;
}
//当退出while后,就给temp找到插入的位置
arr[j] = temp;
}
}
}
}
}
快速排序
快速排序(Quicksort)是对冒泡排序的一种改进。基本思想是:通过一趟排序将要排序的数据分割成独立的两部分,其中一部分的所有数据都比另外一部分的所有数据都要小,然后再按此方法对这两部分数据分别进行快速排序,整个排序过程可以递归进行,以此达到整个数据变成有序序列
package Sort;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
public class QuickSort {
public static void main(String[] args) {
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:" + format.format(new Date()));
quickSort(arr,0,arr.length-1);
System.out.println("排序后时间:" + format.format(new Date()));
long time2 = System.currentTimeMillis();
System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");
}
public static void quickSort(int[] arr, int left, int right) {
int l = left;//左下标
int r = right;//右下标
//pivot 中值
int pivot = arr[(left + right) / 2];
int temp =0;//临时变量
//while循环的目的是让比pivot 值小的放到左边
//比pivot 值大放到右边
while (l < r) {
//在pivot的左边一直找,找到大于等于pivot的值,才退出
while (arr[l] < pivot) {
l += 1;
}
//在pivot的右边一直找,找到小于等于pivot的值,才推出
while (arr[r] > pivot) {
r -= 1;
}
//如果l>=r说明pivot 的左右两的值,已经按照左边全部是
//小于等于pivot值,右边全部是大于等于pivot的值
if (l >= r) {
break;
}
//交换
temp=arr[l];
arr[l]=arr[r];
arr[r]=temp;
//如果交换完后,发现这个arr[l]==pivot 前移
if(arr[l]==pivot){
r--;
}
//如果交换完后arr[r]==pivot 后移
if(arr[r]==pivot){
l++;
}
}
//如果l==r,必须让l++,r--,否会出现栈溢出
if(l==r){
l++;
r--;
}
//向左递归
if(leftl){
quickSort(arr,l,right);
}
}
}
归并排序
归并排序(MERGE-SORT)是利用归并的思想实现的排序方法,该算法采用经典的分治(divide-and-conquer)策略(分治法将问题分(divide)成一些小的问题然后递归求解,而治(conquer)的阶段则将分的阶段得到的各答案修补在一起,即分而治之)。
package Sort;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
public class MergeSort {
public static void main(String[] args) {
int[] arr = new int[80000];
for (int i = 0; i < arr.length; i++) {
arr[i] = (int) (Math.random() * 8000000);
}
//冒泡排序,时间复杂度O(n^2)
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time1 = System.currentTimeMillis();
System.out.println("排序前时间:" + format.format(new Date()));
mergeSort(arr,0,arr.length-1,new int[arr.length]);
System.out.println("排序后时间:" + format.format(new Date()));
long time2 = System.currentTimeMillis();
System.out.println("总共用时" + (time2 - time1) / 1000 + "秒");
}
//分+和方法
public static void mergeSort(int []arr,int left,int right,int[]temp){
if(left 


