欢迎投稿

今日深度:

【Hadoop】JobQueueTaskScheduler,

【Hadoop】JobQueueTaskScheduler,


JobQueueTaskScheduler是JT默认的任务调度器,其本质上维护一个priority-based FIFO作业队列,基本的机制是优先调度高优先级/提交时间早的作业。但是,具体的调度策略需要综合考虑一系列的问题,比如cluster资源利用率、系统吞吐率、任务data locality等等。在复杂的集群环境和不同的系统负载下,完美的调度策略是不可能的,从Hadoop早期的jira讨论中可以发现,最终的调度策略往往是经过激烈地讨论和在大集群下不断benchmark得出的权衡。下面将从几个重点的设计因素出发,来学习该调度器的策略。


单个TaskTracker利用率

最初,JT调度器每次heartbeat只为TT分配一个task,这显然不利于充分利用TT,比较极端的情况是,每个task的运行时间都小于heartbeat interval,这样TT同时总是只能有一个task在运行。于是,HADOOP-3136提出每次heartbeat应该为分配多个task。因此,assignTask方法中出现这样的loop:

for (int i=0; i < availableMapSlots; ++i) {
        for (JobInProgress job : jobQueue){
...
}
}

TaskTracker集群均衡负载

前面提到为了提高单个TT利用率,每个heartbeat要分配多个task。这带来一个问题:在一次heartbeat中,如果总是最大限度的分配task,也就是说TT有多少free slot就分配多少task给它,当系统负载不是很高的情况下,将会出现某些TT非常繁忙,而其他TT相对空闲(因为来一个TT要任务,你就最大限度满足它,导致后续的TT来要任务时,已经没有任务可分配了)。

为了解决上述问题,load factor被加入调度策略:给某个TT分配任务时,要计算集群的负载因子(remainingLoad / clusterCapacity),确保TT负载不会超过系统均衡负载。比如说,TT slot是4,当前集群负载因子是0.5(也就是说集群只要50%负载就可以满足当前任务需求),那么单个TT负载度也不能超过50%,也就是该TT不能分配超过2个任务给它,确保任务能均衡分配到集群所有TT上。

// Compute the 'load factor' for maps and reduces
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }

final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

MapTask数据局部性

为了提高系统吞吐率,降低网络负载,将MapTask分配给data-local/rack-local的TT非常重要。当JT可以一次分配多个任务时,又要面对这样一个问题:如果优先级高的job对于该TT没有data-local/rack-local任务,是否可以分配non-local任务?分配几个?

一方面,试想如果不分配non-local任务,转而优先分配低优先级job的data-local任务,将有两个问题:1、优先级没有得到足够的尊重,高优先级job用户通常希望job尽快地执行完 2、如果其data-local TT处于繁忙状态,该任务可能要等待过长的时间才能分配给data-local TT。 

另一方面,试想尽最大限度(TT maxCapacity)分配non-local任务,将会降低集群整体的data-local比例。假如下一个heartbeat的TT是data-local的,我现在就把任务分配出去了,岂不是剥夺了它分配给其他data-local TT的机会了(steal local-task from other TT)。

作为一种权衡的调度策略是,如果当前高优先级的job没有data-local/rack-local任务,会尝试找它的non-local任务,但是一次最多只能分配一个,以免steal太多潜在的local-task。


降低ReduceTask带来的IO负载

每个Reduce任务第一步是shuffle,需要从各个MapTask TT上拷贝数据,如果同时分配多个Reduce任务给TT,它们将同时启动IO,IO负载太高,将相互影响性能,所以JT总是一次只分配一个Reduce任务。

    for (int i=0; i < availableMapSlots; ++i) {
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }

          Task t = null;
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            
            // Don't assign map tasks to the hilt!
            // Leave some free slots in the cluster for future task-failures,
            // speculative tasks etc. beyond the highest priority job
            if (exceededMapPadding) {
              break scheduleMaps;
            }
           
            // Try all jobs again for the next Map task 
            break;
          }
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());
          
          if (t != null) {
            assignedTasks.add(t);
            ++numNonLocalMaps;
            
            // We assign at most 1 off-switch or speculative task
            // This is to prevent TaskTrackers from stealing local-tasks
            // from other TaskTrackers.
            break scheduleMaps;
          }
        }
      }
    }

留有余地

中国人常说话说七分留三分,任何事不要做得太极致。JT分配任务时,也遵循这个古训:)。为什么呢?因为留有余下以便不时只需,比如重试failed任务、启动speculative任务等。因此,可以看到调度器loop中有exceededMapPadding

和exceededReducePadding。下面的算法中,MIN_CLUSTER_SIZE_FOR_PADDING值越大,留的余地就越大,默认值是0.01。当然,调度器比较尊重优先级任务,从调度器loop中可以看到最高优先级任务的需求可以越过该exceededPadding限制。

        //
        // Beyond the highest-priority task, reserve a little 
        // room for failures and speculative executions; don't 
        // schedule tasks to the hilt.
        //
        totalNeededTasks += 
          isMapTask ? job.desiredMaps() : job.desiredReduces();
        int padding = 0;
        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
          padding = 
            Math.min(maxTaskTrackerSlots,
                     (int) (totalNeededTasks * padFraction));
        }
        if (totalTasks + padding >= totalTaskCapacity) {
          exceededPadding = true;
          break;
        }

www.htsjk.Com true http://www.htsjk.com/Hadoop/41359.html NewsArticle 【Hadoop】JobQueueTaskScheduler, JobQueueTaskScheduler是JT默认的任务调度器,其本质上维护一个priority-based FIFO作业队列,基本的机制是优先调度高优先级/提交时间早的作业。但是,具体的调度...
相关文章
    暂无相关文章
评论暂时关闭