scripts/rokae/aio/brake.py
2024-05-23 17:33:26 +08:00

326 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
import os
import sys
import openpyxl
import time
from threading import Thread
import pandas
def traversal_files(path):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not os.path.exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
warn_pause_exit(msg, 1, 11)
else:
dirs = []
files = []
for item in os.scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def get_threshold_step(excel_file, AXIS):
# 功能负载和速度100%且是j2的时候做特殊处理
# 参数新生成的excel轴号
# 返回值:速度差阈值,处理步长
conditions = sorted(excel_file.split('\\')[-2].split('_'))
# 只有负载和速度是100%时才会启用更敏感的step
flg = 1 if conditions[0][-3:] == '100' and conditions[2][-3:] == '100' else 0
if flg == 1 and AXIS == 'j2':
threshold = 30
step = 5
else:
threshold = 10
step = 5
return threshold, step
def find_row_start(excel_file, ws_data, conditions, AV, RR, AXIS):
# 功能:查找数据文件中有效数据的行号,也即最后一个速度下降的点位
# 参数:如上
# 返回值:速度下降点位,最后的数据点位
ratio = float(conditions[1].removeprefix('speed'))/100
speed_max = AV * ratio * RR / 6
row_max = row_start = ws_data.max_row
threshold, step = get_threshold_step(excel_file, AXIS)
while row_start > step+1:
speed = ws_data[f"A{row_start}"].value
if speed is None or int(speed) < 1:
row_start -= 50
continue
_ = []
for i in range(row_start, row_start-step+1, -1):
_.append(ws_data[f"A{i}"].value)
speed_avg = abs(sum(_))/len(_)
if abs(speed_avg-speed_max) < threshold:
row_start = row_start - 10
break
else:
row_start -= step
else:
os.remove(excel_file)
msg = f"可能是{excel_file.replace('xlsx', 'data')},这个文件数据采集有问题,比如采集的时机不对,也有可能是程序步长设定问题,请检查......"
warn_pause_exit(msg, 1, 9)
return row_max, row_start
def find_result_sheet_name(conditions, count):
# 功能获取结果文件准确的sheet页名称
# 参数:臂展和速度的列表
# 返回值结果文件对应的sheet name
# 33%臂展_33%速度_正1
reach = conditions[0].removeprefix('reach')
speed = conditions[1].removeprefix('speed')
result_sheet_name = f"{reach}%臂展_{speed}%速度_正{count}"
return result_sheet_name
def copy_data_to_result(ws_data, ws_result, row_max, row_start):
# 功能:将数据文件中有效数据拷贝至结果文件对应的 sheet
# 参数:如上
# 返回值:-
# 结果文件数据清零
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=2000, max_col=2):
for cell in row:
cell.value = None
# 将合适的数据复制到结果文件
row_max = row_start + 399 if row_max-row_start > 400 else row_max
data = []
for row in ws_data.iter_rows(min_row=row_start, min_col=1, max_row=row_max, max_col=2):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=row_max - row_start + 2, max_col=2):
for cell in row:
cell.value = data[i]
i = i + 1
def single_file_process(data_file, wb_result, count, AV, RR, RC, AXIS):
# 功能:完成单个数据文件的处理
# 参数:如上
# 返回值:-
excel_file = data_file.replace('.data', '.xlsx')
sheet_name = data_file.split('\\')[-1].removesuffix('.data')
df = pandas.read_csv(data_file, sep='\t')
df.to_excel(excel_file, sheet_name=sheet_name, index=False)
conditions = sorted(data_file.split('\\')[-2].split('_')[1:])
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
wb_data = openpyxl.load_workbook(excel_file)
ws_data = wb_data[sheet_name]
row_max, row_start = find_row_start(excel_file, ws_data, conditions, AV, RR, AXIS)
copy_data_to_result(ws_data, ws_result, row_max, row_start)
ws_result["C2"] = int(2)
ws_result["G2"] = int(10+4)
wb_data.save(excel_file)
wb_data.close()
def now_doing_msg(docs, flag):
# 功能:输出正在处理的文件或目录
# 参数文件或目录start 或 done 标识
# 返回值:-
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
file_type = 'file' if os.path.isfile(docs) else 'dir'
if flag == 'start' and file_type == 'dir':
print(f"[{now}] 正在处理目录【{docs}】中的数据......")
elif flag == 'start' and file_type == 'file':
print(f"[{now}] 正在处理文件【{docs}】中的数据......")
elif flag == 'done' and file_type == 'dir':
print(f"[{now}] 目录【{docs}】数据文件已处理完毕......")
elif flag == 'done' and file_type == 'file':
print(f"[{now}] 文件【{docs}】数据文件已处理完毕......")
def data_process(result_file, raw_data_dirs, AV, RR, RC, AXIS):
# 功能:完成一个结果文件的数据处理
# 参数:结果文件,数据目录,以及预读取的参数
# 返回值:-
prefix = result_file.split('\\')[-1].split('_')[0]
wb_result = openpyxl.load_workbook(result_file)
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now_doing_msg(raw_data_dir, 'start')
_, data_files = traversal_files(raw_data_dir)
# 数据文件串行处理模式---------------------------------
# count = 1
# for data_file in data_files:
# now_doing_msg(data_file, 'start')
# single_file_process(data_file, wb_result, count, AV, RR, RC, AXIS)
# count += 1
# now_doing_msg(data_file, 'done')
# ---------------------------------------------------
# 数据文件并行处理模式---------------------------------
threads = [Thread(target=single_file_process, args=(data_files[0], wb_result, 1, AV, RR, RC, AXIS)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, AV, RR, RC, AXIS)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, AV, RR, RC, AXIS))]
[t.start() for t in threads]
[t.join() for t in threads]
now_doing_msg(raw_data_dir, 'done')
# ---------------------------------------------------
now_doing_msg(result_file, 'done')
print(f"保存文件需要1-2min请耐心等待......")
wb_result.save(result_file)
wb_result.close()
def warn_pause_exit(msg, pause_num, exit_num):
# 功能:打印告警信息,并推出程序
# 参数:告警信息,暂停的次数,退出的值
# 返回值:-
print(msg + '\n')
for i in range(pause_num):
_ = input("Press ENTER to continue......\n")
sys.exit(exit_num)
def check_files(raw_data_dirs, result_files):
# 功能:检查数据文件以及结果文件的合规性
# 参数:数据文件夹,结果文件
# 返回值:-
if len(result_files) != 3:
msg = "结果文件数目错误,结果文件有且只有三个,请确认!"
for result_file in result_files:
print(result_file)
warn_pause_exit(msg, 1, 3)
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['load33', 'load66', 'load100']):
wd = result_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
msg = f"请关闭所有相关数据文件,并检查工作目录【{wd}】下,有且只允许有类似如下三个文件:\n" \
f"1. load33_自研_制动性能测试.xlsx\n" \
f"2. load66_自研_制动性能测试.xlsx\n" \
f"3. load100_自研_制动性能测试.xlsx"
warn_pause_exit(msg, 1, 8)
for raw_data_dir in raw_data_dirs:
components = raw_data_dir.split('\\')[-1].split('_')
sorted(components)
if components[0] not in ['load33', 'load66', 'load100'] or \
components[1] not in ['speed33', 'speed66', 'speed100'] or \
components[2] not in ['reach33', 'reach66', 'reach100']:
msg = f"报错信息:数据目录【{raw_data_dir}】命名不合规,请参考如下形式\n" \
f"命名规则:\n 1. loadAA_speedBB_reachCC\n 2. loadAA_reachBB_speedCC\n" \
f"规则解释AA/BB/CC 指的是负载/速度/臂展的比例\n" \
f"load66_speed100_reach3366% 负载100% 速度以及 33% 臂展情况下的测试结果文件夹"
warn_pause_exit(msg, 1, 7)
# 直接删掉 excel 文件
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith(".xlsx"):
os.remove(raw_data_file)
_, raw_data_files = traversal_files(raw_data_dir)
if len(raw_data_files) != 3:
msg = f"数据目录【{raw_data_dir}】下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 6)
for raw_data_file in raw_data_files:
if not raw_data_file.split('\\')[-1].endswith('.data'):
msg = f"数据文件【{raw_data_file}】后缀错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 5)
print("数据目录合规性检查结束,未发现问题......")
def delete_excel_files(raw_data_dirs):
# 功能:删除数据文件夹里的 .xlsx 文件
# 参数:数据文件夹
# 返回值:-
for raw_data_dir in raw_data_dirs:
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith('.xlsx'):
os.remove(raw_data_file)
def initialization():
# 功能:初始化,记录开始时间,读取预定义参数
# 参数:-
# 返回值:结果文件,数据文件夹,以及预定义参数
time_start = time.time()
try:
# read init configurations from config file
wb_conf = openpyxl.load_workbook('./configs.xlsx', read_only=True)
ws_conf = wb_conf['brake']
except Exception as Err:
msg = "无法在当前路径下找到或打开【configs.xlsx】文件请确认"
warn_pause_exit(msg, 1, 2)
DATA_DIR = ws_conf.cell(row=2, column=2).value
AXIS = int(ws_conf.cell(row=3, column=2).value)
AV = int(ws_conf.cell(row=4, column=2).value.split(',')[AXIS-1].strip())
RR = int(ws_conf.cell(row=5, column=2).value.split(',')[AXIS-1].strip())
RC = float(ws_conf.cell(row=6, column=2).value.split(',')[AXIS-1].strip())
wb_conf.close()
raw_data_dirs, result_files = traversal_files(DATA_DIR)
delete_excel_files(raw_data_dirs)
check_files(raw_data_dirs, result_files)
return raw_data_dirs, result_files, time_start, AV, RR, RC, AXIS
def execution(args):
# 功能:执行处理所有数据文件
# 参数initialization函数的返回值
# 返回值:-
raw_data_dirs, result_files, time_start, AV, RR, RC, AXIS = args
prefix = []
for raw_data_dir in raw_data_dirs:
prefix.append(raw_data_dir.split('\\')[-1].split("_")[0])
try:
# threads = []
for result_file in result_files:
if result_file.split('\\')[-1].split('_')[0] not in set(prefix):
continue
else:
now_doing_msg(result_file, 'start')
data_process(result_file, raw_data_dirs, AV, RR, RC, AXIS)
# threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, AV, RR, RC, AXIS)))
# [t.start() for t in threads]
# [t.join() for t in threads]
print("----------------------------------------------------------")
print("全部处理完毕")
delete_excel_files(raw_data_dirs)
except Exception as Err:
print("程序运行错误,请检查配置文件是否准确设定,以及数据文件组织是否正确!")
delete_excel_files(raw_data_dirs)
time_end = time.time()
time_total = time_end - time_start
msg = f"数据处理时间:{time_total // 3600:02} h {time_total % 3600 // 60:02} min {time_total % 60:02} s"
warn_pause_exit(msg, 1, 0)
def main():
execution(initialization())
if __name__ == "__main__":
main()