5.1 创建maven项目
4.0.0 org.example apache_hudi 1.0-SNAPSHOT org.apache.hudi hudi-client provided 0.9.0 pom org.apache.hudi hudi-spark-bundle_2.12 provided 0.9.0 org.apache.hudi hudi-hadoop-mr-bundle jackson-databind com.fasterxml.jackson.core com.fasterxml.jackson.core jackson-annotations 0.9.0 provided org.apache.spark spark-core_2.12 provided 3.0.1 org.scala-lang scala-library 2.12.10 org.apache.spark spark-sql_2.12 provided 3.0.1 org.apache.spark spark-hive_2.12 provided 3.0.1 org.apache.spark spark-avro_2.12 provided 3.0.1 org.apache.hadoop hadoop-client provided 2.7.2 org.spark-project.hive hive-jdbc provided 1.2.1.spark2
5.2 创建scala 版本的wordcount并本地运行
package com.sinoes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object _01_WordCount {
def main(args: Array[String]): Unit = {
// 获取spark的环境
val conf: SparkConf = new SparkConf()
conf.setAppName("wordCount")
.setMaster("local[*]") // 提交到集群模式删除
val sparkContext: SparkContext = new SparkContext(conf)
// 读取数据,转换为rdd
val linesRDD: RDD[String] = sparkContext.textFile(args(0))
// 对数据进行逻辑处理
val wordCountRDD: RDD[(String, Int)] = linesRDD
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// 打印结果数据
wordCountRDD.foreach(println(_))
// 关闭数据
sparkContext.stop()
//Thread.sleep(100000L)
}
}
# 启动hdfs $ start-dfs.sh # 启动yarn $ start-yarn.sh # 启动spark start-spark-all.sh
命令行运行
$SPARK_HOME/bin/spark-submit --class com.sinoes._01_WordCount --master spark://centos110:7077 --driver-memory 512M --executor-memory 512M --total-executor-cores 1 /root/wordcount.jar 10
注:如果出现加载不到主类的错误,可能是打包方式有问题。
解决方式:
构建打包,提交到spark环境运行-
生产环境下一般是将应用构建打包成jar,放到spark集群中运行,所以我们来操作构建打包;
-
在菜单上选择"File"->“Project Structure”,弹出窗口点击"Artifacts",选择新增jar,如下图:
)] -
如下图,在弹出的窗口中,红框1位置输入要运行的class,红框2选择的是单选框的第二个"copy to the output …":
)] -
在菜单上选择"Build"->“Build Artifacts…”,在弹出的菜单中选择"sparkscalademo:jar"->“Rebuild” 如下图:
)] -
如果编译成功,在项目的 outartifacts目录下就会生成文件,如下:
(C:UsersuserAppDataRoamingTyporatypora-user-imagesimage-20211029152326862.png)]
| 参数名 | 参数说明 |
|---|---|
| –master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local |
| –deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
| –class | 应用程序的主类,仅针对 java 或 scala 应用 |
| –name | 应用程序的名称 |
| –jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
| –packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
| –exclude-packages | 为了避免冲突 而指定不包含的 package |
| –repositories | 远程 repository |
| –conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
| –properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
| –driver-memory | Driver内存,默认 1G |
| –driver-java-options | 传给 driver 的额外的 Java 选项 |
| –driver-library-path | 传给 driver 的额外的库路径 |
| –driver-class-path | 传给 driver 的额外的类路径 |
| –driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
| –executor-memory | 每个 executor 的内存,默认是1G |
| –total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
| –num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
| –executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
6.1 作用
可以查询运行的任务,并且可以查看stage,每个task的运行时间,便于后续观察及调优。
6.2. 配置
1)打开开关
复制文件 spark-defaults.conf.template 为 spark-defaults.conf
vi spark-defaults.conf
spark.eventLog.enabled true spark.eventLog.dir hdfs://centos110:9000/spark-logs 后面的dir的,配置的hdfs的集群的地址
- 增加环境参数
vi spark-env.sh, 增加
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18180 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://centos110:9000/spark-logs"
-
分发刚才两个配置的文件,到其他的服务器上
-
启动hdfs,创建文件夹 /spark-logs
启动hdfs, 启动spark
-
启动 history server : 命令:start-history-server.sh
可以再hdfs上看到相关的信息
7.1 安装配置zookeeper
spark的高可用依赖于zookeeper,需要先安装zookeeper
zookeeper的安装请参考(Zookeeper3.5.8+CentOS7.6集群安装配置及使用):
https://blog.csdn.net/qq_44665283/article/details/121038105
7.2 spark增加配置
在bin/spark-env.sh 增加:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos110:2181,centos112:2181,centos113:2181 -Dspark.deploy.zookeeper.dir=/spark" # 每个work的线程数 export SPARK_WORKER_CORES=4 # 每个work可用的内存数 export SPARK_WORKER_MEMORY=2g
分发到各个节点
scp ./spark-env.sh centos112:$PWD scp ./spark-env.sh centos113:$PWD
7.3 启动zookepper 集群
zkStart-all.sh
7.4 在centos110 启动spark
start-spark-all.sh
在centos112 启动
start-master.sh
发现未启动,因此需要去掉配置:
spark-env.sh 里面的配置
#export SPARK_MASTER_HOST=centos1108 spark-shell 的使用
8.1 bin/spark-shell 进行启动,进入交换窗口
这个没有指定master的地址,用的是spark的local模式运行的,模拟的spark的运行
8.2 bin/spark-shell --master spark://centos110:7077
(centos112:7077,开启高可用需要两个master地址)
启动成功:
8.3 统计hdfs wordcount
sc.textFile("hdfs://centos110:9000/spark_test/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
9 SparkSQL 整合hive
9.1 基本说明
就是使用spark整合hive,利用hive的语法进行处理的。
9.2 已经有hive的配置
可以把 hive-site.xml, hdfs-site.xml, core-site.xml 复制到spark/conf目录下,
修改hive-site.xml 如下:
hive.metastore.warehouse.dir hdfs://centos110:9000/user/hive/warehouse
把相关配置scp到启动spark节点,重新启动spark集群
9.5 上传mysql
把MySQL驱动上传到jar目录下
9.6 配置HADOOP_CONF_DIR
在spark目录下的conf/spark-env.sh,增加如下配置
export HADOOP_CONF_DIR=/opt/apps/hadoop-3.1.3/etc/hadoop
9.6 启动:
启动hdfs集群,启动spark集群
spark-sql --master spark://centos110:7077 --driver-class-path /opt/hadoop/hive-3.1.2/lib/mysql-connector-java-5.1.48.jar
如果按照9.5提示,上传到jar下面,后面的jdbc驱动路径可以不写
9.7 修改数据的存储文件路径
修改DB_LOCATION_UIR为hdfs:
hdfs://centos110:9000/user/sparkhive/warehouse
重新启动SparkSQL命令行即可以
9.8 测试
请参考hive的使用: http://www.yiyong.info/article/161
spark-sql 支持 -f sql.sql 的启动参数. 这样,把需要执行的sql编写在文件里面,把结果保存在数据库的表里面,实现离线或者定时的任务。
9.9 idea整合
增加pom依赖
org.apache.spark spark-hive_2.12 ${spark.version}
在创建sparkSession 的对象时,增加hive支持的。如下
val spark: SparkSession = SparkSession.builder()
.appName("hiveTask")
.master("local[*]")
.enableHiveSupport() // 启用spark对hive的支持(可以兼容hive的语法)
.getOrCreate()
添加配置文件:
把spark/conf 下的hive-site.xml, core-site.xm., hdfs-site.xml放到项目的resources 目录,保证程序能找到hive的元数据库及hdfs位置
hive.metastore.uris thrift://centos110:9083 Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
package com.sinoes
import org.apache.spark.sql.SparkSession
object _02_HiveTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("hiveTask")
.master("local[*]")
.enableHiveSupport() // 启用spark对hive的支持(可以兼容hive的语法)
.getOrCreate()
spark.sql("use test_ods_db")
spark.sql("SELECt * FROM emp").show();
}
}



