import openpyxl import os import sys import pandas def warn_pause_exit(msg, pause_num, exit_num): # 功能:打印告警信息,并推出程序 # 参数:告警信息,暂停的次数,退出的值 # 返回值:- print(msg + '\n') for i in range(pause_num): _ = input("Press ENTER to continue......\n") sys.exit(exit_num) def traversal_files(path): # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 # 参数:路径 # 返回值:路径下的文件夹列表 路径下的文件列表 if not os.path.exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' warn_pause_exit(msg, 1, 11) else: dirs = [] files = [] for item in os.scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def initialization(): try: # read init configurations from config file wb_conf = openpyxl.load_workbook('./configs.xlsx', read_only=True) ws_conf = wb_conf['current'] except Exception as Err: msg = "无法在当前路径下找到或打开【configs.xlsx】文件,请确认!" warn_pause_exit(msg, 1, 2) FUNC = ws_conf.cell(row=2, column=2).value DATA_DIR = ws_conf.cell(row=3, column=2).value AXIS = int(ws_conf.cell(row=4, column=2).value) RC = ws_conf.cell(row=5, column=2).value.split(',') COLUMN = int(ws_conf.cell(row=6, column=2).value) wb_conf.close() _, data_files = traversal_files(DATA_DIR) for data_file in data_files: if not data_file.endswith('.data'): msg = f"所有文件必须以 .data 结尾,请检查后重新运行。" warn_pause_exit(msg, 1, 1) return data_files, FUNC, RC, COLUMN, AXIS def current_max(*args): data_files, FUNC, RC, COLUMN, AXIS = args for data_file in data_files: df = pandas.read_csv(data_file, sep='\t') col = df.columns.values[COLUMN-1] c_max = df[col].max() print(f"{data_file}: {c_max/1000*RC[AXIS-1]:.3f}") msg = f"数据处理完毕......" warn_pause_exit(msg, 1, 0) def current_avg(*args): data_files, FUNC, RC, COLUMN, AXIS = args for data_file in data_files: df = pandas.read_csv(data_file, sep='\t') col = df.columns.values[COLUMN-1] c_std = df[col].std() c_avg = df[col].mean() axis = int(data_file.split('\\')[-1].split('_')[0].replace('j', '')) print(f"{data_file}: {(abs(c_avg)+c_std)/1000*float(RC[axis-1]):.3f}") msg = f"数据处理完毕......" warn_pause_exit(msg, 1, 0) def full_cycle(*args): pass def main(): args = initialization() if args[1] == 'max': current_max(*args) elif args[1] == 'avg': current_avg(*args) elif args[1] == 'cycle': full_cycle(*args) if __name__ == '__main__': main()