1、导入es的python库
from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from elasticsearch import helpers import pandas as pd import os
2、连接es
es = Elasticsearch(hosts="http://192.168.194.16:9200/",timeout=30, max_retries=10, retry_on_timeout=True)
query_body = {
"query":{
"match_all":{}
}
}
3、查询数据(search默认返回1000行数据,可以通过参数size指定返回记录数。但es默认返回10000行。此时需要设置max_result_window参数)。
# 设置某个index的max_result_window
curl -XPUT http://localhost:9200/elastic_segment-20211201/_settings -H "Content-Type:application/json" -d '{"index":{"max_result_window":100000}}'
#设置所有index的max_result_window
curl -XPUT http://localhost:9200/_all/_settings -H "Content-Type:application/json" -d '{"index":{"max_result_window":100000}}'
indexes = ["elastic_segment-20211201"]
dir_out ="/tf/data/out/"
suffix = ".csv"
for index in indexes :
file_out = dir_out + index + suffix
if os.path.exists(file_out):
continue
print("file_out:{}".format(file_out))
# size默认是1000行
query = es.search(index=index,size=500000, request_timeout=1200)
arr = []
for i in query["hits"]["hits"]:
# print(i)
col = i["_source"].keys()
#print(i)
# for key, val in i.items():
# print(key,"= ", val)
# print(i["_source"])
temp = [i["_source"][c] for c in col]
arr.append(temp)
# arr.append([i["_source"]["total"],i["_source"]["percentage"],i["_source"]["match"],i["_source"]["time_bucket"], i["_source"]["entity_id"]])
# break;
df = pd.Dataframe(arr, columns=col)
# df.head()
df.to_csv(file_out, float_format='%.2f', index=False)
4、当请求数据量比较大时,使用分页或者scroll模式请求数据
#es=Elasticsearch(["localhost:9200"])
body={
#"_source":["fileName"],
#"_source":["fullPath"],
#"_source":["HashFeature"],
"query":{
"match_all":{}
}
}
def get_search_result(es,index,scroll='5m',timeout='30m',size=10000,body=body):
arr = []
queryData = es.search(
index = index,
_source_excludes=["data_binary"],
scroll = scroll,
# timeout = timeout,
request_timeout=1200,
size = size,
body = body
)
mdata = queryData.get("hits").get("hits")
# print (queryData)
if not mdata:
print('empty')
# for i in mdata:
# #print(i)
# col = i["_source"].keys()
# #print(i)
# # for key, val in i.items():
# # print(key,"= ", val)
# # print(i["_source"])
# temp = [i["_source"][c] for c in col]
# arr.append(temp)
scroll_id = queryData["_scroll_id"]
total = queryData["hits"]["total"]["value"]
print("scroll_id:{}, total:{}".format(scroll_id, total))
print("num:{}".format(int(total/size)))
cnt = 0
try:
for i in range(int(total/size)):
res = es.scroll(scroll_id=scroll_id, scroll=scroll)
mdata = mdata + res["hits"]["hits"]
if not mdata:
print('empty')
# print(res)
# for i in mdata:
# #print(i)
# col = i["_source"].keys()
# #print(i)
# # for key, val in i.items():
# # print(key,"= ", val)
# # print(i["_source"])
# temp = [i["_source"][c] for c in col]
# arr.append(temp)
mdata = mdata + res["hits"]["hits"]
# scroll_id = queryData["_scroll_id"]
# print("scroll_id:{}", scroll_id)
# cnt += 1
# if cnt > 2:
# break
except Exception, e:
pass
finally:
return mdata



