栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark环境搭建

Spark环境搭建

Spark环境搭建

前言Spark环境搭建-Local-本地模式

准备工作原理操作-开箱即用测试 Spark环境搭建-Standalone-独立集群

原理操作测试 Spark环境搭建-Standalone-HA

原理操作测试 Spark环境搭建-Spark-On-Yarn

原理准备工作

0.关闭之前的Spark-Standalone集群1.配置Yarn历史服务器并关闭资源检查2.配置Spark的历史服务器和Yarn的整合3.配置依赖的Spark 的jar包4.启动服务 两种模式

client-了解cluster模式-开发使用 操作

client模式cluster模式 补充:spark-shell和spark-submit Spark程序开发-重点

准备工作代码实现-WordCount代码实现-On-YarnWordCount图解 扩展:练习题 蒙特卡罗算法求PI

思路代码实现


前言

  Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

  本项目学习内容:

Spark基础概念Spark不同运行模式环境搭建:local、Standalone、Standalone HA、Spark on YARNIDEA搭建Spark开发环境,实现CountWord示例demo

  本项目属于《Spark系列》。详细示例代码和图片展示,点下面github链接:

[《Spark环境搭建》]
https://github.com/xiaoguangbiao-github/bigdata_spark_env.git

[《Spark内核原理及RDD》]
*https://github.com/xiaoguangbiao-github/bigdata_spark_core.git

[《SparkStreaming & SparkSql》]
https://github.com/xiaoguangbiao-github/bigdata_sparkstreaming_sparksql.git

[《StructuredStreaming & Spark综合案例》]
https://github.com/xiaoguangbiao-github/bigdata_structuredstreaming_sparkdemo.git

[《Spark3.0新特性 & Spark多语言开发》]
https://github.com/xiaoguangbiao-github/bigdata_spark3_languagedevelop.git


Spark环境搭建-Local-本地模式 准备工作

1.JDK

2.ScalaSDK只需要在Windows安装即可

3.Spark安装包
http://spark.apache.org/downloads.html
我这里使用:spark-3.0.1-bin-hadoop2.7.tgz

原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gwYuztzb-1648349412133)(img/1609551528270.png)]

操作-开箱即用

1.上传解压安装包

tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz

2.修改权限

chown -R root /export/server/spark-3.0.1-bin-hadoop2.7

chgrp -R root /export/server/spark-3.0.1-bin-hadoop2.7

3.创建软连接(或改名)

ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark

4.查看安装目录

测试

1.启动spark交互式窗口

/export/server/spark/bin/spark-shell

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vFrPqpYj-1648349412134)(img/1609551940572.png)]

2.打开http://ip:4040

3.准备文件

vim /root/words.txt

hello me you her
hello me you
hello me
hello

4.spark交互式窗口中执行WordCount

val textFile = sc.textFile("file:///root/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
Spark环境搭建-Standalone-独立集群 原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fX3JpBY7-1648349412138)(img/1609553123267.png)]

操作

1.环境准备

node1:master

ndoe2:worker/slave

node3:worker/slave

《先安装hadoop集群》

2.配置slaves/workers

进入配置目录

cd /export/server/spark/conf

修改配置文件名称

mv slaves.template slaves

vim slaves

内容如下:

node2
node3

3.配置master

进入配置目录

cd /export/server/spark/conf

修改配置文件名称

mv spark-env.sh.template spark-env.sh

修改配置文件

vim spark-env.sh

增加如下内容:

## 设置JAVA安装目录
JAVA_HOME=/export/server/jdk

## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要,先提前配上
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

## 指定spark老大Master的IP和提交任务的通信端口
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8080

SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g

4.分发

将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:

cd /export/server/

scp -r spark-3.0.1-bin-hadoop2.7 root@node2:$PWD

scp -r spark-3.0.1-bin-hadoop2.7 root@node3:$PWD

创建软连接

ln -s /export/server/spark-3.0.1-bin-hadoop2.7 /export/server/spark

测试

1.集群启动和停止

在主节点上启动spark集群

/export/server/spark/sbin/start-all.sh

在主节点上停止spark集群

/export/server/spark/sbin/stop-all.sh

在主节点上单独启动和停止Master:

start-master.sh

stop-master.sh

在从节点上单独启动和停止Worker(Worker指的是slaves配置文件中的主机名)

start-slaves.sh

stop-slaves.sh

2.jps查看进程

node1:master

node2/node3:worker

3.http://node1:8080/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gqqxMdN3-1648349412139)(img/1609553906442.png)]

4.启动spark-shell

/export/server/spark/bin/spark-shell --master spark://node1:7077

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fhOWpfiZ-1648349412140)(img/1609554065732.png)]

5.提交WordCount任务

注意:上传文件到hdfs方便worker读取

上传文件到hdfs

hadoop fs -put /root/words.txt /wordcount/input/words.txt

目录如果不存在可以创建

hadoop fs -mkdir -p /wordcount/input

结束后可以删除测试文件夹

hadoop fs -rm -r /wordcount

val textFile = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
counts.saveAsTextFile("hdfs://node1:8020/wordcount/output47")

6.查看结果

http://node1:50070/explorer.html#/wordcount/output47

7.查看spark任务web-ui

http://node1:4040/jobs/

总结:

spark: 4040 任务运行web-ui界面端口

spark: 8080 spark集群web-ui界面端口

spark: 7077 spark提交任务时的通信端口

hadoop: 50070集群web-ui界面端口

hadoop:8020/9000(老版本) 文件上传下载通信端口

8.停止集群

/export/server/spark/sbin/stop-all.sh

Spark环境搭建-Standalone-HA 原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lcNZBjww-1648349412140)(img/1609555149171.png)]

操作

1.启动zk

2.修改配置

vim /export/server/spark/conf/spark-env.sh

注释

#SPARK_MASTER_HOST=node1

增加

SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"

3.分发配置

cd /export/server/spark/conf

scp -r spark-env.sh root@node2:$PWD

scp -r spark-env.sh root@node3:$PWD

测试

0.启动zk服务

zkServer.sh status

zkServer.sh stop

zkServer.sh start

1.node1上启动Spark集群执行

/export/server/spark/sbin/start-all.sh

2.在node2上再单独只起个master:

/export/server/spark/sbin/start-master.sh

3.查看WebUI

http://node1:8080/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FjJC4YA3-1648349412141)(img/1609555501043.png)]

http://node2:8080/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jLjtPA5o-1648349412141)(img/1609555527846.png)]

4.模拟node1宕机

jps

kill -9 进程id

5.再次查看web-ui

http://node1:8080/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HT7LEVkw-1648349412141)(img/1609555613242.png)]

http://node2:8080/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JXFJ7DkR-1648349412142)(img/1609555644807.png)]

6.测试WordCount

/export/server/spark/bin/spark-shell --master spark://node1:7077,node2:7077

运行

val textFile = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
counts.collect
counts.saveAsTextFile("hdfs://node1:8020/wordcount/output47_2")
Spark环境搭建-Spark-On-Yarn 原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Equ2AtXc-1648349412142)(img/1609556988870.png)]

注意:

在实际开发中, 大数据任务都有统一的资源管理和任务调度工具来进行管理! —Yarn使用的最多!

因为它成熟稳定, 支持多种调度策略:FIFO/Capcity/Fair

可以使用Yarn调度管理MR/Hive/Spark/Flink

准备工作 0.关闭之前的Spark-Standalone集群

/export/server/spark/sbin/stop-all.sh

1.配置Yarn历史服务器并关闭资源检查

vim /export/server/hadoop/etc/hadoop/yarn-site.xml


    
    
        yarn.resourcemanager.hostname
        node1
    
    
        yarn.nodemanager.aux-services
        mapreduce_shuffle
    
    
    
        yarn.nodemanager.resource.memory-mb
        20480
    
    
        yarn.scheduler.minimum-allocation-mb
        2048
    
    
        yarn.nodemanager.vmem-pmem-ratio
        2.1
    
    
    
        yarn.log-aggregation-enable
        true
    
    
    
        yarn.log-aggregation.retain-seconds
        604800
    
    
    
        yarn.log.server.url
        http://node1:19888/jobhistory/logs
    
    
    
        yarn.nodemanager.pmem-check-enabled
        false
    
    
        yarn.nodemanager.vmem-check-enabled
        false
    

注意:如果之前没有配置,现在配置了需要分发并重启yarn

cd /export/server/hadoop/etc/hadoop

scp -r yarn-site.xml root@node2:$PWD

scp -r yarn-site.xml root@node3:$PWD

/export/server/hadoop/sbin/stop-yarn.sh

/export/server/hadoop/sbin/start-yarn.sh

2.配置Spark的历史服务器和Yarn的整合

修改spark-defaults.conf

进入配置目录

cd /export/server/spark/conf

修改配置文件名称

mv spark-defaults.conf.template spark-defaults.conf

vim spark-defaults.conf

添加内容:

spark.eventLog.enabled                  true
spark.eventLog.dir                      hdfs://node1:8020/sparklog/
spark.eventLog.compress                 true
spark.yarn.historyServer.address        node1:18080

修改spark-env.sh

修改配置文件

vim /export/server/spark/conf/spark-env.sh

增加如下内容:

## 配置spark历史日志存储地址
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

注意:sparklog需要手动创建

hadoop fs -mkdir -p /sparklog

修改日志级别

进入目录

cd /export/server/spark/conf

修改日志属性配置文件名称

mv log4j.properties.template log4j.properties

改变日志级别

vim log4j.properties

修改内容如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9Nb653k0-1648349412143)(img/1609557920445.png)]

分发-可选,如果只在node1上提交spark任务到yarn,那么不需要分发

cd /export/server/spark/conf

scp -r spark-env.sh root@node2:$PWD

scp -r spark-env.sh root@node3:$PWD

scp -r spark-defaults.conf root@node2:$PWD

scp -r spark-defaults.conf root@node3:$PWD

scp -r log4j.properties root@node2:$PWD

scp -r log4j.properties root@node3:$PWD

3.配置依赖的Spark 的jar包

1.在HDFS上创建存储spark相关jar包的目录

hadoop fs -mkdir -p /spark/jars/

2.上传$SPARK_HOME/jars所有jar包到HDFS

hadoop fs -put /export/server/spark/jars*Test.* ** object WordCount { def main(args: Array[String]): Unit = { //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境 val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //TODO 2.source/读取数据 //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单! //RDD[就是一行行的数据] val lines: RDD[String] = sc.textFile("data/input/words.txt") //TODO 3.transformation/数据操作/转换 //切割:RDD[一个个的单词] val words: RDD[String] = lines.flatMap(_.split(" ")) //记为1:RDD[(单词, 1)] val wordAndOnes: RDD[(String, Int)] = words.map((_,1)) //分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_) //TODO 4.sink/输出 //直接输出 result.foreach(println) //收集为本地集合再输出 println(result.collect().toBuffer) //输出到指定path(可以是文件/夹) result.repartition(1).saveAsTextFile("data/output/result") result.repartition(2).saveAsTextFile("data/output/result2") //为了便于查看Web-UI可以让程序睡一会 Thread.sleep(1000 * 60) //TODO 5.关闭资源 sc.stop() } } 代码实现-On-Yarn

package cn.xiaoguangbiao.hello

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object WordCount {
  def main(args: Array[String]): Unit = {
    if(args.length < 2){
      println("请指定input和output")
      System.exit(1)//非0表示非正常退出程序
    }
    //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //TODO 2.source/读取数据
    //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
    //RDD[就是一行行的数据]
    val lines: RDD[String] = sc.textFile(args(0))//注意提交任务时需要指定input参数

    //TODO 3.transformation/数据操作/转换
    //切割:RDD[一个个的单词]
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //记为1:RDD[(单词, 1)]
    val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
    //分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey
    val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)

    //TODO 4.sink/输出
    //直接输出
    //result.foreach(println)
    //收集为本地集合再输出
    //println(result.collect().toBuffer)
    //输出到指定path(可以是文件/夹)
    //如果涉及到HDFS权限问题不能写入,需要执行:
    //hadoop fs -chmod -R 777  /
    //并添加如下代码
    System.setProperty("HADOOP_USER_NAME", "root")
    result.repartition(1).saveAsTextFile(args(1))//注意提交任务时需要指定output参数

    //为了便于查看Web-UI可以让程序睡一会
    //Thread.sleep(1000 * 60)

    //TODO 5.关闭资源
    sc.stop()
  }
}

1.打包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8HcXRjvh-1648349412147)(img/1609574002301.png)]

2.修改别名

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7koEtWaJ-1648349412147)(img/1609574144462.png)]

3.上传到linux

4.提交任务

SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit 
--master yarn 
--deploy-mode cluster 
--driver-memory 512m 
--executor-memory 512m 
--num-executors 1 
--class cn.xiaoguangbiao.hello.WordCount 
/root/wc.jar 
hdfs://node1:8020/wordcount/input/words.txt 
hdfs://node1:8020/wordcount/output47_3

5.查看任务

http://node1:8088/cluster

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tynoojJ1-1648349412148)(img/1609574459201.png)]

6.查看结果

http://node1:50070/explorer.html#/wordcount/output47_3

WordCount图解

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xC7U2K0D-1648349412148)(img/1609575069348.png)]

扩展:练习题 蒙特卡罗算法求PI 思路

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mMHPU8bJ-1648349412148)(img/1609575765598.png)]

代码实现
package cn.xiaoguangbiao.temp;

import java.util.Random;


public class Interview_4 {
    public static void main(String[] args) {
        Random random = new Random();
        int sum = 1000000000;//总共撒的点的数量
        int count = 0;//落在扇形区域的点的数量
        double  x = 0;
        double  y = 0;
        for (int i = 0; i < sum ;i++){
            x = random.nextDouble();//随机产生一个[0.0~1.0)之间的随机数
            y = random.nextDouble();//随机产生一个[0.0~1.0)之间的随机数
            if(x*x + y*y < 1){ //落在扇形区域
                count++;
            }
        }
        double  Pi = 4.0 * count / sum;
        System.out.println("计算出的Pi的值为:"+Pi);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/779409.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号