ElasticSearch文档API,elasticsearchapi
文档API(Document APIs)
本节介绍以下CRUD API:
单文档API:
Index API (创建索引)
GET API (获取索引)
Delete API (删除索引)
Update API (更新索引)
多文档API:
Multi GEt API (批量获取索引)
BulkAPI (批量操作索引)
注意:所有CRUD API都是单索引API。 index参数接受单个索引名称或指向单个索引的别名。
创建索引(Index API)
创建索引API允许调用者将一个Json类型的数据存入到一个特定的索引下,并使其可以被检索到。
构建Json数据
这里有几种构建Json数据的的方法。
手动构建Json类型的字节数组(byte[])或字符串。
使用将自动转换为Json的Map。
使用第三方类库将你的实体序列化成Json。
使用内部工具类XContentFactory.jsonBuilder()。
在Client内部,所有类型都被转换为字节数组(byte[] (所以String也是被转换为byte[])),因此,如果对象已经是这种形式,这样使用时最好的,
jsonBuilder是高度优化的Json生成器,它可以直接构造一个字节数组(byte[])。
手动构建Json
手动构建Json是很简单的,但是有一点你必须注意,你需要将日期编码成ElasticSearch可识别的格式。例如:
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
使用Map构建Json
Map 是一个 键值对(Key:value)的集合,它本身就能代表Json。
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");序列化你的实体
请添加Jackson Databind到你的项目中,这样你就可以使用Jackson来序列化你的实体为一个JSON。你可以使用ObjectMapper来序列化你的实体。
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用 ElasticSearch的帮助类
ElasticSearch提供一个工具类来构建Json内容。
import static org.elasticsearch.common.xcontent.XContentFactory.*;
XContentBuilder builder = jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
注意: 你可以使用startArray(String) 和endArray()方法添加数组。 顺便提醒一下,field方法支持多种对象,你可以添加数字,日期,甚至其他的XContentBuilder对象。
如果你想查看构建好的Json内容,你可以使用string()方法
String json = builder.string();创建索引
下面的例子将为名字为twitter的索引项添加一条Json数据,将这条数据添加到类型tweet下面,并指定其id为1.
import static org.elasticsearch.common.xcontent.XContentFactory.*;
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
.get();
注意: 你可以构建一个JSON格式的字符串,并且不指定它的ID,这样创建索引同样会成功,ElasticSearch将会自动生成一个ID。
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(json)
.get();
IndexResponse会给你一个添加结果的报告:
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
如果想要获得更多信息,请查看REST的创建索引的文档。
REST 创建索引返回的JSON内容:
{
"_shards" : {
"total" : 2,
"failed" : 0,
"successful" : 2
},
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_version" : 1,
"created" : true,
"result" : created
}
_shards提供了索引操作过程中的复制过程信息。
total 指示对多少分片进行操作(主分片和备份分片)。
failed 操作失败的分片数和失败信息。
successful 操作成功的分片数。
获取索引
GetResponse 可以通过索引的ID获取一个JSON格式的数据,下面的例子,将获取twitter索引下的tweet类型的id为1的一条数据。
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
如果想要获得更多信息,请查看REST的获取索引的文档。
REST 获取索引返回的JSON内容:
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_version" : 1,
"found": true,
"_source" : {
"user" : "kimchy",
"date" : "2009-11-15T14:12:12",
"likes": 0,
"message" : "trying out Elasticsearch"
}
}上述结果包括我们希望检索的文档的_index,_type,_id和_version,如果能查找的到的话,它还包含_source.
删除索引
prepareDelete 方法允许根据ID从特定的索引中删除一条数据,一下示例将从twitter索引的tweet下删除ID为1的数据。
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
如果想要获得更多信息,请查看REST的删除索引的文档。
REST 删除索引返回的JSON内容:
{
"_shards" : {
"total" : 10,
"failed" : 0,
"successful" : 10
},
"found" : true,
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_version" : 2,
"result": "deleted"
}更新索引
你可以创建一个UpdateRequest发送给客户端(client):
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
或者使用prepareUpdate()方法:
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" 《1》 , ScriptService.ScriptType.INLINE, null, null))
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() 《2》
.startObject()
.field("gender", "male")
.endObject())
.get(); 《1》 你的脚本, 它也可以是存储在本地的脚本文件的名称,如果你要使用本地存储的脚本,你需要将使用 ScriptService.ScriptType.FILE。《2》 将存在的字段合并到现有的数据中。
请注意,你不能同时调用setScript和setDoc。
通过脚本更新
通过脚本更新数据:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
通过合并数据更新
更新API支持传递部分数据,这些传递的数据将被合并到原有数据中(简单递归合并,内部对象合并,替换核心“键/值”或者数组)。如下例:
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();更新插入
ElasticSearch支持更新插入,如果需要更新的文档不存在,upsert的内容将会被应用到新建的数据上。
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); 《1》
client.update(updateRequest).get();
《1》 如果数据不存在,indexRequest将会被添加。
如果数据 index/type/1 已经存在,我们执行上述代码后,数据将改变成这样:
{
"name" : "Joe Dalton",
"gender": "male" 《1》
}
《1》这条数据来自于更新请求。
如果 index/type/1 不存在, 我们将得到一条新的数据:
{
"name" : "Joe Smith",
"gender": "male"
}
批量获取
批量获取API允许基于它们的索引(index),类型(type)和id来获得文档的列表:
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") 《1》
.add("twitter", "tweet", "2", "3", "4") 《2》
.add("another", "type", "foo") 《3》
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 《4》
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { 《5》
String json = response.getSourceAsString(); 《6》
}
}
《1》单一id查询
《2》 根据相同索引/类型的一组id查询。
《3》你同样可以获得其他索引下的数据。
《4》遍历请求的返回
《5》验证请求的数据是否存在
《6》获取_source的值
批量操作API
批量API允许在一个请求中索引和删除多个文档。 例如:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
使用批量处理器
BulkProcessor类提供了一个简单的接口,可根据请求的数量或大小,或在给定期限后自动刷新批量操作。
想要使用BulkProcessor,需要先创建一个它的实例:
默认参数:
setBulkActions是1000 setBulkSize是5MB
setFlushInterval 不设置
setConcurrentRequests 为 1
setBackoffPolicy 重试8次,每次间隔50MS,总等待时间大约为5.1秒。
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //添加你的ElasticSearch 客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //此方法在批量执行之前调用,
//例如,您可以使用request.numberOfActions()查看numberOfActions,
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //此方法在批量执行之后调用,例如,你可以使用response.hasFailures()方法查 //看失败的调用。
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } // 此方法在处理失败或者抛出异常时调用。
})
.setBulkActions(10000) //指定每次处理多少请求,此处是10000次。
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每次刷新到请求的字节数, 此处是5M
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求量多少 我们都5秒刷新一次请求(即每五秒将所有请求执行)。
.setConcurrentRequests(1) //设置并发请求数。 值为0表示只允许执行单个请求。 值为1表示在累积新的批量请求时允许执行 //1个并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) //设置回退策略,当请求执行错误时,可进行回退操作,
//TimeValue.timeValueMillis(100)执行错误后延迟100MS,重试三次后执行回退。
//要禁用回退,请传递BackoffPolicy.noBackoff()。
.build();
添加请求
创建完成后你就可以添加你的请求到BulkProcessor:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));关闭批量处理器:
当所有文档都加载到BulkProcessor中后,我们可以使用awaitClose 或者 close 方法去关闭它:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
两种方法都刷新任何剩余的文档,并禁用所有其他计划刷新(如果通过设置flushInterval进行调度)。调用awaitClose方法是,如果有正在执行的并发请求,awaitClose将会等待
请求完成或者批量请求超时后,才会返回true,若在此过程中超出了awaitClose指定的等待时间,则返回false。close方法不会等待任何剩余的批量请求完成,并立即退出。
在测试中使用BulkProcessor
如果你使用ElasticSearch和BulkProcessor填充你的数据集,你最好将concurrentRequests 设置为0,这样你就能以同步方式执行刷新操作了:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();