我们的海外集群上线spot动态节点后,由于动态节点不定期会被回收,为了保证作业运行稳定性,需要指定某些重点作业运行在固定节点,而这需要使用到CapacityScheduler的NodeLabel功能。虽然社区也有FairScheduler支持NodeLabel的patch(YARN-2497),但并不成熟,不适合在生产环境使用。
此外,目前社区的开发重点也偏向CapacityScheduler,随着版本的更新,它对于调度吞吐量做了众多改进,比如多线程调度、细粒度锁等。经过讨论后,我们决定将目前使用的FairScheduler切换为CapacityScheduler。
二、集群现状目前我们的海外集群队列数800+,每天提交约18万个Hive、Spark等作业,我们需要在用户无感知情况下将调度器切换成CapacityScheduler,兼容调度器之间的功能差异,并保障集群调度性能。
三、配置转换除了将yarn.resourcemanager.scheduler.class 配置为CapacityScheduler外,我们重点需要将队列配置fair-scheduler.xml转换为capacity-scheduler.xml,因为两者的配置方式差异较大,结合社区的资料(YARN-9698),我整理出一些关键配置的映射关系:
| FairScheduler配置 | CapacityScheduler配置 | 配置说明 |
|---|---|---|
| minResources | yarn.scheduler.capacity. | 队列最小资源 |
| maxResources | yarn.scheduler.capacity. | 队列最大资源 |
| aclSubmitApps | yarn.scheduler.capacity. | 队列提交授权 |
| aclAdministerApps | yarn.scheduler.capacity. | 队列管理授权 |
| maxAMShare | yarn.scheduler.capacity. | AM资源限额管理 |
| queueMaxAppsDefault | yarn.scheduler.capacity. | 最大apps限制 |
| maxRunningApps | yarn.scheduler.capacity. | 最大运行apps限制 |
| maxContainerAllocation (“X mb, Y vcores”) | yarn.scheduler.capacity. | container最大资源 |
| yarn.scheduler.fair.preemption | yarn.resourcemanager.scheduler.monitor.enable | 是否开启抢占,默认值为false |
| yarn.scheduler.fair.allow-undeclared-pools | yarn.scheduler.capacity. | 是否允许自动创建队列,FS默认为true,CS默认为false |
| yarn.scheduler.fair.user-as-default-queue | yarn.scheduler.capacity.queue-mappings | 用户队列映射,FS默认为true |
值得一提的是,在Hadoop 3.3.0版本,yarn提供了一个命令行工具来进行配置转换,使用方式如下:
yarn fs2cs -y /path/to/yarn-site.xml [-f /path/to/fair-scheduler.xml] {-o /output/path/ | -p} [-t] [-s] [-d]
但目前这个工具还不完善,我在试用过后发现有很多参数不支持转换,比如:
– default maximum applications per user – minimum resources for a queue – maximum resources for a queue
最终我选择自己写脚本完成配置文件的转换,这个过程中遇到一些CapacityScheduler队列配置需要重点注意的地方:
1、资源计算器
yarn.scheduler.capacity.resource-calculator:资源计算器用于在调度器中比较资源。
默认值是 org.apache.hadoop.yarn.util.resource.DefaultResourseCalculator,只使用内存进行比较,而我们的集群中需要同时限制用户对内存和vcore的使用,所以需要将该参数设置为org.apache.hadoop.yarn.util.resource.DominantResourceCalculator。
2、排序策略
yarn.scheduler.capacity.
对叶子队列配置时,用于对该队列内的作业进行排序,可选值有fifo 和 fair,默认是fifo。
两者的配置值不能混淆,如果父队列配置了fifo 或 fair,则会报错。
3、队列有效资源
由于使用FairScheduler时队列配置的是绝对资源,转换过后CapacityScheduler也将使用绝对资源配置,而所有队列配置的绝对资源总和可能超过集群的总资源,这种情况下,队列会计算出有效资源:
队列的有效资源(Effective Capacity)= 队列绝对资源占比 * 集群总资源
队列的Effective Max Capacity也不会超过集群总资源
4、队列最大作业数
yarn.scheduler.capacity.
队列的maximum-applications = 队列绝对资源占比 * yarn.scheduler.capacity.maximum-applications(默认为10000)
5、用户资源限制
yarn.scheduler.capacity.
6、队列映射
使用FairScheduler时,用户提交作业若未指定队列,则会提交到同名队列,如果同名队列不存在,则会自动创建,
而在CapacityScheduler中如果要实现用户提交作业自动映射到同名队列,需要配置yarn.scheduler.capacity.queue-mappings=u:%user:%user,且CapacityScheduler默认不允许自动创建队列,可通过yarn.scheduler.capacity.
7、队列配置自动刷新
在使用FairScheduler时,后台线程会每10s检查一次配置文件fair-scheduler.xml有没有被修改,如果被修改则重新加载,
而CapacityScheduler没有自动刷新的机制,修改capacity-scheduler.xml后需要执行 yarn rmadmin -refreshQueues 命令来刷新队列。
社区在3.4.0版本中添加了CapacityScheduler自动刷新配置的功能,详见:YARN-10623。
当配置转换好之后,我们还需要在正式上线前进行模拟测试,这里需要使用到YARN Scheduler Load Simulator (SLS)。
Yarn SLS可以实现在一台机器上模拟大规模的 YARN 集群和应用程序负载,可以很方便地测试调度器。具体使用方式可参考官方文档:https://hadoop.apache.org/docs/stable/hadoop-sls/SchedulerLoadSimulator.html
这里主要介绍我在使用过程中遇到的一些问题,以下基于Hadoop 3.3.0版本
1、运行rumen2sls.sh脚本时报错
Error: Could not find or load main class org.apache.hadoop.yarn.sls.RumenToSLSConverter
解决:https://issues.apache.org/jira/browse/YARN-9994
2、运行rumen2sls.sh脚本报错:
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.createSLSTasks(RumenToSLSConverter.java:210)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.createSLSJob(RumenToSLSConverter.java:191)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.generateSLSLoadFile(RumenToSLSConverter.java:133)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.main(RumenToSLSConverter.java:117)
解决:修改代码
RumenToSLSConverter.java
- long taskFinish = (Long) rumenAttempt.get("finishTime");
+ long taskFinish =-1;
+ try{
+ taskFinish = Long.parseLong(rumenAttempt.get("finishTime").toString());
+ } catch (Exception ex){
+ LOG.error(rumenAttempt.get("finishTime"), ex);
+ }
3、运行rumen2sls.sh脚本报错:
Exception in thread "main" java.lang.IllegalArgumentException: Network Location is null
at org.apache.hadoop.net.Nodebase.normalize(Nodebase.java:156)
at org.apache.hadoop.net.Nodebase.(Nodebase.java:51)
at org.apache.hadoop.yarn.sls.utils.SLSUtils.getRackHostName(SLSUtils.java:72)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.createSLSTasks(RumenToSLSConverter.java:229)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.createSLSJob(RumenToSLSConverter.java:193)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.generateSLSLoadFile(RumenToSLSConverter.java:135)
at org.apache.hadoop.yarn.sls.RumenToSLSConverter.main(RumenToSLSConverter.java:119)
解决:修改代码
RumenToSLSConverter.java
- String rackHost[] = SLSUtils.getRackHostName(hostname);
- if (rackNodeMap.containsKey(rackHost[0])) {
- rackNodeMap.get(rackHost[0]).add(rackHost[1]);
- } else {
- Set hosts = new TreeSet();
- hosts.add(rackHost[1]);
- rackNodeMap.put(rackHost[0], hosts);
+ if(hostname != null){
+ String rackHost[] = SLSUtils.getRackHostName(hostname);
+ if (rackNodeMap.containsKey(rackHost[0])) {
+ rackNodeMap.get(rackHost[0]).add(rackHost[1]);
+ } else {
+ Set hosts = new TreeSet();
+ hosts.add(rackHost[1]);
+ rackNodeMap.put(rackHost[0], hosts);
+ }
4、打开http://localhost:10001/simulate页面时报错,无法加载js,css资源
java.lang.NullPointerException
at org.eclipse.jetty.server.ResourceService.doGet(ResourceService.java:235)
at org.eclipse.jetty.server.handler.ResourceHandler.handle(ResourceHandler.java:256)
at org.apache.hadoop.yarn.sls.web.SLSWebApp$1.handle(SLSWebApp.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:494)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
at java.lang.Thread.run(Thread.java:748)
解决:https://issues.apache.org/jira/browse/YARN-6275
以上问题都解决之后,顺利通过测试,于是便正式在海外集群上线了CapacityScheduler,运行了几天之后,RM突然报错并异常退出
异步调度线程并发BUG异常日志如下:
2021-10-25 11:37:06,585 ERROR org.apache.hadoop.yarn.event.EventDispatcher: Error in handling event type NODE_UPDATE to the Event Dispatcher
java.lang.IllegalArgumentException: Comparison method violates its general contract!
at java.util.TimSort.mergeLo(TimSort.java:777)
at java.util.TimSort.mergeAt(TimSort.java:514)
at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
at java.util.TimSort.sort(TimSort.java:254)
at java.util.Arrays.sort(Arrays.java:1512)
at java.util.ArrayList.sort(ArrayList.java:1460)
at java.util.Collections.sort(Collections.java:175)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy.getAssignmentIterator(PriorityUtilizationQueueOrderingPolicy.java:221)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.sortAndGetChildrenAllocationIterator(ParentQueue.java:783)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.assignContainersToChildQueues(ParentQueue.java:794)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.assignContainers(ParentQueue.java:626)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateOrReserveNewContainers(CapacityScheduler.java:1658)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainerOnSingleNode(CapacityScheduler.java:1596)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainersToNode(CapacityScheduler.java:1768)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainersToNode(CapacityScheduler.java:1504)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.nodeUpdate(CapacityScheduler.java:1342)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.handle(CapacityScheduler.java:1832)
at org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.handle(CapacityScheduler.java:171)
at org.apache.hadoop.yarn.event.EventDispatcher$EventProcessor.run(EventDispatcher.java:71)
at java.lang.Thread.run(Thread.java:748)
排查之后发现原因:当队列发生修改,RM中父队列通过调用ParentQueue#reinitialize刷新子队列,同时异步调度线程在调用ParentQueue#sortAndGetChildrenAllocationIterator时对子队列进行排序。由于 TimSort 不处理它正在排序的对象的并发修改,因此可能会发生竞争条件并抛出异常。
社区有相关补丁:https://issues.apache.org/jira/browse/YARN-10178,做法是在对子队列进行排序之前,对队列资源数据做一个快照,排序过程中使用这个快照的数据,这样不需要加锁也可避免并发冲突,对性能影响很小。
在线上集群上线这个补丁后成功修复该BUG,此后一直正常运行,未出现大的调度问题。
后续以上分享的是我在将海外集群使用的FairScheduler切换到CapacityScheduler过程中遇到的一些比较细节的问题,同时发现社区也正在进行这方面的开发,而目前针对我们集群的场景还有一些问题没有彻底解决,例如:
- CapacityScheduler递归加载队列配置耗时过长
- 使用绝对资源配置时maxApplications参数被重新计算
- 无法根据NodeLabel分区获取队列当前资源限制
后续在优化修复后,也希望能将我们的经验共享到社区,帮助遇到同样场景问题的人。



