Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。在绝大多数的数据计算场景中,Spark确实会比MapReduce更有优势。但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR。
Spark Core:
Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
Spark SQL:
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
Spark Streaming:
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
Spark MLlib:
MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
Spark GraphX:
GraphX是Spark面向图计算提供的框架与算法库。
Maven创建scala项目及打包
spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.0.0.jar 10
1)class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序;
2)master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量(即线程数),local[*]表示最大虚拟核数;
3)spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包;
4)数字10表示程序的入口参数,用于设定当前应用的任务数量
注意:①jar包一定要包含class文件,②程序输入文件和jar包的路径是相对spark-submit执行时所在的位置
Spark YARN模式Spark-Yarn模式|配置历史服务|运行流程|端口号总结Hadoop2.x与Hadoop3.x的默认端口变化 配置Spark on Yarn和Spark历史服务器
[atguigu@hadoop102 conf]$ cat spark-env.sh
#!/usr/bin/env bash
# export JAVA_HOME=/opt/module/jdk1.8.0_212
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
# Options read in YARN client/cluster mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# Spark on Yarn时指定Yarn的配置文件
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:9820/directory
-Dspark.history.retainedApplications=30"
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
# Options for launcher
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")
# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMonIZE Run the proposed command in the foreground. It will not output a PID file.
# Options for native BLAS, like Intel MKL, OpenBLAS, and so on.
# You might get better performance to enable these options if using native BLAS (see SPARK-21305).
# - MKL_NUM_THREADS=1 Disable multi-threading of Intel MKL
# - OPENBLAS_NUM_THREADS=1 Disable multi-threading of OpenBLAS
[atguigu@hadoop102 conf]$ cat spark-defaults.conf # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # Example: # spark.master spark://master:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:9820/directory # spark.serializer org.apache.spark.serializer.KryoSerializer # spark.driver.memory 5g # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" # 历史服务器的主机地址为主节点的主机名hadoop102 spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080
[atguigu@hadoop102 conf]$ sbin/start-dfs.sh [atguigu@hadoop102 conf]$ hadoop fs -mkdir /directory集群模式和客户端模式提交应用举例
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.0.0.jar 10
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.12-3.0.0.jar 103、Spark运行架构
术语定义
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行,所以一般应用于实际生产环境。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:
从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算,RDD的分区数目决定了总的Task数目,下面给出分区数的确定原理。
MapReduce切片、并行度、分区Spark:任务中如何确定spark分区数、task数目、core个数、worker节点个数、excutor数量
Spark与MapReduce的区别是什么?mapreduce和spark的原理及区别Spark与MapReduce对比优势
Spark除Map和Reduce外(并不是算法,只是提供了Map阶段和Reduce阶段,两个阶段提供了很多算法:Map阶段的map、flatMap、ilter、keyBy等,Reduce阶段的reduceByKey、sortByKey、mean、gourpBy、sort等),还支持RDD(RDD封装了计算逻辑,并不保存数据)/Dataframe/DataSet等多种数据模型操作,编程模型更加灵活。Spark在超过指定最大内存后,会使用操作系统内存,既保证了内存的基本使用,又避免了提早分配过多内存带来的资源浪费,修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml:
4、Spark核心编程yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
Spark 并行度和分区的关系RDD的几种创建方式
# 设置并行度,local[*]表示并行度为本地机器的最大虚拟核数(线程)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
# 手动设置并行度,即能够同时运行的task数量(还是线程)
sparkConf.set("spark.default.parallelism", "4")
val sparkContext = new SparkContext(sparkConf)
# 设置分区,一个分区对应一个线程,一个线程可被多个分区重复使用
# 极端情况:只有一个线程,但有多个分区,分区中数据会串行执行
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 5)
val fileRDD: RDD[String] = sparkContext.textFile("input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()
4.1、数据可以按照并行度的设定进行数据的分区操作
val rdd1 : RDD[Int] = sc.makeRDD(Seq(1,2,3,4,5))
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
4.2、Spark的文件读取底层就是Hadoop的文件读取,最终的分区数量就是hadoop读取文件的切片数
# 设置预计的最小切片数(分区数)
val rdd: RDD[String] = sc.textFile("data/word*.txt", 2)
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
public class TextInputFormat extends FileInputFormat
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
# 预计每一个分区处理数据的字节大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList splits = new ArrayList(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
# 计算最终合适的切片大小,minSize默认值是1
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
# SPLIT_SLOP = 1.1,10%以内不创建新的分区
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}
4.3、Spark的分区数据的划分由hadoop决定
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 1. 分区数据的处理也是由Hadoop决定的。
// TODO 2. hadoop在计算分区时和处理数据时的逻辑不一样。
// TODO 3. Spark读取文件数据底层使用的就是hadoop读取的,所以读取规则用的是hadoop
// 3.1 hadoop读取数据是按行读取的,不是按字节读取
// 3.2 hadoop读取数据是偏移量读取的
// 3.3 hadoop读取数据时,不会重复读取相同的偏移量
val rdd = sc.textFile("data/word.txt", 3)
rdd.saveAsTextFile("output")
sc.stop()
4.3、算子(分布式计算和单机计算是不同的)
数据分区数一般不变
数据所在分区一般不变
数据分区内有序、分区间无序
分区内单个数据处理逻辑(RDD)有序
分区内多个数据间处理逻辑(RDD)无序
spark map和mapPartitions区别
mapPartition 处理速度快,但处理完整个分区的数据后才释放内存
mapPartitionWithIndex可以对指定分区进行处理
深入解读 Spark 宽依赖和窄依赖(ShuffleDependency & NarrowDependency)
Spark中map和flatMap的区别详解



