public void crateBatchTaskByLauncher() throws Exception {
SparkApplicationParam sparkAppParams = new SparkApplicationParam();
sparkAppParams.setJarPath("/home/bd/SPARK/spark-test-1.0.0.jar");
sparkAppParams.setMainClass("com.starnet.server.bigdata.spark.wordcount.WordCount");
submitApplication(sparkAppParams);
}
public void submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws Exception {
log.info("spark任务传入参数:{}", sparkAppParams.toString());
Map confParams = sparkAppParams.getOtherConfParams();
SparkLauncher launcher = new SparkLauncher()
.setSparkHome("/opt/module/spark-yarn")
.setAppResource(sparkAppParams.getJarPath())
.setMainClass(sparkAppParams.getMainClass())
.setMaster(sparkAppParams.getMaster())
.setDeployMode(sparkAppParams.getDeployMode())
.setConf("spark.driver.memory", sparkAppParams.getDriverMemory())
.setConf("spark.executor.memory", sparkAppParams.getExecutorMemory())
.setConf("spark.executor.cores", sparkAppParams.getExecutorCores());
if (confParams != null && confParams.size() != 0) {
log.info("开始设置spark job运行参数:{}", JSONObject.toJSONString(confParams));
for (Map.Entry conf : confParams.entrySet()) {
log.info("{}:{}", conf.getKey(), conf.getValue());
launcher.setConf(conf.getKey(), conf.getValue());
}
}
if (otherParams.length != 0) {
log.info("开始设置spark job参数:{}", otherParams);
launcher.addAppArgs(otherParams);
}
log.info("参数设置完成,开始提交spark任务");
new Thread(new Runnable() {
@Override
public void run() {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppHandle handle = launcher.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
log.info("stateChanged:{}", sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
log.info("infoChanged:{}", sparkAppHandle.getState().toString());
}
});
log.info("The task is executing, please wait ....");
//线程等待任务结束
countDownLatch.await();
log.info("The task is finished!");
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}).start();
}
public class SparkApplicationParam {
private String mainClass;
private String jarPath;
private String master = "yarn";
private String deployMode = "cluster";
private String driverMemory = "1g";
private String executorMemory = "1g";
private String executorCores = "1";
private Map otherConfParams;
}
spark-launcher任务提交的日志默认和小程序放到一起,可以通过其他方式将日志单独打印出来,之后要实装Spark的话可以将其日志分开输出,便于问题的回溯,并且可以自定义监听器,当信息或者状态变更时,都能进行操作,支持暂停、停止、断连、获得AppId、获得State等多种功能。



