同时,我发现了多种可能性,至少以合理的速度如何做到这一点:
import jsonimport pandas as pdimport requests# df is a dataframe or dataframe chunk coming from your reading logicdf['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _iddf_as_json = df.to_json(orient='records', lines=True)final_json_string = ''for json_document in df_as_json.split('n'): jdict = json.loads(json_document) metadata = json.dumps({'index': {'_id': jdict['_id']}}) jdict.pop('_id') final_json_string += metadata + 'n' + json.dumps(jdict) + 'n'headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60)除了使用熊猫的
to_json()方法,还可以使用
to_dict()以下方法。这在我的测试中稍慢一些,但并没有很多:
dicts = df.to_dict(orient='records')final_json_string = ''for document in dicts: metadata = {"index": {"_id": document["_id"]}} document.pop('_id') final_json_string += json.dumps(metadata) + 'n' + json.dumps(document) + 'n'当大数据集运行此,人们可以通过更换Python的默认保存了两三分钟
json与库ujson或rapidjson通过安装它,然后
import ujson as json或
import rapidjson as json分别。
通过将步骤的顺序执行替换为并行步骤,可以实现更大的加速,从而在请求等待Elasticsearch处理所有文档并返回响应时,读取和转换不会停止。这可以通过线程,多处理,Asyncio,任务队列等来完成,但这不在此问题的范围内。
如果您碰巧找到一种更快地执行to-json-conversion的方法,请告诉我。



