from threading import Thread
from queue import Queue
from pymongo import MongoClient, collection
from elasticsearch import Elasticsearch, client
from hashlib import md5 #哈希函数
import time
class Putdata(Thread):
def __init__(self,queue:Queue,config:dict):
super(Putdata, self).__init__()
self.queue = queue
self.config = config
def run(self):
while True:
try:
title,content = self.queue.get()
self.put_document(title,content)
finally:
self.queue.task_done()
@staticmethod
def get_hash(title):
return md5(title.encode('utf-8')).hexdigest()
def put_document(self,title,content):
doc = {
'title': title,
'content': content,
}
_id = self.get_hash(title)
self.config['es'].index(index = self.config['index'],document=doc,id=_id,ignore=[400,409,500])
def read_data(table: collection.Collection,queue: Queue):
article = table.find({},{'_id':0,'title':1,'content':1})
for item in article:
queue.put((item['title'],item['content']))
if __name__ =="__main__":
article_queue = Queue()
client = MongoClient()
db = client['yikaowang']
es = Elasticsearch(hosts='http://127.0.0.1:9200/')
put_config = {
'es':es,
'index':'wenzhang',
}
rd = Thread(target=read_data,args=(db['wenzhang'],article_queue))
rd.daemon = True
rd.start()
time.sleep(2)
for _ in range(50):
pd = Putdata(article_queue,put_config)
pd.daemon = True
pd.start()
article_queue.join()
print('成功入es')