背景:
数据中某字段A需要进行转换,批次拉取后进行行处理
为提高效率,将大批次分为10个小批次,分线程处理
read_df = hive_context.sql(hivesql)
allrows = read_df.collect()
#此处将大批次分为10个小批次,分线程处理
temp_list = list_of_groups(allrows, 10)
# step3 line handel
threads = []
for i in range(len(temp_list)):
th = threading.Thread(target=lineHandel, args=(temp_list[i],))
threads.append(th)
for i in range(len(temp_list)):
threads[i].start()
for i in range(len(temp_list)):
threads[i].join()
行处理逻辑省略。。。
问题出现点:
处理完成后的Dataframe我使用registerTempTable函数的方式写入Hive
分析原因:
临时表线程共享,会导致多个线程多次写入的操作
解决方法:
直接使用
DF.write.insertInto("database.table")



