ElasticSearch增删改查之python sort、scroll、scan,
1、用python操作elasticsearch有两个库可以调用
# ElasticSearch不支持scroll(分页查询)查询
from pyelasticsearch import ElasticSearch
# Elasticsearch支持scroll查询,一般建议使用这个库
from elasticsearch import helpers,Elasticsearch
""" 注意:以上两个库各自在查询或更新传递的参数是不同的 """
# ElasticSearch查询使用方式
ES = ElasticSearch(URL)
res = ES.search(
query,
index=index,
size=size
)
# Elasticsearch查询使用方式
ES = ElasticSearch(URL)
res = ES.search(
body=query,
index=index,
size=size
)
- ES中的高性能的部分大部分在helpers中实现
2、Elasticsearch中search scroll使用
- scroll的优势:支持分页查询,自动排序,并把查询结果返回
- scroll使用方式:每次查询获取下一次查询需要使用的scroll_id,查询时传递参数scroll='2m',后台ES即可以将查询的结果保存2分钟
- 查询时常用技巧
1、将必须包含字段添加到 must中
2、将必须不包含字段添加到 must_not中
3、单一条件匹配选用 term,多个单一条件任何一个匹配选用 terms
4、from 指定从结果数据中的第多少条开始返回,from的最大值超不过2000,所以在使用大数据查询基本使用不上
5、size 指定结果数据中共返回多少条数据
# 在使用时一定要注意Elasticsearch与ElasticSearch还是有一定的区别的,传递参数不一样
from elasticsearch import Elasticsearch
ES_SEARCH_HOSTURL = 'http://domain:9000/'
ES = Elasticsearch(ES_SEARCH_HOSTURL)
query = {
"query": {
"bool": {
"must": [],
"must_not": []
}
}
}
# index可以为索引的列表或者单个索引,如果是索引的列表,则使用search时不能传递doc_type,也就是如果同时查询多个索引,不能指定文档的类型
def scroll_search(index, query, size, page):
""" 使用scroll查询ES,实现分页查询
:param index: type of list or str
:param query: type of dict,查询条件
:param size: type of int(1-100),指定返回数据中每页的数据条数
:param page: type of int(>0),指定返回第几页数据
:return: 查询结果总数和某页的数据
"""
try:
res = ES.search(
index=index,
scroll='5m', # 查询一次数据在ES中缓存5分钟再销毁
size=size,
body=query,
sort="modified:desc", # sort增加排序功能,多个字段排序可以以逗号隔开
# sort="modified:desc,_score:desc", # 指定某个字段按照升序或者降序排列,modifie为数据字段
# sort="_doc", # ES会计算一个最优的排序方案
# search_type='scan', # 如果不关注排序的话,可以增加该字段,查询速度十分高效,性能比较好
)
except Exception as e:
raise e
else:
sid = res['_scroll_id'] # 获得查询下一条数据的scroll_id
total = res['hits']['total'] # 获取查询结果中总数据的条数
hits = res["hits"]["hits"] # 首次查询返回第一页的结果数据
results = [hit["_source"] for hit in hits]
first_page = 1
while page > first_page:
try:
res = ES.scroll(scroll_id=sid, scroll='2m')
except Exception as e:
raise e
else:
sid = res['_scroll_id']
hits = res["hits"]["hits"]
results = [hit["_source"] for hit in hits]
first_page += 1
return total, results
# terms使用,其中categories为list类型,含义为categories中任何一个满足条件即可
temp = {"terms": {"categories": categories}}
query["query"]["bool"]["must"].append(temp)
3、Elasticsearch中update局部更新
""" 功能:从多个索引中查询需要更新的对应数据的id,再更新此数据 """
from elasticsearch import Elasticsearch
ES_SEARCH_HOSTURL = 'http://domain:9000/'
ES = Elasticsearch(ES_SEARCH_HOSTURL)
indexs = [index1, index2]
query = {
"query": {
"bool": {
"must": []
}
}
}
for index in indexs:
try:
res = ES.search(body=query, index=index, doc_type='info')
except Exception as e:
print(e)
# logger.error("Request search_indicator function error. Error: %s" % e)
message = "Internal server error"
results = data_formatter(message=message)
return Response(results, status=500)
else:
if res["hits"]["total"] > 0:
hits = res["hits"]["hits"][0]
update_id = hits["_id"]
try:
# 注意,如果ES为pyelasticsearch的对象,则需要更新的参数传递形式应该为doc= {"revoked": revoked}
ES.update(index=index, doc_type='indicator_info', id=update_id, body={"doc": {"revoked": revoked}})
except Exception as e:
print(e)
message = "Update {} failed".format(id)
results = data_formatter(message=message)
return Response(results, status=500)
else:
results = data_formatter()
return Response(results, status=200)
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。