目录
背景
Prometheus Sink
如何对外暴露Http Endpoint
服务注册源码
服务发现源码
如何采集spark metrics
MetricsReporter源码
配置spark conf启用prometheus sink
Prometheus Sink启动的问题
Spark Metrics System
spark启动过程源码分析
spark-submit.sh
SparkSubmit执行流程
--jars、--driver-library-path和--driver-class-path
spark main jar
YarnClusterApplication
AM启动流程
AM CLASSPASH环境变量的设置
Container资源上传到HDFS
启动AM
ApplicationMaster
解决ClassNotFound
executor也出现ClassNotFoundException
背景
当前有一个multi-module project,其中有ModuleA,ModuleB,ModuleC。
ModuleA:定义了main函数,会通过SPI和反射等机制,从ModuleC中加载Spark foreachRDD方法的具体执行逻辑。
ModuleB:定义了sparkstreaming context的配置,以及DStream的处理顺序,Prometheus Sink相关的代码就写在这个工程中。
ModuleC:主要定义了foreachRDD的具体执行逻辑,例如将RDD对象反序列化后,写入Hbase中。
spark job使用spark-submit启动,使用yarn-cluster部署模型。
Prometheus Sink
增加prometheus,主要有两个挑战。
如何对外暴露Http Endpoint
由于spark提交的任务数量是不确定的,且使用yarn进行部署,因此要求动态分配port;且由于prometheus采用的pull模型,因此要求http endpoint需要支持服务发现的能力。
prometheus-2.30支持Http Service Discovery功能,因此决定使用zookeeper+springboot的方式实现服务注册和服务发现功能。
服务注册源码
package ...;
import com.codahale.metrics.MetricRegistry;
import ....PrometheusReporter;
import io.prometheus.client.exporter.HTTPServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.RetryForever;
import org.apache.spark.metrics.sink.Sink;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class PrometheusSink implements Sink {
private final static Logger LOG = LoggerFactory.getLogger(PrometheusSink.class);
private final static String ZK_HOSTS_KEY = "zkHosts";
private final static String ZK_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
private final static String ZK_CONNECT_TIMEOUT_MS = "connectTimeoutMs";
private final static String ZK_RETRY_INTERVAL_MS = "retryIntervalMs";
private final static String SPARK_JOB_NAME = "jobName";
private final Properties property;
private final MetricRegistry metricRegistry;
private HTTPServer httpServer;
private Curatorframework zkClient;
private PrometheusReporter prometheusReporter;
public PrometheusSink(Properties property,
MetricRegistry registry,
org.apache.spark.SecurityManager securityMgr) {
this.property = property;
this.metricRegistry = registry;
}
@Override
public void start() {
try {
String zkHosts = this.property.getProperty(ZK_HOSTS_KEY);
if (StringUtils.isEmpty(zkHosts))
throw new RuntimeException("[[instance].sink.prometheus.zkHosts]不能为空");
int sessionTimeoutMs = Integer.parseInt(this.property
.getProperty(ZK_SESSION_TIMEOUT_MS, "30000"));
int connectTimeoutMs = Integer.parseInt(this.property
.getProperty(ZK_CONNECT_TIMEOUT_MS, "10000"));
int retryInterval = Integer.parseInt(this.property
.getProperty(ZK_RETRY_INTERVAL_MS, "3000"));
String jobName = this.property.getProperty(SPARK_JOB_NAME);
if (StringUtils.isEmpty(jobName))
throw new RuntimeException("[[instance].sink.prometheus.jobName]不能为空");
this.prometheusReporter = new PrometheusReporter(this.metricRegistry,jobName);
int retryCount = 0;
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
String url = "http://%s:%d/metrics";
String ip = null;
int port;
while (true) {
try {
try (ServerSocket serverSocket = new ServerSocket()) {
serverSocket.bind(inetSocketAddress);
ip = inetSocketAddress.getAddress().getHostAddress();
port = serverSocket.getLocalPort();
url = String.format(url, ip, port);
}
this.httpServer = new HTTPServer(port);
break;
} catch (IOException ioException) {
if(++retryCount==3) {
throw ioException;
}
}
}
LOG.info("start [Prometheus] metrics sinknHttp EndPoint:{}",String.format(url,ip, port));
//注册ZK
this.zkClient = CuratorframeworkFactory.newClient(zkHosts,sessionTimeoutMs,connectTimeoutMs,new RetryForever(retryInterval));
this.zkClient.start();
LOG.info("启动zk client,zkHosts=[{}],sessionTimeout=[{}] ms,connectTimeout=[{}] ms," +
"failOverRetryInterval=[{}] ms",zkHosts,sessionTimeoutMs,connectTimeoutMs,
retryInterval);
this.zkClient.blockUntilConnected();
String zNodePath = "/waterfall/spark/metrics/promethues/"+jobName+"/config";
this.zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(zNodePath,url.getBytes(StandardCharsets.US_ASCII));
}catch (Exception e) {
throw new RuntimeException("启动prometheus metrics失败",e);
}
this.prometheusReporter.start(15, TimeUnit.SECONDS);
}
@Override
public void stop() {
if(zkClient!=null) {
zkClient.close();
}
if(this.httpServer!=null)
this.httpServer.close();
this.prometheusReporter.stop();
}
@Override
public void report() {
//源码来看,只有在JVM停止前才会触发report
LOG.info("JVM进程即将停止,会自动调用一次report");
this.prometheusReporter.report();
}
}
服务发现源码
package ...;
import ....config.zookeeper.ZookeeperConfigProperties;
import ....config.mvc.exception.BusinessException;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class FlinkServiceDiscovery implements Closeable {
private final static Logger LOG = LoggerFactory.getLogger(FlinkServiceDiscovery.class);
private final ZookeeperConfigProperties zkConfig;
private final Path parentPath;
private Curatorframework zkClient;
private TreeCache treeCache;
//正在运行的flink task
private final Map> activeServiceMap = new HashMap<>();
private final ReadWriteLock serviceMapLock = new ReentrantReadWriteLock();
//下面两个属性,用于实现第一次TreeNode Cache初始化成功后才启动spring上下文
private final AtomicBoolean hasStart = new AtomicBoolean(false);
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public FlinkServiceDiscovery(ZookeeperConfigProperties zkConfig) {
this.zkConfig = zkConfig;
parentPath = Paths.get(this.zkConfig.getFlinkMetricsZnodePath());
}
public boolean start() throws InterruptedException {
LOG.info("start flink metrics server的服务发现功能,连接的zookeeper:[{}],监控的ZNode Path:[{}]",
this.zkConfig.getZkHosts(), this.zkConfig.getFlinkMetricsZnodePath());
this.zkClient = CuratorframeworkFactory
.newClient(this.zkConfig.getZkHosts(), new RetryForever(3000));
this.zkClient.start();
//等待连接建立成功
this.zkClient.blockUntilConnected();
try {
Stat stat = this.zkClient.checkExists()
.forPath(this.zkConfig.getFlinkMetricsZnodePath());
if (stat == null) {
LOG.warn("不存在ZNode路径:[{}],因此不会启动服务发现功能",this.zkConfig.getFlinkMetricsZnodePath());
return false;
}
LOG.info("开始对ZNode:[{}]建立监听", this.zkConfig.getFlinkMetricsZnodePath());
this.treeCache = TreeCache.newBuilder(this.zkClient, this.zkConfig.getFlinkMetricsZnodePath())
.setCacheData(true)
.build();
treeCache.getListenable().addListener(this::ZnodeListenerFunction);
this.treeCache.start();
return true;
} catch (Exception exception) {
LOG.warn("建立服务发现功能失败", exception);
try {
this.close();
} catch (IOException ioException) {
//do nothing
}
}
return false;
}
public void waitUntilWatcherReady() throws InterruptedException {
LOG.info("等待watcher Node Cache initial");
if(!this.hasStart.get()) {
this.countDownLatch.await();
}
LOG.info("watcher Node Cache初始化完成");
}
public Map> findAllActiveServices() {
try {
this.serviceMapLock.readLock().lock();
return this.activeServiceMap;
}finally {
this.serviceMapLock.readLock().unlock();
}
}
private void ZnodeListenerFunction(Curatorframework curatorframework, TreeCacheEvent event) {
switch (event.getType()) {
case NODE_ADDED:
this.addServiceMap(event);
break;
case NODE_UPDATeD:
LOG.warn("node update,{}", event.getData().toString());
//理论上不会发生
break;
case NODE_REMOVED:
this.deleteServiceMap(event);
break;
case CONNECTION_LOST:
case CONNECTION_SUSPENDED:
LOG.warn("zookeeper连接丢失,event type={}", event.getType());
break;
case INITIALIZED:
LOG.debug("event type={}", event.getType());
if(this.hasStart.compareAndSet(false,true)) {
this.countDownLatch.countDown();
}
default:
LOG.debug("event type={}", event.getType());
}
}
private void addServiceMap(TreeCacheEvent addEvent) {
String watchPath = addEvent.getData().getPath();
byte[] watchDataArray = addEvent.getData().getData();
String watchData = "";
if (watchDataArray != null && watchDataArray.length > 0) {
watchData = new String(watchDataArray, StandardCharsets.UTF_8);
}
LOG.debug("node add,path={},data={}", watchPath, watchData);
//由于zookeeper使用的资源定位方式类似于Path,因此直接通过java.nio.Path类来处理
Path path = Paths.get(watchPath);
Path subPath = this.parentPath.relativize(path);
if (subPath.getNameCount() == 2) {
String taskName = subPath.getName(0).toString();
try {
this.serviceMapLock.writeLock().lock();
if (this.activeServiceMap.containsKey(taskName)) {
Set set = this.activeServiceMap.get(taskName);
set.add(watchData);
} else {
Set set = new HashSet<>();
set.add(watchData);
this.activeServiceMap.put(taskName, set);
}
} finally {
this.serviceMapLock.writeLock().unlock();
}
} else
LOG.warn("ADD变化的ZNode Path={}", watchPath);
}
private void deleteServiceMap(TreeCacheEvent delEvent) {
String watchPath = delEvent.getData().getPath();
byte[] watchDataArray = delEvent.getData().getData();
String watchData = "";
if (watchDataArray != null && watchDataArray.length > 0) {
watchData = new String(watchDataArray, StandardCharsets.UTF_8);
}
LOG.debug("node delete,path={},data={}", watchPath, watchData);
Path path = Paths.get(watchPath);
Path subPath = this.parentPath.relativize(path);
if (subPath.getNameCount() == 1) {
String taskName = subPath.getFileName().toString();
LOG.warn("ZNode被手动删除,Path={}",watchPath);
if(!StringUtils.hasText(taskName))
return;
try {
this.serviceMapLock.writeLock().lock();
this.activeServiceMap.remove(taskName);
}finally {
this.serviceMapLock.writeLock().unlock();
}
} else if (subPath.getNameCount() == 2) {
String taskName = subPath.getName(0).toString();
LOG.debug("flink task:[{}], metric server:[{}]的进程结束",taskName,watchData);
try {
this.serviceMapLock.writeLock().lock();
if(this.activeServiceMap.containsKey(taskName)) {
this.activeServiceMap.get(taskName).remove(watchData);
if(this.activeServiceMap.get(taskName).isEmpty()) {
this.activeServiceMap.remove(taskName);
}
}
}finally {
this.serviceMapLock.writeLock().unlock();
}
} else
LOG.warn("DELETE变化的ZNode Path={}", watchPath);
}
@Override
public void close() throws IOException {
LOG.info("关闭flink metrics服务发现功能");
if (this.treeCache != null)
this.treeCache.close();
if (this.zkClient != null)
this.zkClient = null;
}
}
如何采集spark metrics
package ...;
import ....config.zookeeper.ZookeeperConfigProperties;
import ....config.mvc.exception.BusinessException;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class FlinkServiceDiscovery implements Closeable {
private final static Logger LOG = LoggerFactory.getLogger(FlinkServiceDiscovery.class);
private final ZookeeperConfigProperties zkConfig;
private final Path parentPath;
private Curatorframework zkClient;
private TreeCache treeCache;
//正在运行的flink task
private final Map> activeServiceMap = new HashMap<>();
private final ReadWriteLock serviceMapLock = new ReentrantReadWriteLock();
//下面两个属性,用于实现第一次TreeNode Cache初始化成功后才启动spring上下文
private final AtomicBoolean hasStart = new AtomicBoolean(false);
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public FlinkServiceDiscovery(ZookeeperConfigProperties zkConfig) {
this.zkConfig = zkConfig;
parentPath = Paths.get(this.zkConfig.getFlinkMetricsZnodePath());
}
public boolean start() throws InterruptedException {
LOG.info("start flink metrics server的服务发现功能,连接的zookeeper:[{}],监控的ZNode Path:[{}]",
this.zkConfig.getZkHosts(), this.zkConfig.getFlinkMetricsZnodePath());
this.zkClient = CuratorframeworkFactory
.newClient(this.zkConfig.getZkHosts(), new RetryForever(3000));
this.zkClient.start();
//等待连接建立成功
this.zkClient.blockUntilConnected();
try {
Stat stat = this.zkClient.checkExists()
.forPath(this.zkConfig.getFlinkMetricsZnodePath());
if (stat == null) {
LOG.warn("不存在ZNode路径:[{}],因此不会启动服务发现功能",this.zkConfig.getFlinkMetricsZnodePath());
return false;
}
LOG.info("开始对ZNode:[{}]建立监听", this.zkConfig.getFlinkMetricsZnodePath());
this.treeCache = TreeCache.newBuilder(this.zkClient, this.zkConfig.getFlinkMetricsZnodePath())
.setCacheData(true)
.build();
treeCache.getListenable().addListener(this::ZnodeListenerFunction);
this.treeCache.start();
return true;
} catch (Exception exception) {
LOG.warn("建立服务发现功能失败", exception);
try {
this.close();
} catch (IOException ioException) {
//do nothing
}
}
return false;
}
public void waitUntilWatcherReady() throws InterruptedException {
LOG.info("等待watcher Node Cache initial");
if(!this.hasStart.get()) {
this.countDownLatch.await();
}
LOG.info("watcher Node Cache初始化完成");
}
public Map> findAllActiveServices() {
try {
this.serviceMapLock.readLock().lock();
return this.activeServiceMap;
}finally {
this.serviceMapLock.readLock().unlock();
}
}
private void ZnodeListenerFunction(Curatorframework curatorframework, TreeCacheEvent event) {
switch (event.getType()) {
case NODE_ADDED:
this.addServiceMap(event);
break;
case NODE_UPDATeD:
LOG.warn("node update,{}", event.getData().toString());
//理论上不会发生
break;
case NODE_REMOVED:
this.deleteServiceMap(event);
break;
case CONNECTION_LOST:
case CONNECTION_SUSPENDED:
LOG.warn("zookeeper连接丢失,event type={}", event.getType());
break;
case INITIALIZED:
LOG.debug("event type={}", event.getType());
if(this.hasStart.compareAndSet(false,true)) {
this.countDownLatch.countDown();
}
default:
LOG.debug("event type={}", event.getType());
}
}
private void addServiceMap(TreeCacheEvent addEvent) {
String watchPath = addEvent.getData().getPath();
byte[] watchDataArray = addEvent.getData().getData();
String watchData = "";
if (watchDataArray != null && watchDataArray.length > 0) {
watchData = new String(watchDataArray, StandardCharsets.UTF_8);
}
LOG.debug("node add,path={},data={}", watchPath, watchData);
//由于zookeeper使用的资源定位方式类似于Path,因此直接通过java.nio.Path类来处理
Path path = Paths.get(watchPath);
Path subPath = this.parentPath.relativize(path);
if (subPath.getNameCount() == 2) {
String taskName = subPath.getName(0).toString();
try {
this.serviceMapLock.writeLock().lock();
if (this.activeServiceMap.containsKey(taskName)) {
Set set = this.activeServiceMap.get(taskName);
set.add(watchData);
} else {
Set set = new HashSet<>();
set.add(watchData);
this.activeServiceMap.put(taskName, set);
}
} finally {
this.serviceMapLock.writeLock().unlock();
}
} else
LOG.warn("ADD变化的ZNode Path={}", watchPath);
}
private void deleteServiceMap(TreeCacheEvent delEvent) {
String watchPath = delEvent.getData().getPath();
byte[] watchDataArray = delEvent.getData().getData();
String watchData = "";
if (watchDataArray != null && watchDataArray.length > 0) {
watchData = new String(watchDataArray, StandardCharsets.UTF_8);
}
LOG.debug("node delete,path={},data={}", watchPath, watchData);
Path path = Paths.get(watchPath);
Path subPath = this.parentPath.relativize(path);
if (subPath.getNameCount() == 1) {
String taskName = subPath.getFileName().toString();
LOG.warn("ZNode被手动删除,Path={}",watchPath);
if(!StringUtils.hasText(taskName))
return;
try {
this.serviceMapLock.writeLock().lock();
this.activeServiceMap.remove(taskName);
}finally {
this.serviceMapLock.writeLock().unlock();
}
} else if (subPath.getNameCount() == 2) {
String taskName = subPath.getName(0).toString();
LOG.debug("flink task:[{}], metric server:[{}]的进程结束",taskName,watchData);
try {
this.serviceMapLock.writeLock().lock();
if(this.activeServiceMap.containsKey(taskName)) {
this.activeServiceMap.get(taskName).remove(watchData);
if(this.activeServiceMap.get(taskName).isEmpty()) {
this.activeServiceMap.remove(taskName);
}
}
}finally {
this.serviceMapLock.writeLock().unlock();
}
} else
LOG.warn("DELETE变化的ZNode Path={}", watchPath);
}
@Override
public void close() throws IOException {
LOG.info("关闭flink metrics服务发现功能");
if (this.treeCache != null)
this.treeCache.close();
if (this.zkClient != null)
this.zkClient = null;
}
}
如何采集spark metrics
spark的metrics system使用的数据结构定义,与prometheus不兼容,因此需要实现一个Reporter类,专门负责数据结构的转换工作。
MetricsReporter源码
package ...;
import com.codahale.metrics.*;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class PrometheusReporter extends ScheduledReporter {
private static final Pattern APPLICATION_ID_REGEX = Pattern.compile("(application_\d+_\d+)");
private static io.prometheus.client.Gauge GAUGE_COLLECTOR = null;
//NOTE:这里涉及到spark submit的源码部分,AM会阻塞(默认100S)等待driver端的sparkContext初始化结束
private static AtomicInteger INDENTITY_COUNTER = new AtomicInteger(0);//1:driver,2:AM
public PrometheusReporter(MetricRegistry registry, String jobName) {
super(registry, "prometheus-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS);
jobName = jobName.replaceAll("\.", "_");
String gaugeNameSuffix = null;
if(INDENTITY_COUNTER.incrementAndGet()==1) {
gaugeNameSuffix = "_driver";
}else {
gaugeNameSuffix = "_am";
}
GAUGE_COLLECTOR = io.prometheus.client.Gauge.build("spark_guage_" + jobName+gaugeNameSuffix,
"spark gauge metrics")
.labelNames("application_id", "origin_lable").register();
}
@SuppressWarnings("all")
@Override
public void report(SortedMap gaugeMap,
SortedMap countMap,
SortedMap histogramMap,
SortedMap meterMap,
SortedMap timerMap) {
if (gaugeMap != null) {
for (Map.Entry gaugeEntry : gaugeMap.entrySet()) {
String metricName = gaugeEntry.getKey();
String[] splitKeyworlds = this.spilitMetricName(metricName);
Gauge gauge = gaugeEntry.getValue();
if(gauge.getValue() instanceof Number) {
GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0]).set(Double.parseDouble(String.valueOf(gauge.getValue())));
}
}
}
if(countMap!=null) {
for (Map.Entry counterEntry : countMap.entrySet()) {
String metricName = counterEntry.getKey();
String[] splitKeyworlds = this.spilitMetricName(metricName);
Counter counter = counterEntry.getValue();
GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0])
.set(counter.getCount());
}
}
}
private String[] spilitMetricName(String metricName) {
String[] result = new String[2];
result[0] = metricName;
Matcher matcher = APPLICATION_ID_REGEX.matcher(metricName);
if(matcher.find()) {
result[0] = metricName.replaceAll("application_\d+_\d+","appid");
result[1] = matcher.group(1);
}else {
result[1] = "NaN";
}
result[0] = result[0].replaceAll("\.","_");
return result;
}
}
配置spark conf启用prometheus sink
配置方式有很多,例如通过${SPKAR_HOME}/conf/metrics.properties增加配置。我个人更喜欢通过spark-submit --conf的方式设置。
spark-submit --master yarn --deploy-mode cluster --conf spark.metrics.conf.*.sink.prometheus.class=...spark.metric.sink.PrometheusSink --conf spark.metrics.conf.*.sink.prometheus.zkHosts=xxx:2181 --conf spark.metrics.conf.*.sink.prometheus.sessionTimeoutMs=30000 --conf spark.metrics.conf.*.sink.prometheus.connectTimeoutMs=10000 --conf spark.metrics.conf.*.sink.prometheus.retryIntervalMs=3000 --conf spark.metrics.conf.*.sink.prometheus.jobName=MyJob1 --jars moduleB.jar,moduleC.jar --class mainClass /tmp/moduleA.jar
到这里为止,已经按照官方的metric system文档完成了所有的开发工作,但在运行起来后,你会发现下面的问题:
Prometheus Sink启动的问题
22/01/27 10:20:16 ERROR metrics.MetricsSystem: Sink class ....spark.metric.sink.PrometheusSink cannot be instantiated
22/01/27 10:20:16 ERROR yarn.ApplicationMaster: Uncaught exception:
java.lang.ClassNotFoundException: ....spark.metric.sink.PrometheusSink
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:242)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.deploy.yarn.ApplicationMaster.createAllocator(ApplicationMaster.scala:433)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:460)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:275)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:805)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:804)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:804)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
在spark-submit中,通过--jars moduleB.jar设置了prometheus sink的代码(位于ModuleB中),理论上来说应该没有问题。
Spark Metrics System
spark metrics system的文档中有这么一段话:
syntax: [instance].sink|source.[name].[options]=[value] # This file configures Spark's internal metrics system. The metrics system is # divided into instances which correspond to internal components. # Each instance can be configured to report its metrics to one or more sinks. # Accepted values for [instance] are "master", "worker", "executor", "driver", # and "applications". A wildcard "*" can be used as an instance name, in # which case all instances will inherit the supplied property.
也就是说,一个正常运行的spark on yarn任务,应该包含applications|driver|executor三个metric instance,而我们上面的配置是*,也就是这三个instance都增加了prometheus sink,下面我修改了配置:
--conf spark.metrics.conf.driver.sink.prometheus.class=...spark.metric.sink.PrometheusSink --conf spark.metrics.conf.driver.sink.prometheus.zkHosts=xxx:2181 --conf spark.metrics.conf.driver.sink.prometheus.sessionTimeoutMs=30000 --conf spark.metrics.conf.driver.sink.prometheus.connectTimeoutMs=10000 --conf spark.metrics.conf.driver.sink.prometheus.retryIntervalMs=3000 --conf spark.metrics.conf.driver.sink.prometheus.jobName=MyJob1
以这个配置重写spark-submit,是可以正常运行的(这里注意:如果上面的配置,driver改为executor,依然会报ClassNotFoundException,这个原因和ApplicationMaster是一样的)。
到这里,可以说明spark driver和YarnAM之间加载的jar包是不一样的,需要通过源码分析一下。
spark启动过程源码分析
spark-submit.sh
SparkSubmit执行流程
这个类的主要作用是解析spark-submit传入的args,并反射调用org.apache.spark.deploy.yarn.YarnClusterApplication的main函数,源码很长,不过多解读,只了解几个很重要的地方。
--jars、--driver-library-path和--driver-class-path
在经过SparkSubmit处理后,进入YarnClusterApplication方法之前,会构造出三个spark config出来:
- spark.yarn.dist.jars:等价于--jars中指定的jar包列表,但会做一些URI转换工作,例如相对路径转决定路径,globPath解析,FTP文件下载到本地等工作。spark.driver.extra.classPath:等价于--driver-class-path,不做任何修改。spark.driver.extra.libraryPath:等价于--driver-library-path,不做任何修改。
spark main jar
spark main jar,在源码中被称为primary resource,这个信息在SparkSubmit中被构造为YarnClusterApplication的main函数args入参,形式为:
--jar moduleA.jar --class moduleA.mainClassName --arg moduleA.main.args
到这里,SparkSubmit逻辑结束,进入YarnClusterApplication中。
YarnClusterApplication
这个类用于启动yarn AM,并在AM中启动spark driver端(也就是ModuleA)的代码。
其核心在AM的jar包是如何设置的。
AM启动流程
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// 重点!!!这里构造AM Container的上下文,所有jar包和Classpath相关都在这里面
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
AM CLASSPASH环境变量的设置
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDirPath = new Path(appStagingbaseDir, getAppStagingDir(appId))
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}
//appStagingDirPath=hdfs://nameservice1/user/spark_cdh/.sparkStaging
//pySparkArchives:不需要关注
// 构造AM的Environment,其中包含CLASSPATH
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
// 用于设置yarn container需要下载的资源,例如jar包,log4j.properties文件等
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
//下面用于构造AM启动的脚本,就是一个java -server -jar ... 的命令
}
private def setupLaunchEnv(
stagingDirPath: Path,
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
// 重点!!! AM的classpath支持扩展,通过这个配置项控制,而这个配置项又是通过--driver-class-path设置的
// DRIVER_CLASS_PATH = spark.driver.extraClassPath
populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
sparkConf.getAll
.filter { case (k, v) => k.startsWith(amEnvPrefix) }
.map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
.foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
// SPARK_DIST_CLASSPATH
sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
env(ENV_DIST_CLASSPATH) = dcp
}
env
}
Container资源上传到HDFS
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDirPath = new Path(appStagingbaseDir, getAppStagingDir(appId))
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}
//appStagingDirPath=hdfs://nameservice1/user/spark_cdh/.sparkStaging
//pySparkArchives:不需要关注
// 构造AM的Environment,其中包含CLASSPATH
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
// 用于设置yarn container需要下载的资源,例如jar包,log4j.properties文件等
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
//下面用于构造AM启动的脚本,就是一个java -server -jar ... 的命令
}
private def setupLaunchEnv(
stagingDirPath: Path,
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
// 重点!!! AM的classpath支持扩展,通过这个配置项控制,而这个配置项又是通过--driver-class-path设置的
// DRIVER_CLASS_PATH = spark.driver.extraClassPath
populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
sparkConf.getAll
.filter { case (k, v) => k.startsWith(amEnvPrefix) }
.map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
.foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
// SPARK_DIST_CLASSPATH
sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
env(ENV_DIST_CLASSPATH) = dcp
}
env
}
Container资源上传到HDFS
这个方法也很长,只需要了解:
- spark primary resource(也就是moduleA.jar)上传到HDFS,并重命名为__app__.jar。--jars的文件,上传到HDFS。
启动AM
核心是生成一个command,其内容大致为:
LD_LIBRARY_PATH="{spark.driver.extraLibraryPath}:$LD_LIBRARY_PATH"
{{JAVA_HOME}}/bin/java
-server
org.apache.spark.deploy.yarn.ApplicationMaster
--class MyModuleAMainClass
--jar ModuleA.jar
--arg
1>/stdout
2>/stderr
【这里有个问题没有弄懂】spark.driver.extraLibraryPath(也就是通过--driver-library-path设置的jar包),不会影响AM的CLASSPATH,那么设置LD_LIBRARY_PATH的意义纠结何在?
从上面的命令可以分析出,通过spark-submit --jars设置的jar包,并不会作为AM的CLASSPATH,那么spark driver能启动,也就说明AM与driver的关系应该如下图:
ApplicationMaster
//1、AM的入口,主线程
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
System.exit(master.run())
}
//2、kerberos
final def run(): Int = {
doAsUser {
runImpl()
}
exitCode
}
private def runImpl(): Unit = {
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
}
private def runDriver(): Unit = {
addAmIpFilter(None)
//重点!!!启动spark driver
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
//重点!!!等待sparkContext初始化结束(也就表示driver执行成功),但不会无限等待,默认100S
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
//重点!!!这里面就是负责调用AM端的prometheus sink
createAllocator(driverRef, userConf)
}
}
private def startUserApplication(): Thread = {
// userClassLoader负责加载了__app__.jar(也就是moduleA.jar)和spark-submit --jars指定的所有jar包
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
//负责调用moduleA.jar的main方法,初始化sparkContext
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}
private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}
解决ClassNotFound
到这里我们知道了,AM的默认CLASSPATH是不包含moduleA.jar、moduleB.jar和moduleC.jar的,因此找不到prometheus sink很正常,添加如下配置让AM增加moduleB.jar
spark-submit ... --driver-class-path moduleB.jar --class xxx /moduleA.jar ...
在最开始的背景部分提到了,moduleA、moduleB和moduleC之间存在类的引用,因此上面配置后,AM的JVM classloader结构如下:
由于上面的类分布,在ModuleA启动时,会去加载ModuleB的ClassB,而ClassB需要使用ModuleC的ClassC,此时从ClassB去load ClassC时,就会出现ClassC Not Found Exception。
要解决上面的问题,只要保证通过--driver-class-path设置的jar不要引用--jars和main jar就可以了(但是main jar和--jars可以引用--driver-class-path的jar),我的解决方案是将prometheus sink的编码独立成一个jar包。
executor也出现ClassNotFoundException
原因与AM出现的一样,spark-submit增加下面的配置即可解决。
--conf spark.executor.extraClassPath=spark-prometheus-metrics-1.0.jar


![[Spark2.4] 增加Prometheus Sink带来的问题,并通过源码分析原因 [Spark2.4] 增加Prometheus Sink带来的问题,并通过源码分析原因](http://www.mshxw.com/aiimages/31/720401.png)
