- 一、测试数据
- 二、任务要求
- 三、编码实现
- 四、实验要求
- 员工信息表:下载地址
- 表字段说明:
- 任务1:求每个部门的员工工资总额
- 任务2:求每个部门员工工资与奖金总额
- 任务3:将任务2中的结果按照部门号进行升序排
- 任务4:将任务2中的结果按照工资总额进行降序排
-
创建maven工程
-
添加spark相关依赖,在pom.xml中添加如下依赖
jar 2.11.8 2.4.8 2.12 2.7.3 org.apache.spark spark-core_${spark.artifact.version} ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.scala-lang scala-library ${scala.version} src/main/scala src/test/scala org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin jar-with-dependencies net.alchim31.maven scala-maven-plugin 4.5.4 compile testCompile ${scala.version} -
【任务一】代码实现如下:
- 实现代码:
import org.apache.spark.{SparkConf, SparkContext} object CountSalary { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(CountSalary.getClass.getName).setMaster("local[2]") val sc = new SparkContext(sparkConf) sc.textFile("d:/Tools/emp.csv") .map(line => { val strings = line.split(",") val salary = strings(5).toInt val deptNo = strings(7).toInt (deptNo,salary) }) .reduceByKey(_+_) .collect() .foreach(println) // 关闭sc sc.stop() } } - 结果:
- 实现代码:
-
【任务二】代码实现如下:
- 实现代码:
import org.apache.spark.{SparkConf, SparkContext} object CountBonusAndSalary { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(CountBonusAndSalary.getClass.getName).setMaster("local[2]") val sc = new SparkContext(sparkConf) sc.textFile("d:/Tools/emp.csv") .map(line => { val strings = line.split(",") val salary = strings(5).toInt val deptNo = strings(7).toInt var bonus = 0 if (!"".equals(strings(6)) && null != strings(6)){ bonus = strings(6).toInt } (deptNo,salary+bonus) }) .reduceByKey(_+_) .collect() .foreach(println) // 关闭sc sc.stop() } } - 结果如下:
- 实现代码:
-
【任务三】代码实现如下:
- 代码实现
import org.apache.spark.{SparkConf, SparkContext} object CountBonusAndSalaryByAsc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(CountBonusAndSalaryByAsc.getClass.getName).setMaster("local[2]") val sc = new SparkContext(sparkConf) sc.textFile("d:/Tools/emp.csv") .map(line => { val strings = line.split(",") val salary = strings(5).toInt val deptNo = strings(7).toInt var bonus = 0 if (!"".equals(strings(6)) && null != strings(6)){ bonus = strings(6).toInt } (deptNo,salary+bonus) }) .reduceByKey(_+_) .sortByKey(true) .collect() .foreach(println) // 关闭sc sc.stop() } } - 结果如下
- 代码实现
-
【任务四】代码实现如下:
- 代码实现:
import org.apache.spark.{SparkConf, SparkContext} object CountTotalByAsc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(CountTotalByAsc.getClass.getName).setMaster("local[2]") val sc = new SparkContext(sparkConf) sc.textFile("d:/Tools/emp.csv") .map(line => { val strings = line.split(",") val salary = strings(5).toInt val deptNo = strings(7).toInt var bonus = 0 if (!"".equals(strings(6)) && null != strings(6)){ bonus = strings(6).toInt } (deptNo,salary+bonus) }) .reduceByKey(_+_) .sortBy(tuple2 => { tuple2._2 },false) .collect() .foreach(println) // 关闭sc sc.stop() } } - 结果如下:
- 代码实现:
- 请使用不同的算子来完成如上的任务



