前言:
突发奇想,将SpringBoot、spark、scala结合起来然后打成一个jar包,将jar包用spark-submit的形式提交,然后将spark暴露出一些接口,从而调用spark做一下 事情。
整个过程还算顺利主要是springboot与spark结合上有许多问题,日志的冲突、打包方式等,主要是pom.xml文件我已修改完毕直接用我的就好。
- 一、使用方式
- 二、代码结构
- application.yml
- SparkConfig(java)
- collect(java)
- Service(scala)
github地址:https://github.com/sgr-china/SpringSpark.git
将项目克隆下来后,把自己本地hdfs-site.xml、core-site.xml、hive-site.xml放到resources目录下,同时按照实际情况修改application.yml后即可运行
项目上还有两个jar包,如果运行时出现冲突则直接用对应的jar包替换spark原来的jar包即可
这个项目之前是做了个工具现在单独把这部分拿出来,里面有很多无关的代码不影响使用,主要关注test接口即可,大家也熟悉后可继续修改,其实最主要的就是springboot和spark的结合(pom.xml已修改完毕)
二、代码结构 application.yml服务的端口、spark相关配置
server:
port: 30001
tomcat:
max-threads: 250
compression:
enabled: true
spark:
app:
name: SpringSpark
master:
uri: local[*]
driver:
memory: 1g
executor:
cores: 1
memory: 1g
num:
executors: 1
SparkConfig(java)
spark初始化的配置
package com.sgr.conf;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SparkConfig {
@Value("${spark.app.name}")
private String appName;
@Value("${spark.master.uri}")
private String sparkMasterUri;
@Value("${spark.driver.memory}")
private String sparkDriverMemory;
@Value("${spark.executor.memory}")
private String sparkExecutorMemory;
@Value("${spark.executor.cores}")
private String sparkExecutorCores;
@Value("${spark.num.executors}")
private String sparkExecutorsNum;
@Bean
@ConditionalOnMissingBean(SparkSession.class)
public SparkSession sparkSession() {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.setMaster(sparkMasterUri)
.set("spark.driver.memory",sparkDriverMemory)
.set("spark.executor.memory",sparkExecutorMemory)
.set("spark.executor.cores",sparkExecutorCores)
.set("spark.num.executors",sparkExecutorsNum);
SparkSession spark = new SparkSession.Builder().enableHiveSupport()
.config(sparkConf)
.config("spark.debug.maxToStringFields", 1000)
.getOrCreate();
return spark;
}
}
collect(java)
主要关注test接口就好,可继续拓展
package com.sgr.collect;
import com.sgr.serviceScala.ScalaExportService;
import com.sgr.serviceScala.TestScalaService;
import org.hibernate.validator.constraints.NotBlank;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ExportCollect {
@Resource
private TestScalaService testScalaService;
@RequestMapping("/test")
public void test(@RequestParam("sql") @NotBlank String sql) {
testScalaService.getData(sql);
}
@Resource
private ScalaExportService scalaExportService;
@RequestMapping("/exportT")
public void testSpark(@RequestParam("sql") @NotBlank String sql,
@RequestParam("code") @NotBlank String code,
@RequestParam("path") @NotBlank String path) {
scalaExportService.exportData(sql, code, path, "1000");
}
}
Service(scala)
把service放在scala下主要是因为操作spark用scala语言比较便利简洁一些
package com.sgr.serviceScala
import com.sgr.conf.LazyLogging
import org.apache.spark.sql.SparkSession
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class TestScalaService extends LazyLogging{
@Autowired
private val sparkSession: SparkSession = null
def getData(sql: String): Unit = {
logger.info(sql)
val df = sparkSession.sql(sql)
df.show()
logger.info(df.count().toString)
}
}
引流一波,强烈推荐极客时间吴磊老师讲的spark课程,非常非常好!看完后受益匪浅,而且从每节课下面的评论都能学到很多东西!!!
强烈推荐!!!
做与Spark相关工作的小伙伴可无脑入手!
通过我的链接购买有优惠!!!



