from pandas import read_csv from csv import reader from sys import argv from openpyxl import Workbook from logging import getLogger, INFO from commons import clibs logger = getLogger(__file__) logger.setLevel(INFO) def find_point(bof, step, pos, data_file, flag, df, row, w2t): # bof: backward or forward # pos: used for debug # flag: greater than or lower than if flag == 'gt': while 0 < row < df.index[-1]-100: _value = df.iloc[row, 2] if _value > 2: if bof == 'backward': row -= step elif bof == 'forward': row += step continue else: if bof == 'backward': row_target = row - step elif bof == 'forward': row_target = row + step break else: if bof == 'backward': w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", 0, 2, 'red') elif bof == 'forward': row_target = row + 100 elif flag == 'lt': while 0 < row < df.index[-1]-100: _value = df.iloc[row, 2] if _value < 2: if bof == 'backward': row -= step elif bof == 'forward': row += step continue else: if bof == 'backward': row_target = row - step elif bof == 'forward': row_target = row + step break else: if bof == 'backward': w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", 0, 3, 'red') elif bof == 'forward': row_target = row + 100 return row_target def get_cycle_info(data_file, df, row, step, w2t): # end -> middle: low # middle -> start: high # 1. 从最后读取数据,无论是大于1还是小于1,都舍弃,找到相反的值的起始点 # 2. 从起始点,继续往前寻找,找到与之数值相反的中间点 # 3. 从中间点,继续往前寻找,找到与之数值相反的结束点,至此,得到了高低数值的时间区间以及一轮的周期时间 if df.iloc[row, 2] < 2: row = find_point('backward', step, 'a1', data_file, 'lt', df, row, w2t) _row = find_point('backward', step, 'a2', data_file, 'gt', df, row, w2t) _row = find_point('backward', step, 'a3', data_file, 'lt', df, _row, w2t) row_end = find_point('backward', step, 'a4', data_file, 'gt', df, _row, w2t) row_middle = find_point('backward', step, 'a5', data_file, 'lt', df, row_end, w2t) row_start = find_point('backward', step, 'a6', data_file, 'gt', df, row_middle, w2t) return row_end-row_middle, row_middle-row_start, row_end-row_start def initialization(path, w2t): _, data_files = clibs.traversal_files(path, w2t) for data_file in data_files: if not data_file.lower().endswith('.csv'): w2t(f"{data_file} 文件后缀错误,只允许 .csv 文件,需要确认!", 0, 1, 'red') return data_files def preparation(data_file, wb, w2t): shtname = data_file.split('\\')[-1].split('.')[0] ws = wb.create_sheet(shtname) csv_reader = reader(open(data_file)) i = 0 begin = 70 for row in csv_reader: i += 1 if i == 1: begin = int(row[1]) break df = read_csv(data_file, sep=',', encoding='gbk', skip_blank_lines=False, header=begin - 1, on_bad_lines='warn') low, high, cycle = get_cycle_info(data_file, df, df.index[-1]-110, 5, w2t) return ws, df, low, high, cycle def single_file_proc(ws, data_file, df, low, high, cycle, w2t): _row = _row_lt = _row_gt = count = 1 _step = 5 _data = {} row_max = df.index[-1]-100 # print(data_file) while _row < row_max: if count not in _data.keys(): _data[count] = [] _value = df.iloc[_row, 2] if _value < 2: _row_lt = find_point('forward', _step, 'c'+str(_row), data_file, 'lt', df, _row, w2t) _start = int(_row_gt + (_row_lt - _row_gt - 50) / 2) _end = _start + 50 value = df.iloc[_start:_end, 2].mean() + 3 * df.iloc[_start:_end, 2].std() _data[count].append(value) else: _row_gt = find_point('forward', _step, 'c'+str(_row), data_file, 'gt', df, _row, w2t) if _row_gt - _row_lt > cycle * 2: count += 1 _row = max(_row_gt, _row_lt) for i in range(2, 10): ws.cell(row=1, column=i).value = f"第{i-1}次测试" ws.cell(row=i, column=1).value = f"第{i-1}次精度变化" # print(_data) for i in sorted(_data.keys()): _row = 2 _column = i + 1 for value in _data[i]: ws.cell(row=_row, column=_column).value = float(value) _row += 1 def execution(data_files, w2t): wb = Workbook() for data_file in data_files: ws, df, low, high, cycle = preparation(data_file, wb, w2t) # print(f"low = {low}") # print(f"high = {high}") # print(f"cycle = {cycle}") single_file_proc(ws, data_file, df, low, high, cycle, w2t) wd = data_files[0].split('\\') del wd[-1] wd = '\\'.join(wd) filename = wd + '\\result.xlsx' wb.save(filename) wb.close() w2t('----------------------------------------') w2t('所有文件均已处理完毕') def main(path, w2t): data_files = initialization(path, w2t) execution(data_files, w2t) if __name__ == '__main__': main(path=argv[1], w2t=argv[2])