转向从机型配置文件获取参数

This commit is contained in:
2025-01-14 14:14:33 +08:00
parent 137122947c
commit 0e67a2831c
25 changed files with 524877 additions and 203 deletions

View File

@ -1,5 +1,148 @@
import pandas
import csv
import openpyxl
from common import clibs
def main():
print("wavelogger")
def find_point(bof, step, margin, threshold, pos, data_file, flag, df, row, w2t):
# bof: backward or forward
# pos: used for debug
# flag: greater than or lower than
row_target = None
row_origin = df.index[-1] - margin + 1
if flag == "gt":
while 0 < row < row_origin:
value = float(df.iloc[row, 2])
if value > threshold:
row = row - step if bof == "backward" else row + step
continue
else:
row_target = row - step if bof == "backward" else row + step
break
else:
if bof == "backward":
clibs.insert_logdb("ERROR", "wavelogger", f"find_point-gt: [{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...")
w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", "red", "DataError")
elif bof == "forward":
row_target = row + margin # to end while loop in function `single_file_proc`
elif flag == "lt":
while 0 < row < row_origin:
value = float(df.iloc[row, 2])
if value < threshold:
row = row - step if bof == "backward" else row + step
continue
else:
row_target = row - step if bof == "backward" else row + step
break
else:
if bof == "backward":
clibs.insert_logdb("ERROR", "wavelogger", f"find_point-lt: [{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...")
w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", "red", "DataError")
elif bof == "forward":
row_target = row + margin # to end while loop in function `single_file_proc`
return row_target
def get_cycle_info(data_file, step, margin, threshold, w2t):
# end -> middle: low
# middle -> start: high
# 1. 从最后读取数据无论是大于1还是小于1都舍弃找到相反的值的起始点
# 2. 从起始点,继续往前寻找,找到与之数值相反的中间点
# 3. 从中间点,继续往前寻找,找到与之数值相反的结束点,至此,得到了高低数值的时间区间以及一轮的周期时间
csv_reader = csv.reader(open(data_file))
begin = int(next(csv_reader)[1])
df = pandas.read_csv(data_file, sep=",", encoding="gbk", skip_blank_lines=False, header=begin - 1, on_bad_lines="skip")
row = df.index[-1] - margin
if float(df.iloc[row, 2]) < threshold:
row = find_point("backward", step, margin, threshold, "a1", data_file, "lt", df, row, w2t)
_row = find_point("backward", step, margin, threshold, "a2", data_file, "gt", df, row, w2t)
_row = find_point("backward", step, margin, threshold, "a3", data_file, "lt", df, _row, w2t)
row_end = find_point("backward", step, margin, threshold, "a4", data_file, "gt", df, _row, w2t)
row_middle = find_point("backward", step, margin, threshold, "a5", data_file, "lt", df, row_end, w2t)
row_start = find_point("backward", step, margin, threshold, "a6", data_file, "gt", df, row_middle, w2t)
# print(f"row_end = {row_end}")
# print(f"row_middle = {row_middle}")
# print(f"row_start = {row_start}")
return row_end-row_middle, row_middle-row_start, row_end-row_start, df
def initialization(path, w2t):
_, data_files = clibs.traversal_files(path, w2t)
for data_file in data_files:
if not data_file.lower().endswith(".csv"):
clibs.insert_logdb("ERROR", "wavelogger", f"init: {data_file} 文件后缀错误,只允许 .csv 文件,需要确认!")
w2t(f"{data_file} 文件后缀错误,只允许 .csv 文件,需要确认!", "red", "FileTypeError")
return data_files
def preparation(data_file, step, margin, threshold, wb, w2t):
shtname = data_file.split("/")[-1].split(".")[0]
ws = wb.create_sheet(shtname)
low, high, cycle, df = get_cycle_info(data_file, step, margin, threshold, w2t)
return ws, df, low, high, cycle
def single_file_proc(ws, data_file, step, threshold, margin, data_length, df, cycle, w2t):
row, row_lt, row_gt, count, count_i, data = 1, 1, 1, 1, 1, {}
row_max = df.index[-1] - margin
while row < row_max:
if count not in data.keys():
data[count] = []
value = float(df.iloc[row, 2])
if value < threshold:
row_lt = find_point("forward", step, margin, threshold, "c"+str(row), data_file, "lt", df, row, w2t)
start = int(row_gt + (row_lt - row_gt - data_length) / 2)
end = start + data_length
value = df.iloc[start:end, 2].mean() + 3 * df.iloc[start:end, 2].std()
if value > 1:
msg = f"{data_file} 文件第 {count} 轮 第 {count_i} 个数据可能有问题,需人工手动确认,确认有问题可删除,无问题则保留"
clibs.insert_logdb("WARNING", "wavelogger", msg)
w2t(msg, "orange")
data[count].append(value)
count_i += 1
else:
row_gt = find_point("forward", step, margin, threshold, "c"+str(row), data_file, "gt", df, row, w2t)
if row_gt - row_lt > cycle * 2:
count += 1
count_i = 1
row = max(row_gt, row_lt)
for i in range(2, 10):
ws.cell(row=1, column=i).value = f"{i-1}次测试"
ws.cell(row=i, column=1).value = f"{i-1}次精度变化"
for i in sorted(data.keys()):
row, column = 2, i + 1
for value in data[i]:
ws.cell(row=row, column=column).value = float(value)
row += 1
def execution(data_files, w2t):
wb = openpyxl.Workbook()
step, margin, data_length, threshold = 5, 50, 50, 5
for data_file in data_files:
ws, df, low, high, cycle = preparation(data_file, step, margin, threshold, wb, w2t)
single_file_proc(ws, data_file, step, threshold, margin, data_length, df, cycle, w2t)
wd = "/".join(data_files[0].split("/")[:-1])
filename = wd + "/result.xlsx"
wb.save(filename)
wb.close()
w2t("----------------------------------------\n")
w2t("所有文件均已处理完毕\n")
def main():
path = clibs.data_dp["_path"]
w2t = clibs.w2t
data_files = initialization(path, w2t)
execution(data_files, w2t)
if __name__ == "__main__":
main()