栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark源码编译和集群部署以及idea中sbt开发环境集成

spark源码编译和集群部署以及idea中sbt开发环境集成

spark源码编译和集群部署以及idea中sbt开发环境集成

源码下载源码编译

maven 下载scala 下载编译参数编译编译分发的二进制包 单机启动集群部署开发环境集成源码编译的3.2.0版本无法在window上直接用spark-shell启动总结
项目地址:https://gitee.com/jyq_18792721831/studyspark.git

源码下载

打开Apache Spark™ - Unified Engine for large-scale data analytics,下载源码

在下载界面中选择源码下载

随便选择哪个地址

下载源码后上传到hadoop01服务器的/spark目录下

使用tar -zxvf spark-3.2.0.tgz解压到/spark目录下

源码编译

返回下载界面,选择文档,选择最新版或者你下载的版本

或者你可以直接打开Overview - Spark 3.2.0 documentation (apache.org)界面

选择跳转到打包构建的文档

有两种构建方式:maven和sbt(scala build tool)

我们选择maven

maven 下载

在编译文档中,要求我们尽可能使用新的maven

所以我们使用手动安装的方式,安装最新版

首先打开Maven – Welcome to Apache Maven,跳转到下载页

我们选择最新的二进制包

右键拷贝链接,直接在hadoop01上下载

在根目录下创建/maven目录,然后执行curl -O https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz下载

使用tar -zxvf apache-maven-3.8.4-bin.tar.gz解压

删除压缩包,并把maven里面的内容移动出来,使得maven的home为/maven

因为之前就在hadoop01上安装过了java,并且配置了环境变量,所以可以直接配置maven的环境变量即可,高版本需要jdk1.7以上

配置环境变量

然后使用source ~/.bash_profile生效

使用mvn --version验证版本

scala 下载

到The Scala Programming Language (scala-lang.org)进入下载界面

选择scala2吧,scala3实在没用过。

拉到最下面,下载二进制包

相同的方式,拷贝链接,在hadoop01中下载

然后配置环境变量

使用source ~/.bash_profile生效后验证版本

编译参数

根据文档,首先设置maven的信息

我们将MAVEN_OPTS放入~/.bash_profile中

使用source ~/.bash_profile使之生效。

接着往下看构建文档,找到支持hive和jdbc的构建

这是maven命令,使用源码包中自带的maven,其中-P和-D是一些参数。

这些参数和pom.xml文件中的profile和properties有关。

-D对应properties

-P对应profile

我们查看pom.xml文件

这只是一部分,具体的可以自行查看


    UTF-8
    UTF-8
    
    1.8
    ${java.version}
    ${java.version}
    
    3.6.3
    1.6.0
    spark
    1.7.30
    1.2.17
    
    3.3.1
    2.5.0
    ${hadoop.version}
    3.6.2
    2.13.0
    org.apache.hive
    core
    
    
    2.3.9
    2.3.9
    
    2.3
    
    2.8.0
    
    10.14.2.0
    1.12.1
    1.6.11
    9.4.43.v20210629
    4.0.3
    0.10.0
    2.5.0
    2.0.8
    
    4.2.0
    1.10.2
    1.12.0
    
    1.11.655
    
    0.12.8
    
    4.5.13

所以,我们的构建命令中-D要设置的如下

mvn -Djava.version=11 -Dmaven.version=3.8.1 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -DskipTests clean package

接下来是-P参数

-P参数对应的是pom.xml中的profile属性

    
      hadoop-2.7
      
        2.7.4
        2.7.1
        2.4
        hadoop-client
        hadoop-yarn-api
        hadoop-client
      
    
	
      hive-2.3
    
	
      yarn
      
        resource-managers/yarn
        common/network-yarn
      
    
	
      hive-thriftserver
      
        sql/hive-thriftserver
      
    
	
      scala-2.13
      
        2.13.5
        2.13
      
      
        
          
            
              net.alchim31.maven
              scala-maven-plugin
              
                
                  -unchecked
                  -deprecation
                  -feature
                  -explaintypes
                  -target:jvm-1.8
                  -Wconf:cat=deprecation:wv,any:e
                  -Wconf:cat=scaladoc:wv
                  -Wconf:cat=lint-multiarg-infix:wv
                  -Wconf:cat=other-nullary-override:wv
                  -Wconf:cat=other-match-analysis&site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv
                  -Wconf:cat=other-pure-statement&site=org.apache.spark.streaming.util.FilebasedWriteAheadLog.readAll.readFile:wv
                  -Wconf:cat=other-pure-statement&site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite.<local OutputCommitCoordinatorSuite">>.futureAction:wv
                  -Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s
                  -Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s
                  -Wconf:msg=Auto-application to `()` is deprecated:s
                  -Wconf:msg=method with a single empty parameter list overrides method without any parameter list:s
                  -Wconf:msg=method without a parameter list overrides a method with a single empty one:s
                  -Wconf:cat=deprecation&msg=procedure syntax is deprecated:e
                
                
                
              
            
          
        
      
	

所以-P的参数大概就这么多

mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.13 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.13.8 -Dscala.binary.version=2.13 -DskipTests clean package

编译

如果使用的是scala-2.13版本,那么需要切换scala版本

查看change-scala-version.sh的内容,主要是环境变量的配置和pom.xml文件中的版本号的修改。

在pom.xml中,scala相关的配置是properties,理论上我们使用-D参数就能修改,不过可能有其他的配置,不一定是pom.xml的,反正执行一次最好。

而且,需要注意的是,执行这个命令,必须在spark主目录下,因为在change-scala-version.sh脚本中使用相对路径执行其他脚本,所以如果你在其他位置执行,那么会因为上下文的问题,导致执行失败。

实际上,change-scala-version.sh脚本还会使用spark主目录下build/mvn程序。

我们使用上面拼接好的命令,在SPARK_HOME下执行

第一次执行,maven 会下载相关的依赖,比较慢

哎,依赖是在是多,耐心等待吧

等待了一段时间,发现编译错误了

很正常,一次成功才不正常。

我们查看错误的堆栈,发现是因为找不到一个为RangePartitioner的类,查看源码,这是一个在apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala中定义的类

问题在于scala-2.13中没有打包生成class文件,而scala-2.12中就会打包生成calss文件

生成的class文件存储在/spark/core/target/scala-2.12/classes/org/apache/spark目录下

不知道是不是scala和spark的兼容性问题。(我自己试了好多次,2.12.15可以编译成功,2.13.8就编译失败了。)

在使用scala-2.12编译的时候,编译到spark-sql的时候,卡主,怀疑是内存不足了

幸好是虚拟机,可以增加内存,我们尝试将内存增加到4G

然后终止编译,重新编译,并使用-X启动debug输出

经过尝试,调大内存在一定程度上能加快编译速度。

第一次编译可以采用一步一步执行,可以先编译后打包,其实打包也包含编译,我单独执行编译是为了验证scala-2.12是否会生成相关的class文件,打包成功如下

编译成功就会生成二进制文件,存储于spark根目录的bin目录下

其实这只是一部分,这些脚本调用的jar包也会生成。

我们在源码的bin目录下就可以开心的使用spark了

直接启动spark-shell

这些版本信息和我们在编译的时候指定的一模一样。

编译分发的二进制包

编译好只是这个环境编译好了,里面既有源码文件,也有可执行文件,我们不能在集群环境的每个节点上都进行编译一次,那么太过费时,而且也不可行。

所以我们需要编译可分发的二进制包,就像官网提供的spark一样,没有源码,全部是执行文件。

在官网中是建议我们使用工具编译分发的二进制包

查看这个脚本,发现还是maven命令

最后将编译后在bin目录中的内容进行打包而已

我们在spark的根目录下执行./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package

打包完成后就会生成psark-2.9.2-bin-2.12.tgz压缩包。

并不会打印maven的日志,等待就行了,可能需要和maven打包同样长的时间。(2核,8G,大约40分钟)

完成后会在spark根目录下生成可分发二进制文件包

实际上当我们使用maven命令完成打包后,只是单独的想打包分发二进制包,那么是完全不需要重新执行maven打包的,毕竟maven打包太慢了。我们可以将spark根目录下的./dev/make-distribution.sh文件中的相关maven的命令跳过,注释掉脚本中的这两行即可。

然后把原来脚本中使用maven获取版本号的方式自定义指定即可

这样就不会重复执行maven命令了(maven命令太费时间了)

然后在maven打包完成的前提下,执行分发的二进制包的命令./dev/make-distribution.sh --name 2.12 --tgz --mvn mvn -Phadoop-2.7 -Phive-2.3 -Pyarn -Phive-thriftserver -Pscala-2.12 -Djava.version=11 -Dmaven.version=3.8.4 -Dhadoop.version=2.9.2 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -DskipTests clean package

spark源码编译实际上就是maven编译,如果你了解maven编译,那么在spark源码编译中遇到的问题,你都能很好的解决。

而二进制分发包的打包脚本,就是对maven编译的使用,然后在把编译后的文件重新组合。

单机启动

我们将编译好的二进制包,从hadoop01分发到hadoop02上,使用scp spark-3.2.0-bin-2.12.tgz hadoop02:/spark/

然后在hadoop02上解压,tar -zxvf spark-3.2.0-bin-2.12.tgz

我们选择在hadoop02上验证二进制分发包是否可用。

然后将SPARK_HOME设置到环境变量,并使之生效

然后你可以在任意位置使用spark-shell启动

相关的文档在这里Quick Start - Spark 3.2.0 documentation (apache.org)

启动后如下

可见启动spark并不需要scala环境,应该把scala的相关的包集成进去了

我们尝试一下quick-started中的例子

然后打开http://hadoop02:4040查看执行历史

这种启动方式,在启动中会打印如下数据

这其实是spark的standalone的启动方式

相关的文档在这Overview - Spark 3.2.0 documentation (apache.org)

关于standalone的模式的相关文档

Spark Standalone Mode - Spark 3.2.0 documentation (apache.org)

集群部署

为什么直接启动spark-shell就是单机版的模式呢?

spark的配置和hadoop差不多,有个slaves文件,里面会指定从节点的域名

默认的配置在/spark/conf文件里面

但是你在这里面是找不到slaves文件的,不过有一个workers.template文件

查看这个文件,默认是localhost

在sbin目录下有一个slaves.sh文件

slaves.sh文件跳转到了workers.sh文件中了

通过这个脚本基本上就能明白了,spark的从节点配置在/sbin/spark-config.sh和/conf中都可以配置

其中所有的配置都可以在/conf/spark-env.sh中配置

这些是一些standalone的配置

基本上都是通用的。

我们搭建一个spark的集群,集群信息如下

节点角色其他
hadoop01主节点一个执行器
hadoop02从节点一个执行器
hadoop03从节点一个执行器

完整的/conf/spark-env.sh如下

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program

# hadoop 的配置信息
HADOOP_CONF_DIR=/hadoop/etc/hadoop

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# spark shuffle的数据目录,
SPARK_LOCAL_DIRS=/spark/shuffle_data

# Options read in YARN client/cluster mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)

# yarn的配置文件在hadoop的配置文件中
YARN_CONF_DIR=$HADOOP_CONF_DIR
# 每个节点启动几个执行器
SPARK_EXECUTOR_CORES=1
# 每个执行器可以使用多大内存
SPARK_EXECUTOR_MEMORY=1800M
# 每个driver可以使用多大内存
SPARK_DRIVER_MEMORY=1800M

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

# 主节点
SPARK_MASTER_HOST=hadoop01
# 端口,不要和worker的端口重复,通信端口
SPARK_MASTER_PORT=4040
# 界面端口
SPARK_MASTER_WEBUI_PORT=8089
# 每个从节点启动的执行器
SPARK_WORKER_CORES=1
# 每个从节点可使用最大内存
SPARK_WORKER_MEMORY=1800M
# 从节点的通信端口
SPARK_WORKER_PORT=4040
# 界面端口
SPARK_WORKER_WEBUI_PORT=8089
# 从节点工作目录
SPARK_WORKER_DIR=/spark/worker

# Options for launcher
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")

# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
# - SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.
# - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMonIZE  Run the proposed command in the foreground. It will not output a PID file.
# Options for native BLAS, like Intel MKL, OpenBLAS, and so on.
# You might get better performance to enable these options if using native BLAS (see SPARK-21305).
# - MKL_NUM_THREADS=1        Disable multi-threading of Intel MKL
# - OPENBLAS_NUM_THREADS=1   Disable multi-threading of OpenBLAS

# spark 的日志目录
SPARK_LOG_DIR=/spark/logs

相关的配置项说明在这Spark Standalone Mode - Spark 3.2.0 documentation (apache.org)

然后配置workers

接着把这个配置好的文件分发到hadoop02的/spark目录下

使用scp -r /spark hadoop02:/spark/

然后在hadoop01上启动(当然如果你在hadoop01上配置那么就可以不需要从hadoop02分发到hadoop01上了)

别忘记SPARK_HOME

接着切换到/spark/sbin目录下,执行启动脚本

启动过程中提示hadoop03还没有相关文件

并且提示我们hadoop02的JAVA_HOME没有设置,这就离谱了,我们第一个配置的就是JAVA_HOME。

错误信息中给出了启动worker的命令,我们直接拷贝到hadoop02上执行看下,发现是可以执行的

接着我们在界面中看下

到worker里面看下,发现和我们配置的一样

查看java进程,发现master已经在hadoop01上启动

hadoop02启动的是worker

现在剩下一个问题,在hadoop01上启动,无法启动hadoop02的worker

我们查看下start-all.sh

实际上start-all.sh启动了start-master.sh和start-worker.sh

重点看start-worker.sh

可以看到会执行这两个脚本,用于处理配置,我们把java环境在spark-config.sh中设置下

然后重新分发并重启

就没有报错,并且在hadoop02上成功启动了

将/spark目录分发到hadoop03并进行重启

需要注意,我们配置主节点和从节点的通信端口都是4040,界面访问端口都是8089。这也就是意味着,如果要在hadoop01上启动Worker ,会出现端口冲突。

不过我们有三个节点,本来就是打算让hadoop01成为master的。

开发环境集成

我们开发spark程序是在idea中开发的,所以需要在idea中安装scala的插件

然后在windows环境中配置scala的环境

先把scala的文件从hadoop01上下载到windows中

然后配置环境变量

接着在idea中指定scala的sdk

idea会自动扫描全部的scala的sdk

选择scala目录

有了scala就可以创建spark的项目了,不过还需要scala的打包构建工具,也就是sbt

我们到sbt - The interactive build tool (scala-sbt.org)下载sbt

下载zip包就行

将下载的sbt解压到windows中,并配置环境变量

继续创建项目(scala版本最好选择和我们编译spark相同的版本,当然创建好项目后在修改也是可以的)

关于sbt的更多资料请看

sbt入门_a18792721831的博客-CSDN博客

sbt使用教程_a18792721831的博客-CSDN博客

根据上面这两篇资料的内容,简单了解下sbt。不希望能做到更好,只要能正常使用就行了。

然后重新打开idea,打开新建的studyspark项目

sbt就开始下载依赖了

此时sbt加载项目会异常的,因为idea默认使用自己的sbt,而不是我们安装的sbt

需要在settings中设置我们自己的sbt

重新构建项目

会弹出sbt-shell(如果你没有把sbt配置的全部勾选,那么应该是不会弹出sbt-shell的,那么这步可以跳过)

/*************************跳过开始*****************************

如果你也出现上述提示,那么表示在等待sbt使用默认的配置,从国外下载依赖

我们在sbt的配置中增加如下参数

-Dsbt.override.build.repos=true
-Dsbt.repository.config=E:sbtconfrepo.properties

第一行表示使用全局的仓库配置

第二行则是指定使用的仓库配置的文件路径

记得取消这个勾选

然后重新导入项目

这里发现sbt-shell中文乱码了,很可惜,我尝试了网上找到的方式,都失败了,或许你可以试试,在sbt的参数中增加

-Dfile.encoding=UTF-8
-Dconsold.encoding=GBK

对我都是无效的

********************跳过结束*********************

此时项目结构如下

项目相关的配置可以在build.sbt中修改

build.sbt如下

// 创建配置,用于标识spark的版本
lazy val sparkVersion = SettingKey[String]("spark-version", "spark version")
// 定义公共的包基础
lazy val packagebaseDir = SettingKey[String]("package-base-dir", "package base dir")
// 抽取公共配置
lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.12.15",
  sbtVersion := "1.6.1",
  sparkVersion := "3.2.0",
  // 定义项目包基础
  packagebaseDir := "com.study.spark",
  // 定义每个项目的包前缀
  ThisProject / idePackagePrefix := Some(packagebaseDir.value + "." + name.value),
  // 定义输出目录
  ideOutputDirectory := Some(file("target")),
  organization := "com.study.spark",
  // 申明源码路径
  sourceDirectories := Seq(
    file("src/main/scala"),
    file("src/main/java"),
    file("src/test/scala"),
    file("src/test/java")
  ),
  // 声明资源路径
  resourceDirectories := Seq(
    file("src/main/resources"),
    file("src/test/resources")
  ),
  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
  )
)
// 设置根项目
lazy val root = (project in file("."))
  .settings(
    name := "studyspark",
    commonSettings
  )

接着我们把sbt-assembly插件加入到项目中

等待sbt刷新项目后如下

因为我们在根项目中是不会进行任何编码的,所以我们可以将根项目中的src目录删除

最终我们的代码是需要上传到git的,所以创建.gitignore文件,将target/排除

我们创建一个helloworld的模块,验证sbt下的scala是否可用

别忘记在build.sbt中定义helloworld项目

lazy val helloworld = (project in file("helloworld"))
  .settings(
    name := "helloworld",
    commonSettings
  )

别忘记创建目录main,test,scala,java,resources等.

创建好后,我们创建如下代码

在运行之前需要先执行compile操作

你可以在sbt-shell中执行

也可以在sbt工具窗口中执行(我比较喜欢在sbt-shell中操作)

点击运行

但是现在我们只是能在ide中执行,我们的目的是打出jar包的,所以我们还需要配置sbt-assembly插件

打开sbt-assembly插件的官方文档sbt-assembly (scala-lang.org)

lazy val helloworld = (project in file("helloworld"))
  .settings(
    name := "helloworld",
    // 定义主类
	assembly / mainClass := Some(idePackagePrefix.value.get + ".Hello"),
    // 定义jar包名 项目名字_scala版本_项目版本.jar
    assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
    // 依赖合并策略
    assemblyMergeStrategy := {
      case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
      case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
      case "application.conf"                            => MergeStrategy.concat
      case "unwanted.txt"                                => MergeStrategy.discard
      case x =>
        val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
        oldStrategy(x)
    },
    commonSettings
  )

然后执行assembly操作,记得切换到helloworld项目下,你也可以在sbt工具窗口执行

不过我喜欢在sbt-shell中执行

然后耐心的等待吧,第一次会比较慢的

可以看到jar包已经生成了

我们使用java -jar 执行

可以看到打出来的jar包成功执行了

到了这里scala和sbt都集成完毕,此时需要集成spark了

spark的依赖下载会比较慢,而且新版本国内的镜像不一定有,需要从国外下载

这里推荐一个Idea的插件:Big Data Tools

它可以连接到远程的hadoop,hdfs,spark的集群上,用于监控和交互,安装后重启idea会在下面出现工具窗口

支持的还是挺多的

首先我们将插件和hadoop01进行连接

展示如下,和我们在浏览器查看的效果是一样的

连接hdfs,需要注意,填写的hdfs的通信地址,不是浏览器地址

连接上之后,就可以像操作idea里面的项目文件一样,上传下载等

接着我们启动hadoop01的spark集群

可惜了,连接spark还是有问题

到了这里,工具装了一大堆,但是还没开始开发一行代码,接下来就开发一个小案例wordcount试试吧

当我们安装了big data tools插件后,新建module的时候,就可以选择big data tools下的spark模板了

这是一个类似于spring boot的模板,里面自动创建一些文件和配置,更准确应该是和maven比较像

加载完毕后项目结构如下

不过这种创建项目只是适合创建顶级项目,对于子级项目是不行的,会覆盖已有的build.sbt等信息的

我们前面已经搭建好了顶级项目,所以我们可以直接创建一个scala项目就行了

然后指定模块的名字就行了

我们做个优化,把打包的配置,除了主类之外的,都放在公共配置中

// 抽取公共配置
lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.12.15",
  sbtVersion := "1.6.1",
  sparkVersion := "3.2.0",
  // 定义项目包基础
  packagebaseDir := "com.study.spark",
  // 定义每个项目的包前缀
  ThisProject / idePackagePrefix := Some(packagebaseDir.value + "." + name.value),
  // 定义输出目录
  ideOutputDirectory := Some(file("target")),
  organization := "com.study.spark",
  // 申明源码路径
  sourceDirectories := Seq(
    file("src/main/scala"),
    file("src/main/java"),
    file("src/test/scala"),
    file("src/test/java")
  ),
  // 声明资源路径
  resourceDirectories := Seq(
    file("src/main/resources"),
    file("src/test/resources")
  ),
  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion.value % "provided",
    "org.apache.spark" %% "spark-streaming" % sparkVersion.value % "provided"
  ),
  // 定义jar包名 项目名字_scala版本_项目版本.jar
  ThisProject / assembly / assemblyJarName := name.value + "_" + scalaVersion.value + "_" + version.value + ".jar",
  // 依赖合并策略
  ThisBuild / assemblyMergeStrategy := {
    case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
    case "application.conf"                            => MergeStrategy.concat
    case "unwanted.txt"                                => MergeStrategy.discard
    case x =>
      val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
      oldStrategy(x)
  },
)

然后把wordcount加入到根项目中,主类先不写

lazy val wordcount = (project in file("wordcount"))
  .settings(
    name := "wordcount",
    commonSettings
  )

然后创建目录

我们在scala目录下创建主类

首先使用本地进行执行

package com.study.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    // 配置spark
    val conf = new SparkConf().setMaster("local").setAppName("wordcount")
    // 获取sparkContext
    val sc = new SparkContext(conf)
    // 从本地读取文件,其实就是 build.sbt 文件
    sc.textFile("file:///E:\java\studyspark\build.sbt")
      // 按照空格拆分
      .flatMap(_.split(" "))
      // 每个单词计数1
      .map(_->1)
      // 按单词分组统计
      .reduceByKey(_+_)
      // 计算
      .collect()
      // 打印统计结果
      .foreach(println(_))
  }

}

别忘记在build.sbt中配置主类(之前我们只有一个有效的项目,所以我们可以直接取参数的值,但是现在有了多个项目,就需要在取值的时候,限定作用域)

需要注意的是我们配置的spark依赖是不会在classpath中使用的,因为默认当我们把spark任务的jar包提交给spark集群的时候,spark相关的依赖是不需要在jar包中的。

但是当我们需要本地启动的时候,如果没有spark的依赖,会导致spark相关的类找不到。

所以我们需要在本地启动的spark项目中,再次配置spark的依赖,区别是不写后面provided作用域

然后切换到wordcount项目下,编译并执行(使用run执行,而不是使用ide的执行按钮)

如果一切顺利,就会看到执行的日志

我们也可以打包的,使用sbt-assembly打包

然后使用java -jar 执行

执行会抛出异常,这是因为缺少了scala的依赖,我们需要把scala的依赖也加入

我们要求scala的依赖每次都必须打包,注意这里是一个%

打出来的jar包不能简单的使用java -jar 运行,需要提交到spark环境中执行,因为我们在打包的时候并没有打spark相关的包。

为了能在服务器上使用,我们需要修改我们的代码,需要链接到服务器上的spark集群,而且读取文件应该读取hdfs中的文件

首先启动服务器上的hdfs服务

接着启动spark集群

接着把build.sbt上传到hdfs中

然后重新打包,并上传到服务器

然后使用

spark-submit --master spark://hadoop01:4040 --class com.study.spark.wordcount.WordCount /spark/wordcount_2.12.15_1.0.jar

提交到spark集群,spark就开始执行了

执行结果和在本地执行一模一样

而且在spark的界面中也能看到

如果你还没有配置历史服务,那么会启动报错的,提示没有配置历史数据存储路径

拷贝spark-defaults.conf.template为spark-defaults.conf

内容如下

# 是否开启日志
spark.eventLog.enabled true
# 日志存储路径
spark.eventLog.dir hdfs://hadoop01:8020/logs
# 是否压缩
spark.eventLog.compress true

同时在spark-env.sh中配置以下内容

# 配置spark历史服务
SPARK_HISTORY_OPTS="-Dspark.history.ui.port=8086 -Dspark.history.retainedApplications=10 -Dspark.history.fs.logDirectory=hdfs://hadoop01:8020/logs"

我们启动spark历史记录服务

启动历史记录服务不报错

而且界面能打开

然后在执行一次,因为我们的任务很快,所以开启历史记录,方便我们查看job的信息

这时候就能在历史界面查看了

我们实际上是想看看dag图

很漂亮。

源码编译的3.2.0版本无法在window上直接用spark-shell启动

这个问题我发现不仅仅是我们自己编译的二进制分发包有问题,就连官方下载的二进制分发包,如果你直接使用spark-shell启动,也是会无法启动的。这个问题困扰我了好几天,我在网上找到了相关的讨论windows - Spark illegal character in path - Stack Overflow

但是也没给出问题原因和解决方案,只是说降低版本。

之前是源码编译二进制包,所以我也有spark的源码,我自己根据堆栈找了好长时间,奈何自己水平太低,从源码中没有找到相关的代码。

不过从我自己理解的源码来看,目前好像就是spark-shell的repl的启动存在问题,而提交作业等貌似是没有问题的。(我自己没试)

总结

整个文章的过程比较坎坷,比较艰难。

我是在看书的时候,知道了scala的sdk的二进制不兼容,以及目前市面上大多数公司使用cdh的发行包,于是我也到cdh的官网想下载最新的发行包,结果发现cdh版本貌似也开始收费了,于是只能自己使用源码编译。

源码编译时第一个阶段,当我历经艰难,完成了源码编译后,发现不能在windows上使用spark-shell,于是在服务器集群上搭建spark集群。这里主要学习了spark的启动和集群部署,以及一些配置等。

上述这两个阶段基本上是年前完成的,在搭建spark开发环境的时候,才知道spark或者说scala推荐使用sbt编译工具,可怜我maven都不是很熟悉,我哪会sbt呢。没辙,想办法研究sbt,研究sbt刚开始,就过年了,中间大概10多天吧,就是啥都没做,新年开工,才开始正式研究sbt,研究几天,基本上做到了入门,就继续在idea中集成spark,sbt环境。

最终实现了idea本地开发编码,本地在sbt的基础上执行,然后打包提交到spark集群中执行。

整个过程还是比较长的,但是收获也很多。

当然,我自己深深的知道,自己对上面这些,也仅仅是入门。还需努力,少年!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734149.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号