编写难度大不能很好充分利用系统内存一个作业多个MR任务嵌套不友好(每一次都要涉及磁盘或dfs或db和网络 的IO)(期望以pipeline 流水线的方式从头到尾) 数据处理三段论:(实时离线都是)
读数据(read)–> 规整(ETL)–> 写(write)
Spark、Flink 都是批流一体(一站式解决 one stack to rule them all)
Batch: MR、Hive、RDD、Dataframe、DataSet
Stream:Storm、SS(spark streaming)、SSS(结构化流)
SQL :Hive、Spark SQL
ML:Mathout MLlib
Real time: Hbase
Lightning-fast unified analytics engine (超快的统一分析引擎)
1.speed 快
快的方面:基于内存;基于多线程;pipeline 流水线的方式(DAG图)
2. easy of use 支持多语言 Java、scala、python、R、sql
3. Generality 具有共性的(批流一体)
4. runs everywhere 能运行在hadoop(能读hdfs数据、能跑在yarn)、Mesos、K8S、standalone(spark集群)、获取其他云端。能对接各种数据源
(马铁)Martei 的博士论文最后孵化出了Spark
Spark与Hadoop、MapReducespark 只是个计算引擎,不需要存储数据。只负责将去取出来的数据分析计算
Hadoop 与 Spark
作用:Distributed Storage + Compute Compute
MapReduce Stack
存储:Disk / HDFS Disk / Memory
时间开销: Hadoop Batch > Spark Batch
MapReduce 与 Spark
Spark 并不能替换 Hadoop,勉强可以说Spark能替换MapReduce
sparkCore_01_B RDDResilient Distributed Dataset --> 弹性分布式数据集
弹性:故障无感知,可以转移到其他机器
分布式:可以运行在任意节点上
不可变: rdd1 通过操作(map) 得到的是 新的rdd2
可以被分区:集合的数据可以分区,每个分区一个task运行,也就是分区并行运行的
(有依赖关系的数据会被分在同一个分区)
注意异常不能序列化:spark也会涉及到 磁盘或网络IO,传输的对象一定要 extends Serializable
注解@triansient 加在属性上,表示这个属性不会序列化(不会将真实的值序列化到文件里,也读取不到)
RDD 的容错是以分区为单位(故障隔离),某一分区出现故障,可以在当前分区找到上一次依赖重新计算即可
在RDD中: n个 partition == n个 task == n个 线程 == 输出文件的个数 n个
在MR中:inputspilt == mapTask == JVM 进程数
安装部署Saprk
如果选择Pre-built 预编译版本,需要和已安装的Hadoop和scala版本匹配的spark包(从压缩包名字可以看出)
spark-3.2.0-bin-hadoop3.2 下载解压、环境变量;
bin 文件夹下有 spark-shell、spark-submit、spark-sql等命令
conf 中有配置文件template
sbin 是与启动服务相关的(集群模式的,但是生产一般都是使用yarn或k8s服务,不用standland)
学习的时候可以使用下standland:
bin/spark-shell --master local[2] 进入交互式命令行(把IDEA写好的代码可以直接拿过来运行)
– 可以使用 http://gargantua:4040/ 进入spark任务的web页面
以2个线程启动
[liqiang@Gargantua ~]$ spark-shell --master local[2] Spark context available as 'sc' (master = local[2], app id = local-1642090369041). Spark session available as 'spark'. scala>
web 端访问,默认端口4040
gargantua:4040
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at:23 scala> rdd.count res1: Long = 5 scala> sc.stop scala> [liqiang@Gargantua ~]$ 【Ctrl C 停止】
在idea 中也可以本地启动依赖里的spark-shell
不需要连线上spark-shell,也就是不需要部署也能开发
引入依赖
org.apache.spark spark-core_2.123.2.0
创建 SparkContext
1. val sparkConf = new SparkConf()
sparkConf.setAppName("Your App Name").setMaster("local[2]")
2. val sc = new SparkContext(sparkConf)
3. sc 可以创建RDD sc.makeRDD() 或 sc.parallelize()
local[2] 代表指定并行度,也就是2个线程 == 2个分区 == 2个任务
sc.parallelize()中也可以重新指定并行度
sc.textFile() 中也可以指定分区度(好像范围不一样)
RDD 两大操作
transformations 和 actions
不可变:针对一个已有的RDD通过转换得到的是另一个RDD
lazy:不会立刻触发spark (不会提交作业), 只有等到action
rdd.map…filter… 每一个transformations 都会记录,而不会每次都计算
map 作用在每个元素mapPartition 作用在每个分区,迭代器是在分区上filterflatmap = flatten + map
这几个算子底层都用 MapPatitionRDD来实现
使用MapPatitionRDD 来实现map:
(MapPatitionRDD 需要放在指定的包org.ahache.spark.rdd下,可以自己建一个同名的包。。。)
MapPatitionRDD中就是对迭代器使用scala的map等算子来实现。
同理使用scala的filter就实现spark的filter
mapValues()
flatMapValues()
keys 就是获取所有的key
values 获取所有value ==> map(_._2).collect
keyby 把不是k,v 的元素(只有v),通过指定函数作为key得到k,v
reduceByKey 作用在k,v类型,将相同key的value两两计算
使用reduceByKey做wc:
rdd.flatMap(_.split(",")).map(_,1).reduceByKey(_+_)
groupByKey 作用在k,v类型,将k相同的v合并到一个集合中,作为新的k,v
使用groupByKey 做wc:
rdd.flatMap(_.split(",")).map(_,1).groupByKey().map(_1,_2.sum)
rdd.flatMap(_.split(",")).map(_,1).groupByKey().mapValues(_.size)
groupBy 不需作用k,v,通过指定条件分组,作为key,满足条件的原内容组成集合作为v
比如按奇偶数分组
使用groupBy 做wc:
rdd.flatMap(_.split(",")).groupBy(x => x).mapValues(_.size)
sortBy 指定排序条件即可,默认升序。降序可以条件前加-号
distanct 底层还是用的 reduceByKey
union 就是简单的合起来,分区数等于union前分区数之和。
union没有经过shuffle,可以通过web页面看DAG图,判断是否经过shuffle (只有一个红框(stage)就是没有shuffle)
sample() 取样,随机抽取一个数,参数true控制取的数要不要放回去
zip() 拉链。(分区要一致,元素个数也要一致才能拉)



