栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spring boot整合spark,基于yarn运行提交spark任务 spark on yarn

spring boot整合spark,基于yarn运行提交spark任务 spark on yarn

前言

之前项目是基于springboot整合spark,在standalone上运行,曾经写过一篇博客,链接:

https://blog.csdn.net/qq_41587243/article/details/112918052?spm=1001.2014.3001.5501

现在使用同样的方案,不过是在生产环境yarn集群上提交spark,并且需进行kerbores验证,如下。

背景

公司项目需求,通过手机信令位置数据,做一个分析性平台。基于目前线上环境spark+hadoop+yarn做分析。数据量10亿用户。

spark on yarn 问题总结

首先在开发过程中,前提保证版本的一致性,否则问题更多!!

一、standalone和yarn区别
  1. 在standalone模式下运行,这个只需要指定好setMaster的url为spark://172.31.13.100:7077,启动方式多样,java -jar、tomcat、spark-submit都可以启动的。
  2. on yarn集群上,所以只能用spark-submit去启动,由Spark-Yarn-Client托管应用的jar,否则,应用的jar中缺少向Yarn资源管理器申请资源的模块,无法正常启动。
    如下:
./spark-submit 
--conf spark.yarn.jars="/xxxx/spark/jars/*,hdfs://ns1:8020/xxxxx/lib/*" 
--driver-java-options "-Dorg.springframework.boot.logging.LoggingSystem=none -Dspring.profiles.active=test -Dspark.yarn.dist.files=/yarn-site.xml" --
master yarn
--deploy-mode client  
--executor-cores 3 
--num-executors 80 
--executor-memory 12g 
--driver-memory 12g 
--name xxxxx_analysis 
--queue xxxxxxx 
--class org.springframework.boot.loader.JarLauncher 
./com.xxx-1.0-SNAPSHOT.jar 
--principal xxxx 
--keytab xxx.keytab >> xxx.log 2>&1 &
参数解释:
--conf spark.yarn.jars 指定的是运行在项目需要的一些jar包,先传到hdfs对应目录下;
                  Spark客户端上面需要的一些jar包,springboot依赖的jar包
--driver-java-options  指定应用的运行环境参数.
--class 必须指定springboot的main所在的class
--master   --deploy-mode  指定yarn的client端
--queue  指定hdaoop中yarn队列
--principal  指定kerberos的用户
--keytab   指定kerberos的票据
二、日志冲突


解决:
日志冲突,spark-submit内部使用log4j作为日志模块,springboot采用的是logbak作为日志模块。两种方案,一是springbooe中exclusion掉这个logback-classic这个jar包,二是直接在启动运行环境时,将-Dorg.springframework.boot.logging.LoggingSystem=none。

三、GSON版本冲突

Spark自带的GSON版本可能与SpringBoot依赖的版本冲突,引起如下异常:

很明显的问题,jar包冲突,在两个位置gson包,版本不同,直接移除掉spark的jars中的gson版本即可。

四、guava和validation-api包 冲突

可以在springboot中直接把所有涉及到的东西都exclusion掉,其中涉及到的有spark和javaee-api。

五、序列化和反序列问题


刚看到错误就反应过来,哦!序列化,java使用JavaSerializer序列化方式,spark使用kyro方式,然后修改spark代码,去extend继承javaSerializer,然而并不管用,百度查不到,困扰好几天,最后解决,把springboot打包后的target文件夹下面的com.asia-1.0-SNAPSHOT.jar.original这个文件,重命名为com.asia-api.jar,这个就是我们自己开发程序的jar包,将这个jar上传到我们提前指定的hdfs的项目依赖的路径下即可,解决!!

六、 运行时找不到应用中的类

1)将应用的jar解压,将lib目录下所有jar上传HDFS目录"hdfs://xxx:8020/lib"
2)"hdfs://xxx:8020//lib"追加到命令行“spark.yarn.jars”参数中
3)删除其中可能与集群环境冲突的包
未指定“spark.yarn.jars”参数,导致executor节点缺少依赖库,命令行指定“spark.yarn.jars”参数,将必要的库目录都加上,并将springboot上的依赖包中排除hadoop*、spark*、scala*的包,也是防止冲突并加上了–calss

七、 冲突…

这个问题在本地测试的时候就出现过,是缺少这个包,所以在子pom中添加了

现在提交到集群上,和spark中的又是冲突了,所以直接在自己的项目中排除掉,相应的hdfs上依赖的jar中去除就可以了。

yarn 资源释放

为了节省yarn的资源,打算让每个application运行完成后,自动释放资源。有两种方案:
第一种:
每个程序运行完成后,直接释放掉sparkSession.close(),他在释放的过程中,也关闭了sparkContext,所以不在占用yarn资源,在下一次任务启动后,又会自动启动spark任务,申请资源。 SparkSession不直接加入springboot启动项,在每次运行任务前去加载初始化sparkSession。
第二种:
直接不进行释放sparkSession,设置动态资源,最大使用executors和最小使用。

1.将spark.dynamicAllocation.enabled设置为true。意思就是启动动态资源功能;
2.此外关于动态资源分配还有以下相关参数
3.spark.dynamicAllocation.initialExecutors:
初始executor数量,如果--num-executors设置的值比这个值大,那么将使用--num-executors设置的值作为初始executor数量。
4.spark.dynamicAllocation.maxExecutors:
executor数量的上限,默认是无限制的。
5.spark.dynamicAllocation.minExecutors:
executor数量的下限,默认是0个
6.spark.dynamicAllocation.cachedExecutorIdleTimeout:
如果executor内有缓存数据(cache data),并且空闲了N秒。则remove该executor。默认值无限制。也就是如果有缓存数据,则不会remove该executor
为什么?比如在写shuffle数据时候,executor可能会写到磁盘也可能会保存在内存中,如果保存在内存中,该executor又remove掉了,那么数据也就丢失了。
--executor-memory 20g 
--executor-cores 5 
--driver-memory 10g 
--driver-cores 5 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.shuffle.service.enabled=true 
--conf spark.dynamicAllocation.initialExecutors=20 
--conf spark.dynamicAllocation.minExecutors=20 
--conf spark.dynamicAllocation.maxExecutors=400 
--conf spark.dynamicAllocation.executorIdleTimeout=300s 
--conf spark.dynamicAllocation.schedulerBacklogTimeout=10s 
hadoop token超时问题

HDFS_DELEGATION_TOKEN token 13619910 for zh2_xxxx) can’t be found in cache

首先,由此问题想出,keytab有过期时间,24小时过期,所以不能再springboot直接引入一次,定时刷新票据,或者在使用前刷新一次票据;
其次,这个问题就是连接hadoop判断文件是否存在的问题,因为代码是在启动hadoop的时候就加载了hadoop的票据,以及建立连接的,这个明显是超时的问题;
解决办法:不去定时刷新票据,因为现在是sparkSession每次都会关闭,所以在每次程序开始之前,刷新一次票据。
delegation token其实就是hadoop里一种轻量级认证方法,作为kerberos认证的一种补充。
详解:https://blog.csdn.net/qq_41587243/article/details/122255689

POM文件依赖

        1.8
        1.1.9
        1.2.0
        2.2.0
        2.11.8
        2.8.2
        2.1.6.RELEASE
        2.9.2
    

    
        
            org.springframework.boot
            spring-boot-starter-web
            






            
                
                    javax.validation
                    validation-api
                
            
        
        
        
            com.alibaba
            druid-spring-boot-starter
            ${druid.version}
        

        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            ${mybatis.version}
        

        
        
            org.springframework.boot
            spring-boot-starter-tomcat
            ${spring.boot.version}
            
            provided
        

        
            org.apache.spark
            spark-core_2.11
            ${spark.version}






            
                
                    javax.validation
                    validation-api
                
                    
                        com.google.guava
                        guava
                    
            
        

        
            io.netty
            netty-all
            4.0.43.Final
        
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
            
                
                    com.google.guava
                    guava
                
                
                    org.codehaus.janino
                    janino
                
                
                    org.codehaus.janino
                    commons-compiler
                
            
        

        
            org.apache.spark
            spark-yarn_2.11
            ${spark.version}
            
                
                    com.google.guava
                    guava
                
            
        


        
        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
            org.springframework.boot
            spring-boot-configuration-processor
            true
            ${spring.boot.version}
        

        
        
            io.springfox
            springfox-swagger2
            ${swagger.version}
        
        
            io.springfox
            springfox-swagger-ui
            ${swagger.version}
        

        
            javax
            javaee-api
            8.0
            
                
                    javax.validation
                    validation-api
                
            
        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    com.google.guava
                    guava
                
            
        

        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            ${hadoop.version}
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    commons-net
                    commons-net
                
            
        

        
            net.sf.json-lib
            json-lib
            2.4
            jdk15
        
        
        
            org.projectlombok
            lombok
            1.16.10
            provided
        

        
        
            org.postgresql
            postgresql
            42.2.18
        
    

    
        
            
                org.scala-tools
                maven-scala-plugin
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    incremental
                    ${scala.version}
                    
                        
                            app
                            com.asia.xxxApplication
                            
                                -deprecation
                            
                            
                                -Xms2056m
                                -Xmx4096m
                            
                        
                    
                
            
            
                org.springframework.boot
                spring-boot-maven-plugin
                
                
                    
                        
                            com.asia
                            business
                        
                    
                
            
        
    

这就是项目上springboot和spark整合中的一些坑,中间可能略过了一些很小的问题,如有问题可沟通交流哦! 欢迎留言
后面也尝试了利用多线程线程池,提交多个job的案例,也是可以缩短分析时间,更充分利用资源的,有时间再分享!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/689836.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号