栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark核数以及分区分配使用测试

spark核数以及分区分配使用测试

一. 在开始之前先熟悉几个相关的概念

  1. Executor由若干core组成,
    每个Executor的每个core一次只能执行一个Task
  2. 每个Task执行的结果就是生成目标RDD的一个partiton
    换言之task数partition数是对应起来的,但是是rdd的分区数目决定了总的task数目

这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。
3. task被执行的并发度 = Executor数目 * 每个Executor核数(=core总个数)
注意:每个节点可以起一个或多个Executor。

举例:

RDD有10个分区,那么计算的时候就会生成10个task,配置为1个计算节点,每个2个核,同一时刻可以并行的task数目为2,计算这个RDD就需要5个轮次。
如果计算资源不变,你有11个task的话,就需要6个轮次,在最后一轮中,只有一个task在执行,另一个核在空转。
如果资源不变,你的RDD只有1个分区,那么同一时刻只有1个task运行,另1个核空转,造成资源浪费。

代码演示:

package com.brd.engine

import com.brd.util.sparkUtils
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkTest {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkTest")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    sparkUtils.mapPartUse(sc)

    val rdd = sc.textFile("src/data/wd.txt")
    val rdd2 = rdd.flatMap(line => line.split(" ")).map(word => {
      word.replaceAll(",", "").trim
    }).map(word => (word, 1))
      .reduceByKey(_ + _)
      .repartitionAndSortWithinPartitions(new HashPartitioner(4))
      .mapPartitions(x => {
        println("**********************the partition line ***********************")
        x.toList.map(y => (y._1, y._2 + 1)).toIterator
      })
    rdd2.foreach(x => println(x))
    sc.stop()
  }
}
------------------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------
package com.brd.util

import org.apache.spark.SparkContext

object sparkUtils {
  def mapPartUse(sc:SparkContext): Unit ={
    val rddMap =sc.parallelize(1 to 10,2).repartition(3)
    val rdd_result=rddMap.mapPartitions(x=>{
      println("execute once partition dealing")
      x.toList.map(y=>y*y).toIterator
    })
    rdd_result.foreach(x=>println(x))
  }
}

演示的示例1

核数为 :2 本地节点为 一个 executor
因此可以同时并行2个task

当分区数设置为3 ,则需要2轮次跑完,并且一个核空转
当分区数设置为1 ,则需要1轮跑完,并且一个核空转
当分区数设置为4,则需要2轮跑完,没有空转的核。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389448.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号