HBase的compact流程分析,
关于hbase的compact意义及其应用背景已经有很多文章做了描述,这里不再赘述,本文以1.1.2背景的hbase为例,从源码中分析hbase中compact请求发出到接受处理的流程,限于作者水平有限,分析中可能有疏漏或者错误的地方,希望大家指正,一起学习提高。
从regionserver开始分析compact的流程,regionserver初始化的时候会初始化两个与compact相关的线程分别是compactSplitThread和compactionChecker。其中compactionChecker用于周期性地检查当前是否有compact请求,检查周期由参数threadWakeFrequency控制,默认值是10s,也可以在参数hbase.server.thread.frequency中配置。另一个变量compactSplitThread负责该region server上的compact/split请求具体执行,它的内部定义了如下四个线程池:
private final ThreadPoolExecutor longCompactions; //long合并线程池
private final ThreadPoolExecutor shortCompactions; //short合并线程池
private final ThreadPoolExecutor splits; //split线程池
private final ThreadPoolExecutor mergePool; //merge线程池不同的请求交由不同的线程池来处理,我们关注在compact上,这里区分出long合并和short合并两个不同类型的线程池。此外compactSplitThread中还定义了一个CompactionRunner类型的内部类,该内部类包含了CompactionContext、region、store等成员变量,并实现了Runnable接口,可以看出就是在CompactionRunner中执行了具体的compact操作。
通过对以上两个变量的简单分析,我们可以大概看到compact的概貌:compactionChecker周期性地检查是否有compact请求,如果出现了compact请求,那么将请求以及请求的周边信息一起包装成CompactionContext,将CompactionContext交付给regionserver的compactSplitThread,compactSplitThread会根据compact的类型为它分配合适的线程池,并包装成CompactionRunner交给线程池,线程池调度CompactionRunner,执行它的run方法,完成compact操作。
下面我们从代码入手印证上面的猜想,并就一些细节逐渐展开丰富。下文的讲述分两个角度展开,分别是compact请求的发起和compact动作的执行,在发起机制的讲解中,我们会同时讲述compact文件的筛选原则。
compact请求的发起机制:
compactionChecker周期执行的动作在它的chore方法中定义,chore方法的主要代码如下所示:
protected void chore() {
for (Region r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
for (Store s : r.getStores()) {
try {
long multiplier = s.getCompactionCheckMultiplier();
assert multiplier > 0;
if (iteration % multiplier != 0) continue;
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+ " requests compaction");
} else if (s.isMajorCompaction()) { //如果是major compact
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > ((HRegion)r).getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null); //发起compact请求
} else {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); //递增器增1
}iteration是一个递增器,只有当iteration达到multiplier的整数倍的时候才会检查该Store是否需要compact,这里还有个细节,对compact request的检查是以store为粒度的。ok,如果需要compact,那么compact请求是如何构造出来的,注意上述代码中的requestCompact方法,requestCompact经过层层调用,最终在compactSplitThread的requestCompactionInternal中完成compact上下文的构造并提交线程池执行,其中的关键代码如下所示:
CompactionContext compaction = null;
if (selectNow) { //通过hbase shell人为触发的合并,selectNow为true
compaction = selectCompaction(r, s, priority, request); //选取待compact文件
if (compaction == null) return null; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
? longCompactions : shortCompactions; //划分线程池,系统引发的compact全部划分到short pool
pool.execute(new CompactionRunner(s, r, compaction, pool));
还记得我们前面说过,一个compactSplitThread中compact相关的线程池分成了longCompactions和shortCompactions两种,一个compact请求由哪个线程池来处理不是因为minor或者major来分配的,而是由参与compact的所有storefile的总大小决定的,如果总大小超过了throttle定义的阈值则交与longCompactions处理,反之亦然,其中throttle的定义如下所示:
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());其中longCompaction和shortCompaction两个线程池的大小分别有不同的参数控制:
hbase.regionserver.thread.compaction.large:配置longCompactions线程池中线程的数量,默认是1;
hbase.regionserver.thread.compaction.small:配置smallCompactions线程池中线程的数量,默认是1;
回到requestCompactionInternal方法继续分析,究竟哪些storefile会参与此次compact在selectCompaction方法中决定,下面我们进入该方法,分析compact的文件筛选策略。
selectCompaction调用了HStore中的requestCompaction来筛选出需要compact的文件放入到filesCompacting中,我们把requestCompaction中的主要步骤列出在下面:
CompactionContext compaction = storeEngine.createCompaction();
this.lock.readLock().lock();
try {
synchronized(filesCompaction) {
//preCoprocessor
if (!compaction.hasSelection()) {
try {
compaction.select(sthi.filesCompaction, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (Exception e) {
.........
}
}
//postCoprocessor
request.setPriority;
request.setDescription;
} finally {
this.lock.readLock().unlock;
}
}StoreEngine中包含了四个成员变量,分别是storeFlusher/compactionPolicy/compactor/storeFileManager,可以看出不同的storeEngine定义了不同的compaction策略,hbase中定义两种storeEngine,分别是defaultCompaction和stripCompaction,其中defaultCompaction定义了两种compact policy,分别是RatioBasedCompactionPolicy和exploringPolicy,其中前者是hbase最基本的compaction文件筛选策略,基本思想就是固定最后一个文件的情况下向前查找第一个文件,直到满足下面的公式:
f[start].size <= ratio * (f[start+1].size + f[start+2].size + ....... + f[end].size)
ratio的默认值是1,该值越大则一次选入compact的文件越多,合并后形成的文件越少,compact过程的IO压力则越大。ratio算法筛选部分代码如下:
final int countOfFiles = candidates.size(); //使用了一套复杂的滑动窗口算法选出了待合并文件
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}而exploring在ratio算法的基础上更进一步,在候选文件列表中划分出多个子队列,然后选出满足以下三个条件的最优子队列:
1、队列中的每个文件都符合ratio准则;
2、1条件下拥有更多的文件数目;
3、1,2条件下拥有更小的文件大小;
还有第三种方法是StripeCompactionPolicy,该方法首先将所有的storefile文件按照不同的依据分层多个stripe,然后在每个stripe内依旧使用exploring算法筛选出满足条件的storefile,不同stripe的ratio可不同,甚至滑动窗口的方向也可以不同。代码的细节在StripeCompactionPolicy.java中,这里不再列出。
compact的实现流程:
讲解了compact的筛选策略之后,回到HStore的requestCompaction方法,该方法将本次compact将要操作的文件,compact请求描述信息等包装成一个CompactionContext对象返回给调用方。调用方将之组装成CompactionRunner放入线程池中运行,在compactionRunner的run方法中会调用HRegion的compact方法完成region的compact动作。
long start = EnvironmentEdgeManager.currentTime();
boolean completed =
region.compact(compaction, store, compactionThroughputController); //这开始调用到region级别的compact
long now = EnvironmentEdgeManager.currentTime();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));Hregion上的compact主要包括以下几个步骤:1、获取读锁;
2、检查region信息,设置状态位并记录log;
3、遍历该region上的store,调用每个store的compact方法;
4、释放读锁;
这里需要注意的是在compact期间,该region是处于读加锁状态的,此时所有试图以写模式对它进行加锁的线程都会被阻塞,换言之,compact期间的region不能响应move/split等类型的写请求。下面分析store中的compact方法,主要步骤列在下面:
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest(); //首先,从合并上下文中获取合并请求
try {
............
Collection<StoreFile> filesToCompact = cr.getFiles();
............
// Commence the compaction.
List<Path> newFiles = compaction.compact(throughputController); //开始compact,newFiles是合并后的新文件
............
// Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles); //将newFiles移动到新的位置,返回StoreFile列表
writeCompactionWalRecord(filesToCompact, sfs); //在WAL中写入Compaction记录
replaceStoreFiles(filesToCompact, sfs); //将新生成的StoreFile列表替换到StoreFileManager的storefile中去
if (cr.isMajor()) { //根据compact类型,累加相应计数器
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
} else {
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
// At this point the store will use new files for all new scanners. 归档旧文件
completeCompaction(filesToCompact, true); // Archive old files & update store size.
logCompactionEndMessage(cr, sfs, compactionStartTime); //记录日志信息并返回
return sfs;
} finally {
finishCompactionRequest(cr); //完成compaction请求
}
}这里有几个关键步骤要说明一下:
compaction.compact()完成了文件的compact,进入这个函数,可以看到它创建了一个InternalScanner和一个writer,实际上就是使用scanner读取旧文件的数据,并用writer将之写入到新文件,这中间过期的数据和被标记删除的数据会通过scanner过滤掉,最后由newFiles返回新文件。
然后将newFiles移动到新的位置,并返回其包含的StoreFiles列表,同时将本次compact信息写入WAL日志。接着替换StoreFile,其实就是将合并后新生成的文件sfs替换到storeFileManager管理的storefile中,在替换过程中加了写锁。这里的StoreFileManager管理了该store上可对外提供服务的文件列表。
最后一步completeCompaction完成合并动作,主要包括三步,首先遍历已经被合并的文件,关闭其上的Reader;再次调用文件系统删除掉compactedFiles,最后更新store的计数。