Spark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎; 1.2 Spark && Hadoop
| SPark | Hadoop |
|---|---|
| 1.Scala开发, 快速通用,扩展的大数据分析引擎 | 1. Java开发, 在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架 |
| 2. Spark Core 提供了Spark最基础,核心的内容 | 2. HDFS基于GFS理论, 分布式存储数据 |
| 3. SpaekSQL 是Spark用来操作结构化数据的组件, 通过Spark SQL, 用户了已使用SQL或者HQL来查询数据 | 3. MapReduce基于Goole MapReduce, 分布式计算 |
| 4. SparkStreaming是Spark平台上针对实时数据进行了流式计算的组件, 提供了丰富的处理数据流的API | 5. Hbase基于Bigtable, 分布式数据库, 擅长实时的随机读写超大规模数据集 |
一次性数据计算: 框架在处理数据的时候, 会从存储设备中读取数据, 进行逻辑操作, 然后将处理的结果重新存储到介质中。 1.3 Spark Or Hadoop 1.4 Spark 核心模块
|模块|功能|
|Spark Core| SparkCore提供了Spark最基础与最核心的功能, Spark其他的功能模块都是在SparkCore的基础上进行扩展的|
|Spark SQL||
简单的WordCount程序必须的环境:
- IDEA装好Scala插件: 参考本文
new project ->选则合适的project jdk(jdk 1.8) ->下一步->填好合适的gav
在 settings -> plugins 安装好scala插件, 并给参考本文
在 project structure -> global Libraries中添加 scala的sdk
在maven项目中新建新的module, 对项目进行分类
pom文件中, 添加Spark 3.0 的依赖
org.apache.spark spark-core_2.12 3.0.0
- 配置log4j, 更好的跟踪程序执行日志, 即在maven项目的resources目录创建log4j.properties文件, 并添加日志配置信息如下:
控制日志级别. 只有ERROR才会显示
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell,
the
# log level for this class is used to overwrite the root logger's log level, so
that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent
UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
2.2 Spark’s WordCount
让我们先用Scala的思路写一遍wordcount流程
不熟悉Scala集合函数的可以翻看笔者的文章: 五-2, Scala集合常用函数全总结
object SparkWordCountDemo {
def main(args: Array[String]): Unit = {
/spark api 操作
//2. 创建配置对象, 并设置好必须的参数
var conf = new SparkConf
conf.setMaster("local[*]") //本地模式
conf.setAppName("WordCountEasy")
//1.创建Spark上下文,
var sc: SparkContext = new SparkContext(conf)
//3. 操作
//3.1 读取文件数据
val line: RDD[String] = sc.textFile("spark_demo_data/input")
/3.2 把每行的数据按空格切分, 然后每个单词单独存储起来
val words: RDD[String] = line.flatMap(x => x.split(" ")) /// ? 产生的是怎样的数据
/3.3 对单词进行分组. 记住, 组名作为key, 每一组的元素是一个value
val groupedWords: RDD[(String, Iterable[String])] = words.groupBy(words => words)
3.4 转换单词组为 (word, words集合) => (wrods, String的一个迭代器) => (words, words.size)
val resRDD: RDD[(String, Int)] = groupedWords.map(x => (x._1, x._2.size))
3.5 将转换结果采集到控制台
val res: Array[(String, Int)] = resRDD.collect()
println("=====")
println(res.mkString("Array(",",",")"))
//sc.groupBy
//4. 关闭连接
sc.stop()
}
}
既然我们已经在学习Spark了, 就要尝试下用spark的聚合计算方法 ‘reduceByKey’
object WordCount {
def main(args: Array[String]): Unit = {
//2. 创建Spark的配置文件对象, 并设置一些必要的配置
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StandardWordCount")
//1. 创建Spark上下文,
//2.1 向上下文对象中传入配置对象
val sc = new SparkContext(conf)
//3. 读取文件的每一行
val line: RDD[String] = sc.textFile("spark_demo_data/input")
//4. 格式化每一行, 分词
val words: RDD[String] = line.flatMap(_.split(" "))
//5. 给每个单词都加上1标识, 即 (word, 1)
val wordsWithOne: RDD[(String, Int)] = words.map(x => (x, 1))
//6. 规约
val resRDD: RDD[(String, Int)] = wordsWithOne.reduceByKey(_ + _)
//7. 从内存中把结果取出来, 并输出
val resArray: Array[(String, Int)] = resRDD.collect()
println(resArray.mkString("Array(",",",")") + "n")
//8. 关闭资源连接
sc.stop()
}
}
沿着上面的这个思路, 我们还可以利用Scala集合的高级函数实现wordcount
package cn.cyy.spark.core.wordcountdemo.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//2. 创建Spark的配置文件对象, 并设置一些必要的配置
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("StandardWordCount")
//1. 创建Spark上下文,
//2.1 向上下文对象中传入配置对象
val sc = new SparkContext(conf)
//3. 读取文件的每一行
val line: RDD[String] = sc.textFile("spark_demo_data/input")
//4. 格式化每一行, 分词
val words: RDD[String] = line.flatMap(_.split(" "))
//5. 给每个单词都加上1标识, 即 (word, 1)
val wordsWithOne: RDD[(String, Int)] = words.map(x => (x, 1))
//5.1 分组
val groupedWordWithOne: RDD[(String, Iterable[(String, Int)])] = wordsWithOne.groupBy(tuple => tuple._1)
//5.2 先reduce, 之后map
val resRDD: RDD[(String, Int)] = groupedWordWithOne.map {
case (word, list) => {
list.reduce(
(x, y) => {
(x._1, x._2 + y._2)
}
)
}
}
// //6. 规约
// val resRDD: RDD[(String, Int)] = wordsWithOne.reduceByKey(_ + _)
//7. 从内存中把结果取出来, 并输出
val resArray: Array[(String, Int)] = resRDD.collect()
println(resArray.mkString("Array(",",",")"))
//8. 关闭资源连接
sc.stop()
}
}



