- 简介
- 下载安装
- 本地交互模式
Spark是伯克利开发的通用内存并行计算框架,通过scala实现,适用于海量数据处理,由于引入了RDD(Resilient Distributed Dataset, 弹性分布式数据集)抽象,从而具备较高等容错性。
| Spark组件 | |
|---|---|
| Core | 核心组件,分布式计算引擎 |
| SQL | 高性能的基于Hadoop的SQL解决方案 |
| Streaming | 实现高吞吐量、具备容错机制的准实时流处理系统 |
| GraphX | 分布式图处理框架 |
| MLlib | 构建在Spark上的分布式机器学习库 |
首先下载spark,然后将其解压到相应文件夹
$ tar xzvf spark-3.2.0-bin-hadoop3.2.tgz $ rm spark-3.2.0-bin-hadoop3.2 /bin/spark
然后修改环境变量
$ vim .bashrc
将其最后一行改为
#SPARK export SPARK_HOME=/bin/spark export PATH=$SPARK_HOME/bin:$PATH
刷新环境变量后,就可以进入spark-shell了。
$ source ./.bashrc
$ spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 3.2.0
/_/
...
scala> scala> println("hello world")
hello world
scala> def fac(n:Int):Int={
| if(n>1)
| return n*fac(n-1)
| else
| return 1}
fac: (n: Int)Int
scala> fac(10)
res1: Int = 3628800
scala> :quit
本地交互模式
这部分主要翻译自Spark3.20的Quick Start。
进入bin/spark文件夹,然后进入spark-shell,然后打开README.md文件,可以看到其数据类型为字符串类型的Dataset:
$ cd /bin/spark/
$ spark-shell
scala> val file = spark.read.textFile("README.md")//打开文件
file: org.apache.spark.sql.Dataset[String] = [value: string]
scala> file.count() //README.md的项目数(即行数)
res0: Long = 109
scala> file.first() //第一行数据
res1: String = # Apache Spark
filter可以快速对项目内容进行筛选,其原理类似于对file中的每个项目进行遍历,然后筛选出符合要求的内容
scala> val lines = file.filter(line=>line.contains("Spark")) //对每条line,若line中包含“Spark”则符合要求
lines: org.apache.spark.sql.Dataset[String] = [value: string]
scala> lines.count()
res0: Long = 19
map可以根据某种映射创建一个新的Dataset,下面将file中每一行通过" "分割后,返回单词个数,从而创建了一个[int]类型的Dataset。
scala> val dSet = file.map(line=>line.split(" ").size)
dSet: org.apache.spark.sql.Dataset[Int] = [value: int]
reduce则可对二元操作进行归约,包括reduceLeft和reduceRight,方向分别自左、右开始,例如
scala> val L = List(1,2,3,4,5) L: List[Int] = List(1, 2, 3, 4, 5) scala> L.reduceLeft(_-_) res1: Int = -13 //相当于1-2-3-4-5 scala> L.reduceRight(_-_) res2: Int = 3 //相当于1-(2-(3-(4-5)))
reduce默认为reduceLeft,Dataset也支持这种操作:
scala> val dMax = dSet.reduce((a,b)=>Math.max(a,b)) dMax: Int = 16
flatMap相当于先进行map操作,然后进行flatten操作,由于Dataset不支持flatten操作,所以直接测试flatmap
scala> val dFlatMap = file.flatMap(line=>line.split(" "))
dFlatMap: org.apache.spark.sql.Dataset[String] = [value: string]
Dataset支持groupByKey操作,
scala> val nWords = dFlatMap.groupByKey(identity).count() nWords: org.apache.spark.sql.Dataset[(String, Long)] = [key: string, count(1): bigint]
其中,identity顾名思义,返回变量本身。
在spark中,由于每个RDD都是只读变量,也就是说每次转换操作,都会产生一个新等RDD,所以像上面等这些代码在面对较大规模数据时,是非常消耗资源的。
为了复用RDD,我们可以采用缓存操作,调用方式非常简单,只需L.cache()。



