参考文档:Jsch(SSH)工具-JschUtil · Hutool 参考文档
引入jar包:
com.jcraft jsch0.1.54
spark任务直接上代码:
private String submitSparkJob(String command, StringBuilder localLog) {
log.info("spark command ={} ", command);
localLog.append("启动命令:").append(command).append(SystemConstant.LINE_FEED);
Session session = JschUtil.getSession("192.168.x.xx", 50022, "admin", "admin");
String shell = "/wyyt/software/spark-2.4.8/bin/spark-submit --master yarn --deploy-mode cluster --class com.sjb.example.WordCount --driver-cores 1 --driver-memory 2G --num-executors 1 --executor-cores 3 --executor-memory 3G --name spark-test /wyyt/software/spark-2.4.8/spark-sjb.jar";
// String shell = "/wyyt/software/spark-2.4.8/bin/spark-submit --master yarn --deploy-mode cluster --class com.sjb.example.WordCount --driver-cores 1 --driver-memory 2G --num-executors 1 --executor-cores 3 --executor-memory 3G --name spark-test /wyyt/software/spark-2.4.8/spark-sjb-err.jar";
//OutputStream outputStream= new ByteArrayOutputStream();
//String exec = JschUtil.exec(session, shell, Charset.defaultCharset(), outputStream);
final ChannelExec channel = (ChannelExec) JschUtil.createChannel(session, ChannelType.EXEC);
channel.setCommand(StrUtil.bytes(shell, Charset.defaultCharset()));
//channel.setInputStream(null);
// channel.setErrStream(outputStream);
InputStream in = null;
InputStream out = null;
BufferedInputStream reader = null;
String appId = null;
try {
channel.connect();
out = channel.getErrStream();
//todo 遍历配置流,然后拿到
byte[] buffer = new byte[1024];
int bytesRead = 0;
reader = new BufferedInputStream(out);
while ((bytesRead = reader.read(buffer)) != -1) {
String result = new String(buffer, 0, bytesRead, SystemConstants.CODE_UTF_8);
if (result.contains("YarnClientImpl: Submitted application")) {
appId = result.substring(result.indexOf("application_")).trim();
}
}
//todo 通过状态判断是否成功失败
System.out.println("==>>>运行状态:" + channel.getExitStatus() + "==<<<");
localLog.append("执行的任务状态 channel.getExitStatus()=" + channel.getExitStatus())
.append(" appId=").append(appId)
.append(SystemConstant.LINE_FEED);
if (channel.getExitStatus() != 0) {
System.out.println("任务失败了...........");
throw new RuntimeException("执行异常 exitstatus=" + channel.getExitStatus());
}
return appId;
} catch (Exception ex) {
String error_info = StringUtils.join(ex.getStackTrace(), "n");
log.error(" submitJob()方法 Exception ={} ", error_info);
localLog.append("submitJob方法执行命令报错 ....")
.append(SystemConstant.LINE_FEED)
.append(error_info)
.append(SystemConstant.LINE_FEED);
throw new BizException("submitJob方法执行命令报错 ....");
} finally {
IoUtil.close(in);
IoUtil.close(out);
JschUtil.close(channel);
JschUtil.close(session);
}
}
Flink任务直接上代码:
public String submitJob(String command, StringBuilder localLog) throws Exception {
log.info(" command ={} ", command);
localLog.append("启动命令:").append(command).append(SystemConstant.LINE_FEED);
String appId = null;
Session session = JschUtil.openSession("192.168.x.xx", 50022, "bi", "biadmin");
try {
String result = JschUtil.exec(session, command, Charset.defaultCharset());
System.out.println("result : " +result);
localLog.append(result).append(SystemConstant.LINE_FEED);
if (!result.contains("YARN application has been deployed successfully")){
throw new BizException("任务执行失败..");
}
} catch (Exception ex) {
String error_info = StringUtils.join(ex.getStackTrace(), "n");
log.error(" submitJob()方法 Exception ={} ", error_info);
localLog.append("submitJob方法执行命令报错 ....")
.append(SystemConstant.LINE_FEED)
.append(error_info)
.append(SystemConstant.LINE_FEED);
throw new BizException("submitJob方法执行命令报错 ....");
} finally {
JschUtil.close(session);
}
return appId;
}
写的很简陋,主要是提醒自己。



