栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

java API 远程SSH 调用flink任务运行

java API 远程SSH 调用flink任务运行

参考文档:Jsch(SSH)工具-JschUtil · Hutool 参考文档

引入jar包:


    com.jcraft
    jsch
    0.1.54

spark任务直接上代码:

 private String submitSparkJob(String command, StringBuilder localLog) {
                log.info("spark command ={} ", command);
                localLog.append("启动命令:").append(command).append(SystemConstant.LINE_FEED);
                Session session = JschUtil.getSession("192.168.x.xx", 50022, "admin", "admin");
                String shell = "/wyyt/software/spark-2.4.8/bin/spark-submit --master yarn --deploy-mode cluster --class com.sjb.example.WordCount --driver-cores 1 --driver-memory 2G --num-executors 1 --executor-cores 3 --executor-memory 3G --name spark-test /wyyt/software/spark-2.4.8/spark-sjb.jar";
//                String shell = "/wyyt/software/spark-2.4.8/bin/spark-submit --master yarn --deploy-mode cluster --class com.sjb.example.WordCount --driver-cores 1 --driver-memory 2G --num-executors 1 --executor-cores 3 --executor-memory 3G --name spark-test /wyyt/software/spark-2.4.8/spark-sjb-err.jar";
                //OutputStream outputStream= new ByteArrayOutputStream();
                //String exec = JschUtil.exec(session, shell, Charset.defaultCharset(), outputStream);

                final ChannelExec channel = (ChannelExec) JschUtil.createChannel(session, ChannelType.EXEC);
                channel.setCommand(StrUtil.bytes(shell, Charset.defaultCharset()));
                //channel.setInputStream(null);
                // channel.setErrStream(outputStream);
                InputStream in = null;
                InputStream out = null;
                BufferedInputStream reader = null;
                String appId = null;

                try {
                    channel.connect();
                    out = channel.getErrStream();
                    //todo 遍历配置流,然后拿到
                    byte[] buffer = new byte[1024];
                    int bytesRead = 0;
                    reader = new BufferedInputStream(out);
                    while ((bytesRead = reader.read(buffer)) != -1) {
                        String result = new String(buffer, 0, bytesRead, SystemConstants.CODE_UTF_8);
                        if (result.contains("YarnClientImpl: Submitted application")) {
                            appId = result.substring(result.indexOf("application_")).trim();
                        }

                    }


                    //todo 通过状态判断是否成功失败
                    System.out.println("==>>>运行状态:" + channel.getExitStatus() + "==<<<");

                    localLog.append("执行的任务状态 channel.getExitStatus()=" + channel.getExitStatus())
                            .append("   appId=").append(appId)
                            .append(SystemConstant.LINE_FEED);

                    if (channel.getExitStatus() != 0) {
                        System.out.println("任务失败了...........");
                        throw new RuntimeException("执行异常 exitstatus=" + channel.getExitStatus());
                    }
                    return appId;

                } catch (Exception ex) {
                    String error_info = StringUtils.join(ex.getStackTrace(), "n");
                    log.error(" submitJob()方法 Exception ={} ", error_info);
                    localLog.append("submitJob方法执行命令报错 ....")
                            .append(SystemConstant.LINE_FEED)
                            .append(error_info)
                            .append(SystemConstant.LINE_FEED);
                    throw new BizException("submitJob方法执行命令报错 ....");
                } finally {
                    IoUtil.close(in);
                    IoUtil.close(out);
                    JschUtil.close(channel);
                    JschUtil.close(session);
                }

            }


Flink任务直接上代码:

public String submitJob(String command, StringBuilder localLog) throws Exception {
        log.info(" command ={} ", command);
        localLog.append("启动命令:").append(command).append(SystemConstant.LINE_FEED);
        String appId = null;

        Session session = JschUtil.openSession("192.168.x.xx", 50022, "bi", "biadmin");
        try {
            String result = JschUtil.exec(session, command, Charset.defaultCharset());

            System.out.println("result : " +result);
            localLog.append(result).append(SystemConstant.LINE_FEED);
            if (!result.contains("YARN application has been deployed successfully")){
                throw new BizException("任务执行失败..");
            }


        } catch (Exception ex) {
            String error_info = StringUtils.join(ex.getStackTrace(), "n");
            log.error(" submitJob()方法 Exception ={} ", error_info);
            localLog.append("submitJob方法执行命令报错 ....")
                    .append(SystemConstant.LINE_FEED)
                    .append(error_info)
                    .append(SystemConstant.LINE_FEED);
            throw new BizException("submitJob方法执行命令报错 ....");
        } finally {
            JschUtil.close(session);
        }
        return appId;
    }

写的很简陋,主要是提醒自己。 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433880.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号