前言Spark环境搭建-Local-本地模式
准备工作原理操作-开箱即用测试 Spark环境搭建-Standalone-独立集群
原理操作测试 Spark环境搭建-Standalone-HA
原理操作测试 Spark环境搭建-Spark-On-Yarn
原理准备工作
0.关闭之前的Spark-Standalone集群1.配置Yarn历史服务器并关闭资源检查2.配置Spark的历史服务器和Yarn的整合3.配置依赖的Spark 的jar包4.启动服务 两种模式
client-了解cluster模式-开发使用 操作
client模式cluster模式 补充:spark-shell和spark-submit Spark程序开发-重点
准备工作代码实现-WordCount代码实现-On-YarnWordCount图解 扩展:练习题 蒙特卡罗算法求PI
思路代码实现
前言
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
本项目学习内容:
Spark基础概念Spark不同运行模式环境搭建:local、Standalone、Standalone HA、Spark on YARNIDEA搭建Spark开发环境,实现CountWord示例demo
本项目属于《Spark系列》。详细示例代码和图片展示,点下面github链接:
[《Spark环境搭建》]
https://github.com/xiaoguangbiao-github/bigdata_spark_env.git
[《Spark内核原理及RDD》]
*https://github.com/xiaoguangbiao-github/bigdata_spark_core.git
[《SparkStreaming & SparkSql》]
https://github.com/xiaoguangbiao-github/bigdata_sparkstreaming_sparksql.git
[《StructuredStreaming & Spark综合案例》]
https://github.com/xiaoguangbiao-github/bigdata_structuredstreaming_sparkdemo.git
[《Spark3.0新特性 & Spark多语言开发》]
https://github.com/xiaoguangbiao-github/bigdata_spark3_languagedevelop.git
Spark环境搭建-Local-本地模式 准备工作
1.JDK
2.ScalaSDK只需要在Windows安装即可
3.Spark安装包
http://spark.apache.org/downloads.html
我这里使用:spark-3.0.1-bin-hadoop2.7.tgz
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gwYuztzb-1648349412133)(img/1609551528270.png)]
操作-开箱即用1.上传解压安装包
tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz
2.修改权限
chown -R root /export/server/spark-3.0.1-bin-hadoop2.7
chgrp -R root /export/server/spark-3.0.1-bin-hadoop2.7
3.创建软连接(或改名)
ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark
4.查看安装目录
测试1.启动spark交互式窗口
/export/server/spark/bin/spark-shell
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vFrPqpYj-1648349412134)(img/1609551940572.png)]
2.打开http://ip:4040
3.准备文件
vim /root/words.txt
hello me you her hello me you hello me hello
4.spark交互式窗口中执行WordCount
val textFile = sc.textFile("file:///root/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
Spark环境搭建-Standalone-独立集群
原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fX3JpBY7-1648349412138)(img/1609553123267.png)]
操作1.环境准备
node1:master
ndoe2:worker/slave
node3:worker/slave
《先安装hadoop集群》
2.配置slaves/workers
进入配置目录
cd /export/server/spark/conf
修改配置文件名称
mv slaves.template slaves
vim slaves
内容如下:
node2 node3
3.配置master
进入配置目录
cd /export/server/spark/conf
修改配置文件名称
mv spark-env.sh.template spark-env.sh
修改配置文件
vim spark-env.sh
增加如下内容:
## 设置JAVA安装目录 JAVA_HOME=/export/server/jdk ## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要,先提前配上 HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop ## 指定spark老大Master的IP和提交任务的通信端口 SPARK_MASTER_HOST=node1 SPARK_MASTER_PORT=7077 SPARK_MASTER_WEBUI_PORT=8080 SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=1g
4.分发
将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:
cd /export/server/
scp -r spark-3.0.1-bin-hadoop2.7 root@node2:$PWD
scp -r spark-3.0.1-bin-hadoop2.7 root@node3:$PWD
创建软连接
ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark
测试1.集群启动和停止
在主节点上启动spark集群
/export/server/spark/sbin/start-all.sh
在主节点上停止spark集群
/export/server/spark/sbin/stop-all.sh
在主节点上单独启动和停止Master:
start-master.sh
stop-master.sh
在从节点上单独启动和停止Worker(Worker指的是slaves配置文件中的主机名)
start-slaves.sh
stop-slaves.sh
2.jps查看进程
node1:master
node2/node3:worker
3.http://node1:8080/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gqqxMdN3-1648349412139)(img/1609553906442.png)]
4.启动spark-shell
/export/server/spark/bin/spark-shell --master spark://node1:7077
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fhOWpfiZ-1648349412140)(img/1609554065732.png)]
5.提交WordCount任务
注意:上传文件到hdfs方便worker读取
上传文件到hdfs
hadoop fs -put /root/words.txt /wordcount/input/words.txt
目录如果不存在可以创建
hadoop fs -mkdir -p /wordcount/input
结束后可以删除测试文件夹
hadoop fs -rm -r /wordcount
val textFile = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
counts.saveAsTextFile("hdfs://node1:8020/wordcount/output47")
6.查看结果
http://node1:50070/explorer.html#/wordcount/output47
7.查看spark任务web-ui
http://node1:4040/jobs/
总结:
spark: 4040 任务运行web-ui界面端口
spark: 8080 spark集群web-ui界面端口
spark: 7077 spark提交任务时的通信端口
hadoop: 50070集群web-ui界面端口
hadoop:8020/9000(老版本) 文件上传下载通信端口
8.停止集群
/export/server/spark/sbin/stop-all.sh
Spark环境搭建-Standalone-HA 原理[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lcNZBjww-1648349412140)(img/1609555149171.png)]
操作1.启动zk
2.修改配置
vim /export/server/spark/conf/spark-env.sh
注释
#SPARK_MASTER_HOST=node1
增加
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"
3.分发配置
cd /export/server/spark/conf
scp -r spark-env.sh root@node2:$PWD
scp -r spark-env.sh root@node3:$PWD
测试0.启动zk服务
zkServer.sh status
zkServer.sh stop
zkServer.sh start
1.node1上启动Spark集群执行
/export/server/spark/sbin/start-all.sh
2.在node2上再单独只起个master:
/export/server/spark/sbin/start-master.sh
3.查看WebUI
http://node1:8080/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FjJC4YA3-1648349412141)(img/1609555501043.png)]
http://node2:8080/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jLjtPA5o-1648349412141)(img/1609555527846.png)]
4.模拟node1宕机
jps
kill -9 进程id
5.再次查看web-ui
http://node1:8080/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HT7LEVkw-1648349412141)(img/1609555613242.png)]
http://node2:8080/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JXFJ7DkR-1648349412142)(img/1609555644807.png)]
6.测试WordCount
/export/server/spark/bin/spark-shell --master spark://node1:7077,node2:7077
运行
val textFile = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
counts.saveAsTextFile("hdfs://node1:8020/wordcount/output47_2")
Spark环境搭建-Spark-On-Yarn
原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Equ2AtXc-1648349412142)(img/1609556988870.png)]
注意:
在实际开发中, 大数据任务都有统一的资源管理和任务调度工具来进行管理! —Yarn使用的最多!
因为它成熟稳定, 支持多种调度策略:FIFO/Capcity/Fair
可以使用Yarn调度管理MR/Hive/Spark/Flink
准备工作 0.关闭之前的Spark-Standalone集群/export/server/spark/sbin/stop-all.sh
1.配置Yarn历史服务器并关闭资源检查vim /export/server/hadoop/etc/hadoop/yarn-site.xml
yarn.resourcemanager.hostname node1 yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.resource.memory-mb 20480 yarn.scheduler.minimum-allocation-mb 2048 yarn.nodemanager.vmem-pmem-ratio 2.1 yarn.log-aggregation-enable true yarn.log-aggregation.retain-seconds 604800 yarn.log.server.url http://node1:19888/jobhistory/logs yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
注意:如果之前没有配置,现在配置了需要分发并重启yarn
cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2:$PWD
scp -r yarn-site.xml root@node3:$PWD
/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh
2.配置Spark的历史服务器和Yarn的整合修改spark-defaults.conf
进入配置目录
cd /export/server/spark/conf
修改配置文件名称
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
添加内容:
spark.eventLog.enabled true spark.eventLog.dir hdfs://node1:8020/sparklog/ spark.eventLog.compress true spark.yarn.historyServer.address node1:18080
修改spark-env.sh
修改配置文件
vim /export/server/spark/conf/spark-env.sh
增加如下内容:
## 配置spark历史日志存储地址 SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
注意:sparklog需要手动创建
hadoop fs -mkdir -p /sparklog
修改日志级别
进入目录
cd /export/server/spark/conf
修改日志属性配置文件名称
mv log4j.properties.template log4j.properties
改变日志级别
vim log4j.properties
修改内容如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9Nb653k0-1648349412143)(img/1609557920445.png)]
分发-可选,如果只在node1上提交spark任务到yarn,那么不需要分发
cd /export/server/spark/conf
scp -r spark-env.sh root@node2:$PWD
scp -r spark-env.sh root@node3:$PWD
scp -r spark-defaults.conf root@node2:$PWD
scp -r spark-defaults.conf root@node3:$PWD
scp -r log4j.properties root@node2:$PWD
scp -r log4j.properties root@node3:$PWD
3.配置依赖的Spark 的jar包1.在HDFS上创建存储spark相关jar包的目录
hadoop fs -mkdir -p /spark/jars/
2.上传$SPARK_HOME/jars所有jar包到HDFS
hadoop fs -put /export/server/spark/jars*Test.*
1.打包 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8HcXRjvh-1648349412147)(img/1609574002301.png)] 2.修改别名 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7koEtWaJ-1648349412147)(img/1609574144462.png)] 3.上传到linux 4.提交任务 5.查看任务 http://node1:8088/cluster [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tynoojJ1-1648349412148)(img/1609574459201.png)] 6.查看结果 http://node1:50070/explorer.html#/wordcount/output47_3 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC7U2K0D-1648349412148)(img/1609575069348.png)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mMHPU8bJ-1648349412148)(img/1609575765598.png)]package cn.xiaoguangbiao.hello
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
if(args.length < 2){
println("请指定input和output")
System.exit(1)//非0表示非正常退出程序
}
//TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
val conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//TODO 2.source/读取数据
//RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
//RDD[就是一行行的数据]
val lines: RDD[String] = sc.textFile(args(0))//注意提交任务时需要指定input参数
//TODO 3.transformation/数据操作/转换
//切割:RDD[一个个的单词]
val words: RDD[String] = lines.flatMap(_.split(" "))
//记为1:RDD[(单词, 1)]
val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
//分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey
val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
//TODO 4.sink/输出
//直接输出
//result.foreach(println)
//收集为本地集合再输出
//println(result.collect().toBuffer)
//输出到指定path(可以是文件/夹)
//如果涉及到HDFS权限问题不能写入,需要执行:
//hadoop fs -chmod -R 777 /
//并添加如下代码
System.setProperty("HADOOP_USER_NAME", "root")
result.repartition(1).saveAsTextFile(args(1))//注意提交任务时需要指定output参数
//为了便于查看Web-UI可以让程序睡一会
//Thread.sleep(1000 * 60)
//TODO 5.关闭资源
sc.stop()
}
}
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit
--master yarn
--deploy-mode cluster
--driver-memory 512m
--executor-memory 512m
--num-executors 1
--class cn.xiaoguangbiao.hello.WordCount
/root/wc.jar
hdfs://node1:8020/wordcount/input/words.txt
hdfs://node1:8020/wordcount/output47_3
package cn.xiaoguangbiao.temp;
import java.util.Random;
public class Interview_4 {
public static void main(String[] args) {
Random random = new Random();
int sum = 1000000000;//总共撒的点的数量
int count = 0;//落在扇形区域的点的数量
double x = 0;
double y = 0;
for (int i = 0; i < sum ;i++){
x = random.nextDouble();//随机产生一个[0.0~1.0)之间的随机数
y = random.nextDouble();//随机产生一个[0.0~1.0)之间的随机数
if(x*x + y*y < 1){ //落在扇形区域
count++;
}
}
double Pi = 4.0 * count / sum;
System.out.println("计算出的Pi的值为:"+Pi);
}
}



