栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-RDD操作

Spark-RDD操作

Spark-RDD操作
  • 什么是RDD
  • 怎么理解RDD
  • 创建RDD3中方式
  • 读取数据并产生RDD
    • 读取普通文本数据
    • 读取json格式的数据
    • 读取CSV,TSV格式的数据
    • 读取sequenceFile格式的数据
    • 读取object个数的数据
    • 读取HDFS中的数据
    • 读取MySQL数据库中的数据
  • 保存RDD的数据到外部存储
    • 保存成普通文件
    • 保存成json文件
    • 保存成CSV,TSV格式文件
    • 保存成sequenceFIle文件
    • 写数据到HDFS
    • 写入到MySQL数据库
  • RDD的高级操作
    • RDD缓存
    • 缓存与释放RDD
    • RDD的检查点机制(Checkpoint)
    • RDD的依赖关系
      • 窄依赖
      • 宽依赖
    • 广播
    • 累加器
      • 累加器的分类

什么是RDD
(Resilient distributed dataset) 弹性的,分布式的数据集合

在Hadoop中,shuffle过程中当环形缓冲区的内存使用量达到80%,会将内存中的数据溢写到磁盘,防止内存溢出。
而在spark中,允许内存不足的情况下将集合数据溢写到磁盘。体现了弹性这个词,表明spark的内存优先的特性

怎么理解RDD

可以从三个不同的角度理解: 数据存储,数据分析,程序调度

  • 数据存储
  1. RDD中的数据会被切分成多个分区(切片),分别存储在不同的主机上

  2. 内存优先。将数据缓存到内存的spark计算比MapReduce快100倍

  3. 血统关系(依赖链)保证数据的可靠性,某个RDD数据丢失,通过RDD的血统关系重新运行就可以恢复。也可以设置检查点缓存中间结果

  • 数据分析

通过RDD算子实现数据分析。主要包含转换算子(transformation)和行动算子(action)

  • 程序调度
  1. Driver 在主方法中创建Spark的上下文实例(SparkContext)
  2. Job 每出现一个行动操作,就生成一个job
  3. Stage 只要涉及到数据重组(交叉数据分区如shuffle过程)就产生一个stage
  4. Task 一个stage包含多个task,一个task一次只处理一个分区的数据。在同一个stage中,有多少分区就有多少task
  5. Master 在提交Spark程序时,需要与master服务通信,从而申请资源
  6. Worker 程序申请的运算资源由Worker服务所在的机器提供
  7. Executor 执行运算任务(Task)的进程,Executor进程负责接收Driver进程派发的任务
创建RDD3中方式
  • 在程序内部创建
SparkContext.parallelize(Seq)的方式
  • 从程序外部读取
SparkContext.textFile(path)
  • RDD转换操作
rdd1.flatmap(_.split(" "))
读取数据并产生RDD 读取普通文本数据
// 读取单个文件
val conf = new SparkConf().setMaster("local").setAppName("read Text File")
val sc = new SparkContext(conf)
val inputTextFile = sc.textFile("./spark-demo/data/person.txt")
println(inputTextFile.collect.mkString(","))

// 读取多个文件,使用通配符
val conf = new SparkConf().setMaster("local").setAppName("read Text File")
val sc = new SparkContext(conf)
val inputTextFile = sc.textFile("./spark-demo/data
object MyReadFromTextFile {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("read Text File")
    val sc = new SparkContext(conf)
    val inputTextFile = sc.textFile("./spark-demo/data/test.json")
    val content = inputTextFile.map(JSON.parseFull)
    println(content.collect.mkString(","))
  }
}

读取CSV,TSV格式的数据
sc.textFile("./spark-demo/data/test.csv").flatMap(_.split(",")).foreach(println)
读取sequenceFile格式的数据
sc.sequenceFile("./spark-demo/data/test.sequence").collect.mkString(",").foreach(println)
读取object个数的数据
sc.objectFile("path")
读取HDFS中的数据
// 显示调用HDFS API
sc.newAPIHadoopFile // 未验证成功

// 方式二
sc.textFile("hdfs://mycluster:8020/data/wc/input/wc").flatMap(_.split(" ")).foreach(println)
读取MySQL数据库中的数据
pom中添加MySQL驱动

	mysql
	mysql-connector-java
	5.1.27



// 代码
val inputMysql = new JdbcRDD(sc,
      () => {
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://192.168.7.17:3306/mysql_test?useUnicode=true&characterEncoding=utf8", "meifute", "meifute")
      },
      "select * from student where s_id > ? and s_id <= ? ;",
      1,
      4,
      1, // 分区数
      // 结果集
      r => (r.getInt(1), r.getString(2), r.getString(3), r.getString(4))
    )
    println("查询到的记录条数: " + inputMysql.count)
    inputMysql.foreach(println)
保存RDD的数据到外部存储 保存成普通文件
val conf = new SparkConf().setMaster("local").setAppName("save data")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 10)
// 保存到本地文件系统
// data.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")
// 保存到hdfs,有读到配置文件时
data.saveAsTextFile("./mySaveData")  //等价于hdfs://mycluster/user/god/mySaveData/
保存成json文件
//  和保存成普通文件差不多,只是多了一个转化为json的操作
val map1 = Map("name" -> "chauncy", "age" -> "23", "address" -> JSONArray(List("地址1", "地址2")))
    val map2 = Map("name" -> "alice", "age" -> 18, "address" -> JSONArray(List("地址1", "地址2")))
    val rddData = sc.parallelize(List(JSONObject(map1), JSONObject(map2)))
//    rddData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")
    rddData.saveAsTextFile("./mySaveData2")  //等价于hdfs://mycluster/user/god/mySaveData2/
保存成CSV,TSV格式文件
val array = Array("chauncy", 18, "male", "65kg", "182cm")

    // 转换成CSV格式
    val csvData = sc.parallelize(Array(array.mkString(",")), 1)
    csvData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")


    // 转换成CSV格式
    val tsvData = sc.parallelize(Array(array.mkString("t")), 1)
    tsvData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData2")
保存成sequenceFIle文件
// 保存成SequenceFile 
val data = List(("name", "Chauncy"), ("age", 18))
val rddData = sc.parallelize(data, 1)
//    rddData.saveAsSequenceFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData",Some(classOf[GzipCodec]))
val path:String = "file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData"
rddData.saveAsSequenceFile(path,Option(classOf[SnappyCodec]))
写数据到HDFS
saveAsTextFile(path="hdfs://路径")

写入到MySQL数据库
val conf = new SparkConf().setMaster("local").setAppName("save data")
val sc = new SparkContext(conf)

// 加载驱动
Class.forName("com.mysql.jdbc.Driver")
// 准备数据
val rddData = sc.parallelize(
List(
("09", "木星", "1995-11-11", "男"),
("10", "木叶1", "1995-05-20", "女"),
("11", "木叶2", "1995-05-20", "女"),
("12", "木叶3", "1995-05-20", "女"),
("13", "木叶4", "1995-05-20", "女")
),2)
// 遍历每个分区做批量插入数据
rddData.foreachPartition(
iter =>{
// 建立连接
val conn = DriverManager.getConnection(
"jdbc:mysql://192.168.7.17:3306/mysql_test?useUnicode=true&characterEncoding=utf8",
"meifute",
"meifute")
// 关闭自动提交,等插入完成后一次性提交会提高性能
conn.setAutoCommit(false)
// SQL语句
val preparedStatement = conn.prepareStatement("insert into student (s_id, s_name, s_birth, s_sex) values (?,?,?,?);")
// 赋值
iter.foreach(t=>{
preparedStatement.setString(1,t._1)
preparedStatement.setString(2,t._2)
preparedStatement.setString(3,t._3)
preparedStatement.setString(4,t._4)
preparedStatement.addBatch()
})
// 批量执行
preparedStatement.executeBatch()
// 提交
conn.commit()
// 关闭连接
conn.close()
})
RDD的高级操作
  1. RDD具有只读的特性,所以每次转换操作都会生成新的RDD。

  2. 任务分析过于复杂会产生过多的RDD

  3. 如何提高运算效率的同时节约内存空间?

下面就介绍RDD缓存,并通过广播与累加器有优化Spark程序

RDD缓存
  • 为什么要缓存RDD

内存使用过多,Spark会自动清理最不经常使用的RDD来释放内存

  • RDD的生命周期与清理机制

在初始化SparkContext时,会创建ContextCleaner实例,其通过scala的模式匹配来清理缓存

ref.task match {
  // 对RDD进行清理
  case CleanRDD(rddId) =>
  doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
  // 对shuffle数据块进行清理
  case CleanShuffle(shuffleId) =>
  doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
  // 对广播数据进行清理
  case CleanBroadcast(broadcastId) =>
  doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
  // 对累加器进行清理
  case CleanAccum(accId) =>
  doCleanupAccum(accId, blocking = blockOnCleanupTasks)
  // 对检查点进行清理
  case CleanCheckpoint(rddId) =>
  doCleanCheckpoint(rddId)
  }
  • 缓存RDD的方法
cache方法,是persist的简陋版  缓存到内存

persist方法  -- 可以指定缓存的地方 如内存,磁盘,堆外内存等
  • 缓存级别
// 源码: StorageLevel.scala
// 变量名中有_2表示数据将被缓存两份

// 不缓存
val NONE = new StorageLevel(false, false, false, false)
// 仅将RDD缓存到磁盘
val DISK_onLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
// 仅将RDD缓存到内存,内存不足时,在下次使用时,需要对没有缓存的数据重新计算
val MEMORY_onLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
// 缓存时进行序列化,可节约存储空间,但会消耗更多CPU
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
// 缓存到内存和磁盘,使用时先从内存找然后从磁盘找
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
// 缓存时进行序列化到内存和磁盘,使用是先从内存找再从磁盘找
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
// 缓存到堆外内存共享
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

缓存与释放RDD
sogou数据
wget http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.tar.gz

[god@node01 ~]$ hdfs dfs -mkdir /data/sogou
[god@node01 ~]$ hdfs dfs -put SogouQ.reduced /data/sogou/

// 标记缓存
rddData1.cache()
rddData1.persist()
rddData1.persist(StorageLevel.MEMORY_ONLY)

// 释放
// 程序结束会调用ContextCleaner

// 自己手动释放
rddData1.unpersist(true)
RDD的检查点机制(Checkpoint)

缓存RDD不适用于复杂的调用链,过多的依赖链,且数据一般是存储在运算机器上的。

RDD缓存时将RDD数据存储在内存或其他存储介质,不会切断依赖链,缓存失效会重新计算以恢复数据

Checkpoint机制是将数据存储在HDFS或本地磁盘,直接读取检查点目录的数据来恢复RDD数据

Checkpoint机制一般将RDD数据存储在HDFS,切会切断RDD的上游依赖链

val conf = new SparkConf().setMaster("local").setAppName("rdd checkpoint")
    val sc = new SparkContext(conf)
//    hdfs dfs -mkdir -p /spark_checkpoint/c1
		// 设置CheckpointDir,不然会抛异常
    sc.setCheckpointDir("hdfs://mycluster/spark_checkpoint/c1")
    val rddData1 = sc.parallelize(1 to 100, 2)
    val rddData2 = rddData1.map(_ * 2)
    println(rddData2.dependencies.head.rdd)
    // 标记缓存RDD
    rddData2.persist(StorageLevel.DISK_ONLY)
    rddData2.checkpoint() // 标记checkpoint,等待action操作
    println(rddData2.dependencies.head.rdd) // 获取上游RDD

    val rddData3 = rddData2.map(_ + 3)
    val rddData4 = rddData2.map(_ + 4)

    rddData3.collect() // 行动
    rddData4.collect() // 行动
		// 上游RDD会被切断,变成rddData4
    println(rddData2.dependencies.head.rdd)
    rddData2.unpersist(true)
RDD的依赖关系 窄依赖

分区策略一样,分区数一样

宽依赖

分区策略不同或分区数不同,父RDD分区分叉了

广播

广播数据以序列化的形式发送到计算任务的机器,在计算任务前,通过反序列化将数据复原

package com.chauncy.spark_rdd.spark_rdd_gaoji

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable




case class CityInfo(cityCode: String, cityName: String)

case class UserInfo(userID: String, telephone: String, userName: String)

object MyBrocast {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("Br")
    val sc = new SparkContext(conf)

    val cityDetailMap = Map(
      "010" -> "北京",
      "021" -> "上海",
      "020" -> "广州",
      "0755" -> "深圳")

    val userDetailMap = Map(
      "13212345678" -> ("userID_001", "Spark"),
      "13921002300" -> ("userID_002", "Hadoop"),
      "13700222200" -> ("userID_003", "Scala"),
      "18765655656" -> ("userID_004", "Python"),
      "13323455432" -> ("userID_005", "Java"),
      "13114700741" -> ("userID_006", "Hive"))

    // 将数据加入广播
    val cdmBroadcast = sc.broadcast(cityDetailMap)
    val udmBroadcast = sc.broadcast(userDetailMap)

    val userArray = Array(
      ("010", "13921002300"),
      ("010", "18765655656"),
      ("0755", "13114700741"),
      ("020", "13323455432"),
      ("020", "13212345678"))

    val userRDD = sc.parallelize(userArray, 2)

    
    val aggregateRDD = userRDD.combineByKey(
      (telStr1: String) => mutable.Set[String](telStr1),
      (telSet: mutable.Set[String], telStr: String) => telSet += telStr,
      (telephoneSet1: mutable.Set[String], telephoneSet2: mutable.Set[String]) => telephoneSet1 ++= telephoneSet2
    )

    val resultRDD = aggregateRDD.map(info => {
      val cityInfo = CityInfo(info._1, cdmBroadcast.value(info._1))
      val userInfoSet = collection.mutable.Set[UserInfo]()
      for (telephone <- info._2) {
        val idAndName = udmBroadcast.value(telephone)
        val userInfo = UserInfo(idAndName._1, telephone, idAndName._2)
        userInfoSet.add(userInfo)
      }
      (cityInfo, userInfoSet)
    })

    print(resultRDD.collect.mkString(","))

    cdmBroadcast.unpersist
    udmBroadcast.unpersist
    sc.stop()
  }
}

累加器

跨节点传输数据,累加器提供了一种共享数据的机制

累加器的分类
  • 长整数累加器
  • 双精度浮点累加器
  • 集合累加器
  • 自定义累加器(2.0版本后) 继承AccumulaorV2抽象类
val rddData = sc.parallelize(Array(
      ("Bob", 15),
      ("chauncy", 12),
      ("chauncy", 23),
      ("chauncy", 34),
      ("mft", 45),
      ("mft", 56),
      ("mft", 67),
      ("yiyun", 78),
      ("yiyun", 89)
    ), 3)

    val acc = sc.longAccumulator("我心飞翔")
    rddData.foreach(line => {
      if (line._2 % 2 ==0) acc.add(1)
    })
    println(acc)

// 集合累加器
case class User(name:String,age:Int)

...
val scc = sc.collectionAccumulator[User]("集合累加器")

scc.add(user)


// 自定义累加器
class MyExtendAcc[T] extends AccumulatorV2[T, Array[Int]]
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/317550.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号