栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark-pyspark实现基本词频计算-ssh远程测试-spark on yarn配置/启动-pyspark两种部署方式

spark-pyspark实现基本词频计算-ssh远程测试-spark on yarn配置/启动-pyspark两种部署方式

基于pycharm中pyspark的使用

pyspark的计算过程

pycharm实现wordcount

注意点

错误说明: 找不到 java home

原因:
	当前python程序是通过ssh 连接到远端, 在远端进行执行的 , 在执行的时候, 需要加载各种环境变量, 而python代码在远端执行的时候, 主要是加载 .bashrc 文件, 但是这个文件中并没有配置jdk相关信息 从而导致无法使用

解决方案:
	需要修改 linux服务器中 /root/.bashrc 文件, 添加以下两行内容:
		export JAVA_HOME=/export/server/jdk1.8.0_241/
		export PYSPARK_PYTHON=/root/anaconda3/bin/python
	添加后, 执行 source  重新加载即可
		source /root/.bashrc
	说明:
		PYSPARK_PYTHON 设置的是 base环境的python环境即可 如果是其他虚拟环境的, 设置为:
			/root/anaconda3/envs/虚拟环境名称/bin/python    
			
	
说明: 在后续所有的spark的代码中, 都需要添加以下内容:
import os
# 读取hdfs的文件数据, 完成WordCount案例

# 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"

本地实现

#spark程序的编写wordcount
#导入spark的对象
from pyspark import SparkContext, SparkConf

import os

os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"


#程序的入口
if __name__ == '__main__':
    print('spark程序的入口')

    #创建sc(sparkContext)对象
    conf = SparkConf().setMaster('local[*]').setAppName('wordcount_01')
    sc = SparkContext(conf=conf)

    #编写wordcount案例
    #读取文件 读取本地文件: file:///   读取hdfs的协议:hdfs://node1:8020/  ,读取的时候是按行读取的
    rdd1 = sc.textFile("file:export/data/workspace/_01_pyspark_base/data/words.txt")

    #将读取到的每一行数据,进行切割操作,然后得到一个大列表,放置每一个单词
    # rdd2 = rdd1.map(lambda x: x.split(' '))
    #打印
    # print(rdd2.collect())

    #flatmap 第一个作用跟map的样,第二个作用对结果进行压扁得到一个大的列表
    rdd2 = rdd1.flatMap(lambda x: x.split(' '))
    #返回一个(单词,1)的形式
    rdd3 = rdd2.map(lambda word: (word, 1))
    #进行分组操作
    rdd4 = rdd3.reduceByKey(lambda agg, curr: agg + curr)
    #打印输出
    print(rdd4.collect())

hdfs上的实现

from pyspark import SparkContext, SparkConf

import os
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"


if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("wordcount_02")
    sc = SparkContext(conf=conf)

    #读取数据
    rdd1 = sc.textFile("hdfs://node1:8020/words.text")
    #进行数据处理,连式编程
    rdd2 = rdd1.flatMap(lambda x : x.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda agg, curr: agg+curr)
    #排序的方式一
    rdd3 = rdd2.sortBy(lambda data: data[1], ascending=False)
    print(rdd3.take(3))

    print('======================')

    #排序的方式二
    rdd4 = rdd2.map(lambda data: (data[1], data[0])).sortByKey(ascending=False).map(
        lambda data: (data[1], data[0]))

    print(rdd4.take(3))

    print('===========================')

    #排序方式三
    print(rdd2.top(3, lambda data:data[1]))

    #释放sc对象资源
    sc.stop()
pycharm基于ssh远程测试

开发环境和生产环境

	一般在企业中, 会存在两套线上环境, 一套环境是用于开发(测试)环境, 一套环境是用于生产环境, 首先一般都是先在开发测试环境上进行编写代码, 并且在此环境上进行测试, 当整个项目全部开发完成后, 需要将其上传到生产环境, 面向用于使用
	
	如果说还是按照之前的本地模式开发方案, 每个人的环境有可能都不一致, 导致整个团队无法统一一套开发环境进行使用, 从而导致后续在进行测试 上线的时候, 出现各种各样环境问题
	
	pycharm提供了一些解决方案: 远程连接方案, 允许所有的程序员都去连接远端的测试环境的, 确保大家的环境都是统一, 避免各种环境问题发生, 而且由于连接的远程环境, 所有在pycharm编写代码, 会自动上传到远端环境中, 在执行代码的时候, 相当于是直接在远端环境上进行执行操作
spark on yarn环境的搭建

spark on yarn的本质

spark on yarn 本质:  将 spark的程序提交到yarn平台, 有yarn进行执行调度操作

这种调度方案在企业中也是非常常见的, 因为一旦使用on yarn方案后, 企业就不需要在维护一个spark集群环境, 可以减少节点维护, 节省资源(省钱)

注意点

注意:
	虽然不需要有spark集群, 但是需要有spark的客户端:  spark-submit  
		所有环境中依然是存在spark的包的, 但是不需要启动集群而已 (仅仅需要一台即可, 当做客户端) 

spark on yarn的配置

#修改spark-env.sh配置

cd /export/server/spark/conf
vim /export/server/spark/conf/spark-env.sh

添加以下内容:
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

同步到其他两台
cd /export/server/spark/conf
scp -r spark-env.sh node2:$PWD
scp -r spark-env.sh node3:$PWD
--------------------------------------------------

#修改Hadoop中yarn-site.xml配置
cd /export/server/hadoop-3.3.0/etc/hadoop/
vim /export/server/hadoop-3.3.0/etc/hadoop/yarn-site.xml
添加配置:

        yarn.nodemanager.resource.memory-mb
        20480
    
    
        yarn.scheduler.minimum-allocation-mb
        2048
    
    
        yarn.nodemanager.vmem-pmem-ratio
        2.1
    
分发给其余两台服务器
cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml node2:$PWD
scp -r yarn-site.xml node3:$PWD

-----------------------------------------------
#spark设置历史服务地址
cd /export/server/spark/conf
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf

添加以下内容
spark.yarn.historyServer.address        node1:18080

#设置日志级别
cd /export/server/spark/conf
cp log4j.properties.template log4j.properties
vim log4j.properties

为warn

分发给其余两台服务器
cd /export/server/spark/conf
scp -r spark-defaults.conf log4j.properties node2:$PWD
scp -r spark-defaults.conf log4j.properties node3:$PWD

------------------------------------------------
#配置依赖spark jar包
hadoop fs -mkdir -p /spark/jars/
hadoop fs -put /export/server/spark/jars/* /spark/jars/

#修改spark-defaults.conf
cd /export/server/spark/conf
vim spark-defaults.conf
添加以下内容:
spark.yarn.jars  hdfs://node1:8020/spark/jars/*

同步到其他节点
cd /export/server/spark/conf
scp -r spark-defaults.conf root@node2:$PWD
scp -r spark-defaults.conf root@node3:$PWD

启动服务说明

#先重启Hadoop服务
stop-all.sh
start-all.sh
#启动MRHistoryServer服务,在node1执行命令
mr-jobhistory-daemon.sh start historyserver
#启动Spark HistoryServer服务,,在node1执行命令
/export/server/spark/sbin/start-history-server.sh

#Spark HistoryServer服务WEB UI页面地址
http://node1:8080/
#yarn的日志服务地址
node1:19888

spark on yarn的测试使用

  • 测试: pi.py 圆周率计算脚本, 将其运行在yarn上
cd /export/server/spark
./bin/spark-submit 
--master yarn 
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" 
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" 
/export/server/spark/examples/src/main/python/pi.py 
10
  • 第二种提交方式
cd /export/server/spark
./bin/spark-submit 
--master yarn 
--driver-memory 512m 
--executor-memory 512m 
--executor-cores 1 
--num-executors 3 
--queue default 
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" 
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" 
/export/server/spark/examples/src/main/python/pi.py 
10


参数说明:
	--driver-memory : 设置 diver的内存大小
	--executor-memory: 设置 执行器的内存大小
	--executor-cores : 设置 执行器占用几核cpu
	--num-executors : 启动多少个执行器
	--queue : 将任务提交yarn集群的那个队列  默认就是 default
	--conf : 设置相关的配置

说明:
spark程序在运行的时候, 主要分为二大部分: 一个是Driver 一个是Exector

Driver:  相当于 yarn中 applicationMaster
	yarn中会为每一个任务启动一个applicationMaster, 主要负责任务的管理操作:
		任务的分配, 资源的申请,  任务的监控 等  跟任务相关管理工作
	spark中Driver其实和 applicationMaster的作用是几乎是一致的, 甚至说 在on yarn的时候, Driver和 appMaster是一个进程

executor:  执行器 , 理解为是一个线程池
	因为spark是基于线程运行的, 线程必须运行在某一个进程中, executor目的让线程运行其中 完成任务的执行

两种部署方式

本质:指的是spark程序中Driver程序应该允许在什么位置

  • 第一种:client模式(客户端模式)

默认提交spark程序,采用模式就是client
	指的:  将spark程序中Driver程序运行在提交任务的本地客户端上
	好处:
		可以直接在客户端中看到执行的结果, 方便进行测试
	
	弊端: 
		加剧网络的传输压力, 影响执行效率
  • 第二种:cluster模式(集群模式)

	指的: 将spark程序中Driver程序运行在worker节点上或者nodemanager节点上
	
	好处:  
		减少数据传输压力, 提高执行效率
	弊端: 
		无法直接看到输出结果, 需要借助历史日志服务器, 查看, 不方便测试
	
	适合于生产环境使用

如何设置部署方式

cd /export/server/spark
./bin/spark-submit 
--master yarn 
--deploy-mode client|cluster (二选一)  
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" 
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" 
/export/server/spark/examples/src/main/python/pi.py 
10

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439160.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号