from openpyxl import load_workbook from os import scandir from os.path import exists from sys import argv from pandas import read_csv from re import match from threading import Thread from time import sleep class GetThreadResult(Thread): def __init__(self, func, args=()): super(GetThreadResult, self).__init__() self.func = func self.args = args self.result = 0 def run(self): sleep(1) self.result = self.func(*self.args) def get_result(self): Thread.join(self) # 等待线程执行完毕 try: return self.result except Exception as Err: return None def w2t_local(msg, wait, w2t): while True: global stop if stop == 0 and wait != 0: sleep(1) w2t(msg, wait) else: break def traversal_files(path, w2t): # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 # 参数:路径 # 返回值:路径下的文件夹列表 路径下的文件列表 if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1) else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def initialization(path, sub, w2t): _, data_files = traversal_files(path, w2t) count = 0 for data_file in data_files: filename = data_file.split('\\')[-1] if sub != 'cycle': if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)): msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾(x取值1-7),请检查后重新运行。" w2t(msg, 0, 2) else: if filename.endswith('.xlsx'): count += 1 elif not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)): msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾(x取值1-7),请检查后重新运行。" w2t(msg, 0, 3) if sub == 'cycle' and count != 1: w2t("未找到电机电流数据处理excel表格,确认后重新运行!", 0, 4) return data_files def current_max(data_files, rcs, trqh, w2t): current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0} for data_file in data_files: if data_file.endswith('.data'): df = read_csv(data_file, sep='\t') elif data_file.endswith('.csv'): df = read_csv(data_file, sep=',', encoding='gbk', header=8) axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) rca = rcs[axis-1] col = df.columns.values[trqh-1] c_max = df[col].max() scale = 1 if data_file.endswith('.csv') else 1000 _ = abs(c_max/scale*rca) current[axis] = _ w2t(f"{data_file}: {_:.4f}") w2t("【MAX】数据处理完毕......") return current def current_avg(data_files, rcs, trqh, w2t): current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0} for data_file in data_files: if data_file.endswith('.data'): df = read_csv(data_file, sep='\t') elif data_file.endswith('.csv'): df = read_csv(data_file, sep=',', encoding='gbk', header=8) axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) rca = rcs[axis-1] col = df.columns.values[trqh - 1] c_std = df[col].std() c_avg = df[col].mean() scale = 1 if data_file.endswith('.csv') else 1000 _ = (abs(c_avg)+c_std)/scale*rca current[axis] = _ w2t(f"{data_file}: {_:.4f}") w2t("【AVG】数据处理完毕......") return current def current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t): result = None hold = [] single = [] for data_file in data_files: filename = data_file.split('\\')[-1] if data_file.endswith('.xlsx'): result = data_file elif match('j[1-7]_hold_.*\\.data', filename) or match('j[1-7]_hold_.*\\.csv', filename): hold.append(data_file) else: single.append(data_file) w2t(f"正在打开文件 {result},需要 10s 左右", 1) global stop stop = 0 t_excel = GetThreadResult(load_workbook, args=(result, )) t_wait = Thread(target=w2t_local, args=('.', 1, w2t)) t_excel.start() t_wait.start() t_excel.join() wb = t_excel.get_result() stop = 1 sleep(1.1) w2t('') if hold != []: avg = current_avg(hold, rcs, trqh, w2t) for axis, cur_value in avg.items(): try: shtname = f"J{axis}" wb[shtname]["J4"].value = float(cur_value) except: pass if dur == 0: p_single(wb, single, vel, trq, w2t) else: p_scenario() w2t(f"正在保存文件 {result},需要 10s 左右", 1) stop = 0 t_excel = Thread(target=wb.save, args=(result, )) t_wait = Thread(target=w2t_local, args=('.', 1, w2t)) t_excel.start() t_wait.start() t_excel.join() stop = 1 sleep(1.1) w2t('\n') w2t("----------------------------------------------------------") w2t("全部处理完毕") def p_single(wb, single, vel, trq, w2t): # 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑 # 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑 # 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置 for data_file in single: axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) shtname = f"J{axis}" ws = wb[shtname] if data_file.endswith('.data'): df = read_csv(data_file, sep='\t') elif data_file.endswith('.csv'): df = read_csv(data_file, sep=',', encoding='gbk', header=8) # 过滤尾部无效数据 _row_e = df.index[-1] _row_s = _row_e - 200 while _row_e > 200: speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean() if speed_avg > 2: _row_e -= 50 _row_s -= 50 continue else: break else: w2t("数据有误,需要检查,无法找到第一个有效起始点...", 0, 1) # 找到第一个起始点 row_end,继续找到有数据的部分 row_end = _row_e - 100 _row_e -= 200 _row_s -= 200 while _row_e > 200: speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean() if speed_avg < 2: _row_e -= 50 _row_s -= 50 continue else: break else: w2t("数据有误,需要检查,无法找到第二个有效起始点...", 0, 2) # 目前已经有一点的速度值了,继续往前搜寻下一个速度为零的点 _row_e -= 200 _row_s -= 200 while _row_e > 200: speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean() if speed_avg > 2: _row_e -= 50 _row_s -= 50 continue else: break else: w2t("数据有误,需要检查,无法找到第三个有效起始点...", 0, 3) row_start = _row_s + 180 data = [] for row in range(row_start, row_end): data.append(df.iloc[row, vel-1]) data.append(df.iloc[row, trq-1]) i = 0 for row in ws.iter_rows(min_row=2, min_col=2, max_row=15000, max_col=3): for cell in row: try: cell.value = data[i] i += 1 except: cell.value = None def p_scenario(): pass # ======================================= def main(path, sub, rcs, vel, trq, trqh, dur, w2t): data_files = initialization(path, sub, w2t) if sub == 'max': current_max(data_files, rcs, trqh, w2t) elif sub == 'avg': current_avg(data_files, rcs, trqh, w2t) elif sub == 'cycle': current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t) else: pass if __name__ == '__main__': stop = 0 main(*argv[1:])