from openpyxl import load_workbook from os import scandir from os.path import exists from sys import argv from pandas import read_csv def traversal_files(path, w2t): # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 # 参数:路径 # 返回值:路径下的文件夹列表 路径下的文件列表 if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1) else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def initialization(path, sub, w2t): _, data_files = traversal_files(path, w2t) count = 0 for data_file in data_files: if sub != 'cycle': if not (data_file.endswith('.data') or data_file.endswith('.csv')): msg = f"所有文件必须以 .data 结尾,请检查后重新运行。" w2t(msg, 0, 2) else: if data_file.endswith('.xlsx'): count += 1 elif not (data_file.endswith('.data') or data_file.endswith('.csv')): msg = f"所有文件必须以 .data 结尾,请检查后重新运行。" w2t(msg, 0, 3) if sub == 'cycle' and count != 1: w2t("未找到电机电流数据处理excel表格,确认后重新运行!", 0, 4) return data_files def current_max(data_files, rc, trq, w2t): current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0} for data_file in data_files: if data_file.endswith('.data'): df = read_csv(data_file, sep='\t') elif data_file.endswith('.csv'): df = read_csv(data_file, sep=',', encoding='gbk', header=8) axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) rca = rc[axis-1] col = df.columns.values[trq-1] c_max = df[col].max() scale = 1 if data_file.endswith('.csv') else 1000 _ = abs(c_max/scale*rca) current[axis] = _ w2t(f"{data_file}: {_:.4f}") w2t("【MAX】数据处理完毕......") return current def current_avg(data_files, rc, trq, w2t): current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0} for data_file in data_files: if data_file.endswith('.data'): df = read_csv(data_file, sep='\t') elif data_file.endswith('.csv'): df = read_csv(data_file, sep=',', encoding='gbk', header=8) axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) rca = rc[axis-1] col = df.columns.values[trq - 1] c_std = df[col].std() c_avg = df[col].mean() scale = 1 if data_file.endswith('.csv') else 1000 _ = (abs(c_avg)+c_std)/scale*rca current[axis] = _ w2t(f"{data_file}: {_:.4f}") w2t("【AVG】数据处理完毕......") return current def current_cycle(dur, data_files, rc, vel, trq, w2t): result = None hold = [] single = [] for data_file in data_files: if data_file.endswith('.xlsx'): result = data_file elif (data_file.endswith('.csv') or data_file.endswith('.data')) and data_file.startswith('hold', 3, 10): hold.append(data_file) else: single.append(data_file) if hold != []: avg = current_avg(hold, rc, trq, w2t) wb = load_workbook(result) for k, v in avg.items(): try: shtname = f"J{k}" wb[shtname]["J4"].value = v except: pass for data_file in single: axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) rcs = rc[axis-1] pass # ======================================= def main(path, sub, rc, vel, trq, dur, w2t): data_files = initialization(path, sub, w2t) if sub == 'max': current_max(data_files, rc, trq, w2t) elif sub == 'avg': current_avg(data_files, rc, trq, w2t) elif sub == 'cycle': current_cycle(dur, data_files, rc, vel, trq, w2t) else: pass if __name__ == '__main__': main(*argv[1:])