1)获取内存中的RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//从内存中获取RDD
object _01FromMemory {
def main(args: Array[String]): Unit = {
//配置文件
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("createRDD")
//获取上下文对象
val sc = new SparkContext(conf)
//从内存中创建RDD
val list = List(3, 6, 78, 9)
val rdd: RDD[Int] = sc.parallelize(list)
rdd.foreach(println)
println("--------------------------------")
val rdd2 = sc.makeRDD(list)
rdd2.foreach(println)
//3
//6
//78
//9
//--------------------------------
//3
//6
//78
//9
}
}
2)获取外部数据的RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//从外部数据获取RDD
object _02FromOthers {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("getRDD")
val sc = new SparkContext(conf)
val rdd = sc.textFile("D://input//words.txt")
val rdd2: RDD[String] = sc.textFile("data/1.txt")
//打印结果
rdd.collect().foreach(println)
println("-------------------------")
rdd2.collect().foreach(println)
//hello world
//hello spark
//hello scala
//hello java
//hello spark
//-------------------------
//12
//12345
//3
}
}
3)读取内存数据的并行度和分区
import org.apache.spark.{SparkConf, SparkContext}
//读取内存中的分区和并行度
//并行度指的就是分区的数量,也同时指的就是Task的数量
object _03PartitionFromMemory {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("readRDD")
conf.set("spark.default.parallelism","4") //如果取消掉就是走local[*]
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(3, 5, 6, 6, 7))
rdd.saveAsTextFile("out")
sc.stop()
//我使用的是4core 所以会有四个分区 每个分区平均分配到一个数据
}
}
4)读取外部数据的并行度和分区
package com.qf.spark.wc.day03.getrdd
import org.apache.spark.{SparkConf, SparkContext}
object _04PartitionFromOuterFile {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("readRDD")
val sc = new SparkContext(conf)
//textFile的第二个参数解析:表示设置最小分区数,因此实际分区数可能大于该值
val rdd = sc.textFile("data/1.txt", 2)
rdd.saveAsTextFile("out")
sc.stop()
}
//这里会生成了两个分区,因为第二个参数进行了设置
}
//生成分区1的数据
//12
//12345
//分区2
//3
5)内存分区的数据
import org.apache.spark.{SparkConf, SparkContext}
object _05PartitionDataFromMemory {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1, 3, 8, 9),2)
//获取分区数的方法
rdd.saveAsTextFile("out")//生成分区
sc.stop()
//result
//1 3 一个分分区
//8 9 一个分区
//总结:均分,如不能均分就从后面的分区开始加入数据
}
}
6)外部文件的分区的数据
import org.apache.spark.{SparkConf, SparkContext}
//获取外部文件分区的数据
//spark的分区数量就是和MR的分片数量有关,而MR的分片计算有一个阈值1.1
object _06PartitionDataFromFile {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
val sc = new SparkContext(conf)
val rdd = sc.textFile("data/1.txt", 2)
rdd.saveAsTextFile("out")
sc.stop()
//解释
// 比如数据如下:
// 12@@
// 12345@@
// 1
//
// 总共12个字节,期望每个分区6个字节,但是通常来说,上一个分区和下一个分区的数据的临界行,会跨这两个分区
// 也就是上一个分区的最后一行的数据是溢出的,因此要读取到换行符号才行
// 第一个分区的数据: 12@@
// 12345@@ 在同一个分区
// 第二个分区的数据,不会重读读过的数据,因此只有1
//result 生成如下两个分区
//12
//12345
//-----------------
//1
}
}
6)RDD的读取和保存
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object _07RDDIO {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MemoryData")
val sc = new SparkContext(conf)
// val value = sc.textFile("data/1.txt")
// val value: RDD[(String, String)] = sc.wholeTextFiles("data")
// val value: RDD[Nothing] = sc.objectFile("data/part-00001")
val value: RDD[(String, Int)] = sc.sequenceFile("data/part-00001")
value.saveAsTextFile("out")
// value.saveAsObjectFile("outObject")
// value.map((_,1)).saveAsSequenceFile("outSequence")
value.foreach(println)
}
}



