欢迎投稿

今日深度:

HBase的compact流程分析,

HBase的compact流程分析,


关于hbase的compact意义及其应用背景已经有很多文章做了描述,这里不再赘述,本文以1.1.2背景的hbase为例,从源码中分析hbase中compact请求发出到接受处理的流程,限于作者水平有限,分析中可能有疏漏或者错误的地方,希望大家指正,一起学习提高。

从regionserver开始分析compact的流程,regionserver初始化的时候会初始化两个与compact相关的线程分别是compactSplitThread和compactionChecker。其中compactionChecker用于周期性地检查当前是否有compact请求,检查周期由参数threadWakeFrequency控制,默认值是10s,也可以在参数hbase.server.thread.frequency中配置。另一个变量compactSplitThread负责该region server上的compact/split请求具体执行,它的内部定义了如下四个线程池:

  private final ThreadPoolExecutor longCompactions;     //long合并线程池
  private final ThreadPoolExecutor shortCompactions;    //short合并线程池
  private final ThreadPoolExecutor splits;              //split线程池
  private final ThreadPoolExecutor mergePool;           //merge线程池

不同的请求交由不同的线程池来处理,我们关注在compact上,这里区分出long合并和short合并两个不同类型的线程池。此外compactSplitThread中还定义了一个CompactionRunner类型的内部类,该内部类包含了CompactionContext、region、store等成员变量,并实现了Runnable接口,可以看出就是在CompactionRunner中执行了具体的compact操作。

通过对以上两个变量的简单分析,我们可以大概看到compact的概貌:compactionChecker周期性地检查是否有compact请求,如果出现了compact请求,那么将请求以及请求的周边信息一起包装成CompactionContext,将CompactionContext交付给regionserver的compactSplitThread,compactSplitThread会根据compact的类型为它分配合适的线程池,并包装成CompactionRunner交给线程池,线程池调度CompactionRunner,执行它的run方法,完成compact操作。

下面我们从代码入手印证上面的猜想,并就一些细节逐渐展开丰富。下文的讲述分两个角度展开,分别是compact请求的发起和compact动作的执行,在发起机制的讲解中,我们会同时讲述compact文件的筛选原则。

compact请求的发起机制:

compactionChecker周期执行的动作在它的chore方法中定义,chore方法的主要代码如下所示:

protected void chore() {
      for (Region r : this.instance.onlineRegions.values()) {
        if (r == null)
          continue;
        for (Store s : r.getStores()) {
          try {
            long multiplier = s.getCompactionCheckMultiplier();
            assert multiplier > 0;
            if (iteration % multiplier != 0) continue;
            if (s.needsCompaction()) {
              // Queue a compaction. Will recognize if major is needed.
              this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
                  + " requests compaction");
            } else if (s.isMajorCompaction()) {       //如果是major compact
              if (majorCompactPriority == DEFAULT_PRIORITY
                  || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
                this.instance.compactSplitThread.requestCompaction(r, s, getName()
                    + " requests major compaction; use default priority", null);    //发起compact请求
              } else {
                this.instance.compactSplitThread.requestCompaction(r, s, getName()
                    + " requests major compaction; use configured priority",
                  this.majorCompactPriority, null);
              }
            }
          } catch (IOException e) {
            LOG.warn("Failed major compaction check on " + r, e);
          }
        }
      }
      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);    //递增器增1
 }

iteration是一个递增器,只有当iteration达到multiplier的整数倍的时候才会检查该Store是否需要compact,这里还有个细节,对compact request的检查是以store为粒度的。ok,如果需要compact,那么compact请求是如何构造出来的,注意上述代码中的requestCompact方法,requestCompact经过层层调用,最终在compactSplitThread的requestCompactionInternal中完成compact上下文的构造并提交线程池执行,其中的关键代码如下所示:

CompactionContext compaction = null;
if (selectNow) {    //通过hbase shell人为触发的合并,selectNow为true
   compaction = selectCompaction(r, s, priority, request);   //选取待compact文件
   if (compaction == null) return null; // message logged inside
}

// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
   ? longCompactions : shortCompactions;  //划分线程池,系统引发的compact全部划分到short pool
pool.execute(new CompactionRunner(s, r, compaction, pool));

还记得我们前面说过,一个compactSplitThread中compact相关的线程池分成了longCompactions和shortCompactions两种,一个compact请求由哪个线程池来处理不是因为minor或者major来分配的,而是由参与compact的所有storefile的总大小决定的,如果总大小超过了throttle定义的阈值则交与longCompactions处理,反之亦然,其中throttle的定义如下所示:

throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());

其中longCompaction和shortCompaction两个线程池的大小分别有不同的参数控制:

hbase.regionserver.thread.compaction.large:配置longCompactions线程池中线程的数量,默认是1;

hbase.regionserver.thread.compaction.small:配置smallCompactions线程池中线程的数量,默认是1;

回到requestCompactionInternal方法继续分析,究竟哪些storefile会参与此次compact在selectCompaction方法中决定,下面我们进入该方法,分析compact的文件筛选策略。

selectCompaction调用了HStore中的requestCompaction来筛选出需要compact的文件放入到filesCompacting中,我们把requestCompaction中的主要步骤列出在下面:

CompactionContext compaction = storeEngine.createCompaction();
this.lock.readLock().lock();
try {
   synchronized(filesCompaction) {
      //preCoprocessor
      if (!compaction.hasSelection()) {
         try {
             compaction.select(sthi.filesCompaction, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
         } catch (Exception e) {
             .........
         }
      }
      //postCoprocessor
             request.setPriority;
             request.setDescription;
   } finally {
      this.lock.readLock().unlock;
   }
}

StoreEngine中包含了四个成员变量,分别是storeFlusher/compactionPolicy/compactor/storeFileManager,可以看出不同的storeEngine定义了不同的compaction策略,hbase中定义两种storeEngine,分别是defaultCompaction和stripCompaction,其中defaultCompaction定义了两种compact policy,分别是RatioBasedCompactionPolicy和exploringPolicy,其中前者是hbase最基本的compaction文件筛选策略,基本思想就是固定最后一个文件的情况下向前查找第一个文件,直到满足下面的公式:

f[start].size <= ratio * (f[start+1].size + f[start+2].size + ....... + f[end].size)

ratio的默认值是1,该值越大则一次选入compact的文件越多,合并后形成的文件越少,compact过程的IO压力则越大。ratio算法筛选部分代码如下:

 final int countOfFiles = candidates.size();     //使用了一套复杂的滑动窗口算法选出了待合并文件
 long[] fileSizes = new long[countOfFiles];
 long[] sumSize = new long[countOfFiles];
 for (int i = countOfFiles - 1; i >= 0; --i) {
   StoreFile file = candidates.get(i);
   fileSizes[i] = file.getReader().length();
   // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
   int tooFar = i + comConf.getMaxFilesToCompact() - 1;
   sumSize[i] = fileSizes[i]
     + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
     - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
 }


 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
   fileSizes[start] > Math.max(comConf.getMinCompactSize(),
      (long) (sumSize[start + 1] * ratio))) {
   ++start;
 }

而exploring在ratio算法的基础上更进一步,在候选文件列表中划分出多个子队列,然后选出满足以下三个条件的最优子队列:

1、队列中的每个文件都符合ratio准则;

2、1条件下拥有更多的文件数目;

3、1,2条件下拥有更小的文件大小;

还有第三种方法是StripeCompactionPolicy,该方法首先将所有的storefile文件按照不同的依据分层多个stripe,然后在每个stripe内依旧使用exploring算法筛选出满足条件的storefile,不同stripe的ratio可不同,甚至滑动窗口的方向也可以不同。代码的细节在StripeCompactionPolicy.java中,这里不再列出。

compact的实现流程:

讲解了compact的筛选策略之后,回到HStore的requestCompaction方法,该方法将本次compact将要操作的文件,compact请求描述信息等包装成一个CompactionContext对象返回给调用方。调用方将之组装成CompactionRunner放入线程池中运行,在compactionRunner的run方法中会调用HRegion的compact方法完成region的compact动作。

long start = EnvironmentEdgeManager.currentTime();
boolean completed =
      region.compact(compaction, store, compactionThroughputController);    //这开始调用到region级别的compact
long now = EnvironmentEdgeManager.currentTime();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
      this + "; duration=" + StringUtils.formatTimeDiff(now, start));
Hregion上的compact主要包括以下几个步骤:
1、获取读锁;

2、检查region信息,设置状态位并记录log;

3、遍历该region上的store,调用每个store的compact方法;

4、释放读锁;

这里需要注意的是在compact期间,该region是处于读加锁状态的,此时所有试图以写模式对它进行加锁的线程都会被阻塞,换言之,compact期间的region不能响应move/split等类型的写请求。下面分析store中的compact方法,主要步骤列在下面:

public List<StoreFile> compact(CompactionContext compaction,
      CompactionThroughputController throughputController) throws IOException {
    assert compaction != null;
    List<StoreFile> sfs = null;
    CompactionRequest cr = compaction.getRequest();     //首先,从合并上下文中获取合并请求
    try {
      ............
      Collection<StoreFile> filesToCompact = cr.getFiles();
      
      ............
      // Commence the compaction.
      List<Path> newFiles = compaction.compact(throughputController);   //开始compact,newFiles是合并后的新文件
      ............

      // Do the steps necessary to complete the compaction.
      sfs = moveCompatedFilesIntoPlace(cr, newFiles);  //将newFiles移动到新的位置,返回StoreFile列表
      writeCompactionWalRecord(filesToCompact, sfs);   //在WAL中写入Compaction记录
      replaceStoreFiles(filesToCompact, sfs);     //将新生成的StoreFile列表替换到StoreFileManager的storefile中去
      if (cr.isMajor()) {                           //根据compact类型,累加相应计数器
        majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
        majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
      } else {
        compactedCellsCount += getCompactionProgress().totalCompactingKVs;
        compactedCellsSize += getCompactionProgress().totalCompactedSize;
      }
      // At this point the store will use new files for all new scanners. 归档旧文件
      completeCompaction(filesToCompact, true); // Archive old files & update store size.

      logCompactionEndMessage(cr, sfs, compactionStartTime);   //记录日志信息并返回
      return sfs;
    } finally {
      finishCompactionRequest(cr);      //完成compaction请求
    }
}
这里有几个关键步骤要说明一下:

compaction.compact()完成了文件的compact,进入这个函数,可以看到它创建了一个InternalScanner和一个writer,实际上就是使用scanner读取旧文件的数据,并用writer将之写入到新文件,这中间过期的数据和被标记删除的数据会通过scanner过滤掉,最后由newFiles返回新文件。

然后将newFiles移动到新的位置,并返回其包含的StoreFiles列表,同时将本次compact信息写入WAL日志。接着替换StoreFile,其实就是将合并后新生成的文件sfs替换到storeFileManager管理的storefile中,在替换过程中加了写锁。这里的StoreFileManager管理了该store上可对外提供服务的文件列表。

最后一步completeCompaction完成合并动作,主要包括三步,首先遍历已经被合并的文件,关闭其上的Reader;再次调用文件系统删除掉compactedFiles,最后更新store的计数。



www.htsjk.Com true http://www.htsjk.com/hbase/30949.html NewsArticle HBase的compact流程分析, 关于hbase的compact意义及其应用背景已经有很多文章做了描述,这里不再赘述,本文以1.1.2背景的hbase为例,从源码中分析hbase中compact请求发出到接受处理的流程,...
相关文章
    暂无相关文章
评论暂时关闭