- 一、实验目的
- 二、实验平台
- 三、实验步骤
- 1. 准备工作
- (1)安装spark及其API程序
- (2)配置环境变量
- (3)启动Hadoop
- 2. Spark读取文件系统的数据
- 3. 编写独立应用程序实现数据去重
- 4. 编写独立应用程序实现求平均值问题
- 四、实验总结
一、实验目的
- 掌握使用 Spark 访问本地文件和 HDFS 文件的方法
- 掌握 Spark 应用程序的编写、编译和运行方法
二、实验平台
- 操作系统:Ubuntu18.04(或Ubuntu16.04)
- Spark版本:3.2.0
- Hadoop版本:3.3.2
三、实验步骤 1. 准备工作 (1)安装spark及其API程序
安装 spark:Ubuntu下安装Spark3.2.0教程
安装 sbt:Ubuntu下为Spark安装配置sbt
修改你的 .bashrc 文件:
vim ~/.bashcrc
添加以下内容至文件顶部:
export SPARK_HOME=/usr/local/spark export PATH=$PATH:$SPARK_HOME/bin
使修改立即生效:
source ~/.bashrc(3)启动Hadoop
进入 Hadoop 安装目录:
cd /usr/local/hadoop
启动并使用 jps 检查结点:
./sbin/start-dfs.sh jps2. Spark读取文件系统的数据
(1)在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;
创建 test.txt:
echo -e "HellonThis is a testnBye!" >> ~/test.txt
启动 spark-shell:
cd /usr/local/spark ./bin/spark-shell
Scala 命令:
val textFile=sc.textFile("file:///home/hadoop/test.txt")
textFile.count()
输出如下:
scala> val textFile=sc.textFile("file:///home/hadoop/test.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/test.txt MapPartitionsRDD[1] at textFile at :23
scala> textFile.count()
res0: Long = 3
(2)在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;
上传 test.txt 文件至 HDFS 中(终端中执行,退出 spark-shell):
/usr/local/hadoop/bin/hdfs dfs -put ~/test.txt
Scala 命令如下(spark-shell):
val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
textFile.count()
输出如下:
scala> val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/hadoop/test.txt MapPartitionsRDD[3] at textFile at :23
scala> textFile.count()
res1: Long = 3
(3)编写独立应用程序(推荐使用Scala语言),读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过sbt工具将整个应用程序编译打包成 JAR包,并将生成的JAR包通过 spark-submit 提交到 Spark 中运行命令。
进入 spark 安装目录:
cd /usr/local/spark mkdir mycode && cd mycode
创建 HDFStest 目录并编写 Scala 文件:
mkdir -p HDFStest/src/main/scala vim ./HDFStest/src/main/scala/HDFStest.scala
代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object HDFStest {
def main(args: Array[String]) {
val logFile = "hdfs://localhost:9000/user/hadoop/test.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2)
val num = logData.count()
printf("The num of this file is %dn", num)
}
}
进入 HDFStest 目录,创建 simple.sbt:
cd HDFStest vim simple.sbt
内容如下:
name := "A Simple HDFS Test" version := "1.0" scalaVersion := "2.12.15" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
注意这里的 scalaVersion 是你的 Scala 版本,spark-core 是你的 spark 版本。接下来,可以通过如下代码将整个应用程序打包成 JAR:
/usr/local/sbt/sbt package
打包成功输出如下:
运行如下代码使用生成的 jar 包:
/usr/local/spark/bin/spark-submit --class "HDFStest" /usr/local/spark/mycode/HDFStest/target/scala-2.12/a-simple-hdfs-test_2.12-1.0.jar 2>& 1 | grep The
输出如下:
3. 编写独立应用程序实现数据去重对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala 语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。
输入文件 A 的样例如下:
20170101 x 20170102 y 20170103 x 20170104 y 20170105 z 20170106 z
输入文件 B 的样例如下:
20170101 y 20170102 y 20170103 x 20170104 z 20170105 y
根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:
20170101 x 20170101 y 20170102 y 20170103 x 20170104 y 20170104 z 20170105 y 20170105 z 20170106 z
进入到 mycode 目录,新建 RemDup 目录,
cd /usr/local/spark/mycode mkdir -p RemDup/src/main/scala cd RemDup
新建 datas 目录,写入文件 A 和文件 B:
mkdir datas
注意这里 A 和 B 文件内容不能有多余的换行符或者空格!
vim ./datas/A
vim ./datas/B
编写 Scala 文件:
vim ./src/main/scala/RemDup.scala
代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object RemDup {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("RemDup")
val sc = new SparkContext(conf)
val dataFile = "file:///usr/local/spark/mycode/RemDup/datas"
val data = sc.textFile(dataFile,2)
val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
res.saveAsTextFile("file:///usr/local/spark/mycode/RemDup/result")
}
}
编写 simple.sbt 文件:
vim simple.sbt
内容如下:
name := "Remove Duplication" version := "1.0" scalaVersion := "2.12.15" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
使用如下命令打包:
/usr/local/sbt/sbt package
使用生成的 jar 包:
/usr/local/spark/bin/spark-submit --class "RemDup" /usr/local/spark/mycode/RemDup/target/scala-2.12/remove-duplication_2.12-1.0.jar
使用如下命令查看输出:
cat result/*
输出如下:
4. 编写独立应用程序实现求平均值问题每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm 成绩:
小明 92 小红 87 小新 82 小丽 90
Database 成绩:
小明 95 小红 81 小新 89 小丽 85
Python 成绩:
小明 82 小红 83 小新 94 小丽 91
平均成绩如下:
(小红,83.67) (小新,88.33) (小明,89.67) (小丽,88.67)
进入到 mycode 目录,新建 AvgScore 目录,
cd /usr/local/spark/mycode mkdir -p AvgScore/src/main/scala cd AvgScore
新建 datas 目录,写入文件 algorithm、database、python:
mkdir datas
注意这里 algorithm、database 和 python 文件内容不能有多余的换行符或者空格!
vim ./datas/algorithm
vim ./datas/database
vim ./datas/python
编写 Scala 文件:
vim ./src/main/scala/AvgScore.scala
代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object AvgScore {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AvgScore")
val sc = new SparkContext(conf)
val dataFile = "file:///usr/local/spark/mycode/AvgScore/datas"
val data = sc.textFile(dataFile,3)
val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => {
var n = 0
var sum = 0.0
for(i <- x._2){
sum = sum + i
n = n +1
}
val avg = sum/n
val format = f"$avg%1.2f".toDouble
(x._1,format)
})
res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")
}
}
编写 simple.sbt 文件:
vim simple.sbt
内容如下:
name := "Average Score" version := "1.0" scalaVersion := "2.12.15" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
使用如下命令打包:
/usr/local/sbt/sbt package
使用生成的 jar 包:
/usr/local/spark/bin/spark-submit --class "AvgScore" /usr/local/spark/mycode/AvgScore/target/scala-2.12/average-score_2.12-1.0.jar
使用如下命令查看输出:
cat result/*
输出如下:
四、实验总结



