栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

Sping boot结合Spark做成一个长服务

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Sping boot结合Spark做成一个长服务

功能
常见的Spark离线任务可能处理完就结束了,这里我们结合SpringBoot实现一个可以一直运行的Spark任务,在这种基础上可以做更多的扩展功能。
项目结构
创建一个maven项目:
src
   main--
java
resource
scala

包含java和scala代码。因为包含scala,所以需要在pom中新增如下插件:
   
     
  net.alchim31.maven
  scala-maven-plugin
  3.3.1
  
      
   scala-compile-first
   process-resources
   
add-source
compile
   
      
      
   scala-test-compile
   process-test-resources
   
testCompile
   
      
  
  
      ${scala.version}
      incremental
      
   -Xlint:unchecked
   -Xlint:deprecation
      
      
   
   -nobootcp
      
      ${java.version}
      ${java.version}
  
     

打包方式
1-首先我们想要所有依赖的包单独放在一个目录下,这样我们更新代码之后只需要传递一个很小的jar包。
需要新增如下两个插件。
		     
		  org.apache.maven.plugins
		  maven-jar-plugin
		  
		      
		   
		true
		lib/
		com.demo.WebApplication
		   
		   
		${project.version}
		   
		      
		  
		     
     
  org.apache.maven.plugins
  maven-dependency-plugin
  
      
   copy
   package
   
copy-dependencies
   
   


    ${project.build.directory}/lib

   
      
  
     

2-针对spark相关的包进行排除:provided

     org.apache.spark
     spark-sql_2.11
     ${spark.version}
     provided
 

		 
		     org.apache.spark
		     spark-core_2.11
		     ${spark.version}
		     
		  
		      org.slf4j
		      slf4j-log4j12
		  
		  
		      com.google.code.gson
		      gson
		  
		 
		     com.fasterxml.jackson.module
		     jackson-module-scala_2.11
		 
		  
		      com.fasterxml.jackson.core
		      jackson-databind
		  
		     
		     provided
		 
代码实现
1-SpringBoot 相关  启动类,controller,service:
@SpringBootApplication
public class WebApplication {

    public static void main(String[] args) {
 SpringApplication.run(WebApplication.class,args);
    }

}

@RestController
public class DemoController {

    @Autowired
    private DemoService demoService;

    @GetMapping("/write")
    public void demo(@RequestParam("path") String path, @RequestParam("outPut")String outPut){

 demoService.test(path,outPut);
    }

}
@Slf4j
@Service
public class DemoService {
    public void test(String path,String outPut){
 log.info("test~~~");
 new Thread(new Runnable() {
     @Override
     public void run() {
  ParquetUtils parquetUtils=new ParquetUtils();
  parquetUtils.write(path,outPut);
     }
 }).start();
    }
}

2-Spark相关  获取sparkSession

public class SingleSpark {


    private static volatile SparkSession sparkSession=null;

    private SingleSpark(){}

    public static SparkSession getInstance(){
 if(sparkSession==null){

     synchronized (SingleSpark.class){

  if(sparkSession==null){
      sparkSession=SparkSession.builder()

.getOrCreate();
  }

     }

 }
 return sparkSession;
    }

}

3-最后在scala包里实现service中具体调用的逻辑这里简单读取本地文件,然后通过spark sql写入parquet文件。
class ParquetUtils extends Serializable{


  def write(path:String,outPut:String): Unit = {


    val sparkSession=SingleSpark.getInstance();
    val sc=sparkSession.sparkContext

    val textRdd=sc.textFile(path)

    val struct=StructType{
      Array(
 StructField("col01",StringType),
 StructField("col02",StringType)
      )
    }

    val df=sparkSession.createDataframe(textRdd.map(line=>{
      val lines=line.split(",")
      Row(lines(0),lines(1))
    }),struct)


    df.select("*").show()

    df.write.parquet(outPut)

    val outDF = sparkSession.read.parquet(outPut)

    // Parquet files can also be used to create a temporary view and then used in SQL statements
    outDF.createOrReplaceTempView("parquetFile")
    val col01DF = sparkSession.sql("SELECT col01 FROM parquetFile")

    col01DF.show()

  }

}

打包测试
mvn clean package
1-首先准备一个hadoop环境和spark。
修改spark-env.sh
HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0

2-获取springBoot依赖的包
因为打包完成后lib下面会包含所有的包,包括spark相关,但是会引起很多jar包冲突的问题,所以我们需要排除spark相关的jar包。当在
服务器运行的时候,我们安装的spark已经提供了那些包,因此我们
找到jar包下的MANIFEST.MF文件,里面有我们打包完成后启动web项目的依赖Class-Path: 
web-spark-1.0-SNAPSHOT.jarmeta-INFMANIFEST.MF
Class-Path: lib/spring-boot-starter-web-2.0.2.RELEASE.jar ....

我们获取这些包,然后放到环境的lib目录下:
获取lib,不包含spark相关的
cp lib/spring-boot-starter-web-2.0.2.RELEASE.jar lib/spring-boot-starter-json-2.0.2.RELEASE.jar tmp
cp lib/jackson-datatype-jdk8-2.9.5.jar lib/jackson-datatype-jsr310-2.9.5.jar lib/jackson-module-parameter-names-2.9.5.jar lib/spring-boot-starter-tomcat-2.0.2.RELEASE.jar tmp
cp lib/tomcat-embed-core-8.5.31.jar lib/tomcat-embed-el-8.5.31.jar lib/tomcat-embed-websocket-8.5.31.jar lib/spring-web-5.0.6.RELEASE.jar tmp
cp lib/spring-beans-5.0.6.RELEASE.jar lib/spring-webmvc-5.0.6.RELEASE.jar lib/spring-aop-5.0.6.RELEASE.jar lib/spring-context-5.0.6.RELEASE.jar tmp
cp lib/spring-expression-5.0.6.RELEASE.jar lib/spring-boot-starter-2.0.2.RELEASE.jar lib/spring-boot-2.0.2.RELEASE.jar lib/spring-boot-starter-logging-2.0.2.RELEASE.jar tmp
cp lib/log4j-to-slf4j-2.10.0.jar lib/log4j-api-2.10.0.jar lib/javax.annotation-api-1.3.2.jar lib/spring-core-5.0.6.RELEASE.jar lib/spring-jcl-5.0.6.RELEASE.jar lib/snakeyaml-1.19.jar tmp
cp lib/spring-boot-autoconfigure-2.0.2.RELEASE.jar lib/slf4j-nop-1.7.2.jar tmp
cp lib/slf4j-api-1.7.2.jar lib/validation-api-1.1.0.Final.jar lib/hibernate-validator-5.2.4.Final.jar lib/jboss-logging-3.2.1.Final.jar tmp
cp lib/classmate-1.1.0.jar lib/scala-library-2.11.8.jar lib/paranamer-2.3.jar tmp
cp lib/jul-to-slf4j-1.7.16.jar lib/jackson-module-scala_2.11-2.9.5.jar tmp
cp lib/scala-reflect-2.11.11.jar lib/jackson-core-2.9.5.jar lib/jackson-annotations-2.9.5.jar lib/jackson-databind-2.9.5.jar lib/jackson-module-paranamer-2.9.5.jar tmp

mv tmp lib

提交到yarn
1-使用下面的命令提交到yarn运行:
spark-submit --master yarn 
--name web-spark 
--class com.demo.WebApplication 
--executor-memory 1G 
--num-executors 1 
/home/hadoop/jars/web-spark-1.0-SNAPSHOT.jar

如下错误:
Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

1.因为在spark-default.conf增加了配置
spark.yarn.jars=hdfs://hadoop003:8020/sparkjars/*
所以需要把相关的包上传上去
cd /home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
hdfs dfs -put * hdfs://hadoop003:8020/sparkjars/

再次提交
spark-submit --master yarn 
--name web-spark 
--jars $(echo /home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/*.jar | tr ' ' ',') 
--class com.demo.WebApplication 
--executor-memory 1G 
--num-executors 1 
/home/hadoop/jars/web-spark-1.0-SNAPSHOT.jar

通过controller提供的接口访问:

结果:

+-----+-----+
|col01|col02|
+-----+-----+
|    a|    g|
|    b|    d|
|    c|    f|
+-----+-----+


+-----+
|col01|
+-----+
|    a|
|    b|
|    c|
+-----+

http://192.168.76.142:8088/cluster/scheduler

完整实现
完整实现请参考:
https://github.com/lizu18xz/web-spark
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号