Cassandra Compaction代码解读,
Compaction代码解读
对compacttion的解读主要是分为以下几部分:
1、 compaction线程的管理
2、 compaction候选sstable的选取
3、 compaction对sstable的合并。
其中1,3比较固定,应该是不用优化的,仅仅了解即可。
1 compaction线程的管理
1.1 compaction的启动
在cassandra启动后,会在open table 的最后创建cfs,cfs的构造函数会根据具体的compactionstrategy创建相应的对象,让我们看看两种具体的compactionstrategy的构造函数。
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String,String> options)
{
super(cfs, options);
this.estimatedRemainingTasks = 0;
String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue): DEFAULT_MIN_SSTABLE_SIZE;
cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
}
public LeveledCompactionStrategy(ColumnFamilyStorecfs, Map<String, String> options)
{
super(cfs, options);
int configuredMaxSSTableSize = 5;
if (options != null)
{
String value = options.containsKey(SSTABLE_SIZE_OPTION)? options.get(SSTABLE_SIZE_OPTION): null;
if (null != value)
{
try
{
configuredMaxSSTableSize =Integer.parseInt(value);
}
catch (NumberFormatException ex)
{
logger.warn(String.format("%s is not a parsable int (base10) for %s usingdefault value",
value, SSTABLE_SIZE_OPTION));
}
}
}
maxSSTableSizeInMB =configuredMaxSSTableSize;
cfs.getDataTracker().subscribe(this);
logger.debug("{} subscribed to the data tracker.",this);
manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB);
logger.debug("Created {}", manifest);
// override min/max for this strategy
cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
cfs.setMinimumCompactionThreshold(1);
}
关键是这个:反正很简单,就不说明了。
protectedAbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>options)
{
assertcfs != null;
this.cfs = cfs;
this.options = options;
// start compactions in fiveminutes (if no flushes have occurred by then to do so)
Runnable runnable = new Runnable()
{
public void run()
{
if (CompactionManager.instance.getActiveCompactions()== 0)
{
CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
}
}
};
StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);// optionalTasks是个线程库
}从以上三段代码,可以看出,cassandra启动后就会创建compactionstrategy的线程,如果在5分钟内没有flush就会启动。
CompactionManager是对正在进行compaction线程的管理,compaction一般来说是单线程的(对一个cf来说)。牵涉到同步啊,锁啊,以及线程池。
下面看下主要的代码:CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions aresingle-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
public Future<Integer>submitBackground(finalColumnFamilyStore cfs)
{
logger.debug("Scheduling a background task check for {}.{} with{}",
new Object[] {cfs.table.name,
cfs.columnFamily,
cfs.getCompactionStrategy().getClass().getSimpleName()});
Callable<Integer> callable = new Callable<Integer>()
{
publicInteger call() throwsIOException
{
compactionLock.readLock().lock();
try
{
logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays fromthat if any
if (!cfs.isValid())
{
logger.debug("Aborting compaction for dropped CF");
return 0;
}
boolean taskExecuted = false;
AbstractCompactionStrategystrategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));//选取候选compaction的sstable的过程。不解释了。
logger.debug("{} minor compaction tasks available",tasks.size());
for (AbstractCompactionTask task : tasks)
{
if(!task.markSSTablesForCompaction())//mark过程
{
logger.debug("Skipping {}; sstables are busy", task);
continue;
}
taskExecuted = true;
try
{
task.execute(executor);//主要的combine过程
}
finally
{
task.unmarkSSTables();
}
}
// newly created sstables might have made other compactionseligible
if (taskExecuted)
submitBackground(cfs);
}
finally
{
compactionLock.readLock().unlock();
}
return 0;
}
};
return executor.submit(callable);
}从上面这段代码可以看出compaction的过程是单线程的。
2 compaction候选sstable的选取
2.1 SizeTieredCompactionStrategy
由于该策略中有min/maxCompactionThreshold,因此在选取sstable时候,如果大小相似的sstable的数量不到min则构不成一个task,但一个task最多不超过max个sstable。
在这个策略中最重要的是如何找到所有大小相似的sstable列表。
看下面代码:
/*
* Group files of similar size into buckets.
*/
static <T> List<List<T>>getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
{
// Sort the list in order to getdeterministic results during the grouping below
List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
Collections.sort(sortedFiles, newComparator<Pair<T, Long>>()
{
public int compare(Pair<T, Long> p1, Pair<T,Long> p2)
{
return p1.right.compareTo(p2.right);
}
});
Map<Long, List<T>> buckets = newHashMap<Long, List<T>>();
outer:
for (Pair<T, Long> pair: sortedFiles)
{
long size = pair.right;
// look for a bucket containingsimilar-sized files:
// group in the same bucket if it's w/in50% of the average for this bucket,
// or this file and the bucket are allconsidered "small" (less than `minSSTableSize`)
for (Entry<Long, List<T>> entry :buckets.entrySet())//不是已经sorted了吗?为什么还要这样?效率不高吧。其实消耗时间的不在这里。
{
List<T> bucket =entry.getValue();
long oldAverageSize = entry.getKey();
if ((size> (oldAverageSize / 2) && size < (3 * oldAverageSize) / 2)
|| (size <minSSTableSize && oldAverageSize < minSSTableSize))//关键是这里,用来判断大小是否相似。
{
// remove and re-add under new new average size
buckets.remove(oldAverageSize);
longtotalSize = bucket.size() * oldAverageSize;
longnewAverageSize = (totalSize + size) / (bucket.size() + 1);
bucket.add(pair.left);
buckets.put(newAverageSize,bucket);
continueouter;
}
}
// no similar bucket found; put it in a newone
ArrayList<T> bucket = new ArrayList<T>();
bucket.add(pair.left);
buckets.put(size, bucket);
}
return newArrayList<List<T>>(buckets.values());
}上面代码最重要的部分是找出相似大小的sstable列表。找到后封装成数据结构List<List<SSTableReader>>buckets.对每个list最多构建一个tasks。而且只有在所有现有的tasks全部执行完才会重新构建tasks。
2.2LeveledCompactionStrategy
这种策略不同前面,没有Threhold。在该策略中所有的sstable都有相似的大小,而且最重要的是在同一level各个sstable的key是不相交的(除了L0)。先说一般情况(L0除外),当把Li的sstable compact到Li+1时,Li的选择是从高层到低层的,详细下面叙述。Li层sstable的选取根据lastCompactedKeys[i]的(sstable.first.compareTo(lastCompactedKeys[level]) > 0),Li+1层参与compaction的sstable的选取:
return overlapping(sstable,generations[(level + 1)]);可见在Li上仅有一个sstable参与,Li+1上所有与它key重叠的sstable都要参与。对于L0到L1的compaction,与上面不同的是所有L0 sstable key重叠的也要参与compaction(当然这个并不是十分必要)。
public synchronizedCollection<SSTableReader> getCompactionCandidates()
{
// LevelDB gives each level a score of howmuch data it contains vs its ideal amount, and
// compacts the level with the highestscore. But this falls apart spectacularly once you
// get behind. Consider this set of levels:
// L0: 988 [ideal: 4]
// L1: 117 [ideal: 10]
// L2: 12 [ideal: 100]
//
// The problem is that L0 has a much higherscore (almost 250) than L1 (11), so what we'll
// do is compact a batch ofMAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
// result (say, 120 sstables) in L1.Then we'll compact the next batch of MAX_COMPACTING_L0,
// and so forth. So we spend most of our i/o rewriting the L1data with each batch.
//
// If we could just do *all* L0 a singletime with L1, that would be ideal. Butwe can't
// -- see the javadoc forMAX_COMPACTING_L0.
//
// LevelDB's way around this is to simplyblock writes if L0 compaction falls behind.
// We don't have that luxury.
//
// So instead, we force compacting higherlevels first. This may not minimize thenumber
// of reads done as quickly in the shortterm, but it minimizes the i/o needed to compact
// optimially which gives us a longterm win.
for(inti = generations.length - 1; i >= 0; i--)
{
List<SSTableReader>sstables = generations[i];
if(sstables.isEmpty())
continue; // mostly this just avoids polluting the debug log with zero scores
doublescore = (double)SSTableReader.getTotalBytes(sstables)/ (double)maxBytesForLevel(i);
logger.debug("Compaction score for level {} is {}", i, score);
// L0 gets a special case that if we don'thave anything more important to do,
// we'll go ahead and compact even just onesstable
if(score > 1.001 || i == 0)
{
Collection<SSTableReader>candidates = getCandidatesFor(i);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}",i, toString(candidates));
return candidates;
}
}
returnCollections.emptyList();
}上面这段代码是选择从那层开始compaction,代码讲解很清楚,不叙述了。
该策略每次只会构建一个task。
3 tocompact sstable的合并
这节的内容由于用到了大量的模板类以及继承的知识,使得合并的过程十分难懂。最基本要知道大体的过程。
/**
* For internal use and testing only. The rest of the system should go through the submit* methods,
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;
Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;
if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (compactionFileLocation == null && partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}
if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact; aborting compaction");
return 0;
}
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);//
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(compactionType, toCompact, controller)
: new CompactionIterable(compactionType, toCompact, controller);//先看单线程的
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescanner out=AbstractCompactedRow>
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();
Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact, compactionType);
return 0;
}
SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;
long position = writer.append(row);
totalkeysWritten++;
if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}
cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}
long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;
StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");
double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}
这段代码来自CompactionTask,task正如第二节所叙述的,主要是保存了tocompact的sstables。这段代码有三个比较重要的点,1、选择directory;2、整个合并过程;3、合并后的写入过程。
directory选择data目录中剩余空间最多的directory,比较简单,不单独叙述。
3.1合并过程
真正的合并过程是以下几段代码:
AbstractCompactionIterable ci =DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(compactionType,toCompact, controller)
: new CompactionIterable (compactionType,toCompact, controller);//先看单线程的
//主要的作用是
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescannerout=AbstractCompactedRow>
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,Predicates.notNull());//真正的合并
这几段代码比较难理解,虽然只有三行,缺是整个的合并过程。
需要综合CompactionIterable.java、Reducer、MergeIterator、ManyToOne、SstableReader、SstableScanner几个类来理解。而且要知道父类子类之间的关系。
public static <T>Iterator<T> filter(final Iterator<T> unfiltered, finalPredicate<? super T> predicate) { checkNotNull(unfiltered);checkNotNull(predicate);
return newAbstractIterator<T>() {
@Override protected T computeNext() {
while (unfiltered.hasNext()) {
T element = unfiltered.next();
if (predicate.apply(element)) {
return element;
}
}
endOfData();
return null; } };}要理解这段代码,首先要知道 com.google.common.collect这个jar包,这个jar包封装了很多有用的容器。其中AbstractIterator就在其中。iter是ManyToOne,继承了MergeIterator,后者又继承了AbstractIterator,通过对源码的阅读,unfiltered.hasNext(),会调用computeNext();根据父类子类的关系,显然执行的是子类的实现。即 ManyToOne.computeNext();这样一切都易懂了。
大体的过程是把所有的SstableScanner加入一个PriorityQueue里面(依据首个Row key来排序),然后从queue中取SstableScanner得当前的row进行合并(CF.add()),直到key不相同为止。然后把取出的SstableScanner再次加入queue(位置已经指向下一个row)。
直到queue为空位置,这是合并就完成了:下面是关键的代码:
protected final Out consume()
{
reducer.onKeyChange();
Candidate<In> candidate = queue.peek();
if (candidate == null)
return endOfData();
do
{
candidate = queue.poll();
candidates.push(candidate);
reducer.reduce(candidate.item);//合并
}
while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);//合并的过程,注意循环的判断条件。compareTo关键是看这里的comp。key.
return reducer.getReduced();
}
/** Advance and re-enqueue all items we consumed in the last iteration. */// andidate是SstableScanner,它是对sstable的解析,是一个个row。而candidate.advance()顾名思义,就是取出了一个个row、
protected final void advance()
{//加入优先序列,排序自然是根据candidate的comp,queue中的内容是非空的。
Candidate<In> candidate;
while ((candidate = candidates.pollFirst()) != null)
if (candidate.advance())//直到取完了。这个sstable就完成了合并了。因此comp应该是按照排头的key来进行的。
queue.add(candidate);
}
}
上面代码中candidate=SstableScanner。candidate.advance()是把SstableScanner的迭代器移到下一个row。
3.2 flush
对于SizeTieredCompactionStrategy,合并后成为一个sstable,写入时,没有sstable大小的限制。LeveledCompactionStrategy则有sstable大小的限制,当sstable大小超过限制时,会创建一个新的sstable,接着写入。
4 总结
几个比较关键的类:
CompactionManager是整个compaction的管理者,它把其他的几个类联系在一起,共同完成compaction的过程,其中更是包括对compaction个个task的管理。
CompactionStrategy,应该说是有两个,它是Compaction线程实体,并提供了创建task的方法。
CompactionTask也是又两种,它承载了需要compaction的sstable。而且提供合并写入过程的方法。是compaction的实际执行。
CompactedRow两种不一样,可以理解为存储合并后Row的数据结构。
要想充分理解Compaction过程,还需要对SStable SStableReader SStableScanner SStableWriter的理解。