【Hadoop】JobQueueTaskScheduler,
JobQueueTaskScheduler是JT默认的任务调度器,其本质上维护一个priority-based FIFO作业队列,基本的机制是优先调度高优先级/提交时间早的作业。但是,具体的调度策略需要综合考虑一系列的问题,比如cluster资源利用率、系统吞吐率、任务data locality等等。在复杂的集群环境和不同的系统负载下,完美的调度策略是不可能的,从Hadoop早期的jira讨论中可以发现,最终的调度策略往往是经过激烈地讨论和在大集群下不断benchmark得出的权衡。下面将从几个重点的设计因素出发,来学习该调度器的策略。
单个TaskTracker利用率
最初,JT调度器每次heartbeat只为TT分配一个task,这显然不利于充分利用TT,比较极端的情况是,每个task的运行时间都小于heartbeat interval,这样TT同时总是只能有一个task在运行。于是,HADOOP-3136提出每次heartbeat应该为分配多个task。因此,assignTask方法中出现这样的loop:
for (int i=0; i < availableMapSlots; ++i) {
for (JobInProgress job : jobQueue){
...
}
}
TaskTracker集群均衡负载
前面提到为了提高单个TT利用率,每个heartbeat要分配多个task。这带来一个问题:在一次heartbeat中,如果总是最大限度的分配task,也就是说TT有多少free slot就分配多少task给它,当系统负载不是很高的情况下,将会出现某些TT非常繁忙,而其他TT相对空闲(因为来一个TT要任务,你就最大限度满足它,导致后续的TT来要任务时,已经没有任务可分配了)。
为了解决上述问题,load factor被加入调度策略:给某个TT分配任务时,要计算集群的负载因子(remainingLoad / clusterCapacity),确保TT负载不会超过系统均衡负载。比如说,TT slot是4,当前集群负载因子是0.5(也就是说集群只要50%负载就可以满足当前任务需求),那么单个TT负载度也不能超过50%,也就是该TT不能分配超过2个任务给它,确保任务能均衡分配到集群所有TT上。
// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}
final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
MapTask数据局部性
为了提高系统吞吐率,降低网络负载,将MapTask分配给data-local/rack-local的TT非常重要。当JT可以一次分配多个任务时,又要面对这样一个问题:如果优先级高的job对于该TT没有data-local/rack-local任务,是否可以分配non-local任务?分配几个?
一方面,试想如果不分配non-local任务,转而优先分配低优先级job的data-local任务,将有两个问题:1、优先级没有得到足够的尊重,高优先级job用户通常希望job尽快地执行完 2、如果其data-local TT处于繁忙状态,该任务可能要等待过长的时间才能分配给data-local TT。
另一方面,试想尽最大限度(TT maxCapacity)分配non-local任务,将会降低集群整体的data-local比例。假如下一个heartbeat的TT是data-local的,我现在就把任务分配出去了,岂不是剥夺了它分配给其他data-local TT的机会了(steal local-task from other TT)。
作为一种权衡的调度策略是,如果当前高优先级的job没有data-local/rack-local任务,会尝试找它的non-local任务,但是一次最多只能分配一个,以免steal太多潜在的local-task。
降低ReduceTask带来的IO负载
每个Reduce任务第一步是shuffle,需要从各个MapTask TT上拷贝数据,如果同时分配多个Reduce任务给TT,它们将同时启动IO,IO负载太高,将相互影响性能,所以JT总是一次只分配一个Reduce任务。
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = null;
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,
numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;
// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
}
// Try all jobs again for the next Map task
break;
}
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;
// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
}
}
}
}
留有余地
中国人常说话说七分留三分,任何事不要做得太极致。JT分配任务时,也遵循这个古训:)。为什么呢?因为留有余下以便不时只需,比如重试failed任务、启动speculative任务等。因此,可以看到调度器loop中有exceededMapPadding
和exceededReducePadding。下面的算法中,MIN_CLUSTER_SIZE_FOR_PADDING值越大,留的余地就越大,默认值是0.01。当然,调度器比较尊重优先级任务,从调度器loop中可以看到最高优先级任务的需求可以越过该exceededPadding限制。 //
// Beyond the highest-priority task, reserve a little
// room for failures and speculative executions; don't
// schedule tasks to the hilt.
//
totalNeededTasks +=
isMapTask ? job.desiredMaps() : job.desiredReduces();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding =
Math.min(maxTaskTrackerSlots,
(int) (totalNeededTasks * padFraction));
}
if (totalTasks + padding >= totalTaskCapacity) {
exceededPadding = true;
break;
}