欢迎投稿

今日深度:

Cassandra Compaction代码解读,

Cassandra Compaction代码解读,


Compaction代码解读

对compacttion的解读主要是分为以下几部分:

1、  compaction线程的管理

2、  compaction候选sstable的选取

3、  compaction对sstable的合并。

其中1,3比较固定,应该是不用优化的,仅仅了解即可。

1     compaction线程的管理

1.1  compaction的启动

在cassandra启动后,会在open table 的最后创建cfs,cfs的构造函数会根据具体的compactionstrategy创建相应的对象,让我们看看两种具体的compactionstrategy的构造函数。

public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String,String> options)
    {
        super(cfs, options);
        this.estimatedRemainingTasks = 0;
        String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
        minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue): DEFAULT_MIN_SSTABLE_SIZE;
        cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
        cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
}
 
 
 
public LeveledCompactionStrategy(ColumnFamilyStorecfs, Map<String, String> options)
    {
        super(cfs, options);
        int configuredMaxSSTableSize = 5;
        if (options != null)
        {
            String value = options.containsKey(SSTABLE_SIZE_OPTION)? options.get(SSTABLE_SIZE_OPTION): null;
            if (null != value)
            {
                try
                {
                    configuredMaxSSTableSize =Integer.parseInt(value);
                }
                catch (NumberFormatException ex)
                {
                    logger.warn(String.format("%s is not a parsable int (base10) for %s usingdefault value",
                                             value, SSTABLE_SIZE_OPTION));
                }
            }
        }
        maxSSTableSizeInMB =configuredMaxSSTableSize;
 
        cfs.getDataTracker().subscribe(this);
        logger.debug("{} subscribed to the data tracker.",this);
 
        manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB);
        logger.debug("Created {}", manifest);
        // override min/max for this strategy
       cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
        cfs.setMinimumCompactionThreshold(1);
    }
 


 

关键是这个:反正很简单,就不说明了。

protectedAbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>options)
    {
       assertcfs != null;
       this.cfs = cfs;
       this.options = options;
 
        // start compactions in fiveminutes (if no flushes have occurred by then to do so)
       Runnable runnable = new Runnable()
       {
           public void run()
           {
                if (CompactionManager.instance.getActiveCompactions()== 0)
                {
                    CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);
                }
           }
       };
       StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);// optionalTasks是个线程库
}


从以上三段代码,可以看出,cassandra启动后就会创建compactionstrategy的线程,如果在5分钟内没有flush就会启动。

CompactionManager是对正在进行compaction线程的管理,compaction一般来说是单线程的(对一个cf来说)。牵涉到同步啊,锁啊,以及线程池。

下面看下主要的代码:CompactionManager.instance.submitBackground(AbstractCompactionStrategy.this.cfs);

 

/**
    * Call this whenever a compaction might be needed on the given columnfamily.
    * It's okay to over-call (within reason) since the compactions aresingle-threaded,
    * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
    */
    public Future<Integer>submitBackground(finalColumnFamilyStore cfs)
    {
       logger.debug("Scheduling a background task check for {}.{} with{}",
                     new Object[] {cfs.table.name,
                                   cfs.columnFamily,
                                  cfs.getCompactionStrategy().getClass().getSimpleName()});
       Callable<Integer> callable = new Callable<Integer>()
       {
           publicInteger call() throwsIOException
           {
               compactionLock.readLock().lock();
                try
                {
                    logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays fromthat if any
                    if (!cfs.isValid())
                    {
                        logger.debug("Aborting compaction for dropped CF");
                        return 0;
                    }
 
                    boolean taskExecuted = false;
                    AbstractCompactionStrategystrategy = cfs.getCompactionStrategy();
                   List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));//选取候选compaction的sstable的过程。不解释了。
                    logger.debug("{} minor compaction tasks available",tasks.size());
                    for (AbstractCompactionTask task : tasks)
                    {
                        if(!task.markSSTablesForCompaction())//mark过程
                        {
                            logger.debug("Skipping {}; sstables are busy", task);
                            continue;
                        }
 
                        taskExecuted = true;
                        try
                        {
                            task.execute(executor);//主要的combine过程
                        }
                        finally
                        {
                           task.unmarkSSTables();
                        }
                    }
 
                    // newly created sstables might have made other compactionseligible
                    if (taskExecuted)
                        submitBackground(cfs);
                }
                finally
                {
                    compactionLock.readLock().unlock();
                }
                return 0;
           }
       };
       return executor.submit(callable);
    }


从上面这段代码可以看出compaction的过程是单线程的。

2 compaction候选sstable的选取

2.1 SizeTieredCompactionStrategy

由于该策略中有min/maxCompactionThreshold,因此在选取sstable时候,如果大小相似的sstable的数量不到min则构不成一个task,但一个task最多不超过max个sstable。

在这个策略中最重要的是如何找到所有大小相似的sstable列表。

看下面代码:

 

  

 /*
    * Group files of similar size into buckets.
    */
    static <T> List<List<T>>getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
    {
       // Sort the list in order to getdeterministic results during the grouping below
       List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
       Collections.sort(sortedFiles, newComparator<Pair<T, Long>>()
       {
           public int compare(Pair<T, Long> p1, Pair<T,Long> p2)
           {
                return p1.right.compareTo(p2.right);
           }
       });
       Map<Long, List<T>> buckets = newHashMap<Long, List<T>>();
       outer:
       for (Pair<T, Long> pair: sortedFiles)
       {
           long size = pair.right;
           // look for a bucket containingsimilar-sized files:
           // group in the same bucket if it's w/in50% of the average for this bucket,
           // or this file and the bucket are allconsidered "small" (less than `minSSTableSize`)
           for (Entry<Long, List<T>> entry :buckets.entrySet())//不是已经sorted了吗?为什么还要这样?效率不高吧。其实消耗时间的不在这里。
           {
                List<T> bucket =entry.getValue();
               long oldAverageSize = entry.getKey();
                if ((size> (oldAverageSize / 2) && size < (3 * oldAverageSize) / 2)
                    || (size <minSSTableSize && oldAverageSize < minSSTableSize))//关键是这里,用来判断大小是否相似。
                {
                    // remove and re-add under new new average size
                   buckets.remove(oldAverageSize);
                    longtotalSize = bucket.size() * oldAverageSize;
                    longnewAverageSize = (totalSize + size) / (bucket.size() + 1);
                    bucket.add(pair.left);
                    buckets.put(newAverageSize,bucket);
                    continueouter;
                }
           }
           // no similar bucket found; put it in a newone
           ArrayList<T> bucket = new ArrayList<T>();
           bucket.add(pair.left);
           buckets.put(size, bucket);
       }
       return newArrayList<List<T>>(buckets.values());
}

上面代码最重要的部分是找出相似大小的sstable列表。找到后封装成数据结构List<List<SSTableReader>>buckets.对每个list最多构建一个tasks。而且只有在所有现有的tasks全部执行完才会重新构建tasks。

2.2LeveledCompactionStrategy

这种策略不同前面,没有Threhold。在该策略中所有的sstable都有相似的大小,而且最重要的是在同一level各个sstable的key是不相交的(除了L0)。先说一般情况(L0除外),当把Li的sstable compact到Li+1时,Li的选择是从高层到低层的,详细下面叙述。Li层sstable的选取根据lastCompactedKeys[i]的(sstable.first.compareTo(lastCompactedKeys[level]) > 0),Li+1层参与compaction的sstable的选取:

return overlapping(sstable,generations[(level + 1)]);可见在Li上仅有一个sstable参与,Li+1上所有与它key重叠的sstable都要参与。对于L0到L1的compaction,与上面不同的是所有L0 sstable key重叠的也要参与compaction(当然这个并不是十分必要)。

public synchronizedCollection<SSTableReader> getCompactionCandidates()
    {
       // LevelDB gives each level a score of howmuch data it contains vs its ideal amount, and
       // compacts the level with the highestscore. But this falls apart spectacularly once you
       // get behind.  Consider this set of levels:
       // L0: 988 [ideal: 4]
       // L1: 117 [ideal: 10]
       // L2: 12 [ideal: 100]
       //
       // The problem is that L0 has a much higherscore (almost 250) than L1 (11), so what we'll
       // do is compact a batch ofMAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
       // result (say, 120 sstables) in L1.Then we'll compact the next batch of MAX_COMPACTING_L0,
       // and so forth.  So we spend most of our i/o rewriting the L1data with each batch.
       //
       // If we could just do *all* L0 a singletime with L1, that would be ideal.  Butwe can't
       // -- see the javadoc forMAX_COMPACTING_L0.
       //
       // LevelDB's way around this is to simplyblock writes if L0 compaction falls behind.
       // We don't have that luxury.
       //
       // So instead, we force compacting higherlevels first.  This may not minimize thenumber
       // of reads done as quickly in the shortterm, but it minimizes the i/o needed to compact
       // optimially which gives us a longterm win.
       for(inti = generations.length - 1; i >= 0; i--)
       {
           List<SSTableReader>sstables = generations[i];
           if(sstables.isEmpty())
                continue; // mostly this just avoids polluting the debug log with zero scores
           doublescore = (double)SSTableReader.getTotalBytes(sstables)/ (double)maxBytesForLevel(i);
           logger.debug("Compaction score for level {} is {}", i, score);
 
           // L0 gets a special case that if we don'thave anything more important to do,
           // we'll go ahead and compact even just onesstable
           if(score > 1.001 || i == 0)
           {
                Collection<SSTableReader>candidates = getCandidatesFor(i);
                if (logger.isDebugEnabled())
                    logger.debug("Compaction candidates for L{} are {}",i, toString(candidates));
                return candidates;
           }
       }
 
       returnCollections.emptyList();
}


上面这段代码是选择从那层开始compaction,代码讲解很清楚,不叙述了。

该策略每次只会构建一个task。

 

3 tocompact sstable的合并

这节的内容由于用到了大量的模板类以及继承的知识,使得合并的过程十分难懂。最基本要知道大体的过程。

/**
     * For internal use and testing only.  The rest of the system should go through the submit* methods,
     * which are properly serialized.
     * Caller is in charge of marking/unmarking the sstables as compacting.
     */
    public int execute(CompactionExecutorStatsCollector collector) throws IOException
    {
        // The collection of sstables passed may be empty (but not null); even if
        // it is not empty, it may compact down to nothing if all rows are deleted.
        assert sstables != null;

        Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
        if (!isCompactionInteresting(toCompact))
            return 0;

        if (compactionFileLocation == null)
            compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));

        if (compactionFileLocation == null && partialCompactionsAcceptable())
        {
            // If the compaction file path is null that means we have no space left for this compaction.
            // Try again w/o the largest one.
            while (compactionFileLocation == null && toCompact.size() > 1)
            {
                logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
                // Note that we have removed files that are still marked as compacting.
                // This suboptimal but ok since the caller will unmark all the sstables at the end.
                toCompact.remove(cfs.getMaxSizeFile(toCompact));
                compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
            }
        }
        if (compactionFileLocation == null)
        {
            logger.warn("insufficient space to compact; aborting compaction");
            return 0;
        }

        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
            cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

        // sanity check: all sstables must belong to the same cfs
        for (SSTableReader sstable : toCompact)
            assert sstable.descriptor.cfname.equals(cfs.columnFamily);

        CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);//
        // new sstables from flush can be added during a compaction, but only the compaction can remove them,
        // so in our single-threaded compaction world this is a valid way of determining if we're compacting
        // all the sstables (that existed when we started)
        logger.info("Compacting {}", toCompact);

        long startTime = System.currentTimeMillis();
        long totalkeysWritten = 0;

        long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
        if (logger.isDebugEnabled())
            logger.debug("Expected bloom filter size : " + keysPerSSTable);

        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
                                      ? new ParallelCompactionIterable(compactionType, toCompact, controller)
                                      : new CompactionIterable(compactionType, toCompact, controller);//先看单线程的
        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescanner out=AbstractCompactedRow> 
        Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
        Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
        // replace the old entries.  Track entries to preheat here until then.
        Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap =  new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

        if (collector != null)
            collector.beginCompaction(ci);
        try
        {
            if (!nni.hasNext())
            {
                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                // we need to sync it (via closeAndOpen) first, so there is no period during which
                // a crash could cause data loss.
                cfs.markCompacted(toCompact, compactionType);
                return 0;
            }

            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
            writers.add(writer);
            while (nni.hasNext())
            {
                AbstractCompactedRow row = nni.next();
                if (row.isEmpty())
                    continue;

                long position = writer.append(row);
                totalkeysWritten++;

                if (DatabaseDescriptor.getPreheatKeyCache())
                {
                    for (SSTableReader sstable : toCompact)
                    {
                        if (sstable.getCachedPosition(row.key, false) != null)
                        {
                            cachedKeys.put(row.key, position);
                            break;
                        }
                    }
                }
                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
                {
                    SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                    cachedKeyMap.put(toIndex, cachedKeys);
                    sstables.add(toIndex);
                    if (nni.hasNext())
                    {
                        writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
                        writers.add(writer);
                        cachedKeys = new HashMap<DecoratedKey, Long>();
                    }
                }
            }
        }
        catch (Exception e)
        {
            for (SSTableWriter writer : writers)
                writer.abort();
            throw FBUtilities.unchecked(e);
        }
        finally
        {
            iter.close();
            if (collector != null)
                collector.finishCompaction(ci);
        }

        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
        for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
        {
            SSTableReader key = ssTableReaderMapEntry.getKey();
            for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
               key.cacheKey(entry.getKey(), entry.getValue());
        }

        long dTime = System.currentTimeMillis() - startTime;
        long startsize = SSTable.getTotalBytes(toCompact);
        long endsize = SSTable.getTotalBytes(sstables);
        double ratio = (double)endsize / (double)startsize;

        StringBuilder builder = new StringBuilder();
        builder.append("[");
        for (SSTableReader reader : sstables)
            builder.append(reader.getFilename()).append(",");
        builder.append("]");

        double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s.  Time: %,dms.",
                                  builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
        return toCompact.size();
}



这段代码来自CompactionTask,task正如第二节所叙述的,主要是保存了tocompact的sstables。这段代码有三个比较重要的点,1、选择directory;2、整个合并过程;3、合并后的写入过程。

directory选择data目录中剩余空间最多的directory,比较简单,不单独叙述。

3.1合并过程

真正的合并过程是以下几段代码:

AbstractCompactionIterable ci =DatabaseDescriptor.isMultithreadedCompaction()

                                      ? new ParallelCompactionIterable(compactionType,toCompact, controller)

                                      : new CompactionIterable (compactionType,toCompact, controller);//先看单线程的

      //主要的作用是

       CloseableIterator<AbstractCompactedRow> iter = ci.iterator();//ManyToOne<in=sstablescannerout=AbstractCompactedRow>

       Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,Predicates.notNull());//真正的合并

这几段代码比较难理解,虽然只有三行,缺是整个的合并过程。

需要综合CompactionIterable.java、Reducer、MergeIterator、ManyToOne、SstableReader、SstableScanner几个类来理解。而且要知道父类子类之间的关系。

public static <T>Iterator<T> filter(final Iterator<T> unfiltered, finalPredicate<? super T> predicate) { checkNotNull(unfiltered);checkNotNull(predicate);
 return newAbstractIterator<T>() {
@Override protected T computeNext() {
while (unfiltered.hasNext()) {
T element = unfiltered.next();
if (predicate.apply(element)) {
return element;
 }
 }
 endOfData();
return null; } };}


要理解这段代码,首先要知道 com.google.common.collect这个jar包,这个jar包封装了很多有用的容器。其中AbstractIterator就在其中。iter是ManyToOne,继承了MergeIterator,后者又继承了AbstractIterator,通过对源码的阅读,unfiltered.hasNext(),会调用computeNext();根据父类子类的关系,显然执行的是子类的实现。即 ManyToOne.computeNext();这样一切都易懂了。    

大体的过程是把所有的SstableScanner加入一个PriorityQueue里面(依据首个Row key来排序),然后从queue中取SstableScanner得当前的row进行合并(CF.add()),直到key不相同为止。然后把取出的SstableScanner再次加入queue(位置已经指向下一个row)。

直到queue为空位置,这是合并就完成了:下面是关键的代码:

protected final Out consume()
        {
            reducer.onKeyChange();
            Candidate<In> candidate = queue.peek();
            if (candidate == null)
                return endOfData();
            do
            {
                candidate = queue.poll();
                candidates.push(candidate);
                reducer.reduce(candidate.item);//合并
            }
            while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);//合并的过程,注意循环的判断条件。compareTo关键是看这里的comp。key.
            return reducer.getReduced();
        }

        /** Advance and re-enqueue all items we consumed in the last iteration. */// andidate是SstableScanner,它是对sstable的解析,是一个个row。而candidate.advance()顾名思义,就是取出了一个个row、
        protected final void advance()
        {//加入优先序列,排序自然是根据candidate的comp,queue中的内容是非空的。
            Candidate<In> candidate;
            while ((candidate = candidates.pollFirst()) != null)
                if (candidate.advance())//直到取完了。这个sstable就完成了合并了。因此comp应该是按照排头的key来进行的。
                    queue.add(candidate);
        }
}

上面代码中candidate=SstableScanner。candidate.advance()是把SstableScanner的迭代器移到下一个row。

3.2 flush

对于SizeTieredCompactionStrategy,合并后成为一个sstable,写入时,没有sstable大小的限制。LeveledCompactionStrategy则有sstable大小的限制,当sstable大小超过限制时,会创建一个新的sstable,接着写入。

4 总结

几个比较关键的类:

CompactionManager是整个compaction的管理者,它把其他的几个类联系在一起,共同完成compaction的过程,其中更是包括对compaction个个task的管理。

CompactionStrategy,应该说是有两个,它是Compaction线程实体,并提供了创建task的方法。

CompactionTask也是又两种,它承载了需要compaction的sstable。而且提供合并写入过程的方法。是compaction的实际执行。

CompactedRow两种不一样,可以理解为存储合并后Row的数据结构。

要想充分理解Compaction过程,还需要对SStable SStableReader SStableScanner SStableWriter的理解。





www.htsjk.Com true http://www.htsjk.com/cassandra/31591.html NewsArticle Cassandra Compaction代码解读, Compaction代码解读 对compacttion的解读主要是分为以下几部分: 1、  compaction线程的管理 2、  compaction候选sstable的选取 3、  compaction对sstable的合并。 其中1,3比较...
相关文章
    暂无相关文章
评论暂时关闭