欢迎投稿

今日深度:

elasticsearch索引建立过程,elasticsearch索引

elasticsearch索引建立过程,elasticsearch索引


开篇

 怀着佛系心态写的文章,因为发现心急依然看不懂代码,所以只能安慰自己佛系一点,这篇文章希望能够把elasticsearch的index的创建过程讲清楚(包括index但不包括doc的添加过程),希望能够有帮助。


ES的Meta的组成

 Meta是用来描述数据的数据。在ES中,Index的mapping结构、配置、持久化状态等就属于meta数据,集群的一些配置信息也属于meta。ES的Meta信息可以简单的理解为包括ClusterState、MetaData、IndexMetaData。

ClusterState
 集群中的每个节点都会在内存中维护一个当前的ClusterState,表示当前集群的各种状态。ClusterState中包含一个MetaData的结构,MetaData中存储的内容更符合meta的特征,而且需要持久化的信息都在MetaData中,此外的一些变量可以认为是一些临时状态,是集群运行中动态构建出来的。

public class ClusterState implements ToXContentFragment, Diffable<ClusterState> {
    public static final String UNKNOWN_UUID = "_na_";
    public static final long UNKNOWN_VERSION = -1;
    private final long version;
    private final String stateUUID;
    private final RoutingTable routingTable;
    private final DiscoveryNodes nodes;
    private final MetaData metaData;
    private final ClusterBlocks blocks;
    private final ImmutableOpenMap<String, Custom> customs;
    private final ClusterName clusterName;
    private final boolean wasReadFromDiff;
}



MetaData
 MetaData主要是集群的一些配置,集群所有Index的Meta,所有Template的Meta,所有custom的Meta信息。

public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, ToXContentFragment {
    private final String clusterUUID;
    private final long version;
    private final Settings transientSettings;
    private final Settings persistentSettings;
    private final Settings settings;
    private final ImmutableOpenMap<String, IndexMetaData> indices;
    private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;
    private final ImmutableOpenMap<String, Custom> customs;
    private final transient int totalNumberOfShards; 
    private final int numberOfShards;
    private final String[] allIndices;
    private final String[] allOpenIndices;
    private final String[] allClosedIndices;
    private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;



IndexMetaData
 IndexMetaData指具体某个Index的Meta,比如Index的shard、replica、mappings等

public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragment {
    private final int routingNumShards;
    private final int routingFactor;
    private final int routingPartitionSize;
    private final int numberOfShards;
    private final int numberOfReplicas;
    private final Index index;
    private final long version;
    private final long[] primaryTerms;
    private final State state;
    private final ImmutableOpenMap<String, AliasMetaData> aliases;
    private final Settings settings;
    private final ImmutableOpenMap<String, MappingMetaData> mappings;
    private final ImmutableOpenMap<String, Custom> customs;
    private final ImmutableOpenIntMap<Set<String>> inSyncAllocationIds;
    private final transient int totalNumberOfShards;
    private final DiscoveryNodeFilters requireFilters;
    private final DiscoveryNodeFilters includeFilters;
    private final DiscoveryNodeFilters excludeFilters;
    private final DiscoveryNodeFilters initialRecoveryFilters;
    private final Version indexCreatedVersion;
    private final Version indexUpgradedVersion;
    private final ActiveShardCount waitForActiveShards;
}


索引建立过程

阶段一:校验参数阶段

  • 校验当前ClusterState的currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。

阶段二:配置合并阶段

  • 合并template和request传入的mapping、customs 数据,优先级上request配置优先于template。
  • 合并template和request的setting,优先级上request配置优先于template。

阶段三:构建IndexSettings阶段

  • 构建Settings.Builder indexSettingsBuilder对象,合并templates、request的数据,辅以默认配置值。

阶段四:构建IndexMetaData阶段

  • 构建IndexMetaData.Builder的tmpImdBuilder对象并绑定indexSettingsBuilder生成的actualIndexSettings。
  • 通过tmpImdBuilder.build()构建构建IndexMetaData tmpImd对象。

阶段五:构建IndexService阶段

  • 临时构建indexService对象,IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList())。

阶段六:获取Index和Map阶段

  • 获取新创建的index和mapping,Index createdIndex = indexService.index()和MapperService mapperService = indexService.mapperService()。

阶段七:更新mapping到mapperService阶段

  • mapperService.merge()合并request和template合并后生成的最新的mappings。
  • 生成Map mappingsMetaData,key是创建mapping的指定的type。

阶段八:构建IndexMetaData阶段

  • 构建IndexMetaData.Builder indexMetaDataBuilder的indexMetaDataBuilder对象并绑定actualIndexSettings和routingNumShards。
  • indexMetaDataBuilder通过primaryTerm设置primaryTerm、putMapping设置mappingMd,putAlias设置aliasMetaData(template和request请求),putCustom设置customIndexMetaData,indexMetaDataBuilder.state设置state。
  • 创建indexMetaData对象,通过indexMetaData = indexMetaDataBuilder.build()实现。

阶段九:更新IndexMetaData阶段

  • 将IndexMetaData对象更新到MetaData对象中并返回最新的MetaData, MetaData newMetaData = MetaData.builder(currentState.metaData()).put(indexMetaData, false).build()。

阶段十:更新MetaData阶段

  • 更新最新的MetaData对象newMetaData对象到ClusterState,ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build()。
public class MetaDataCreateIndexService extends AbstractComponent {

    static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse> {

        @Override
        /**
         * es当前集群的状态ClusterState
         */
        public ClusterState execute(ClusterState currentState) throws Exception {
            Index createdIndex = null;
            String removalExtraInfo = null;
            IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
            try {
                // 以下是校验ClusterState中是否存在index索引。
                // 校验当前currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。
                validator.validate(request, currentState);

                // 校验是否已存在别名
                for (Alias alias : request.aliases()) {
                    aliasValidator.validateAlias(alias, request.index(), currentState.metaData());
                }

                // 以下过程是合并request和template的配置信息
                // 从当前的ClusterState查询是否存在别名
                List<IndexTemplateMetaData> templates = findTemplates(request, currentState);

                // 用来保存请求带过来的mapping
                Map<String, Custom> customs = new HashMap<>();
                Map<String, Map<String, Object>> mappings = new HashMap<>();
                Map<String, AliasMetaData> templatesAliases = new HashMap<>();
                List<String> templateNames = new ArrayList<>();

                // 保存request的mapping到临时变量mappings
                for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
                    mappings.put(entry.getKey(), MapperService.parseMapping(xContentRegistry, entry.getValue()));
                }

                // 保存request的customs信息到临时变量customs当中
                for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {
                    customs.put(entry.getKey(), entry.getValue());
                }

                final Index recoverFromIndex = request.recoverFrom();

                // todo 这里不知道是什么判断,大概是不需要从某个索引进行恢复
                if (recoverFromIndex == null) {
                    // 合并template的mapping、customs、alias到request的参数当中
                    for (IndexTemplateMetaData template : templates) {
                        templateNames.add(template.getName());

                        // 合并request和template的mapping变量
                        for (ObjectObjectCursor<String, CompressedXContent> cursor : template.mappings()) {
                            String mappingString = cursor.value.string();
                            // 如果request包含该命名的mapping,就进行合并,mapping以request传入为主,合并命中的template
                            if (mappings.containsKey(cursor.key)) {
                                XContentHelper.mergeDefaults(mappings.get(cursor.key),
                                    MapperService.parseMapping(xContentRegistry, mappingString));
                            } else {
                                // 如果request不包含该命名的mapping,就直接新增即可
                                mappings.put(cursor.key,
                                    MapperService.parseMapping(xContentRegistry, mappingString));
                            }
                        }
                        // handle custom
                        for (ObjectObjectCursor<String, Custom> cursor : template.customs()) {
                            String type = cursor.key;
                            IndexMetaData.Custom custom = cursor.value;
                            IndexMetaData.Custom existing = customs.get(type);
                            if (existing == null) {
                                customs.put(type, custom);
                            } else {
                                IndexMetaData.Custom merged = existing.mergeWith(custom);
                                customs.put(type, merged);
                            }
                        }

                        // 以request带的alias作为为主,合并template的alias
                        for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) {
                            AliasMetaData aliasMetaData = cursor.value;

                            if (request.aliases().contains(new Alias(aliasMetaData.alias()))) {
                                continue;
                            }

                            if (templatesAliases.containsKey(cursor.key)) {
                                continue;
                            }

                            if (aliasMetaData.alias().contains("{index}")) {
                                String templatedAlias = aliasMetaData.alias().replace("{index}", request.index());
                                aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);
                            }

                            aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());
                            templatesAliases.put(aliasMetaData.alias(), aliasMetaData);
                        }
                    }
                }

                // 以下是创建setting过程,用于创建索引时候使用。
                // 合并templates的setting配置
                Settings.Builder indexSettingsBuilder = Settings.builder();
                if (recoverFromIndex == null) {
                    // apply templates, here, in reverse order, since first ones are better matching
                    for (int i = templates.size() - 1; i >= 0; i--) {
                        indexSettingsBuilder.put(templates.get(i).settings());
                    }
                }

                // 合并request的setting并覆盖templates的setting
                indexSettingsBuilder.put(request.settings());
                if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
                    indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
                }
                if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
                    indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
                }
                if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && 
                    indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {
                    indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));
                }

                if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
                    DiscoveryNodes nodes = currentState.nodes();
                    final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
                    indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
                }

                if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) {
                    indexSettingsBuilder.put(SETTING_CREATION_DATE, new DateTime(DateTimeZone.UTC).getMillis());
                }
                indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
                indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());

                // 以下是创建IndexMetaData的builder的过程
                // 组建IndexMetaData的builder
                final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());

                final int routingNumShards;

                // routingNumShards的获取逻辑没怎么看懂
                if (recoverFromIndex == null) {
                    Settings idxSettings = indexSettingsBuilder.build();
                    routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
                } else {
                    assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
                        : "index.number_of_routing_shards should be present on the target index on resize";
                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                    routingNumShards = sourceMetaData.getRoutingNumShards();
                }
                // 移除routing_shards配置
                indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
                tmpImdBuilder.setRoutingNumShards(routingNumShards);

                if (recoverFromIndex != null) {
                    assert request.resizeType() != null;
                    prepareResizeIndexSettings(
                        currentState, mappings.keySet(), 
                        indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType());
                }

                // 实际的IndexSetting
                final Settings actualIndexSettings = indexSettingsBuilder.build();

                // IndexMetaData的builder添加实际的索引的设置
                tmpImdBuilder.settings(actualIndexSettings);

                if (recoverFromIndex != null) {

                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                    final long primaryTerm =
                        IntStream
                            .range(0, sourceMetaData.getNumberOfShards())
                            .mapToLong(sourceMetaData::primaryTerm)
                            .max()
                            .getAsLong();
                    for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
                        tmpImdBuilder.primaryTerm(shardId, primaryTerm);
                    }
                }

                // 创建实际的IndexMetaData对象
                final IndexMetaData tmpImd = tmpImdBuilder.build();
                ActiveShardCount waitForActiveShards = request.waitForActiveShards();
                if (waitForActiveShards == ActiveShardCount.DEFAULT) {
                    waitForActiveShards = tmpImd.getWaitForActiveShards();
                }
                if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
                    throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
                        "]: cannot be greater than number of shard copies [" +
                        (tmpImd.getNumberOfReplicas() + 1) + "]");
                }

                // 创建索引服务IndexService
                final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
                createdIndex = indexService.index();

                // 获取创建IndexService的mapperService
                MapperService mapperService = indexService.mapperService();
                try {
                    // 合并request和template合并后生成的最新的mappings
                    mapperService.merge(mappings, MergeReason.MAPPING_UPDATE, request.updateAllTypes());
                } catch (Exception e) {
                    removalExtraInfo = "failed on parsing default mapping/mappings on index creation";
                    throw e;
                }

                if (request.recoverFrom() == null) {
                    indexService.getIndexSortSupplier().get();
                }

                final QueryShardContext queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null);

                for (Alias alias : request.aliases()) {
                    if (Strings.hasLength(alias.filter())) {
                        aliasValidator.validateAliasFilter(alias.name(), alias.filter(), queryShardContext, xContentRegistry);
                    }
                }
                for (AliasMetaData aliasMetaData : templatesAliases.values()) {
                    if (aliasMetaData.filter() != null) {
                        aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(),
                            queryShardContext, xContentRegistry);
                    }
                }

                // 新建mappingsMetaData对象
                Map<String, MappingMetaData> mappingsMetaData = new HashMap<>();
                for (DocumentMapper mapper : mapperService.docMappers(true)) {
                    MappingMetaData mappingMd = new MappingMetaData(mapper);
                    mappingsMetaData.put(mapper.type(), mappingMd);
                }

                // 以下过程是创建IndexMetaData的过程
                // 创建indexMetaDataBuilder对象,执行真正的创建
                final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
                    .settings(actualIndexSettings)
                    .setRoutingNumShards(routingNumShards);

                for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) {
                    indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId));
                }

                for (MappingMetaData mappingMd : mappingsMetaData.values()) {
                    indexMetaDataBuilder.putMapping(mappingMd);
                }

                for (AliasMetaData aliasMetaData : templatesAliases.values()) {
                    indexMetaDataBuilder.putAlias(aliasMetaData);
                }
                for (Alias alias : request.aliases()) {
                    AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())
                        .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();
                    indexMetaDataBuilder.putAlias(aliasMetaData);
                }

                for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {
                    indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());
                }

                indexMetaDataBuilder.state(request.state());

                // 创建indexMetaData对象
                final IndexMetaData indexMetaData;
                try {
                    indexMetaData = indexMetaDataBuilder.build();
                } catch (Exception e) {
                    removalExtraInfo = "failed to build index metadata";
                    throw e;
                }

                indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
                    indexMetaData.getSettings());

                // 创建新的MetaData
                MetaData newMetaData = MetaData.builder(currentState.metaData())
                    .put(indexMetaData, false)
                    .build();

                ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
                if (!request.blocks().isEmpty()) {
                    for (ClusterBlock block : request.blocks()) {
                        blocks.addIndexBlock(request.index(), block);
                    }
                }
                blocks.updateBlocks(indexMetaData);

                // 阻塞blocks并更新Meta元数据
                ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();

                if (request.state() == State.OPEN) {
                    RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
                        .addAsNew(updatedState.metaData().index(request.index()));
                    updatedState = allocationService.reroute(
                        ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
                        "index [" + request.index() + "] created");
                }
                removalExtraInfo = "cleaning up after validating index on master";
                removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
                return updatedState;
            } finally {
                if (createdIndex != null) {
                    indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
                }
            }
        }
    }
}


创建IndexService过程

这部分逻辑暂时没有深入研究,这里只放在这里用于引导提醒一下,具体不做深入研究。

public class IndicesService extends AbstractLifecycleComponent
    implements IndicesClusterStateService.AllocatedIndices<IndexShard, IndexService>, IndexService.ShardStoreDeleter {

    public synchronized IndexService createIndex(
            final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
        ensureChangesAllowed();
        if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
            throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
        }
        final Index index = indexMetaData.getIndex();
        if (hasIndex(index)) {
            throw new ResourceAlreadyExistsException(index);
        }
        List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
        final IndexEventListener onStoreClose = new IndexEventListener() {
            @Override
            public void onStoreClosed(ShardId shardId) {
                indicesQueryCache.onClose(shardId);
            }
        };
        finalListeners.add(onStoreClose);
        finalListeners.add(oldShardsStats);
        final IndexService indexService =
                createIndexService(
                        "create index",
                        indexMetaData,
                        indicesQueryCache,
                        indicesFieldDataCache,
                        finalListeners,
                        indexingMemoryController);
        boolean success = false;
        try {
            indexService.getIndexEventListener().afterIndexCreated(indexService);
            indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
            success = true;
            return indexService;
        } finally {
            if (success == false) {
                indexService.close("plugins_failed", true);
            }
        }
    }


    private synchronized IndexService createIndexService(final String reason,
                                                         IndexMetaData indexMetaData,
                                                         IndicesQueryCache indicesQueryCache,
                                                         IndicesFieldDataCache indicesFieldDataCache,
                                                         List<IndexEventListener> builtInListeners,
                                                         IndexingOperationListener... indexingOperationListeners) throws IOException {
        final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
        logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
            indexMetaData.getIndex(),
            idxSettings.getNumberOfShards(),
            idxSettings.getNumberOfReplicas(),
            reason);

        final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
        for (IndexingOperationListener operationListener : indexingOperationListeners) {
            indexModule.addIndexOperationListener(operationListener);
        }
        pluginsService.onIndexModule(indexModule);
        for (IndexEventListener listener : builtInListeners) {
            indexModule.addIndexEventListener(listener);
        }
        return indexModule.newIndexService(
                nodeEnv,
                xContentRegistry,
                this,
                circuitBreakerService,
                bigArrays,
                threadPool,
                scriptService,
                client,
                indicesQueryCache,
                mapperRegistry,
                indicesFieldDataCache,
                namedWriteableRegistry
        );
    }
}


参考文章

Elasticsearch分布式一致性原理剖析(二)-Meta篇
elasticsearch index 之 create index(二)


招聘信息

【招贤纳士】

欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。

贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com。

贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。

贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。

公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)

www.htsjk.Com true http://www.htsjk.com/Elasticsearch/11452.html NewsArticle elasticsearch索引建立过程,elasticsearch索引 开篇  怀着佛系心态写的文章,因为发现心急依然看不懂代码,所以只能安慰自己佛系一点,这篇文章希望能够把elasticsearch的index的创建过程讲...
评论暂时关闭