栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark综合大作业:RDD编程初级实践

Spark综合大作业:RDD编程初级实践

Spark综合大作业:RDD编程初级实践
实验配置:操作系统:Ubuntu16.04 | 环境:Spark版本:2.4.0 | 软件:Python版本:3.4.3。

文章目录
  • 一、实验目的
  • 二、实验平台
  • 三、实验内容和要求
    • 1、pyspark交互式编程
    • 2.编写独立应用程序实现数据去重
    • 3.编写独立应用程序实现求平均值问题
  • 四、环境介绍
  • 五、实验步骤
    • 2、编写独立应用程序实现数据去重
    • 3、编写独立应用程序实现求平均值问题
  • 六、经验总结
  • 七、参考文献


一、实验目的

(1)熟悉Spark的RDD基本操作及键值对操作;
(2)熟悉使用RDD编程解决实际具体问题的方法。

二、实验平台

本次大作业的实验是操作系统:Ubuntu16.04,Spark版本:2.4.0,Python版本:3.4.3。

三、实验内容和要求 1、pyspark交互式编程

本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,Database,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,Database,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系Database课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了Database这门课。

2.编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

四、环境介绍

Spark是云计算大数据的集大成者,是Hadoop的取代者,是第二代云计算大数据技术。它作为一个基于内存计算的云计算大数据平台,在实时流处理、交互式查询、机器学习、图处理、数据统计分析等方面具有无可比拟的优势;Spark 能够比Hadoop快100倍以上;Spark采用一个统一 的堆栈解决了云计算大数据的所有核心问题,这直接奠定了其一统云计算大数据领域的霸主地位。
PySpark 是 Spark 为 Python 开发者提供的 API。
RDD: 弹性分布式数据集分布在不同的集群节点的内存中,可以理解为一大数组,数组的每一个元素就是RDD的一个分区,一个RDD可以分布并被运算在多态计算机节点的内存以及硬盘中,RDD数据块可以放在磁盘上也可以放在内存中(取决于你的设置),如果出现缓冲失效或丢失,RDD分区可以重新计算刷新,RDD是不能被修改的但是可以通过API被变换生成新的RDD。有俩类对RDD的操作(也成算子):
1.变换(懒执行): 有 map flatMap groupByKey reduceByKey 等;
他们只是将一些指令集而不会马上执行,需要有操作的时候才会真正计算出结果;
2.操作(立即执行): 有 count take collect 等;
他们会返回结果,或者把RDD数据输出,这些操作实现了MapReduce的基本函数map,reduce及计算模型,还提供了filter,join,groupBYKey等,另外spark sql 可以用来操作有数据结构的RDD即SPARK DATA frame,它的运行原理和mapreduce是一样的,只是他们的运行方式不同,mr的运算是内存磁盘交互读写,不能在内存中共享数据,而RDD可以被共享和持久化.因为大数据运算经常是交互式和迭代式的,所以数据的重用性很重要,而mr的磁盘交互读写带来的I/O开销导致数度减慢。

五、实验步骤

(一)spark的安装
1、安装spark
(1)解压安装包,更改使用用户名hadoop并修改权限

sudo tar -zxf ~/下载/spark-2.0.2-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark     

(2)安装后,还需要修改Spark的配置文件spark-env.sh

cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

(3)启动spark-shell后,就会进入“scala>”命令提示符状态,如下图所示:

2、Java独立应用程序编程
(1)安装maven
选择安装在/usr/local/maven中:


2.Java应用程序代码
在终端执行如下命令创建一个文件夹sparkapp2作为应用程序根目录

在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加代码如下:

    import org.apache.spark.api.java.*;
    import org.apache.spark.api.java.function.Function;
 
    public class SimpleApp {
        public static void main(String[] args) {
            String logFile = "file:///usr/local/spark/README.md"; // Should be some file on your system
            JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
                "file:///usr/local/spark/", new String[]{"target/simple-project-1.0.jar"});
            JavaRDD logData = sc.textFile(logFile).cache();
 
            long numAs = logData.filter(new Function() {
                public Boolean call(String s) { return s.contains("a"); }
            }).count();
 
            long numBs = logData.filter(new Function() {
                public Boolean call(String s) { return s.contains("b"); }
            }).count();
 
            System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
        }
    }

该程序依赖Spark Java API,因此我们需要通过Maven进行编译打包。在./sparkapp2中新建文件pom.xml(vim ./sparkapp2/pom.xml),添加内容如下,声明该独立应用程序的信息以及与Spark的依赖关系:

 
        edu.berkeley
        simple-project
        4.0.0
        Simple Project
        jar
        1.0
        
            
                Akka repository
                http://repo.akka.io/releases
            
        
        
             
                org.apache.spark
                spark-core_2.11
                2.1.0
            
        
    

3.使用maven打包java程序
为了保证maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:
cd ~/sparkapp2
Find .
文件结构如下图:

接着,我们可以通过如下代码将这整个应用程序打包成Jar
/usr/local/maven/bin/mvn package
如果运行上面命令后出现类似下面的信息,说明生成Jar包成功:

(2)通过spark-submit 运行程序
可以通过spark-submit提交应用程序,该命令的格式如下:

./bin/spark-submit 
  --class   //需要运行的程序的主类,应用程序的入口点
  --master   //Master URL,下面会有具体解释
  --deploy-mode    //部署模式
  ... # other options  //其他参数
    //应用程序JAR包
  [application-arguments] //传递给主类的主方法的参数

最后,针对上面编译打包得到的应用程序,可以通过将生成的jar包通过spark-submit提交到Spark中运行,如下命令:

/usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar
#上面命令执行后会输出太多信息,可以不使用上面命令,而使用下面命令查看想要的结果
/usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar 2>&1 | grep "Lines with a"

最后得到的结果如下:

通过运行Spark自带的示例,验证Spark是否安装成功

(二)pyspark的安装配置

1.配置环境变量

进入.bashrc文件,输入hadoop的密码为hadoop

2.在文件中添加如下几行代码:

3.接着让该环境变量激活生效,执行如下代码:

4.执行pyspark后如下所示:

数据来源描述
(1)该系总共有多少学生
1)创建一个sparksqldata的文件

mkdir sparksqldata

2)cp拷贝的命令,执行

 cp data.txt /usr/local/spark/sparksqldata/

进入sparksqldata目录查看data.txt文件

cd /usr/local/spark/sparksqldata/
ls

3)启动pyspark,Cp拷贝的命令,将data.txt拷贝到目录sparkdata下



启动pyspark,pyspark启动成功

4)加载数据集,获取每行数据的第1列;去重操作;取元素总个数

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[0])
distinst_res = res.distinct()
distinct_res.count()

答案为265人。
(2)该系共开设了多少门课程;
获取每行数据的第2列;去重操作;取元素总个数

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1])
distinst_res = res.distinct()
distinct_res.count()

答案为8门。
(3)Tom同学的总成绩平均分是多少;
筛选Tom同学的成绩信息
res.foreach(print)
score = res.map(lambda x:int(x[2])) //提取Tom同学的每门成绩,并转换为int类型
num = res.count() //Tom同学选课门数
sum_score = score.reduce(lambda x,y:x+y) //Tom同学的总成绩
avg = sum_score/num // 总成绩/门数=平均分

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=='Tom')
res.foreach(print)


Tom同学的平均分为30.8分
(4)求每名同学的选修的课程门数;
学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1);按学生姓名获取每个学生的选课总数。

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],1))
each_res = res.reduceByKey(lambda x,y: x+y)
each_res.foreach(print)

答案共265行。
(5)该系Database课程共有多少人选修;

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=='Database')
res.count()

答案为1764人。

(6)各门课程的平均分是多少;
为每门课程的分数后面新增一列1,表示1个学生选择了该课程。格式如(‘ComputerNetwork’, (44, 1));按课程名聚合课程总分和选课人数。格式如(‘ComputerNetwork’, (7370, 142));课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数。

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1],(int(x[2]),1)))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
avg = temp.map(lambda x:(x[0],round(x[1][0]/xx[1][1],2)))
avg.foreach(print)

所以ComputerNetwork的平均分是51.9分,Software的平均分是50.91分,Algorithm的平均分是48.83分,OperatingSystem的平均分是54.94分,python的平均分是57.82分,datastructure的平均分是47.57分,clanguage的平均分是50.61分。

(7)使用累加器计算共有多少人选了Database这门课。
筛选出选了Database课程的数据;定义一个从0开始的累加器accum;遍历res,每扫描一条数据,累加器加1;

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]=='Database')
accum = sc.accumulator(0)
res.foreach(lambda x:accumn.add(1))
accum.value

答案为1764人。

2、编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 yv
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z
(1)当前目录为/usr/local/spark/sparksqldata/,首先创建A.py和B.py文件,分别存放A、B两个文件,在当前目录下新建一个C.py文件;

cd /usr/local/spark/sparksqldata/
vim A
vim B




vim C.py

(2)输入以下代码

from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext("local","sparksqldata")
#加载文件 A B ,创建RDD
lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/A")
lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/B")
#合并文件 A B
lines = lines1.union(lines2)
#去重操作
distinct_lines = lines.distinct()
#排序操作
res = distinct_lines.sortBy(lambda x:x)
#让合并结果放入一个文件中
res.repartition(1).saveAsTextFile('file:///usr/local/spark/sparksqldata/ymsresult')

(3)最后在目录/usr/local/spark/sparksqldata/下执行下面命令执行程序
$ python3 C.py

python3 C.py

(4)在目录/usr/local/spark/sparksqldata/result下即可得到结果文件part-00000。

cd ymsresult/
ls

vim part-00000

一共有500行

(5)在本机目录/usr/local/spark/sparksqldata/result下可以看到结果文件part-00000。

3、编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
(1)创建Algorithm.py和Database.py文件以及Python.py文件

 vim Algorithm


创建Algorithm.py:

创建Database.py

vim Database



创建Python.py文件

vim Python


(2)当前目录为/usr/local/spark/sparksqldata/,首先创建Algorithm.py和Database.py文件以及Python.py文件,分别存放三个科目文件,在当前目录下新建一个avg.py文件求平均值

vim avg.py

(3)输入以下代码:

from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext('local','sparksqldata')
#加载三个文件Algorithm.txt、Database.txt、Python.txt
lines1 =sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm")
lines2 =sc.textFile("file:///usr/local/spark/sparksqldata/Database")
lines3 =sc.textFile("file:///usr/local/spark/sparksqldata/Python")
#合并三个文件的内容
lines1.union(lines2).union(lines3)
#为每个数据增加一列1,方便后续统计每个学生的课程数目。
data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
#根据key也就是学生姓名合计每门课程的成绩,以及选秀的课程数目。
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
#利用总成绩除以选秀的课程来计算每个学生的没门的平均分,并且利用round(0,2)来保留两位小数
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
#将结果写入result文件中,reparttition(1)的作用是让结果合并到一个文件中,不加的话会写入三个文件中
result.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/newys")

(4)最后在目录/usr/local/spark/sparksqldata/下执行下面命令执行程序
$ python3 avg.py

python3 avg.py

(5)在目录/usr/local/spark/sparksqldata/result1下即可得到结果文件part-00000。

cd newys
ls


(6)进入part-00000查看内容

(7)在本机目录/usr/local/spark/sparksqldata/newre下可以看到结果文件part-00000。


打开part-00000查看具体的内容:

六、经验总结

本次实验操作下来,我学到了很多东西,我对Spark的RDD基本操作及键值对操作有了一定的了解;对如何使用RDD编程解决实际具体问题的方法有了进一步的认识,每个步骤都让我掌握了一定的命令知识,更深入理解大数据。创建RDD有两种方式:一种是通过并行化驱动程序中的已有集合创建,另外一种方法是读取外部数据集;我也了解到我们不应该把RDD 看作存放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如 何计算数据的指令列表。我也发现自身的不足,以后会处于学习过程中,最后如果有错误的地方欢迎大家指出。

七、参考文献

[1] link.
http://dblab.xmu.edu.cn/blog/2481-2/
[2] link.
http://dblab.xmu.edu.cn/blog/290-2/
[3] link.
http://dblab.xmu.edu.cn/blog/285/
[4] link.
http://dblab.xmu.edu.cn/blog/hadoop-build-project-using-eclipse/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652867.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号