欢迎投稿

今日深度:

ElasticSearch文档API,elasticsearchapi

ElasticSearch文档API,elasticsearchapi



文档API(Document APIs)

本节介绍以下CRUD API

                                     单文档API:

                                                        Index API (创建索引)

                                                        GET API (获取索引)

                                                        Delete API (删除索引)

                                                        Update API (更新索引)

                                     多文档API:

                                                        Multi GEt API (批量获取索引)

                                                        BulkAPI (批量操作索引)

注意:所有CRUD API都是单索引API。 index参数接受单个索引名称或指向单个索引的别名。


创建索引(Index API)

    创建索引API允许调用者将一个Json类型的数据存入到一个特定的索引下,并使其可以被检索到。

    构建Json数据

    这里有几种构建Json数据的的方法。

           手动构建Json类型的字节数组(byte[])或字符串。

           使用将自动转换为Json的Map。

           使用第三方类库将你的实体序列化成Json。

            使用内部工具类XContentFactory.jsonBuilder()。

在Client内部,所有类型都被转换为字节数组(byte[]  (所以String也是被转换为byte[])),因此,如果对象已经是这种形式,这样使用时最好的,

jsonBuilder是高度优化的Json生成器,它可以直接构造一个字节数组(byte[])。


手动构建Json

手动构建Json是很简单的,但是有一点你必须注意,你需要将日期编码成ElasticSearch可识别的格式。例如:

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";


使用Map构建Json

Map 是一个 键值对(Key:value)的集合,它本身就能代表Json。

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

序列化你的实体

请添加Jackson Databind到你的项目中,这样你就可以使用Jackson来序列化你的实体为一个JSON。你可以使用ObjectMapper来序列化你的实体。

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);


使用 ElasticSearch的帮助类

ElasticSearch提供一个工具类来构建Json内容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意: 你可以使用startArray(String) 和endArray()方法添加数组。 顺便提醒一下,field方法支持多种对象,你可以添加数字,日期,甚至其他的XContentBuilder对象。

如果你想查看构建好的Json内容,你可以使用string()方法

String json = builder.string();

创建索引

下面的例子将为名字为twitter的索引项添加一条Json数据,将这条数据添加到类型tweet下面,并指定其id为1.

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意: 你可以构建一个JSON格式的字符串,并且不指定它的ID,这样创建索引同样会成功,ElasticSearch将会自动生成一个ID。

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .get();

IndexResponse会给你一个添加结果的报告:

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();


如果想要获得更多信息,请查看REST的创建索引的文档。

REST 创建索引返回的JSON内容:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    },
    "_index" : "twitter",
    "_type" : "tweet",
    "_id" : "1",
    "_version" : 1,
    "created" : true,
    "result" : created
}

_shards提供了索引操作过程中的复制过程信息。

             total 指示对多少分片进行操作(主分片和备份分片)。

             failed  操作失败的分片数和失败信息。

             successful  操作成功的分片数。

             

获取索引

GetResponse 可以通过索引的ID获取一个JSON格式的数据,下面的例子,将获取twitter索引下的tweet类型的id为1的一条数据。

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();


如果想要获得更多信息,请查看REST的获取索引的文档。

REST 获取索引返回的JSON内容:

{
    "_index" : "twitter",
    "_type" : "tweet",
    "_id" : "1",
    "_version" : 1,
    "found": true,
    "_source" : {
        "user" : "kimchy",
        "date" : "2009-11-15T14:12:12",
        "likes": 0,
        "message" : "trying out Elasticsearch"
    }
}

上述结果包括我们希望检索的文档的_index,_type,_id和_version,如果能查找的到的话,它还包含_source.


删除索引

prepareDelete 方法允许根据ID从特定的索引中删除一条数据,一下示例将从twitter索引的tweet下删除ID为1的数据。

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

如果想要获得更多信息,请查看REST的删除索引的文档。

REST 删除索引返回的JSON内容:

{
    "_shards" : {
        "total" : 10,
        "failed" : 0,
        "successful" : 10
    },
    "found" : true,
    "_index" : "twitter",
    "_type" : "tweet",
    "_id" : "1",
    "_version" : 2,
    "result": "deleted"
}

更新索引

你可以创建一个UpdateRequest发送给客户端(client):

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

或者使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\"" 《1》  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               《2》
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();
       《1》 你的脚本, 它也可以是存储在本地的脚本文件的名称,如果你要使用本地存储的脚本,你需要将使用 ScriptService.ScriptType.FILE。

        《2》 将存在的字段合并到现有的数据中。

请注意,你不能同时调用setScript和setDoc。


通过脚本更新

通过脚本更新数据:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

通过合并数据更新

更新API支持传递部分数据,这些传递的数据将被合并到原有数据中(简单递归合并,内部对象合并,替换核心“键/值”或者数组)。如下例:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

更新插入

ElasticSearch支持更新插入,如果需要更新的文档不存在,upsert的内容将会被应用到新建的数据上。

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);     《1》         
client.update(updateRequest).get();

           《1》 如果数据不存在,indexRequest将会被添加。

如果数据 index/type/1 已经存在,我们执行上述代码后,数据将改变成这样:

{
    "name"  : "Joe Dalton",
    "gender": "male"    《1》    
}

           《1》这条数据来自于更新请求。


 如果 index/type/1 不存在, 我们将得到一条新的数据:

{
    "name" : "Joe Smith",
    "gender": "male"
}


批量获取

批量获取API允许基于它们的索引(index),类型(type)和id来获得文档的列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")       《1》    
    .add("twitter", "tweet", "2", "3", "4")  《2》
    .add("another", "type", "foo")          《3》
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 《4》
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      《5》
        String json = response.getSourceAsString();  《6》
    }
}


          《1》单一id查询
          《2》 根据相同索引/类型的一组id查询。

          《3》你同样可以获得其他索引下的数据。

          《4》遍历请求的返回

          《5》验证请求的数据是否存在

           《6》获取_source的值


批量操作API

批量API允许在一个请求中索引和删除多个文档。 例如

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

使用批量处理器

BulkProcessor类提供了一个简单的接口,可根据请求的数量或大小,或在给定期限后自动刷新批量操作。

想要使用BulkProcessor,需要先创建一个它的实例:

默认参数:
                  setBulkActions是1000                   setBulkSize是5MB
                  setFlushInterval 不设置
                  setConcurrentRequests 为 1
                  setBackoffPolicy 重试8次,每次间隔50MS,总等待时间大约为5.1秒。
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,                                              //添加你的ElasticSearch 客户端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... }  //此方法在批量执行之前调用,
								//例如,您可以使用request.numberOfActions()查看numberOfActions,

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //此方法在批量执行之后调用,例如,你可以使用response.hasFailures()方法查								//看失败的调用。

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } // 此方法在处理失败或者抛出异常时调用。
        })
        .setBulkActions(10000)  			//指定每次处理多少请求,此处是10000次。
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))  // 每次刷新到请求的字节数, 此处是5M
        .setFlushInterval(TimeValue.timeValueSeconds(5))     //无论请求量多少 我们都5秒刷新一次请求(即每五秒将所有请求执行)。
        .setConcurrentRequests(1) 			//设置并发请求数。 值为0表示只允许执行单个请求。 值为1表示在累积新的批量请求时允许执行							//1个并发请求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) //设置回退策略,当请求执行错误时,可进行回退操作,
				//TimeValue.timeValueMillis(100)执行错误后延迟100MS,重试三次后执行回退。
				//要禁用回退,请传递BackoffPolicy.noBackoff()。
        .build(); 


添加请求
创建完成后你就可以添加你的请求到BulkProcessor:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭批量处理器:
当所有文档都加载到BulkProcessor中后,我们可以使用awaitClose 或者 close 方法去关闭它:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

两种方法都刷新任何剩余的文档,并禁用所有其他计划刷新(如果通过设置flushInterval进行调度)。调用awaitClose方法是,如果有正在执行的并发请求,awaitClose将会等待
请求完成或者批量请求超时后,才会返回true,若在此过程中超出了awaitClose指定的等待时间,则返回false。close方法不会等待任何剩余的批量请求完成,并立即退出。

在测试中使用BulkProcessor
如果你使用ElasticSearch和BulkProcessor填充你的数据集,你最好将concurrentRequests 设置为0,这样你就能以同步方式执行刷新操作了:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();



            
                
	

www.htsjk.Com true http://www.htsjk.com/Elasticsearch/26083.html NewsArticle ElasticSearch文档API,elasticsearchapi 文档API(Document APIs) 本节介绍以下CRUD API :                                      单文档API:                          ...
相关文章
    暂无相关文章
评论暂时关闭