pyspark的计算过程
注意点
错误说明: 找不到 java home 原因: 当前python程序是通过ssh 连接到远端, 在远端进行执行的 , 在执行的时候, 需要加载各种环境变量, 而python代码在远端执行的时候, 主要是加载 .bashrc 文件, 但是这个文件中并没有配置jdk相关信息 从而导致无法使用 解决方案: 需要修改 linux服务器中 /root/.bashrc 文件, 添加以下两行内容: export JAVA_HOME=/export/server/jdk1.8.0_241/ export PYSPARK_PYTHON=/root/anaconda3/bin/python 添加后, 执行 source 重新加载即可 source /root/.bashrc 说明: PYSPARK_PYTHON 设置的是 base环境的python环境即可 如果是其他虚拟环境的, 设置为: /root/anaconda3/envs/虚拟环境名称/bin/python 说明: 在后续所有的spark的代码中, 都需要添加以下内容: import os # 读取hdfs的文件数据, 完成WordCount案例 # 目的: 锁定远端操作环境, 避免存在多个版本环境的问题 os.environ["SPARK_HOME"]="/export/server/spark" os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python" os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"
本地实现
#spark程序的编写wordcount
#导入spark的对象
from pyspark import SparkContext, SparkConf
import os
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"
#程序的入口
if __name__ == '__main__':
print('spark程序的入口')
#创建sc(sparkContext)对象
conf = SparkConf().setMaster('local[*]').setAppName('wordcount_01')
sc = SparkContext(conf=conf)
#编写wordcount案例
#读取文件 读取本地文件: file:/// 读取hdfs的协议:hdfs://node1:8020/ ,读取的时候是按行读取的
rdd1 = sc.textFile("file:export/data/workspace/_01_pyspark_base/data/words.txt")
#将读取到的每一行数据,进行切割操作,然后得到一个大列表,放置每一个单词
# rdd2 = rdd1.map(lambda x: x.split(' '))
#打印
# print(rdd2.collect())
#flatmap 第一个作用跟map的样,第二个作用对结果进行压扁得到一个大的列表
rdd2 = rdd1.flatMap(lambda x: x.split(' '))
#返回一个(单词,1)的形式
rdd3 = rdd2.map(lambda word: (word, 1))
#进行分组操作
rdd4 = rdd3.reduceByKey(lambda agg, curr: agg + curr)
#打印输出
print(rdd4.collect())
hdfs上的实现
from pyspark import SparkContext, SparkConf
import os
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("wordcount_02")
sc = SparkContext(conf=conf)
#读取数据
rdd1 = sc.textFile("hdfs://node1:8020/words.text")
#进行数据处理,连式编程
rdd2 = rdd1.flatMap(lambda x : x.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda agg, curr: agg+curr)
#排序的方式一
rdd3 = rdd2.sortBy(lambda data: data[1], ascending=False)
print(rdd3.take(3))
print('======================')
#排序的方式二
rdd4 = rdd2.map(lambda data: (data[1], data[0])).sortByKey(ascending=False).map(
lambda data: (data[1], data[0]))
print(rdd4.take(3))
print('===========================')
#排序方式三
print(rdd2.top(3, lambda data:data[1]))
#释放sc对象资源
sc.stop()
pycharm基于ssh远程测试
开发环境和生产环境
一般在企业中, 会存在两套线上环境, 一套环境是用于开发(测试)环境, 一套环境是用于生产环境, 首先一般都是先在开发测试环境上进行编写代码, 并且在此环境上进行测试, 当整个项目全部开发完成后, 需要将其上传到生产环境, 面向用于使用 如果说还是按照之前的本地模式开发方案, 每个人的环境有可能都不一致, 导致整个团队无法统一一套开发环境进行使用, 从而导致后续在进行测试 上线的时候, 出现各种各样环境问题 pycharm提供了一些解决方案: 远程连接方案, 允许所有的程序员都去连接远端的测试环境的, 确保大家的环境都是统一, 避免各种环境问题发生, 而且由于连接的远程环境, 所有在pycharm编写代码, 会自动上传到远端环境中, 在执行代码的时候, 相当于是直接在远端环境上进行执行操作spark on yarn环境的搭建
spark on yarn的本质
spark on yarn 本质: 将 spark的程序提交到yarn平台, 有yarn进行执行调度操作 这种调度方案在企业中也是非常常见的, 因为一旦使用on yarn方案后, 企业就不需要在维护一个spark集群环境, 可以减少节点维护, 节省资源(省钱)
注意点
注意: 虽然不需要有spark集群, 但是需要有spark的客户端: spark-submit 所有环境中依然是存在spark的包的, 但是不需要启动集群而已 (仅仅需要一台即可, 当做客户端)
spark on yarn的配置
#修改spark-env.sh配置 cd /export/server/spark/conf vim /export/server/spark/conf/spark-env.sh 添加以下内容: HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop 同步到其他两台 cd /export/server/spark/conf scp -r spark-env.sh node2:$PWD scp -r spark-env.sh node3:$PWD -------------------------------------------------- #修改Hadoop中yarn-site.xml配置 cd /export/server/hadoop-3.3.0/etc/hadoop/ vim /export/server/hadoop-3.3.0/etc/hadoop/yarn-site.xml 添加配置:yarn.nodemanager.resource.memory-mb 20480 yarn.scheduler.minimum-allocation-mb 2048 分发给其余两台服务器 cd /export/server/hadoop/etc/hadoop scp -r yarn-site.xml node2:$PWD scp -r yarn-site.xml node3:$PWD ----------------------------------------------- #spark设置历史服务地址 cd /export/server/spark/conf cp spark-defaults.conf.template spark-defaults.conf vim spark-defaults.conf 添加以下内容 spark.yarn.historyServer.address node1:18080 #设置日志级别 cd /export/server/spark/conf cp log4j.properties.template log4j.properties vim log4j.properties 为warn 分发给其余两台服务器 cd /export/server/spark/conf scp -r spark-defaults.conf log4j.properties node2:$PWD scp -r spark-defaults.conf log4j.properties node3:$PWD ------------------------------------------------ #配置依赖spark jar包 hadoop fs -mkdir -p /spark/jars/ hadoop fs -put /export/server/spark/jars/* /spark/jars/ #修改spark-defaults.conf cd /export/server/spark/conf vim spark-defaults.conf 添加以下内容: spark.yarn.jars hdfs://node1:8020/spark/jars/* 同步到其他节点 cd /export/server/spark/conf scp -r spark-defaults.conf root@node2:$PWD scp -r spark-defaults.conf root@node3:$PWD yarn.nodemanager.vmem-pmem-ratio 2.1
启动服务说明
#先重启Hadoop服务 stop-all.sh start-all.sh #启动MRHistoryServer服务,在node1执行命令 mr-jobhistory-daemon.sh start historyserver #启动Spark HistoryServer服务,,在node1执行命令 /export/server/spark/sbin/start-history-server.sh #Spark HistoryServer服务WEB UI页面地址 http://node1:8080/ #yarn的日志服务地址 node1:19888
spark on yarn的测试使用
- 测试: pi.py 圆周率计算脚本, 将其运行在yarn上
cd /export/server/spark ./bin/spark-submit --master yarn --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" --conf "spark.pyspark.python=/root/anaconda3/bin/python3" /export/server/spark/examples/src/main/python/pi.py 10
- 第二种提交方式
cd /export/server/spark ./bin/spark-submit --master yarn --driver-memory 512m --executor-memory 512m --executor-cores 1 --num-executors 3 --queue default --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" --conf "spark.pyspark.python=/root/anaconda3/bin/python3" /export/server/spark/examples/src/main/python/pi.py 10 参数说明: --driver-memory : 设置 diver的内存大小 --executor-memory: 设置 执行器的内存大小 --executor-cores : 设置 执行器占用几核cpu --num-executors : 启动多少个执行器 --queue : 将任务提交yarn集群的那个队列 默认就是 default --conf : 设置相关的配置
说明:
spark程序在运行的时候, 主要分为二大部分: 一个是Driver 一个是Exector
Driver: 相当于 yarn中 applicationMaster yarn中会为每一个任务启动一个applicationMaster, 主要负责任务的管理操作: 任务的分配, 资源的申请, 任务的监控 等 跟任务相关管理工作 spark中Driver其实和 applicationMaster的作用是几乎是一致的, 甚至说 在on yarn的时候, Driver和 appMaster是一个进程 executor: 执行器 , 理解为是一个线程池 因为spark是基于线程运行的, 线程必须运行在某一个进程中, executor目的让线程运行其中 完成任务的执行
两种部署方式
本质:指的是spark程序中Driver程序应该允许在什么位置
- 第一种:client模式(客户端模式)
默认提交spark程序,采用模式就是client 指的: 将spark程序中Driver程序运行在提交任务的本地客户端上 好处: 可以直接在客户端中看到执行的结果, 方便进行测试 弊端: 加剧网络的传输压力, 影响执行效率
- 第二种:cluster模式(集群模式)
指的: 将spark程序中Driver程序运行在worker节点上或者nodemanager节点上 好处: 减少数据传输压力, 提高执行效率 弊端: 无法直接看到输出结果, 需要借助历史日志服务器, 查看, 不方便测试 适合于生产环境使用
如何设置部署方式
cd /export/server/spark ./bin/spark-submit --master yarn --deploy-mode client|cluster (二选一) --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" --conf "spark.pyspark.python=/root/anaconda3/bin/python3" /export/server/spark/examples/src/main/python/pi.py 10



