使用Pymongo保存数据的基本方法(增删改查)请参考:Python连接MongoDB,使用pymongo进行增删改查
文章目录- 1. 基本方法: 逐行保存
- 2. insert_many 批量保存
- 3. Threading 多线程保存数据
这是最基本的保存方法,可以对数据本身做微调,然后保存
from pymongo import MongoClient
import pandas as pd
import numpy as np
def get_coll(database, collection, host="127.0.0.1"):
"""目标数据库"""
mongo_conn = MongoClient(host=host, port=27017)
mongo_db = mongo_conn.get_database(database)
coll = mongo_db.get_collection(collection)
return coll
def _save_or_update_mongodb(coll, dict_value):
"""根据检查_id,如果存在就覆盖,如果不存在就新增"""
record = coll.find_one({"_id": dict_value['_id']})
if not record:
coll.insert_one(dict_value)
else:
coll.update_one(record, {
"$set": dict_value,
})
def save_dataframe_to_mongo(dataframe):
coll = get_coll("test_db", "test_collection")
for index, series in dataframe.iterrows():
dict_value = series.to_dict()
dict_value.update({
"_id": index,
})
_save_or_update_mongodb(coll, dict_value)
if __name__ == '__main__':
df = pd.DataFrame(np.random.randn(10, 4))
df.columns = ['a', 'b', 'c', 'd']
save_dataframe_to_mongo(df)
2. insert_many 批量保存
可以一次性保存一批数据,使用insert_many方法可以批量保存数据
from pymongo import MongoClient
import pandas as pd
import numpy as np
import math
def get_coll(database, collection, host="127.0.0.1"):
"""目标数据库"""
mongo_conn = MongoClient(host=host, port=27017)
mongo_db = mongo_conn.get_database(database)
coll = mongo_db.get_collection(collection)
return coll
def save_dataframe_to_mongo(dataframe, step=20):
coll = get_coll("test_db", "test_collection2")
for i in range(math.ceil(dataframe.shape[0] / step)):
dict_list = dataframe.iloc[step * i:step * (i + 1)].to_dict(orient="record")
coll.insert_many(dict_list)
if __name__ == '__main__':
df = pd.DataFrame(np.random.randn(900, 4))
df.columns = ['a', 'b', 'c', 'd']
save_dataframe_to_mongo(df)
3. Threading 多线程保存数据
Pymongo是多线程安全、多进程不安全的,因此可以肆无忌惮的使用多线程模式保存数据,示例代码如下:
from pymongo import MongoClient
import pandas as pd
import numpy as np
import math
import threading
def get_coll(database, collection, host="127.0.0.1"):
"""目标数据库"""
mongo_conn = MongoClient(host=host, port=27017)
mongo_db = mongo_conn.get_database(database)
coll = mongo_db.get_collection(collection)
return coll
def save_dataframe_to_mongo(dataframe, step=20):
coll = get_coll("test_db", "test_collection2")
thread_list = []
for i in range(math.ceil(dataframe.shape[0] / step)):
dict_list = dataframe.iloc[step * i:step * (i + 1)].to_dict(orient="record") # 待保存数据
# 多线程
thread = threading.Thread(target=coll.insert_many, args=(dict_list,))
thread.start()
thread_list.append(thread)
# 等待全部线程任务执行完成
for _thr in thread_list:
_thr.join()
if __name__ == '__main__':
df = pd.DataFrame(np.random.randn(900, 4))
df.columns = ['a', 'b', 'c', 'd']
save_dataframe_to_mongo(df)



