spark概述Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Spark基于MR框架的,但是优化了其中的计算过程,使用内存来代替计算结果
Spark基于Scala语言开发,更适合迭代计算和数据挖掘计算
Spark中计算模型非常丰富,MR中只有Mapper和Reducer两种,Scala中有map,filter,group by,sort by等等
spark与hadoopSpark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
hadoop的MapReduce已经是比较熟悉的框架,为什么还要学习新的spark计算框架呢,这样就要说一下Spark与Hadoop的关系
从时间看
# Hadoop - 2006年1月,Doug Cutting加入Yahoo,领导Hadoop的开发 - 2008年1月,Hadoop成为Apache顶级项目 - 2011年1.0正式发布 - 2012年3月稳定版发布 - 2013年10月发布2.X (Yarn)版本 # Spark - 2009年,Spark诞生于伯克利大学的AMPLab实验室 - 2010年,伯克利大学正式开源了Spark项目 - 2013年6月,Spark成为了Apache基金会下的项目 - 2014年2月,Spark以飞快的速度成为了Apache的顶级项目 - 2015年至今,Spark变得愈发火爆,大量的国内公司开始重点部署或者使用Spark
从功能看
# Hadoop - Hadoop是由java语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架 - 作为Hadoop分布式文件系统,HDFS处于Hadoop生态圈的最下层,存储着所有的数据,支持着Hadoop的所有服务。它的理论基础源于Google的TheGoogleFileSystem这篇论文,它是GFS的开源实现。 - MapReduce是一种编程模型,Hadoop根据Google的MapReduce论文将其实现,作为Hadoop的分布式计算模型,是Hadoop的核心。基于这个框架,分布式并行程序的编写变得异常简单。综合了HDFS的分布式存储和MapReduce的分布式计算,Hadoop在处理海量数据时,性能横向扩展变得非常容易。 - Hbase是对Google的Bigtable的开源实现,但又和Bigtable存在许多不同之处。Hbase是一个基于HDFS的分布式数据库,擅长实时地随机读/写超大规模数据集。它也是Hadoop非常重要的组件。 # Spark - Spark是一种由Scala语言开发的快速、通用、可扩展的大数据分析引擎 - Spark Core中提供了Spark最基础与最核心的功能 - Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者 Apache Hive版本的SQL方言(HQL)来查询数据。 - Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数 据流的API。spark核心模块
# Spark Core Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的 # Spark SQL Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。 # Spark Streaming Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。 # Spark MLlib MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。 # Spark GraphX GraphX是Spark面向图计算提供的框架与算法库。spark快速上手
简单的体会一下spark
创建Maven项目,添加如下依赖
org.apache.spark spark-core_2.12 3.0.0
添加Scala插件
Spark版本为3.0.0,默认采用的Scala编译版本为2.12
重写wordCount案例
package com.pihao.spark.wc
import org.apache.spark.{SparkConf, SparkContext}
object WordCount_Env {
def main(args: Array[String]): Unit = {
// TODO 使用Spark
// 1,增加依赖
// 2,获取Spark的连接(环境)
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
//3,读取文件
val lines = sc.textFile("data/word.txt")
//4,将文件中的数据进行分词
val words = lines.flatMap(_.split(" "))
//5,将分词后的数据进行了分组
val wordGroup = words.groupBy(word => word)
//6,对分组后的数据进行了统计分析
val wordCount = wordGroup.mapValues(_.size)
//7,将统计结果打印在控制台
wordCount.collect().foreach(println)
sc.stop()
}
}
执行的时候会打印很多的日志信息,如需关闭,可在resource下创建log4j.properties
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
spark运行环境
Local模式Spark作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来。接下来,我们就分别看看不同环境下Spark的运行
所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,之前在IDEA中运行代码的环境我们称之为开发环境,一次性就没了,不太一样。
解压缩文件
将spark-3.0.0-bin-hadoop3.2.tgz文件上传到Linux /opt/software并解压缩
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-local
启动Local环境
bin/spark-shell
启动成功后,可以输入网址进行Web UI监控页面访问
执行命令行工具
在解压缩文件夹下的data目录中,添加word.txt文件。在命令行工具中执行如下代码指令
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
退出本地模式
:quit
提交应用
上面的方法讲的是在命令行spark-shell命令行窗口中直接执行代码的方式,提交应用是我们写好的代码已经打好包的方式提交
# 这个是官方自带的 bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.0.0.jar 10 #1)--class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序 #2)--master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量 #3)spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设 定为咱们自己打的jar包 #4)数字10表示程序的入口参数,用于设定当前应用的任务数量Standalone模式
local本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。
集群规划
| hadoop102 | hadoop103 | hadoop104 | |
|---|---|---|---|
| Spark | Worker Master | Worker | Worker |
加压缩文件
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-standalone
修改配置文件
1.进入解压缩后路径的conf目录,修改slaves.template文件名为slaves
mv slaves.template slaves # vim修改为如下,启动3个机器 hadoop102 hadoop103 hadoop104
- 修改spark-env.sh.template文件名为spark-env.sh
mv spark-env.sh.template spark-env.sh # vim修改如下,添加JAVA_HOME环境变量和集群对应的master节点 export JAVA_HOME=/opt/module/1.8.0_212 SPARK_MASTER_HOST=hadoop102 SPARK_MASTER_PORT=7077
注意:7077端口,相当于hadoop3内部通信的8020端口,此处的端口需要确认自己的Hadoop配置
3.分发spark-standalone目录
[atguigu@hadoop102 module]$ pwd /opt/module [atguigu@hadoop102 module]$ my_rsync spark-standalone
启动集群
sbin/start-all.sh
# 查看进程 [atguigu@hadoop102 spark-standlone]$ my_jps.sh hadoop102,hadoop103,hadoop104进程启动情况 =================hadoop102 jps情况================= 5842 Jps 5785 Worker 5693 Master =================hadoop103 jps情况================= 23895 Worker 23947 Jps =================hadoop104 jps情况================= 15638 Worker 15690 Jps [atguigu@hadoop102 spark-standlone]$
查看Master资源监控Web UI界面: hadoop102:8080
提交应用
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077 ./examples/jars/spark-examples_2.12-3.0.0.jar 10
提交参数说明
| 参数 | 解释 | 可选值举例 |
|---|---|---|
| –class | Spark程序中包含主函数的类 | |
| –master | Spark程序运行的模式(环境) | 模式:local[*]、spark://linux1:7077、Yarn |
| –executor-memory 1G | 指定每个executor可用内存为1G | 符合集群内存配置即可,具体情况具体分析。 |
| –total-executor-cores 2 | 指定所有executor使用的cpu核数为2个 | |
| –executor-cores | 指定每个executor使用的cpu核数 | |
| application-jar | 打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar | |
| application-arguments | 传给main()方法的参数 |
配置历史服务
由于spark-shell停止掉后,集群监控hadoop102:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。
# 1)修改spark-defaults.conf.template文件名为spark-defaults.conf mv spark-defaults.conf.template spark-defaults.conf # 2)修改spark-default.conf文件,配置日志存储路径 spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory ##注意:需要启动hadoop集群,HDFS上的directory目录需要提前存在。手动页面创建或者如下命令创建 sbin/start-dfs.sh hadoop fs -mkdir /directory # 3)修改spark-env.sh文件, 添加日志配置 export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30" 参数1含义:WEB UI访问的端口号为18080 参数2含义:指定历史服务器日志存储路径 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。 # 4)分发配置文件 my_rsync conf # 5)重新启动集群和历史服务 sbin/start-all.sh sbin/start-history-server.sh # 6)重新执行任务 bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077 ./examples/jars/spark-examples_2.12-3.0.0.jar 10 # 7)查看历史服务:http://hadoop102:18080
配置高可用HA
所谓的高可用是因为当前集群中的Master节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个Master节点,一旦处于活动状态的Master发生故障时,由备用Master提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper设置
集群规划
| hadoop102 | hadoop103 | hadoop104 | |
|---|---|---|---|
| Spark | Master,Zookeeper,Worker | Master,Zookeeper,Worker | Zookeeper,Worker |
# 1)停止集群 sbin/stop-all.sh # 2)启动Zookeeper集群 zk_client start # 3)修改spark-env.sh文件添加如下配置 注释如下内容: #SPARK_MASTER_HOST=hadoop102 #SPARK_MASTER_PORT=7077 添加如下内容: # Master监控页面默认访问端口为8080,但是可能会和Zookeeper冲突,所以改成8989,也可以自定义,访问UI监控页面时请注意 SPARK_MASTER_WEBUI_PORT=8989 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 -Dspark.deploy.zookeeper.dir=/spark" # 4)分发配置文件 my_rsync conf/ # 5)启动集群 sbin/start-all.sh # 6)启动hadoop103的单独Master节点,此时hadoop103节点Master状态处于备用状态 [root@hadoop103 spark-standalone]# sbin/start-master.sh # 7)提交应用到高可用集群 bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077,hadoop103:7077 ./examples/jars/spark-examples_2.12-3.0.0.jar 10 # 8)停止hadoop102的Master资源监控进程 kill -9 master的ID 9)查看hadoop103的Master 资源监控Web UI,稍等一段时间后,hadoop103节点的Master状态提升为活动状态Alive,说明配置高可用成功Yarn模式
独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。
解压缩文件
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-yarn
修改配置文件
1.修改hadoop配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发
yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
2.修改conf/spark-env.sh,添加JAVA_HOME和YARN_CONF_DIR配置
mv spark-env.sh.template spark-env.sh 。。。 export JAVA_HOME=/opt/module/1.8.0_212 YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
启动HDFS以及Yarn集群
start-dfs.sh start-yarn.sh
提交应用
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.0.0.jar 10
查看http://hadoop103:8088页面,点击History,查看历史页面
配置历史服务器
# 1)修改spark-defaults.conf.template文件名为spark-defaults.conf mv spark-defaults.conf.template spark-defaults.conf # 2)修改spark-default.conf文件,配置日志存储路径 spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory 注意:需要启动hadoop集群,HDFS上的目录需要提前存在。 [root@linux1 hadoop]# sbin/start-dfs.sh [root@linux1 hadoop]# hadoop fs -mkdir /directory # 3)修改spark-env.sh文件, 添加日志配置 export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30" 参数1含义:WEB UI访问的端口号为18080 参数2含义:指定历史服务器日志存储路径 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。 # 4)修改spark-defaults.conf spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080 # 5)启动历史服务 sbin/start-history-server.sh # 6)重新提交应用 # bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.12-3.0.0.jar 10 # 7)Web页面查看日志:http://hadoop103:8088
部署模式对比
| 模式 | Spark安装机器数 | 需启动的进程 | 所属者 | 应用场景 |
|---|---|---|---|---|
| Local | 1 | 无 | Spark | 测试 |
| Standalone | 3 | Master及Worker | Spark | 单独部署 |
| Yarn | 1 | Yarn及HDFS | Hadoop | 混合部署 |
端口号
-
Spark查看当前Spark-shell运行任务情况端口号:4040(计算)
-
Spark Master内部通信服务端口号:7077
-
Standalone模式下,Spark Master Web端口号:8080(资源)
-
Spark历史服务器端口号:18080
-
Hadoop YARN任务运行情况查看端口号:8088
核心组件Spark框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。
如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务。
spark的核心组件两个:Driver,Executor
Driver
Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。
Driver在Spark作业执行时主要负责:
-
将用户程序转化为作业(job)
-
在Executor之间调度任务(task)
-
跟踪Executor的执行情况
-
通过UI展示查询运行情况
实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。
Executor
Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行
Executor有两个核心功能:
-
负责运行组成Spark应用的任务,并将结果返回给驱动器进程
-
它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
Executor与Core
Spark Executor是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量。
应用程序启动参数如下:
| 名称 | 说明 |
|---|---|
| –num-executors | 配置Executor的数量 |
| –executor-memory | 配置每个Executor的内存大小 |
| –executor-cores | 配置每个Executor的虚拟CPU core数量 |
并行度(Parallelism)
这里我们将整个集群并行执行任务的数量称之为并行度。
有向无环图
这里所谓的有向无环图,并不是真正意义的图形,而是由Spark程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。
提交流程
所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过Spark客户端提交给Spark运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将Spark引用部署到Yarn环境中会更多一些,所以本课程中的提交流程是基于Yarn环境的。
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。



