- combineByKey
- join
- leftOuterJoin
- cogroup
- 实现统计每个省份的广告阅读量
- 转换算子
- reduce
- collect
- count
- first
- take
- takeOrdered
- aggregate
- fold
- countByValue
- countByKey
- 不同方法实现wordCount
- save
- foreach
- RDD 序列化
- 后缀表达式
- 递归
- 八皇后
- 迷宫
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1), ("a", 1)), 2)
val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => (t._1 + v, t._2 + 1),
(t1:(Int,Int), t2:(Int,Int)) => (t1._1 + t2._1, t1._2 + t2._2)
)
newRDD.collect().foreach(println)
sc.stop()
}
join
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("d",7)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6),("a",8)))
//两个数据源中,相同的key的value会连接在一起,形成元组,如果两个数据源中key没有匹配上,那么数据不会出现在结果中
//两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积
val joinRDD = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
}
leftOuterJoin
类似于 SQL 语句的左外连接
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5)))
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
}
cogroup
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5)))
val cogroupRDD = rdd1.cogroup(rdd2)
cogroupRDD.collect().foreach(println)
}
实现统计每个省份的广告阅读量
package operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Rep {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
// TODO
//1、获取原始数据:时间戳,省份,城市,用户,广告
val dataRDD: RDD[String] = sc.textFile("datas/agent.log")
val mapRDD: RDD[((String, String), Int)] = dataRDD.map(
line => {
val datas: Array[String] = line.split(" ")
((datas(1), datas(4)), 1)
}
)
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
case ((prv, ad), sum) => {
(prv, (ad, sum))
}
}
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
val resultRDD = groupRDD.mapValues(
iter => iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
)
resultRDD.collect().foreach(println)
}
}
转换算子
//行动算子其实是触发作业(job)执行的方法 //底层代码调用的是环境对象的runJob方法 //底层代码中会创建ActiveJob,并提交执行reduce
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
collect在驱动程序中,以数组 Array 的形式返回数据集的所有元素
count返回 RDD 中元素的个数
first返回 RDD 中的第一个元素
take返回一个由 RDD 的前 n 个元素组成的数组
takeOrdered返回该 RDD 排序后的前 n 个元素组成的数组
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4))
val i = rdd.reduce(_ + _)
println(i)
//collect():方法会将不同分区的数据按照分区顺序采集到Drive端内存中,形成数组
val ints=rdd.collect()
println(ints.mkString(","))
//first:取出数据的
val first=rdd.first()
println(first)
//take:获取n个数据
val take=rdd.take(3);
println(take.mkString(","))
//takeOrdered
val takeOrdered = rdd.takeOrdered(3)
println(takeOrdered.mkString(","))
sc.stop()
}
aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4),2)
val result=rdd.aggregate(10)(_+_,_+_)
//aggregateByKey:初始值只会参与分区内的计算
//aggregate:初始值会参与分区内计算,并且参与分区间就算
//13+17+10=40
println(result)
sc.stop()
}
fold
折叠操作,aggregate 的简化版操作
countByValue统计每种 key 的个数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4),2)
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
sc.stop()
}
countByKey
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
//val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4),2)
val rdd:RDD[(String,Int)]=sc.makeRDD(List(("a",1),("a",2),("a",3)))
val intToLong: collection.Map[String, Long] = rdd.countByKey()
println(intToLong)
sc.stop()
}
不同方法实现wordCount
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
wordCount8(sc)
sc.stop()
}
def wordCount1(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
def wordCount2(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group = wordOne.groupByKey()
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
def wordCount3(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCOunt = wordOne.aggregateByKey(0)(_ + _, _ + _)
//
// wordOne.foldByKey(0)(_+_)
}
def wordCount4(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val countWord: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
}
def wordCount5(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
v => v,
(x: Int, y) => x + y,
(x: Int, y: Int) => x + y
)
}
def wordCount6(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val countWord: collection.Map[String, Long] = wordOne.countByKey()
}
def wordCount7(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val stringToLong: collection.Map[String, Long] = words.countByValue()
}
def wordCount8(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val mapWord = words.map(
word => {
mutable.Map[String, Long]((word, 1))
}
)
val wordCount = mapWord.reduce(
(map1, map2) => {
map2.foreach({
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
})
map1
}
)
println(wordCount)
}
save
将数据保存到不同格式的文件中
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
//必须格式化成键值类型
rdd.map((_,1)).saveAsSequenceFile("output2")
foreach
分布式遍历 RDD 中的每一个元素,调用指定函数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//foreach其实是Drive内存集合的循环遍历方法
rdd.collect().foreach(println)
println("******************")
//foreach其实是executor端内存数据打印
rdd.foreach(println)
sc.stop()
}
RDD 序列化
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就 形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列 化,这个操作我们称之为闭包检测
package operator.transform.action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object serializable02_function {
def main(args: Array[String]): Unit = {
//1.创建 SparkConf 并设置 App 名称
val conf: SparkConf = new
SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建 SparkContext,该对象是提交 Spark App 的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个 RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark",
"hive", "atguigu"))
//3.1 创建一个 Search 对象
val search = new Search("hello")
//3.2 函数传递,打印:ERROR Task not serializable
search.getMatch1(rdd).collect().foreach(println)
//3.3 属性传递,打印:ERROR Task not serializable
search.getMatch2(rdd).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
//rdd.filter(this.isMatch)
rdd.filter(isMatch)
}
class User extends Serializable {
val age: Int = 30
}
//case class User() {
// var age:Int=30
//}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
//rdd.filter(x => x.contains(this.query))
rdd.filter(x => x.contains(query))
//val q = query
//rdd.filter(x => x.contains(q))
}
}
后缀表达式
中缀转后缀表达式:
1)初始化两个栈:运算符栈s1和储存中间结果的栈s2;
2)从左至右扫描中缀表达式;
3)遇到操作数时,将其压s2;
4)遇到运算符时,比较其与s1栈顶运算符的优先级:
(1)如果s1为空,或栈顶运算符为左括号“(”,则直接将此运算符入栈;
(2)否则,若优先级比栈顶运算符的高,也将运算符压入s1;
否则,将s1栈顶的运算符弹出并压入到s2中,再次转到(4-1)与s1中新的栈顶运算符相比较;
5)遇到括号时:
(1) 如果是左括号“(”,则直接压入s1
(2) 如果是右括号“)”,则依次弹出s1栈顶的运算符,并压入s2,直到遇到左括号为止,此时将这一对 括号丢弃
6)重复步骤2至5,直到表达式的最右边
7)将s1中剩余的运算符依次弹出并压入s2
8)依次弹出s2中的元素并输出,结果的逆序即为中缀表达式对应的后缀表达式
package stack;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
public class PolandNotation {
public static void main(String[] args) {
//完成一个中缀表达式转转成后缀表达式的功能
//说明
//1、1+((2+3)*4)-5=>1 2 3 + 4 * + 5 -
//2、因为直接对str 进行操作不方便,因此先将 1+((2+3)*4)-5=>中缀表达式对应的list
// 即1+((2+3)*4)-5=> ArrayList[1,+,(,(,2,+,3,),*,4,),-,5]
//3、将得到的中缀表达式list 转换成 后缀表达式对应的list
String expression = "1+((2+3)*4)-5";
List infixexpressionList = toInfixexpressionList(expression);
System.out.println(expression+"="+calculator(parseSuffixexpressionList(infixexpressionList)));
}
//将中缀表达式转成对应的list
public static List toInfixexpressionList(String s) {
//定义一个中缀表达式对应的内容
List ls = new ArrayList<>();
int i = 0;//这是一个指针,用于遍历 中缀表达式字符串
String str;//多位数的操作
char c;//每遍历一个字符,就放入到c
do {
if ((c = s.charAt(i)) < 48 || (c = s.charAt(i)) > 57) {
ls.add("" + c);
i++;//i需要后移
} else {//如果是一个数,需要考虑多位数
str = "";
while (i < s.length() && (c = s.charAt(i)) >= 48 && (c = s.charAt(i)) <= 57) {
str += c;//拼接
i++;
}
ls.add(str);
}
} while (i < s.length());
return ls;
}
//将得到的中缀表达式list 转换成 后缀表达式对应的list
public static List parseSuffixexpressionList(List ls) {
//定义两个栈
Stack s1 = new Stack();//符号栈
List s2 = new ArrayList<>();//存储中间结果
//遍历ls
for (String item : ls) {
//如果是一个数,加入s2
if (item.matches("\d+")) {
s2.add(item);
} else if (item.equals("(")) {
s1.push(item);
} else if (item.equals(")")) {
//如果是“)”,则需要依次弹出s1栈顶的运算符,并压入s2,直到遇到左括号为止,此时将这一对括号丢弃
while (!s1.peek().equals("(")) {
s2.add(s1.pop());
}
s1.pop();//消除小括号
} else {
//当item的优先级小于等于栈顶运算符,将s1栈顶的运算符弹出并加入到s2中
while (s1.size() != 0 && Operation.getValue(s1.peek()) >= Operation.getValue(item)) {
s2.add(s1.pop());
}
//需要将item压入栈中
s1.push(item);
}
}
//将s1中剩余的运算符一次弹出并加入s2
while(s1.size()!=0){
s2.add(s1.pop());
}
return s2;
}
@Test
public void test1() {
//先定义一个逆波兰表达式
//说明:为了方便,逆波兰表达式的数字和符号用空格隔开
String suffixexpression = "3 4 + 5 * 6 -";
//思路
//1、先将"3 4 + 5 * 6 -"放到ArrayList中
//2、将ArrayList传递给一个方法,配合栈完成计算
List rpnList = getListString(suffixexpression);
int res = calculator(rpnList);
System.out.println(res);
}
//将一个逆波兰表达式,一次将数据放入到ArrayList中
public static List getListString(String suffixexpression) {
//将suffixexpression分割
String[] split = suffixexpression.split(" ");
return new ArrayList(Arrays.asList(split));
}
//完成对逆波兰表达式的运算
public static int calculator(List ls) {
//创建一个栈,只需要一个栈
Stack stack = new Stack<>();
//遍历ls
for (String item : ls) {
//这里是由正则表达式来取出数
if (item.matches("\d+")) {//匹配多位数
//入栈
stack.push(item);
} else {
//pop出两个数,并运算,再入栈
int num2 = Integer.parseInt(stack.pop());
int num1 = Integer.parseInt(stack.pop());
int res = 0;
if (item.equals("+")) {
res = num1 + num2;
} else if (item.equals("-")) {
res = num1 - num2;
} else if (item.equals("*")) {
res = num1 * num2;
} else if (item.equals("/")) {
res = num1 / num2;
} else {
throw new RuntimeException("运算符有误"+item);
}
//把res 入栈
stack.push("" + res);
}
}
//最后留在stack中数据就是结果
return Integer.parseInt(stack.pop());
}
}
//编写一个类 Operation 可以返回一个运算符 对应的优先级
class Operation {
private static final int ADD = 1;
private static final int SUB = 1;
private static final int MUL = 2;
private static final int DIV = 2;
//写一个方法,返回对应的优先级数字
public static int getValue(String operation) {
int result = 0;
switch (operation) {
case "+":
result = ADD;
break;
case "-":
result = SUB;
break;
case "/":
result = DIV;
break;
case "*":
result = MUL;
break;
default:
System.out.println("不存在该运算符"+operation);
break;
}
return result;
}
}
递归
递归需要遵守的重要规则
1)执行一个方法时,就创建一个新的受保护的独立空间(栈空间)
2)方法的局部变量是独立的,不会相互影响, 比如n变量
3)如果方法中使用的是引用类型变量(比如数组),就会共享该引用类型的数据.
4)递归必须向退出递归的条件逼近,否则就是无限递归,出现StackOverflowError,死龟了:)
5)当一个方法执行完毕,或者遇到return,就会返回,遵守谁调用,就将结果返回给谁,同时当方法执行完毕或者返回时,该方法也就执行完毕。
八皇后package recursion;
public class Queue8 {
//定义一个max表示共有多少个皇后
int max = 8;
//定义数组array,保存皇后放置位置的结果,比如arr={0,4,7,5,2,6,1,3}
int[] array = new int[max];
static int count=0;
public static void main(String[] args) {
Queue8 queue8 = new Queue8();
queue8.check(0);
System.out.println("一共有"+count+"种解法");
}
//编写一个方法,放置第n个皇后
//check 是 每一次递归时,进入到check中都有 for(int i=0;i
迷宫
package recursion;
public class MiGong {
public static void main(String[] args) {
//创建一个二维数组,模拟迷宫
//地图
int[][] map = new int[8][7];
//使用1 表示墙
//上下全部置为1
for (int i = 0; i < 7; i++) {
map[0][i] = 1;
map[7][i] = 1;
}
//左右全部为1
for (int i = 0; i < 8; i++) {
map[i][0] = 1;
map[i][6] = 1;
}
//设置挡板,1表示
map[3][1] = 1;
map[3][2] = 1;
map[4][3] = 1;
System.out.println("迷宫");
for (int i = 0; i < 8; i++) {
for (int j = 0; j < 7; j++) {
System.out.print(map[i][j] + " ");
}
System.out.println();
}
setWay(map, 1, 1);
System.out.println("解迷");
for (int i = 0; i < 8; i++) {
for (int j = 0; j < 7; j++) {
System.out.print(map[i][j] + " ");
}
System.out.println();
}
}
//使用递归回溯未给小球找路
public static boolean setWay(int[][] map, int i, int j) {
if (map[6][5] == 2) {//说明通路已经找到
return true;
} else {
if (map[i][j] == 0) {
//按照策略 下->右->上->左
map[i][j] = 2;//假定该点是可以走通,
if (setWay(map, i + 1, j)) {//向下走
return true;
} else if (setWay(map, i, j + 1)) {//向右走
return true;
} else if (setWay(map, i - 1, j)) {//向上走
return true;
} else if (setWay(map, i, j - 1)) {//向左走
return true;
} else {
//说明该点是走不通,是死路
map[i][j] = 3;
return false;
}
} else {//如果map[i][j]!=0 可能是1,2,3
return false;
}
}
}
//修改策略 改成 上-> 右 -> 下 -> 左
public static boolean setWay2(int[][] map, int i, int j) {
if (map[6][5] == 2) {//说明通路已经找到
return true;
} else {
if (map[i][j] == 0) {
//按照策略 下->右->上->左
map[i][j] = 2;//假定该点是可以走通,
if (setWay(map, i - 1, j)) {//向上走
return true;
} else if (setWay(map, i, j + 1)) {//向右走
return true;
} else if (setWay(map, i + 1, j)) {//向下走
return true;
} else if (setWay(map, i, j - 1)) {//向左走
return true;
} else {
//说明该点是走不通,是死路
map[i][j] = 3;
return false;
}
} else {//如果map[i][j]!=0 可能是1,2,3
return false;
}
}
}
}



