栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark通过Spark-launcher提交任务到yarn集群

Spark通过Spark-launcher提交任务到yarn集群

通过spark-launcher提交
public void crateBatchTaskByLauncher() throws Exception {

    SparkApplicationParam sparkAppParams = new SparkApplicationParam();
    sparkAppParams.setJarPath("/home/bd/SPARK/spark-test-1.0.0.jar");
    sparkAppParams.setMainClass("com.starnet.server.bigdata.spark.wordcount.WordCount");

    submitApplication(sparkAppParams);
}

public void submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws Exception {

    log.info("spark任务传入参数:{}", sparkAppParams.toString());

    Map confParams = sparkAppParams.getOtherConfParams();
    SparkLauncher launcher = new SparkLauncher()
        .setSparkHome("/opt/module/spark-yarn")
        .setAppResource(sparkAppParams.getJarPath())
        .setMainClass(sparkAppParams.getMainClass())
        .setMaster(sparkAppParams.getMaster())
        .setDeployMode(sparkAppParams.getDeployMode())
        .setConf("spark.driver.memory", sparkAppParams.getDriverMemory())
        .setConf("spark.executor.memory", sparkAppParams.getExecutorMemory())
        .setConf("spark.executor.cores", sparkAppParams.getExecutorCores());

    if (confParams != null && confParams.size() != 0) {
        log.info("开始设置spark job运行参数:{}", JSONObject.toJSONString(confParams));
        for (Map.Entry conf : confParams.entrySet()) {
            log.info("{}:{}", conf.getKey(), conf.getValue());
            launcher.setConf(conf.getKey(), conf.getValue());
        }
    }
    if (otherParams.length != 0) {
        log.info("开始设置spark job参数:{}", otherParams);
        launcher.addAppArgs(otherParams);
    }

    log.info("参数设置完成,开始提交spark任务");

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {

                CountDownLatch countDownLatch = new CountDownLatch(1);

                SparkAppHandle handle = launcher.setVerbose(true).startApplication(new SparkAppHandle.Listener() {

                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        log.info("stateChanged:{}", sparkAppHandle.getState().toString());
                    }

                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        log.info("infoChanged:{}", sparkAppHandle.getState().toString());
                    }
                });

                log.info("The task is executing, please wait ....");

                //线程等待任务结束
                countDownLatch.await();

                log.info("The task is finished!");
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }).start();
}


public class SparkApplicationParam {
    
    private String mainClass;

    
    private String jarPath;

    private String master = "yarn";

    private String deployMode = "cluster";

    private String driverMemory = "1g";

    private String executorMemory = "1g";

    private String executorCores = "1";

    
    private Map otherConfParams;
}

spark-launcher任务提交的日志默认和小程序放到一起,可以通过其他方式将日志单独打印出来,之后要实装Spark的话可以将其日志分开输出,便于问题的回溯,并且可以自定义监听器,当信息或者状态变更时,都能进行操作,支持暂停、停止、断连、获得AppId、获得State等多种功能。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/283080.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号