栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

pandas dataframe 写入到es中

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

pandas dataframe 写入到es中

1.pandas中的dataframe

1.工作原因,需要从hive读取数据到pandas中进行操作,然后处理完全以后需要再读取到es中。

读取hive数据:因为脚本在生产上,所以,导入的包如果有缺失,可以自己补一下。

import pandas as pd
from pyhive import hive
import json
import time
from elasticsearch import Elasticsearch

2.读取Hive数据库

conn=hive.connect(host='',port='',username='',database='')
sql='select * from table1'
start_time=time.time()
tb=pd.read_sql(sql=sql,con=conn)
end_time=time.time()
# 显示的是加载数据的时间
print(t)

2.调用es的包,并写入到ES中

在es中并不需要提前去新增索引,可以按照自己的喜好命名。
我参照的是他的代码@'Humz:我觉得还可以,只是有些需要优化下。


def connect_es(frame, index_, type_):

    try:
        es = Elasticsearch(host, http_auth=(user, password), port='9200')
        # 把df转化成json
        df_as_json = frame.to_json(orient='records', lines=True)
        bulk_data = []
        # ID最好是需求但是一定要唯一,可以是两个str拼接起来的
        for json_document in df_as_json.split('n'):
            json_document1=json_document.loads(json_document)
            id1=json_document1['id']+json_document1['versionn']
            bulk_data.append({"index": {
                '_index': index_,
                '_type': type_,
                '_id':id1
            }})
            bulk_data.append(json.loads(json_document))
            
            # 一次bulk request包含1000条数据
            if len(bulk_data) > 1000:
                es.bulk(bulk_data)
                bulk_data = []
                
        es.bulk(bulk_data)
        print('database connet successfully')

    except Exception as e:
    
        print(e)
connect_es(tb,index_name,'docs')
3.写到最后

由于我对es并不是很熟,我一开始想着先读取一个es到df中看下es的构成是那些的,算是一些启发吧。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/355287.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号