新建如下测试excel表:
一:目标匹配股票公司.xlsx,一个sheet,一张表 二:招商.xlsx sheet0,有额外的无效列sheet1 两张表, 表的位置从头开始,中间有间隔
sheet2
sheet3 三张表,两表中间间隔不同,表从头开始 sheet4 三张表, 表的位置不是从头开始 三: 中金.xlsx 四:汇总表.xlsx,合并效果如下五:匹配结果.xlsx,匹配结果如下: 六:代码如下
# enconding = 'utf-8'
from pathlib import Path
import pandas as pd
import numpy as np
import os,re,time,sys
class ExeclProc(object):
def __init__(self, target_excel, merge_excel, match_result_excel):
self.cur_path = Path(os.path.dirname(__file__)) #获取当前执行文件的路径
self.merge_excel = os.path.join(self.cur_path, merge_excel) #合并总表的文件相对路径
self.workbook = pd.ExcelWriter(self.merge_excel) #新建总表
self.excels = self.cur_path.glob('*.xlsx*') # 获取文件夹下所有工作簿的文件路径
self.target_excel = os.path.join(self.cur_path, target_excel)
self.match_result = os.path.join(self.cur_path, match_result_excel)
self.save_wb = pd.ExcelWriter(self.match_result)
def exclude_new_create_execl(self, file_name):
exclude_new_create_file = []
exclude_new_create_file.append(os.path.basename(self.merge_excel))
exclude_new_create_file.append(os.path.basename(self.target_excel))
exclude_new_create_file.append(os.path.basename(self.match_result))
if file_name in exclude_new_create_file:
return True
return False
def is_single_sheet_multi_table(self,cloums):
for column in cloums:
if re.findall(r'Unnamed: d', column):#通过识别表头中有Unnamed判断有多个表
return True
return False
def save_single_sheet_multi_table_column_start_end_index(self,file, sheet_name, index, start, end):
table_name = re.sub('.xls[x]*', "-",file) + sheet_name + '-{}'
if start != end:
self.table_column_start_end[table_name.format(index)] = (start, end)
return True
return False
def calc_single_sheet_multi_table_start_end_idx(self,file,sheet_name,colums):
self.table_column_start_end = {}
column_start_idx = 0
column_end_idx = 0
table_head_serial_num = 0
for column_idx,column in enumerate(colums):
if re.findall(r'Unnamed: d', column):
if column_idx + 1 < len(colums) and re.findall(r'Unnamed: d', colums[column_idx + 1]):
self.save_single_sheet_multi_table_column_start_end_index(file, sheet_name, table_head_serial_num, column_start_idx, column_end_idx)
else:
if self.save_single_sheet_multi_table_column_start_end_index(file, sheet_name, table_head_serial_num, column_start_idx, column_end_idx):
table_head_serial_num = table_head_serial_num + 1
column_start_idx = column_idx + 1
else:
column_end_idx = column_idx + 1
if column_idx + 1 == len(colums):
column_end_idx = column_idx + 1
self.save_single_sheet_multi_table_column_start_end_index(file, sheet_name, table_head_serial_num, column_start_idx, column_end_idx)
# print(self.table_column_start_end)
def merge_multi_table_to_multi_sheet(self,file_path,sheet_name):
df = pd.read_excel(file_path, sheet_name=sheet_name)
for key in self.table_column_start_end.keys():
data = {}
start,end = self.table_column_start_end[key]
for s in range(start,end):
colunmn_name = re.sub('.d', '' ,df.columns.values[s]) #将数量.1中".数字"去掉
data[colunmn_name] = list(df.values[:,s])
sheet = pd.DataFrame(data)
sheet.to_excel(self.workbook,sheet_name = key,index=False)
def merge_excel_proc(self):
for file in self.excels:
if self.exclude_new_create_execl(file.name):
#print("跳过{}".format(file.name))
continue
file_path = os.path.join(self.cur_path, file.name)
sheets = pd.ExcelFile(file_path).sheet_names
for sheet_name in sheets:
df = pd.read_excel(file_path, sheet_name=sheet_name)
if self.is_single_sheet_multi_table(df.columns.values):
self.calc_single_sheet_multi_table_start_end_idx(file.name, sheet_name, df.columns.values)
self.merge_multi_table_to_multi_sheet(file_path, sheet_name)
else:
df.to_excel(self.workbook,sheet_name=sheet_name, index=False)
self.workbook.save()
self.workbook.close()
def read_flag_excel_info(self):
sheets = pd.ExcelFile(self.target_excel).sheet_names
self.company = []
for index,sheet_name in enumerate(sheets):
df = pd.read_excel(self.target_excel, sheet_name=sheet_name)
for columns in df.columns.values:
if not self.is_match_security_name(columns): continue
self.company.extend(df[columns]) #保存目标公司名称
break
def is_match_security_name(self, security_name):
security_name_list = ['证券名称','证券简称', '股票','股票公司']
if security_name in security_name_list:return True
return False
def save_match_company_info(self):
merge_sheets = pd.ExcelFile(self.merge_excel).sheet_names
self.match_company = []
for sheet_name in merge_sheets:
df = pd.read_excel(self.merge_excel, sheet_name=sheet_name)
is_find_match_company = False
match_company_info = []
for columns in df.columns.values:
if not self.is_match_security_name(columns): continue
for company in self.company:
for rowIdx, company_name in enumerate(df[columns]):
if company == company_name:
is_find_match_company = True
match_company_info.extend([df.values[rowIdx]])
break
if is_find_match_company:
self.match_company.extend([df.columns.values])
self.match_company.extend(match_company_info)
self.match_company.extend([''])
if len(self.match_company) > 0:
data=pd.DataFrame(self.match_company)
data.to_excel(self.save_wb, sheet_name="匹配结果",index=False, header=None)
self.save_wb.save()
self.save_wb.close()
if __name__ == '__main__':
# ExeclProc 第一参数:表示目标
# 第二个参数:新建总表名字,用于保存所有表格中sheet合并结果
# 第三个参数:新建匹配结果表,用于保存匹配目标股票公司信息
print('n=========================匹配开始============================n')
match_result = '匹配结果.xlsx'
excel = ExeclProc('目标匹配股票公司.xlsx', '汇总表.xlsx', match_result)
excel.read_flag_excel_info()
excel.merge_excel_proc()
excel.save_match_company_info()
print('=========================匹配完成============================n')
print('======================请到 [{}] 查看匹配结果====================='.format(match_result))
time.sleep(3)
sys.exit(0)



