栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

python获取ElasticSearch数据

python获取ElasticSearch数据

1、导入es的python库

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import helpers
import pandas as pd
import os

2、连接es

es = Elasticsearch(hosts="http://192.168.194.16:9200/",timeout=30, max_retries=10, retry_on_timeout=True)
query_body = {
    "query":{
        "match_all":{}
    }
}

3、查询数据(search默认返回1000行数据,可以通过参数size指定返回记录数。但es默认返回10000行。此时需要设置max_result_window参数)。

# 设置某个index的max_result_window
curl -XPUT http://localhost:9200/elastic_segment-20211201/_settings -H "Content-Type:application/json" -d '{"index":{"max_result_window":100000}}'

#设置所有index的max_result_window
curl -XPUT http://localhost:9200/_all/_settings -H "Content-Type:application/json" -d '{"index":{"max_result_window":100000}}'
indexes = ["elastic_segment-20211201"]
dir_out ="/tf/data/out/"
suffix = ".csv"

for index in indexes :
    
    file_out = dir_out + index + suffix
    if os.path.exists(file_out):
        continue
        
    print("file_out:{}".format(file_out))
    # size默认是1000行
    query = es.search(index=index,size=500000, request_timeout=1200)
    arr = []

    for i in query["hits"]["hits"]:
        # print(i)
        col = i["_source"].keys()
        #print(i)
        # for key, val in i.items():
        #    print(key,"= ", val)
        # print(i["_source"])
        temp = [i["_source"][c] for c in col]
        arr.append(temp)
        # arr.append([i["_source"]["total"],i["_source"]["percentage"],i["_source"]["match"],i["_source"]["time_bucket"], i["_source"]["entity_id"]])
        # break;
    
    df = pd.Dataframe(arr, columns=col)
    # df.head()
    df.to_csv(file_out, float_format='%.2f', index=False)

4、当请求数据量比较大时,使用分页或者scroll模式请求数据

#es=Elasticsearch(["localhost:9200"])
body={

        #"_source":["fileName"],
        #"_source":["fullPath"],
        #"_source":["HashFeature"],
        "query":{
            "match_all":{}
        }
    }
def get_search_result(es,index,scroll='5m',timeout='30m',size=10000,body=body):
    arr = []
    queryData = es.search(
        index = index,
                _source_excludes=["data_binary"],
        scroll = scroll,
        # timeout = timeout,
        request_timeout=1200,
        size = size,
        body = body
    )
  
    mdata = queryData.get("hits").get("hits")
    # print (queryData)
    
    if not mdata:
        print('empty')
        
#     for i in mdata:
#         #print(i)
#         col = i["_source"].keys()
#         #print(i)
#         # for key, val in i.items():
#         #    print(key,"= ", val)
#         # print(i["_source"])
#         temp = [i["_source"][c] for c in col]
        
#         arr.append(temp)

        
    scroll_id = queryData["_scroll_id"]
    total = queryData["hits"]["total"]["value"]
    print("scroll_id:{}, total:{}".format(scroll_id, total))
    print("num:{}".format(int(total/size)))
    cnt = 0
    try:
        for i in range(int(total/size)):

            res = es.scroll(scroll_id=scroll_id, scroll=scroll)
            mdata = mdata + res["hits"]["hits"]
            if not mdata:
                print('empty')
    #         print(res)
    #         for i in mdata:
    #             #print(i)
    #             col = i["_source"].keys()
    #             #print(i)
    #             # for key, val in i.items():
    #             #    print(key,"= ", val)
    #             # print(i["_source"])
    #             temp = [i["_source"][c] for c in col]

    #             arr.append(temp)
            mdata = mdata + res["hits"]["hits"]

    #         scroll_id = queryData["_scroll_id"]
    #         print("scroll_id:{}", scroll_id)
    #         cnt += 1
    #         if cnt > 2:
    #             break
    except Exception, e:
        pass
    finally:
        return mdata       
 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676887.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号