零、本讲学习目标一、Spark简介
(一)什么是Spark?(二)有了Hadoop为什么还要Spark? 二、Spark官网
(一)官网网址(二)Spark四大库 三、Spark发展历史四、Spark的特点
(一)快速(二)易用性(三)通用性(四)随处运行(五)代码简洁
1、采用MapReduce实现词频统计2、采用Spark实现词频统计 五、Spark存储层次六、Spark生态圈
(一)Spark SQL(二)Spark Streaming(三)MLlib(四)GraphX 七、Spark应用场景
(一)腾讯(二)Yahoo(三)淘宝(四)优酷土豆 八、课后思考题
零、本讲学习目标- 了解Spark发展史了解Spark的特点了解Spark存储层次了解Spark生态圈了解Spark应用场景
Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量的廉价硬件之上,形成集群。Spark诞生于加州大学伯利克分校AMPLab,AMPLab开发以Spark为核心的BDAS时提出的目标是:one stack to rule them all,也就是说在一套软件栈内完成各种大数据分析任务。. (二)有了Hadoop为什么还要Spark?
Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入
Hadoop的生态系统,以弥补缺失MapReduce的不足。Spark相比Hadoop MapReduce有诸多优势。Hadoop最初设计时,shuffle的过程中数据要频繁落地到磁盘中,会大大影响性能,这在最初硬件相对昂贵,内存十分宝贵时,是一种可以理解的选择。但是随着数据量越来越大,处理的流程越来越复杂,hadoop shuflle过程中因为数据落磁盘而造成性能的低下越来越让人无法容忍,特别是,当需要连续多次mr才能完成计算时,每一次的MR都经过shuffle数据落地,性能低下的缺点就格外的明显。所以才有了最初spark设计的目的 —— 完全基于内存进行计算,数据尽量不落地,提高效率,虽然占用能存很高,但是效率同样得到了大大的提升,可以达到hadoop的10~100倍。这在硬件逐渐廉价而数据量越来越大的情况下优势越来越明显。
二、Spark官网
(一)官网网址
https://spark.apache.org
(二)Spark四大库
SQL and DataframeSpark StreamingMLlib (machine learning)GraphX (graph)
三、Spark发展历史
对于一个具有相当技术门槛与复杂度的平台,Spark从诞生到正式版本的成熟,经历的时间如此之短,让人感到惊诧。2009年,Spark诞生于伯克利大学AMPLab,最开初属于伯克利大学的研究性项目。它于2010年正式开源,并于2013年成为了Aparch基金项目,并于2014年成为Aparch基金的顶级项目,整个过程不到五年时间。
Spark目前最新版本是2022年1月26日发布的Spark3.2.1
四、Spark的特点
Spark官网上给出Spark的特点
(一)快速
一般情况下,对于迭代次数较多的应用程序,Spark程序在内存中的运行速度是Hadoop MapReduce运行速度的100多倍,在磁盘上的运行速度是Hadoop MapReduce运行速度的10多倍。
(二)易用性
Spark支持使用Scala、Python、Java及R语言快速编写应用。同时Spark提供超过80个高级运算符,使得编写并行应用程序变得容易并且可以在Scala、Python或R的交互模式下使用Spark。
(三)通用性
Spark可以与SQL、Streaming及复杂的分析良好结合。Spark还有一系列的高级工具,包括Spark SQL、MLlib(机器学习库)、GraphX(图计算)和Spark Streaming,并且支持在一个应用中同时使用这些组件。
(四)随处运行
用户可以使用Spark的独立集群模式运行Spark,也可以在EC2(亚马逊弹性计算云)、Hadoop YARN或者Apache Mesos上运行Spark。并且可以从HDFS、Cassandra、Hbase、Hive、Tachyon和任何分布式文件系统读取数据。
(五)代码简洁
参看【采用多种方式实现词频统计】
1、采用MapReduce实现词频统计
编写WordCountMapper
package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = line.split(" "); for (int i = 0; i < data.length; i++) { context.write(new Text(data[i]), new IntWritable(1)); } } }
编写WordCountReducer
package net.hw.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count = count + value.get(); } context.write(key, new IntWritable(count)); } }
编写WordCountDriver
package net.hw.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
String uri = "hdfs://hadoop:9000";
Path inputPath = new Path(uri + "/word");
Path outputPath = new Path(uri + "/word/result");
FileSystem fs = FileSystem.get(new URI(uri), conf);
fs.delete(outputPath, true);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
System.out.println("统计结果:");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i < fileStatuses.length; i++) {
System.out.println(fileStatuses[i].getPath());
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
运行程序WordCountDriver,查看结果
2、采用Spark实现词频统计
编写WordCount
package net.hw.spark.wc
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordcount")
val sc = new SparkContext(conf)
val rdd = sc.textFile("test.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rdd.foreach(println)
rdd.saveAsTextFile("result")
}
}
启动WordCount,查看结果
大家可以看出,完成同样的词频统计任务,Spark代码比MapReduce代码简洁很多。
五、Spark存储层次
Spark 不仅可以将任何Hadoop 分布式文件系统(HDFS)上的文件读取为分布式数据集,也可以支持其他支持Hadoop 接口的系统,比如本地文件、亚马逊S3、Cassandra、Hive、Hbase 等。我们需要弄清楚的是,Hadoop 并非Spark 的必要条件,Spark 支持任何实现了Hadoop 接口的存储系统。Spark 支持的Hadoop 输入格式包括文本文件、SequenceFile、Avro、Parquet 等。 六、Spark生态圈
生态圈越来越丰富能做的事越来越多 ——SparkCore、SparkSQL、SparkStreaming、GraphX、MLib。支持操作方法多,不像Hadoop只有MR。支持的语言多 Java、Python、Scala、R。可以使用HDFS作为存储结构,可以使用Yarn作为协调框架。
以其RDD模型的强大表现能力,逐渐形成了一套自己的生态圈,提供了full-stack(全栈)的解决方案。主要包括Spark核心Core内存中批处理,Spark SQL交互式查询,Spark Streaming流式计算,GraphX和MLib提供的常用图计算和机器学习算法。
Seamlessly mix SQL queries with Spark programs. Spark SQL lets you query structured data inside Spark programs, using either SQL or a familiar Dataframe API. Usable in Java, Scala, Python and R.Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做Dataframe并且作为分布式SQL查询引擎的作用。为什么要学习Spark SQL?我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。Apply functions to results of SQL queries.
Query and join different data sources
(二)Spark Streaming
Spark Streaming brings Apache Spark’s language-integrated API to stream processing, letting you write streaming jobs the same way you write batch jobs. It supports Java, Scala and Python.Spark Streaming是一套框架,是Spark核心API的一个扩展,可以实现高吞吐量的,具备容错机制的实时流数据处理。支持多种数据源获取数据。
Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、Database等各种地方。Find words with higher frequency than historic data
(三)MLlib
MLlib fits into Spark’s APIs and interoperates with NumPy in Python (as of Spark 0.9) and R libraries (as of Spark 1.5). You can use any Hadoop data source (e.g. HDFS, Hbase, or local files), making it easy to plug into Hadoop workflows.Spark MLlib 是Spark中可以扩展的机器学习库,它有一系列的机器学习算法和实用程序组成。包括分类、回归、聚类、协同过滤、等,还包含一些底层优化的方法。
(四)GraphX
Seamlessly work with both graphs and collections. GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API.
Using GraphX in Scala
七、Spark应用场景
(一)腾讯
广点通是最早使用Spark的应用之一。腾讯大数据精准推荐借助Spark快速迭代的优势,围绕“数据+算法+系统”这套技术方案,实现了在“数据实时采集、算法实时训练、系统实时预测”的全流程实时并行高维算法,最终成功应用于广点通pCTR (Predict Click-Through Rate) 投放系统上,支持每天上百亿的请求量。 (二)Yahoo
Yahoo将Spark用在Audience Expansion中。Audience Expansion是广告中寻找目标用户的一种方法,首先广告者提供一些观看了广告并且购买产品的样本客户,据此进行学习,寻找更多可能转化的用户,对他们定向广告。Yahoo采用的算法是Logistic Regression。同时由于某些SQL负载需要更高的服务质量,又加入了专门跑Shark的大内存集群,用于取代商业BI/OLAP工具,承担报表/仪表盘和交互式/即席查询,同时与桌面BI工具对接。 (三)淘宝
淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等,将Spark运用于淘宝的推荐相关算法上,同时还利用GraphX解决了许多生产问题,包括以下计算场景:基于度分布的中枢节点发现、基于最大连通图的社区发现、基于三角形计数的关系衡量、基于随机游走的用户属性传播等。 (四)优酷土豆
目前Spark已经广泛使用在优酷土豆的视频推荐,广告业务等方面,相比Hadoop,Spark交互查询响应快,性能比Hadoop提高若干倍。一方面,使用Spark模拟广告投放的计算效率高、延迟小(同Hadoop比延迟至少降低一个数量级)。另一方面,优酷土豆的视频推荐往往涉及机器学习及图计算,而使用Spark解决机器学习、图计算等迭代计算能够大大减少网络传输、数据落地等的次数,极大地提高了计算性能。 八、课后思考题
- 为什么需要Spark?Spark有哪些特点?Spark生态圈包含哪些环境?



