- 1、Transform算子
- 1.1 map
- 1.2 flatmap
- 1.3 groupBy和groupBykey
- 1.4 filter
- 1.5 Mappartitions
- 1.6 mapValues
- 1.7 sort
- 1.8 simple
- 1.9 union
- 2、 Actions算子
- 2.1 count,collect,reduce,save,lookup
- 2.2 foreach 和 foreachPartition
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDMap {
def main(args: Array[String]): Unit = {
//创建spark的上下文环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDdemo1")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//textFile是读取文件的RDD形式,parallelize是创建一个list集合的方式
val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
//println("map之前")
val mapRDD: RDD[Int] = listRDD.map(i => {
println("i的值" + i)
i * 20
})
//println("map之后")
// mapRDD.foreach(println)
// listRDD.foreach(println)
val JiShuRDD: RDD[Int] = listRDD.filter(i => {
var flag: Boolean = false
if (i % 2 == 1) {
flag = true
}
flag
})
JiShuRDD.foreach(println)
while (true){
}
}
}
1.2 flatmap
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object SparkRDDFlatmap {
def main(args: Array[String]): Unit = {
//创建上下文环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDFlatmap")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.parallelize(List("java,java,scala,python","hadoop,hive,hbase","spark,filk,MapReduce"))
val splitsRDD: RDD[String] = lineRDD.flatMap(word => {
word.split(",")
})
val groupByRDD: RDD[(String, Iterable[String])] = splitsRDD.groupBy(word=>word)
val wordcountRDD : RDD[(String, Int)] = groupByRDD.map(kv => {
val key: String = kv._1
val value: Iterable[String] = kv._2
val size: Int = value.size
(key, size)
})
wordcountRDD.foreach(println)
groupByRDD.foreach(println)
splitsRDD.foreach(println)
lineRDD.foreach(println)
}
}
1.3 groupBy和groupBykey
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkRDDGroupBy {
def main(args: Array[String]): Unit = {
//创建spark上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDGroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//读取students数据
val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
//统计班级人数
val clazzRDD: RDD[(String, Int)] = lineRDD.map(line => (line.split(",")(4), 1))
//clazzRDD.foreach(println)
//按班级分组
val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(kv => kv._1)
// groupRDD.foreach(println)
//统计班级人数
val sum_clazzRDD: RDD[(String, Int)] = groupRDD.map {
case (key: String, iter: Iterable[(String, Int)]) => {
val clazz_sum: Int = iter.map(lin => lin._2).sum
(key, clazz_sum)
}
}
//sum_clazzRDD.foreach(println)
val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzRDD.groupByKey()
groupByKeyRDD.map(kv=>(kv._1,kv._2.sum)).foreach(println)
}
}
1.4 filter
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDFilter{
def main(args: Array[String]): Unit = {
//创建Spark上下环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDFilter")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//读取students数据
val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
//过滤除理科班学生
lineRDD.filter(line=>{
val splits: Array[String] = line.split(",")
//startsWith是字符串中以某某为前缀的方法
splits(4).startsWith("理科")
}).foreach(println)
}
}
1.5 Mappartitions
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDMappartitions {
def main(args: Array[String]): Unit = {
//创建上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDMappartitions")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/words")
//对每一个分区的数据进行处理,这里有三份文件,既有三个分区,每一个分区至少对应一个task
//适用于在算子内部需要跟外部数据源建立连接的情况
//通过mapPartitions这种方式可以减少连接创建的次数,顺便提高运行效率
lineRDD.mapPartitions((iter: Iterator[String]) => {
println("map partitions") //打印三次
//迭代器也有map等方法
iter.flatMap(line => {
line.split(",")
})
}).foreach(println)
//对每一条数据进行处理,假设有N条数据
//如果需要在map中例如去请求mysql的数据(一般创建连接是为了获取数据),那么会与mysql建立N次连接
//会导致运行效率较低,甚至会导致mysql建立的连接数达到上限,出现性能问题
lineRDD.map(line => {
println("map")
val strings: Array[String] = line.split(",")
strings
}).foreach(println)
lineRDD.mapPartitionsWithIndex((index,iter)=>{
println("当前的分区索引:"+index)
iter.flatMap(line=>line.split(",0"))
}).foreach(println)
}
}
1.6 mapValues
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDMapvalues {
def main(args: Array[String]): Unit = {
//创建spark上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDGroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//只能作用在k-v格式的RDD上,相当于对values进行遍历
val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三",1),("李四",2),("王五",3)))
rdd.mapValues(i=>i*i).foreach(println)
}
}
1.7 sort
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDsort {
def main(args: Array[String]): Unit = {
//创建spark上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDGroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
//按照年龄排序,倒序
//ascending 默认升序排序
stuRDD.sortBy(stu=>stu.split(",")(2),ascending = false)
.foreach(println)
}
}
1.8 simple
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDSample {
def main(args: Array[String]): Unit = {
//创建spark上下文环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDSimple")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
val sampleRDD: RDD[String] = lineRDD.sample(false, 0.2)
sampleRDD.foreach(println)
}
}
1.9 union
package com.shujia.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDUnion {
def main(args: Array[String]): Unit = {
//创建spark上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDGroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//通过集合创建RDD
//两个RDD union格式必须一致
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6))
val rdd2: RDD[Int] = sc.parallelize(List(4,5,6,7,8,9))
rdd1.union((rdd2)).foreach(println)
}
}
2、 Actions算子
2.1 count,collect,reduce,save,lookup
package com.shujia.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkRDDAction {
def main(args: Array[String]): Unit = {
//创建spark上下文的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDGroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//读取students、scores数据
val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
//foreach 没有返回值,会触发job
//需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit
stuRDD.foreach(println)
println(stuRDD.count())
val stuArr: Array[String] = stuRDD.collect()
val blackListRDD: RDD[String] = sc.parallelize(List("1500100001","1500100007","1500100009"))
//我们可以在算子外部先调用collect方法然后再算子内部调用
val ListRDD: Array[String] = blackListRDD.collect()
stuRDD.filter(stu=>{
ListRDD.contains(stu.split(",")(0))
}).foreach(println)
//传入一个聚合函数
//select sum(age) from students group by 1
//全局的聚合(将所有的数据作为一个组进行聚合)
stuRDD.map(line=>line.split(",")(2))
.reduce((i,j)=>i+j)
.foreach(println)
stuRDD.saveAsTextFile("")
val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0)))
.lookup("宣谷芹")
println(ids)
}
}
2.2 foreach 和 foreachPartition
package com.shujia.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDForeach {
def main(args: Array[String]): Unit = {
//创建上下文环境
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkRDDForeach")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//读取数据,设置了是个分区
val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt", 4)
println(lineRDD.getNumPartitions)
//创建mysql连接
//遍历每一条数据
//因为不需要返回值,所以选择foreach行为算子遍历
// lineRDD.foreach(line=>{
// //连接是不能被序列化的,所以连接的建立需要放入算子内部
// //foreach是针对每一条数据处理一次,相当于这里会创建1000次连接,会造成性能问题
// val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456")
// val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")
// val splits: Array[String] = line.split(",")
// val id: Int = splits(0).toInt
// val name: String = splits(1)
// val age: Int = splits(2).toInt
// val gender: String = splits(3)
// val clazz: String = splits(4)
// ps.setInt(1,id)
// ps.setString(2,name)
// ps.setInt(3,age)
// ps.setString(4,gender)
// ps.setString(5,clazz)
// ps.execute()
// ps.close()
// conn.close()
// })
lineRDD.foreachPartition(iter => {
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")
//这里的foreach方法实际上不是RDD的算子,这里是Iterator的foreach方法
//不会出现连接未被序列化的问题,当前处理的分区数据都会共用一个连接
iter.foreach(line => {
val splits: Array[String] = line.split(",")
val id: Int = splits(0).toInt
val name: String = splits(1)
val age: Int = splits(2).toInt
val gender: String = splits(3)
val clazz: String = splits(4)
ps.setInt(1, id)
ps.setString(2, name)
ps.setInt(3, age)
ps.setString(4, gender)
ps.setString(5, clazz)
//相当于每条数据插入一次,性能也比较低
//ps.execute()
ps.addBatch()
})
//采用批量插入的方式
ps.executeBatch()
ps.close()
conn.close()
})
}
}
说明:代码中所涉及到的数据,可以联系本人获取



