栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark-3.0.1安装教程(二)

spark-3.0.1安装教程(二)

5 idea环境maven环境搭建及测试

5.1 创建maven项目



   4.0.0

   org.example
   apache_hudi
   1.0-SNAPSHOT

   
       
           org.apache.hudi
           hudi-client
           provided
           0.9.0
           pom
       
       
           org.apache.hudi
           hudi-spark-bundle_2.12
           provided
           0.9.0
       
       
           org.apache.hudi
           hudi-hadoop-mr-bundle
           
               
                   jackson-databind
                   com.fasterxml.jackson.core
               
               
                   com.fasterxml.jackson.core
                   jackson-annotations
               
           
           0.9.0
           provided
       

       
       

       
           org.apache.spark
           spark-core_2.12
           
           provided
           3.0.1
       
       
           org.scala-lang
           scala-library
           2.12.10
       
       
           org.apache.spark
           spark-sql_2.12
           provided
           3.0.1
       
       
           org.apache.spark
           spark-hive_2.12
           provided
           3.0.1
       
       
           org.apache.spark
           spark-avro_2.12
           provided
           3.0.1
       
       
           org.apache.hadoop
           hadoop-client
           provided
          
           2.7.2
       
       
           org.spark-project.hive
           hive-jdbc
           provided
           1.2.1.spark2
       
   


5.2 创建scala 版本的wordcount并本地运行

package com.sinoes

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _01_WordCount {

  def main(args: Array[String]): Unit = {


    // 获取spark的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("wordCount")
      .setMaster("local[*]") // 提交到集群模式删除

    val sparkContext: SparkContext = new SparkContext(conf)

    // 读取数据,转换为rdd
    val linesRDD: RDD[String] = sparkContext.textFile(args(0))

    // 对数据进行逻辑处理
    val wordCountRDD: RDD[(String, Int)] = linesRDD
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)

    // 打印结果数据
    wordCountRDD.foreach(println(_))

    // 关闭数据
    sparkContext.stop()

    //Thread.sleep(100000L)
  }
}

# 启动hdfs
$ start-dfs.sh 
 
# 启动yarn
$ start-yarn.sh 
 
# 启动spark
start-spark-all.sh

命令行运行

$SPARK_HOME/bin/spark-submit 
--class com.sinoes._01_WordCount  
--master spark://centos110:7077 
--driver-memory 512M 
--executor-memory 512M 
--total-executor-cores 1 
/root/wordcount.jar  
10

注:如果出现加载不到主类的错误,可能是打包方式有问题。

解决方式:

构建打包,提交到spark环境运行
  1. 生产环境下一般是将应用构建打包成jar,放到spark集群中运行,所以我们来操作构建打包;

  2. 在菜单上选择"File"->“Project Structure”,弹出窗口点击"Artifacts",选择新增jar,如下图:

    )]

  3. 如下图,在弹出的窗口中,红框1位置输入要运行的class,红框2选择的是单选框的第二个"copy to the output …":
    )]

  4. 在菜单上选择"Build"->“Build Artifacts…”,在弹出的菜单中选择"sparkscalademo:jar"->“Rebuild” 如下图:


    )]

  5. 如果编译成功,在项目的 outartifacts目录下就会生成文件,如下:


(C:UsersuserAppDataRoamingTyporatypora-user-imagesimage-20211029152326862.png)]

spark-submit 详细参数说明
参数名参数说明
–mastermaster 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local
–deploy-mode在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
–class应用程序的主类,仅针对 java 或 scala 应用
–name应用程序的名称
–jars用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
–packages包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
–exclude-packages为了避免冲突 而指定不包含的 package
–repositories远程 repository
–conf PROP=VALUE指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
–properties-file加载的配置文件,默认为 conf/spark-defaults.conf
–driver-memoryDriver内存,默认 1G
–driver-java-options传给 driver 的额外的 Java 选项
–driver-library-path传给 driver 的额外的库路径
–driver-class-path传给 driver 的额外的类路径
–driver-coresDriver 的核数,默认是1。在 yarn 或者 standalone 下使用
–executor-memory每个 executor 的内存,默认是1G
–total-executor-cores所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用
–num-executors启动的 executor 数量。默认为2。在 yarn 下使用
–executor-core每个 executor 的核数。在yarn或者standalone下使用
6. 配置spark history-server

6.1 作用

可以查询运行的任务,并且可以查看stage,每个task的运行时间,便于后续观察及调优。

6.2. 配置

1)打开开关

复制文件 spark-defaults.conf.template 为 spark-defaults.conf

vi spark-defaults.conf

spark.eventLog.enabled           true

spark.eventLog.dir               hdfs://centos110:9000/spark-logs

后面的dir的,配置的hdfs的集群的地址
  1. 增加环境参数

vi spark-env.sh, 增加

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18180 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://centos110:9000/spark-logs"
  1. 分发刚才两个配置的文件,到其他的服务器上

  2. 启动hdfs,创建文件夹 /spark-logs

​ 启动hdfs, 启动spark

  1. 启动 history server : 命令:start-history-server.sh

    可以再hdfs上看到相关的信息

7. HA(高可用)配置

7.1 安装配置zookeeper

spark的高可用依赖于zookeeper,需要先安装zookeeper

zookeeper的安装请参考(Zookeeper3.5.8+CentOS7.6集群安装配置及使用):

https://blog.csdn.net/qq_44665283/article/details/121038105

7.2 spark增加配置

在bin/spark-env.sh 增加:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos110:2181,centos112:2181,centos113:2181 -Dspark.deploy.zookeeper.dir=/spark"
 
# 每个work的线程数
export SPARK_WORKER_CORES=4
# 每个work可用的内存数
export SPARK_WORKER_MEMORY=2g

分发到各个节点

scp ./spark-env.sh centos112:$PWD

scp ./spark-env.sh centos113:$PWD

7.3 启动zookepper 集群

zkStart-all.sh

7.4 在centos110 启动spark

start-spark-all.sh

在centos112 启动

start-master.sh

发现未启动,因此需要去掉配置:

spark-env.sh 里面的配置

#export SPARK_MASTER_HOST=centos110
8 spark-shell 的使用

8.1 bin/spark-shell 进行启动,进入交换窗口

这个没有指定master的地址,用的是spark的local模式运行的,模拟的spark的运行

8.2 bin/spark-shell --master spark://centos110:7077

(centos112:7077,开启高可用需要两个master地址)

启动成功:

8.3 统计hdfs wordcount

sc.textFile("hdfs://centos110:9000/spark_test/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
9 SparkSQL 整合hive

9.1 基本说明

就是使用spark整合hive,利用hive的语法进行处理的。

9.2 已经有hive的配置

可以把 hive-site.xml, hdfs-site.xml, core-site.xml 复制到spark/conf目录下,

修改hive-site.xml 如下:

  
   
     hive.metastore.warehouse.dir
      hdfs://centos110:9000/user/hive/warehouse
   

把相关配置scp到启动spark节点,重新启动spark集群

9.5 上传mysql

把MySQL驱动上传到jar目录下

9.6 配置HADOOP_CONF_DIR

在spark目录下的conf/spark-env.sh,增加如下配置

export HADOOP_CONF_DIR=/opt/apps/hadoop-3.1.3/etc/hadoop

9.6 启动:

启动hdfs集群,启动spark集群

 spark-sql --master spark://centos110:7077 --driver-class-path /opt/hadoop/hive-3.1.2/lib/mysql-connector-java-5.1.48.jar

如果按照9.5提示,上传到jar下面,后面的jdbc驱动路径可以不写

9.7 修改数据的存储文件路径

修改DB_LOCATION_UIR为hdfs:

hdfs://centos110:9000/user/sparkhive/warehouse

重新启动SparkSQL命令行即可以

9.8 测试

请参考hive的使用: http://www.yiyong.info/article/161

spark-sql 支持 -f sql.sql 的启动参数. 这样,把需要执行的sql编写在文件里面,把结果保存在数据库的表里面,实现离线或者定时的任务。

9.9 idea整合

增加pom依赖

 
      org.apache.spark
      spark-hive_2.12
      ${spark.version}
 

在创建sparkSession 的对象时,增加hive支持的。如下

val spark: SparkSession = SparkSession.builder()
      .appName("hiveTask")
      .master("local[*]")
      .enableHiveSupport()  // 启用spark对hive的支持(可以兼容hive的语法)
      .getOrCreate()

添加配置文件:

把spark/conf 下的hive-site.xml, core-site.xm., hdfs-site.xml放到项目的resources 目录,保证程序能找到hive的元数据库及hdfs位置

  
  
    hive.metastore.uris
    thrift://centos110:9083
    Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
  
package com.sinoes

import org.apache.spark.sql.SparkSession

object _02_HiveTest {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .appName("hiveTask")
      .master("local[*]")
      .enableHiveSupport()  // 启用spark对hive的支持(可以兼容hive的语法)
      .getOrCreate()
    spark.sql("use test_ods_db")
    spark.sql("SELECt * FROM emp").show();
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354567.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号