import pandas import csv import openpyxl import chardet from common import clibs def find_point(bof, step, margin, threshold, pos, data_file, flag, df, row, w2t): # bof: backward or forward # pos: used for debug # flag: greater than or lower than row_target = None row_origin = len(df) - margin + 1 if flag == "gt": while 0 < row < row_origin: value = float(df.iloc[row, 2]) if value > threshold: row = row - step if bof == "backward" else row + step continue else: row_target = row - step if bof == "backward" else row + step break else: if bof == "backward": clibs.insert_logdb("ERROR", "wavelogger", f"find_point-gt: [{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...") w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...\n", "red", "DataError") elif bof == "forward": row_target = row + margin # to end while loop in function `single_file_proc` elif flag == "lt": while 0 < row < row_origin: value = float(df.iloc[row, 2]) if value < threshold: row = row - step if bof == "backward" else row + step continue else: row_target = row - step if bof == "backward" else row + step break else: if bof == "backward": clibs.insert_logdb("ERROR", "wavelogger", f"find_point-lt: [{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...") w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...\n", "red", "DataError") elif bof == "forward": row_target = row + margin # to end while loop in function `single_file_proc` return row_target def get_cycle_info(data_file, step, margin, threshold, w2t): # end -> middle: low # middle -> start: high # 1. 从最后读取数据,无论是大于1还是小于1,都舍弃,找到相反的值的起始点 # 2. 从起始点,继续往前寻找,找到与之数值相反的中间点 # 3. 从中间点,继续往前寻找,找到与之数值相反的结束点,至此,得到了高低数值的时间区间以及一轮的周期时间 with open(data_file, "rb") as f: raw_data = f.read(1000) result = chardet.detect(raw_data) encoding = result['encoding'] csv_reader = csv.reader(open(data_file, encoding=encoding)) begin = int(next(csv_reader)[1]) df = pandas.read_csv(data_file, sep=",", encoding=encoding, skip_blank_lines=False, header=begin - 1, on_bad_lines="skip") row = len(df) - margin if float(df.iloc[row, 2]) < threshold: row = find_point("backward", step, margin, threshold, "a1", data_file, "lt", df, row, w2t) _row = find_point("backward", step, margin, threshold, "a2", data_file, "gt", df, row, w2t) _row = find_point("backward", step, margin, threshold, "a3", data_file, "lt", df, _row, w2t) row_end = find_point("backward", step, margin, threshold, "a4", data_file, "gt", df, _row, w2t) row_middle = find_point("backward", step, margin, threshold, "a5", data_file, "lt", df, row_end, w2t) row_start = find_point("backward", step, margin, threshold, "a6", data_file, "gt", df, row_middle, w2t) # print(f"row_end = {row_end}") # print(f"row_middle = {row_middle}") # print(f"row_start = {row_start}") return row_end-row_middle, row_middle-row_start, row_end-row_start, df def initialization(path, w2t): _, data_files = clibs.traversal_files(path, w2t) for data_file in data_files: if not data_file.lower().endswith(".csv"): clibs.insert_logdb("ERROR", "wavelogger", f"init: {data_file} 文件后缀错误,只允许 .csv 文件,需要确认!") w2t(f"{data_file} 文件后缀错误,只允许 .csv 文件,需要确认!\n", "red", "FileTypeError") return data_files def preparation(data_file, step, margin, threshold, wb, w2t): shtname = data_file.split("/")[-1].split(".")[0] ws = wb.create_sheet(shtname) low, high, cycle, df = get_cycle_info(data_file, step, margin, threshold, w2t) return ws, df, low, high, cycle def single_file_proc(ws, data_file, step, threshold, margin, data_length, df, cycle, w2t): row, row_lt, row_gt, count, count_i, data = 1, 1, 1, 1, 1, {} row_max = len(df) - margin while row < row_max: if count not in data.keys(): data[count] = [] value = float(df.iloc[row, 2]) if value < threshold: row_lt = find_point("forward", step, margin, threshold, "c"+str(row), data_file, "lt", df, row, w2t) start = int(row_gt + (row_lt - row_gt - data_length) / 2) end = start + data_length value = df.iloc[start:end, 2].astype(float).mean() + 3 * df.iloc[start:end, 2].astype(float).std() if value > 1: msg = f"{data_file} 文件第 {count} 轮 第 {count_i} 个数据可能有问题,需人工手动确认,确认有问题可删除,无问题则保留\n" clibs.insert_logdb("WARNING", "wavelogger", msg) w2t(msg, "orange") data[count].append(value) count_i += 1 else: row_gt = find_point("forward", step, margin, threshold, "c"+str(row), data_file, "gt", df, row, w2t) if row_gt - row_lt > cycle * 2: count += 1 count_i = 1 row = max(row_gt, row_lt) for i in range(2, 10): ws.cell(row=1, column=i).value = f"第{i-1}次测试" ws.cell(row=i, column=1).value = f"第{i-1}次精度变化" for i in sorted(data.keys()): row, column = 2, i + 1 for value in data[i]: ws.cell(row=row, column=column).value = float(value) row += 1 def execution(data_files, w2t): wb = openpyxl.Workbook() step, margin, data_length, threshold = 5, 50, 50, 5 for data_file in data_files: ws, df, low, high, cycle = preparation(data_file, step, margin, threshold, wb, w2t) single_file_proc(ws, data_file, step, threshold, margin, data_length, df, cycle, w2t) wd = "/".join(data_files[0].split("/")[:-1]) filename = wd + "/result.xlsx" wb.save(filename) wb.close() w2t("----------------------------------------\n") w2t("所有文件均已处理完毕\n") def main(): path = clibs.data_dp["_path"] w2t = clibs.w2t data_files = initialization(path, w2t) execution(data_files, w2t) if __name__ == "__main__": main()