使用
parallel_bulkmethod,您可以传递一个字典列表或一个生成器,生成一个字典。在这里解释。一个
generator在python用于未在RAM中加载变量,但如果你要通过你的
elem前一个清单-
在字典
action中的列表
actions,它没有更多的意义,因为建立一个列表,你应该在内存中加载所有里面的元素。在您的情况下,您要传递的生成器不会产生dict符
action--但会生成操作列表-
actions。
因此,或者您的函数_gen_data返回一个列表,实际上是一个生成器的列表:
def _gen_data(self, index, doc_type, chunk_size): sql = """select * from tem_search_engine_1 where rownum <= 10000""" self.cursor.execute(sql) col_name_list = [col[0].lower() for col in self.cursor.description] col_name_len = len(col_name_list) actions = [] start = time.time() for row in self.cursor: source = {} tbl_id = "" for i in range(col_name_len): source.update({col_name_list[i]: str(row[i])}) if col_name_list[i] == "tbl_id": tbl_id = row[i] action = { "_index": index, "_type": doc_type, "_id": tbl_id, "_source": source } actions.append(action) return actions或者,您不创建
actions列表,而得出
action字典:
def _gen_data(self, index, doc_type, chunk_size): sql = """select * from tem_search_engine_1 where rownum <= 10000""" self.cursor.execute(sql) col_name_list = [col[0].lower() for col in self.cursor.description] col_name_len = len(col_name_list) start = time.time() for row in self.cursor: source = {} tbl_id = "" for i in range(col_name_len): source.update({col_name_list[i]: str(row[i])}) if col_name_list[i] == "tbl_id": tbl_id = row[i] yield { "_index": index, "_type": doc_type, "_id": tbl_id, "_source": source }


