Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法,不过spark on yarn的集群部署模式不支持http的方式提交任务,所以需要通过远程docker容器通过docker发送spark-submit的指令到spark容器内部,才可在springboot项目中远程开启任务
spark-submit在服务端的调用流程如下:
spark-on-yarn模式远程提交docker指令流程如下
首先需要先在服务端配置可被远程访问的api端口
vi /lib/systemd/system/docker.service 找到 Execstart=/usr/bin/dockerd,并在后加上如下内容,然后保存退出 -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock 接着运行以下命令,重启 docker 服务 systemctl daemon-reload service docker restart//重启启动docker 执行如下命令可以查看相关内容,看看 2375 是否已经设置好 systemctl status docker 最后分别执行下面两条命令配置 firewall 防火墙策略 firewall-cmd --permanent --add-port=2375/tcp firewall-cmd --reload
其次发送docker指令,可采用原生的http方式或者引入docker-java的jar包,通过调用api的方式实现,鉴于api非官方,存在不定期更新或者不更新问题,所以建议还是采用http的方式
创建请求对象
@Data
public class ExecCreateReqVo {
private boolean AttachStdin;
private boolean AttachStdout;
private boolean AttachStderr;
private boolean Tty;
private String[] Cmd;
}
获取docker 任务id
1、拼接获取docker任务id的链接
http://192.168.1.1:2375/containers/hadoopnode1/exec
2、定义spark-cubmit指令常量
String cmd = "sh spark-submit --class com.test.calculate--master yarn --executor-cores 10 --executor-memory 10G --driver-memory 40G --deploy-mode cluster --jars hdfs://hadoopcluster/spark-jars/mysql-connector-java-8.0.19.jar --files /home/conf/sparkApplication.properties hdfs://hadoopcluster/spark-jars/calculate-0.0.1-SNAPSHOT.jar"
3、发送http请求
ExecCreateReqVo execCreateReqVo = new ExecCreateReqVo() {{
setAttachStdin(false);
setAttachStdout(false);
//只获取异常输出
setAttachStderr(true);
setTty(false);
setCmd(new String[]{cmd});
}};
ResponseEntity execCreateRes = restTemplate.postForEntity(execCreateUrl, execCreateReqVo, ExecCreateResVo.class);
String taskId = execCreateRes.getBody().getId();
执行远程docker任务
String execStartUrl = MessageFormat.format("http://192.168.1.1:2375/exec/{0}/start", taskId);
ExecStartReqVo execStartReqVo = new ExecStartReqVo() {{
setDetach(false);
setTty(false);
}};
ResponseEntity execStartRes = restTemplate.postForEntity(execStartUrl, execStartReqVo, String.class);
String startResStr = removeSpecialChar(execStartRes.getBody())
public static String removeSpecialChar(String str) {
//去除控制字符
str = RegexPatternConstants.UNICODE_PROP_C.matcher(str).replaceAll("")
//unicode REPLACEMENT CHARACTER,unicode中的未知字符会被转换为65533
.replaceAll(String.valueOf((char)65533), "").trim();
return str;
}



