栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

019 大数据之Spark

019 大数据之Spark

1、Spark概述

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。在绝大多数的数据计算场景中,Spark确实会比MapReduce更有优势。但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR。

Spark Core:
Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
Spark SQL:
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
Spark Streaming:
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
Spark MLlib:
MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
Spark GraphX:
GraphX是Spark面向图计算提供的框架与算法库。

2、Spark快速上手 2.1、Local模式

Maven创建scala项目及打包

spark-submit 
--class org.apache.spark.examples.SparkPi 
--master local[2] 
./examples/jars/spark-examples_2.12-3.0.0.jar 
10

1)class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序;
2)master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量(即线程数),local[*]表示最大虚拟核数;
3)spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包;
4)数字10表示程序的入口参数,用于设定当前应用的任务数量
注意:①jar包一定要包含class文件,②程序输入文件和jar包的路径是相对spark-submit执行时所在的位置

2.2、Running Spark on YARN

Spark YARN模式Spark-Yarn模式|配置历史服务|运行流程|端口号总结Hadoop2.x与Hadoop3.x的默认端口变化 配置Spark on Yarn和Spark历史服务器

[atguigu@hadoop102 conf]$ cat spark-env.sh 
#!/usr/bin/env bash
# export JAVA_HOME=/opt/module/jdk1.8.0_212
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client/cluster mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# Spark on Yarn时指定Yarn的配置文件
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop

# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://hadoop102:9820/directory 
-Dspark.history.retainedApplications=30"

# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

# Options for launcher
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")

# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
# - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMonIZE  Run the proposed command in the foreground. It will not output a PID file.
# Options for native BLAS, like Intel MKL, OpenBLAS, and so on.
# You might get better performance to enable these options if using native BLAS (see SPARK-21305).
# - MKL_NUM_THREADS=1        Disable multi-threading of Intel MKL
# - OPENBLAS_NUM_THREADS=1   Disable multi-threading of OpenBLAS
[atguigu@hadoop102 conf]$ cat spark-defaults.conf 
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop102:9820/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
# 历史服务器的主机地址为主节点的主机名hadoop102
spark.yarn.historyServer.address=hadoop102:18080
spark.history.ui.port=18080
[atguigu@hadoop102 conf]$ sbin/start-dfs.sh
[atguigu@hadoop102 conf]$ hadoop fs -mkdir /directory
集群模式和客户端模式提交应用举例
bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode cluster 
./examples/jars/spark-examples_2.12-3.0.0.jar 
10
bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode client 
./examples/jars/spark-examples_2.12-3.0.0.jar 
10

3、Spark运行架构

术语定义

Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行,所以一般应用于实际生产环境。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:

从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算,RDD的分区数目决定了总的Task数目,下面给出分区数的确定原理。

MapReduce切片、并行度、分区Spark:任务中如何确定spark分区数、task数目、core个数、worker节点个数、excutor数量
Spark与MapReduce的区别是什么?mapreduce和spark的原理及区别Spark与MapReduce对比优势

Spark除Map和Reduce外(并不是算法,只是提供了Map阶段和Reduce阶段,两个阶段提供了很多算法:Map阶段的map、flatMap、ilter、keyBy等,Reduce阶段的reduceByKey、sortByKey、mean、gourpBy、sort等),还支持RDD(RDD封装了计算逻辑,并不保存数据)/Dataframe/DataSet等多种数据模型操作,编程模型更加灵活。Spark在超过指定最大内存后,会使用操作系统内存,既保证了内存的基本使用,又避免了提早分配过多内存带来的资源浪费,修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml:



     yarn.nodemanager.pmem-check-enabled
     false




     yarn.nodemanager.vmem-check-enabled
     false

4、Spark核心编程

Spark 并行度和分区的关系RDD的几种创建方式

# 设置并行度,local[*]表示并行度为本地机器的最大虚拟核数(线程)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
# 手动设置并行度,即能够同时运行的task数量(还是线程)
sparkConf.set("spark.default.parallelism", "4")
val sparkContext = new SparkContext(sparkConf)
# 设置分区,一个分区对应一个线程,一个线程可被多个分区重复使用
# 极端情况:只有一个线程,但有多个分区,分区中数据会串行执行 
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 5)
val fileRDD: RDD[String] = sparkContext.textFile("input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()
4.1、数据可以按照并行度的设定进行数据的分区操作
val rdd1 : RDD[Int] = sc.makeRDD(Seq(1,2,3,4,5))

def parallelize[T: ClassTag](
     seq: Seq[T],
     numSlices: Int = defaultParallelism): RDD[T] = withScope {
   assertNotStopped()
   new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
 }

override def getPartitions: Array[Partition] = {
   val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
   slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
 }

def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
   if (numSlices < 1) {
     throw new IllegalArgumentException("Positive number of partitions required")
   }
   // Sequences need to be sliced at the same set of index positions for operations
   // like RDD.zip() to behave as expected
   def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
       val start = ((i * length) / numSlices).toInt
       val end = (((i + 1) * length) / numSlices).toInt
       (start, end)
     }
   }
   seq match {
     case r: Range =>
       positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
         // If the range is inclusive, use inclusive range for the last slice
         if (r.isInclusive && index == numSlices - 1) {
           new Range.Inclusive(r.start + start * r.step, r.end, r.step)
         }
         else {
           new Range(r.start + start * r.step, r.start + end * r.step, r.step)
         }
       }.toSeq.asInstanceOf[Seq[Seq[T]]]
     case nr: NumericRange[_] =>
       // For ranges of Long, Double, BigInteger, etc
       val slices = new ArrayBuffer[Seq[T]](numSlices)
       var r = nr
       for ((start, end) <- positions(nr.length, numSlices)) {
         val sliceSize = end - start
         slices += r.take(sliceSize).asInstanceOf[Seq[T]]
         r = r.drop(sliceSize)
       }
       slices
     case _ =>
       val array = seq.toArray // To prevent O(n^2) operations for List etc
       positions(array.length, numSlices).map { case (start, end) =>
           array.slice(start, end).toSeq
       }.toSeq
   }
 }
4.2、Spark的文件读取底层就是Hadoop的文件读取,最终的分区数量就是hadoop读取文件的切片数
# 设置预计的最小切片数(分区数)
val rdd: RDD[String] = sc.textFile("data/word*.txt", 2)

def textFile(
     path: String,
     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
   assertNotStopped()
   hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
     minPartitions).map(pair => pair._2.toString).setName(path)
 }

public class TextInputFormat extends FileInputFormat

public InputSplit[] getSplits(JobConf job, int numSplits)
  throws IOException {
  StopWatch sw = new StopWatch().start();
  FileStatus[] files = listStatus(job);
  
  // Save the number of input files for metrics/loadgen
  job.setLong(NUM_INPUT_FILES, files.length);
  long totalSize = 0;                           // compute total size
  for (FileStatus file: files) {                // check we have valid files
    if (file.isDirectory()) {
      throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
  }
  # 预计每一个分区处理数据的字节大小
  long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
    FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

  // generate splits
  ArrayList splits = new ArrayList(numSplits);
  NetworkTopology clusterMap = new NetworkTopology();
  for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0) {
      FileSystem fs = path.getFileSystem(job);
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      if (isSplitable(fs, path)) {
        long blockSize = file.getBlockSize();
        # 计算最终合适的切片大小,minSize默认值是1
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        # SPLIT_SLOP = 1.1,10%以内不创建新的分区
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              splitHosts[0], splitHosts[1]));
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
              - bytesRemaining, bytesRemaining, clusterMap);
          splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
              splitHosts[0], splitHosts[1]));
        }
      } else {
        String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
        splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
      }
    } else { 
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
  }
  return splits.toArray(new FileSplit[splits.size()]);
}
4.3、Spark的分区数据的划分由hadoop决定
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 1. 分区数据的处理也是由Hadoop决定的。
// TODO 2. hadoop在计算分区时和处理数据时的逻辑不一样。
// TODO 3. Spark读取文件数据底层使用的就是hadoop读取的,所以读取规则用的是hadoop
//         3.1 hadoop读取数据是按行读取的,不是按字节读取
//         3.2 hadoop读取数据是偏移量读取的
//         3.3 hadoop读取数据时,不会重复读取相同的偏移量
val rdd = sc.textFile("data/word.txt", 3)
rdd.saveAsTextFile("output")
sc.stop()

4.3、算子(分布式计算和单机计算是不同的)

数据分区数一般不变
数据所在分区一般不变
数据分区内有序、分区间无序
分区内单个数据处理逻辑(RDD)有序
分区内多个数据间处理逻辑(RDD)无序

spark map和mapPartitions区别
mapPartition 处理速度快,但处理完整个分区的数据后才释放内存
mapPartitionWithIndex可以对指定分区进行处理
深入解读 Spark 宽依赖和窄依赖(ShuffleDependency & NarrowDependency)
Spark中map和flatMap的区别详解

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748424.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号