- 什么是RDD
- 怎么理解RDD
- 创建RDD3中方式
- 读取数据并产生RDD
- 读取普通文本数据
- 读取json格式的数据
- 读取CSV,TSV格式的数据
- 读取sequenceFile格式的数据
- 读取object个数的数据
- 读取HDFS中的数据
- 读取MySQL数据库中的数据
- 保存RDD的数据到外部存储
- 保存成普通文件
- 保存成json文件
- 保存成CSV,TSV格式文件
- 保存成sequenceFIle文件
- 写数据到HDFS
- 写入到MySQL数据库
- RDD的高级操作
- RDD缓存
- 缓存与释放RDD
- RDD的检查点机制(Checkpoint)
- RDD的依赖关系
- 窄依赖
- 宽依赖
- 广播
- 累加器
- 累加器的分类
(Resilient distributed dataset) 弹性的,分布式的数据集合 在Hadoop中,shuffle过程中当环形缓冲区的内存使用量达到80%,会将内存中的数据溢写到磁盘,防止内存溢出。 而在spark中,允许内存不足的情况下将集合数据溢写到磁盘。体现了弹性这个词,表明spark的内存优先的特性怎么理解RDD
可以从三个不同的角度理解: 数据存储,数据分析,程序调度
- 数据存储
RDD中的数据会被切分成多个分区(切片),分别存储在不同的主机上
内存优先。将数据缓存到内存的spark计算比MapReduce快100倍
血统关系(依赖链)保证数据的可靠性,某个RDD数据丢失,通过RDD的血统关系重新运行就可以恢复。也可以设置检查点缓存中间结果
- 数据分析
通过RDD算子实现数据分析。主要包含转换算子(transformation)和行动算子(action)
- 程序调度
创建RDD3中方式
- Driver 在主方法中创建Spark的上下文实例(SparkContext)
- Job 每出现一个行动操作,就生成一个job
- Stage 只要涉及到数据重组(交叉数据分区如shuffle过程)就产生一个stage
- Task 一个stage包含多个task,一个task一次只处理一个分区的数据。在同一个stage中,有多少分区就有多少task
- Master 在提交Spark程序时,需要与master服务通信,从而申请资源
- Worker 程序申请的运算资源由Worker服务所在的机器提供
- Executor 执行运算任务(Task)的进程,Executor进程负责接收Driver进程派发的任务
- 在程序内部创建
SparkContext.parallelize(Seq)的方式
- 从程序外部读取
SparkContext.textFile(path)
- RDD转换操作
rdd1.flatmap(_.split(" "))
读取数据并产生RDD
读取普通文本数据
// 读取单个文件
val conf = new SparkConf().setMaster("local").setAppName("read Text File")
val sc = new SparkContext(conf)
val inputTextFile = sc.textFile("./spark-demo/data/person.txt")
println(inputTextFile.collect.mkString(","))
// 读取多个文件,使用通配符
val conf = new SparkConf().setMaster("local").setAppName("read Text File")
val sc = new SparkContext(conf)
val inputTextFile = sc.textFile("./spark-demo/data
object MyReadFromTextFile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("read Text File")
val sc = new SparkContext(conf)
val inputTextFile = sc.textFile("./spark-demo/data/test.json")
val content = inputTextFile.map(JSON.parseFull)
println(content.collect.mkString(","))
}
}
读取CSV,TSV格式的数据
sc.textFile("./spark-demo/data/test.csv").flatMap(_.split(",")).foreach(println)
读取sequenceFile格式的数据
sc.sequenceFile("./spark-demo/data/test.sequence").collect.mkString(",").foreach(println)
读取object个数的数据
sc.objectFile("path")
读取HDFS中的数据
// 显示调用HDFS API
sc.newAPIHadoopFile // 未验证成功
// 方式二
sc.textFile("hdfs://mycluster:8020/data/wc/input/wc").flatMap(_.split(" ")).foreach(println)
读取MySQL数据库中的数据
pom中添加MySQL驱动保存RDD的数据到外部存储 保存成普通文件// 代码 val inputMysql = new JdbcRDD(sc, () => { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://192.168.7.17:3306/mysql_test?useUnicode=true&characterEncoding=utf8", "meifute", "meifute") }, "select * from student where s_id > ? and s_id <= ? ;", 1, 4, 1, // 分区数 // 结果集 r => (r.getInt(1), r.getString(2), r.getString(3), r.getString(4)) ) println("查询到的记录条数: " + inputMysql.count) inputMysql.foreach(println) mysql mysql-connector-java 5.1.27
val conf = new SparkConf().setMaster("local").setAppName("save data")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 10)
// 保存到本地文件系统
// data.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")
// 保存到hdfs,有读到配置文件时
data.saveAsTextFile("./mySaveData") //等价于hdfs://mycluster/user/god/mySaveData/
保存成json文件
// 和保存成普通文件差不多,只是多了一个转化为json的操作
val map1 = Map("name" -> "chauncy", "age" -> "23", "address" -> JSONArray(List("地址1", "地址2")))
val map2 = Map("name" -> "alice", "age" -> 18, "address" -> JSONArray(List("地址1", "地址2")))
val rddData = sc.parallelize(List(JSONObject(map1), JSONObject(map2)))
// rddData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")
rddData.saveAsTextFile("./mySaveData2") //等价于hdfs://mycluster/user/god/mySaveData2/
保存成CSV,TSV格式文件
val array = Array("chauncy", 18, "male", "65kg", "182cm")
// 转换成CSV格式
val csvData = sc.parallelize(Array(array.mkString(",")), 1)
csvData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData")
// 转换成CSV格式
val tsvData = sc.parallelize(Array(array.mkString("t")), 1)
tsvData.saveAsTextFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData2")
保存成sequenceFIle文件
// 保存成SequenceFile
val data = List(("name", "Chauncy"), ("age", 18))
val rddData = sc.parallelize(data, 1)
// rddData.saveAsSequenceFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData",Some(classOf[GzipCodec]))
val path:String = "file:///Users/jinxingguang/java_project/bigdata-chauncy/mySaveData"
rddData.saveAsSequenceFile(path,Option(classOf[SnappyCodec]))
写数据到HDFS
saveAsTextFile(path="hdfs://路径")写入到MySQL数据库
val conf = new SparkConf().setMaster("local").setAppName("save data")
val sc = new SparkContext(conf)
// 加载驱动
Class.forName("com.mysql.jdbc.Driver")
// 准备数据
val rddData = sc.parallelize(
List(
("09", "木星", "1995-11-11", "男"),
("10", "木叶1", "1995-05-20", "女"),
("11", "木叶2", "1995-05-20", "女"),
("12", "木叶3", "1995-05-20", "女"),
("13", "木叶4", "1995-05-20", "女")
),2)
// 遍历每个分区做批量插入数据
rddData.foreachPartition(
iter =>{
// 建立连接
val conn = DriverManager.getConnection(
"jdbc:mysql://192.168.7.17:3306/mysql_test?useUnicode=true&characterEncoding=utf8",
"meifute",
"meifute")
// 关闭自动提交,等插入完成后一次性提交会提高性能
conn.setAutoCommit(false)
// SQL语句
val preparedStatement = conn.prepareStatement("insert into student (s_id, s_name, s_birth, s_sex) values (?,?,?,?);")
// 赋值
iter.foreach(t=>{
preparedStatement.setString(1,t._1)
preparedStatement.setString(2,t._2)
preparedStatement.setString(3,t._3)
preparedStatement.setString(4,t._4)
preparedStatement.addBatch()
})
// 批量执行
preparedStatement.executeBatch()
// 提交
conn.commit()
// 关闭连接
conn.close()
})
RDD的高级操作
RDD缓存
RDD具有只读的特性,所以每次转换操作都会生成新的RDD。
任务分析过于复杂会产生过多的RDD
如何提高运算效率的同时节约内存空间?
下面就介绍RDD缓存,并通过广播与累加器有优化Spark程序
- 为什么要缓存RDD
内存使用过多,Spark会自动清理最不经常使用的RDD来释放内存
- RDD的生命周期与清理机制
在初始化SparkContext时,会创建ContextCleaner实例,其通过scala的模式匹配来清理缓存
ref.task match {
// 对RDD进行清理
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
// 对shuffle数据块进行清理
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
// 对广播数据进行清理
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
// 对累加器进行清理
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
// 对检查点进行清理
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
- 缓存RDD的方法
cache方法,是persist的简陋版 缓存到内存 persist方法 -- 可以指定缓存的地方 如内存,磁盘,堆外内存等
- 缓存级别
// 源码: StorageLevel.scala // 变量名中有_2表示数据将被缓存两份 // 不缓存 val NONE = new StorageLevel(false, false, false, false) // 仅将RDD缓存到磁盘 val DISK_onLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) // 仅将RDD缓存到内存,内存不足时,在下次使用时,需要对没有缓存的数据重新计算 val MEMORY_onLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) // 缓存时进行序列化,可节约存储空间,但会消耗更多CPU val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) // 缓存到内存和磁盘,使用时先从内存找然后从磁盘找 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) // 缓存时进行序列化到内存和磁盘,使用是先从内存找再从磁盘找 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) // 缓存到堆外内存共享 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)缓存与释放RDD
sogou数据 wget http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.tar.gz [god@node01 ~]$ hdfs dfs -mkdir /data/sogou [god@node01 ~]$ hdfs dfs -put SogouQ.reduced /data/sogou/
// 标记缓存 rddData1.cache() rddData1.persist() rddData1.persist(StorageLevel.MEMORY_ONLY) // 释放 // 程序结束会调用ContextCleaner // 自己手动释放 rddData1.unpersist(true)RDD的检查点机制(Checkpoint)
缓存RDD不适用于复杂的调用链,过多的依赖链,且数据一般是存储在运算机器上的。
RDD缓存时将RDD数据存储在内存或其他存储介质,不会切断依赖链,缓存失效会重新计算以恢复数据
Checkpoint机制是将数据存储在HDFS或本地磁盘,直接读取检查点目录的数据来恢复RDD数据
Checkpoint机制一般将RDD数据存储在HDFS,切会切断RDD的上游依赖链
val conf = new SparkConf().setMaster("local").setAppName("rdd checkpoint")
val sc = new SparkContext(conf)
// hdfs dfs -mkdir -p /spark_checkpoint/c1
// 设置CheckpointDir,不然会抛异常
sc.setCheckpointDir("hdfs://mycluster/spark_checkpoint/c1")
val rddData1 = sc.parallelize(1 to 100, 2)
val rddData2 = rddData1.map(_ * 2)
println(rddData2.dependencies.head.rdd)
// 标记缓存RDD
rddData2.persist(StorageLevel.DISK_ONLY)
rddData2.checkpoint() // 标记checkpoint,等待action操作
println(rddData2.dependencies.head.rdd) // 获取上游RDD
val rddData3 = rddData2.map(_ + 3)
val rddData4 = rddData2.map(_ + 4)
rddData3.collect() // 行动
rddData4.collect() // 行动
// 上游RDD会被切断,变成rddData4
println(rddData2.dependencies.head.rdd)
rddData2.unpersist(true)
RDD的依赖关系
窄依赖
宽依赖分区策略一样,分区数一样
广播分区策略不同或分区数不同,父RDD分区分叉了
广播数据以序列化的形式发送到计算任务的机器,在计算任务前,通过反序列化将数据复原
package com.chauncy.spark_rdd.spark_rdd_gaoji
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
case class CityInfo(cityCode: String, cityName: String)
case class UserInfo(userID: String, telephone: String, userName: String)
object MyBrocast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("Br")
val sc = new SparkContext(conf)
val cityDetailMap = Map(
"010" -> "北京",
"021" -> "上海",
"020" -> "广州",
"0755" -> "深圳")
val userDetailMap = Map(
"13212345678" -> ("userID_001", "Spark"),
"13921002300" -> ("userID_002", "Hadoop"),
"13700222200" -> ("userID_003", "Scala"),
"18765655656" -> ("userID_004", "Python"),
"13323455432" -> ("userID_005", "Java"),
"13114700741" -> ("userID_006", "Hive"))
// 将数据加入广播
val cdmBroadcast = sc.broadcast(cityDetailMap)
val udmBroadcast = sc.broadcast(userDetailMap)
val userArray = Array(
("010", "13921002300"),
("010", "18765655656"),
("0755", "13114700741"),
("020", "13323455432"),
("020", "13212345678"))
val userRDD = sc.parallelize(userArray, 2)
val aggregateRDD = userRDD.combineByKey(
(telStr1: String) => mutable.Set[String](telStr1),
(telSet: mutable.Set[String], telStr: String) => telSet += telStr,
(telephoneSet1: mutable.Set[String], telephoneSet2: mutable.Set[String]) => telephoneSet1 ++= telephoneSet2
)
val resultRDD = aggregateRDD.map(info => {
val cityInfo = CityInfo(info._1, cdmBroadcast.value(info._1))
val userInfoSet = collection.mutable.Set[UserInfo]()
for (telephone <- info._2) {
val idAndName = udmBroadcast.value(telephone)
val userInfo = UserInfo(idAndName._1, telephone, idAndName._2)
userInfoSet.add(userInfo)
}
(cityInfo, userInfoSet)
})
print(resultRDD.collect.mkString(","))
cdmBroadcast.unpersist
udmBroadcast.unpersist
sc.stop()
}
}
累加器
累加器的分类跨节点传输数据,累加器提供了一种共享数据的机制
- 长整数累加器
- 双精度浮点累加器
- 集合累加器
- 自定义累加器(2.0版本后) 继承AccumulaorV2抽象类
val rddData = sc.parallelize(Array(
("Bob", 15),
("chauncy", 12),
("chauncy", 23),
("chauncy", 34),
("mft", 45),
("mft", 56),
("mft", 67),
("yiyun", 78),
("yiyun", 89)
), 3)
val acc = sc.longAccumulator("我心飞翔")
rddData.foreach(line => {
if (line._2 % 2 ==0) acc.add(1)
})
println(acc)
// 集合累加器
case class User(name:String,age:Int)
...
val scc = sc.collectionAccumulator[User]("集合累加器")
scc.add(user)
// 自定义累加器
class MyExtendAcc[T] extends AccumulatorV2[T, Array[Int]]



