scripts/rokae/brake/brake.py
gitea b335f61c72 [modify] v0.0.3(2024/05/21)
1. just_open函数打开失败的信息中,添加文件名
2. 删除global变量,函数全部通过传参实现
3. configuration.xlsx配置文件增加AXIS常量,表示那个轴,取值为 j1/j2/j3/j4/j5/j6/j7
4. [bugfix] 增加get_threshold_step函数,用来获取在计算row_start时合适的阈值和步长,主要是解决了二轴最差工况下,最大速度是个尖端的问题:
    a. load100_speed100_reachxxx 二轴 threshold = 50 step = 20
    b. 其他 threshold = 50 step = 100
    如上是一个比较保守的设定,因为设定的step比较小,找到点之后要往后延200最好
5. 在find_row_start_dp函数中新增一个参数data_file,方便后续调试
2024-05-23 11:23:32 +08:00

351 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
from os import scandir, remove
from sys import exit
from openpyxl import load_workbook
from win32com.client import DispatchEx
from time import time, strftime, localtime, sleep
from threading import Thread
import pythoncom
from pandas import read_csv
def just_open(filename):
for i in range(3):
try:
pythoncom.CoInitialize()
xlapp = DispatchEx("Excel.Application")
xlapp.Visible = False
xlbook = xlapp.Workbooks.Open(filename)
xlapp.DisplayAlerts = False
xlbook.SaveAs(filename)
xlbook.Close()
xlapp.Quit()
except Exception as Err:
if xlbook is None:
xlbook.SaveAs(filename)
xlbook.close()
if xlapp is not None:
xlapp.Quit()
print(f"使用win32com打开【{filename}】文件,第 {i} 次操作失败,静默三秒钟,等待重新执行......")
sleep(3)
def traversal_files(path):
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def get_threshold_step(excel_file, AXIS):
conditions = sorted(excel_file.split('\\')[-2].split('_'))
# 只有负载和速度是100%时才会启用更敏感的step
flg = 1 if conditions[0][-3:] == '100' and conditions[2][-3:] == '100' else 0
if flg == 1 and AXIS == 'j2':
threshold = 50
step = 20
else:
threshold = 50
step = 100
return threshold, step
def find_row_start(excel_file, ws_data, conditions, AV, RR, AXIS):
ratio = float(conditions[1].removeprefix('speed'))/100
speed_max = AV * ratio * RR / 6
row_max = ws_data.max_row
row_start = row_max - 1000
threshold, step = get_threshold_step(excel_file, AXIS)
while row_start > 0:
row_end = row_start - step
if row_end < 2:
msg = f"可能是{excel_file.replace('xlsx', 'data')}, 这个文件数据采集有问题,也有可能是程序步长设定问题......" \
f"建议重新采集,或者先删除该文件夹,重新运行程序,先手动处理"
warn_pause_exit(msg, 1, 10)
_a = ws_data[f"A{row_start}"].value
_b = ws_data[f"A{row_end}"].value
if abs(_a-speed_max) < threshold and abs(_b-speed_max) < threshold and abs(_a-_b) < threshold:
row_start -= (step + 200)
break
else:
row_start -= step
else:
remove(excel_file)
msg = f"可能是{excel_file.replace('xlsx', 'data')},这个文件数据采集有问题,比如采集的时机不对,请检查......"
warn_pause_exit(msg, 1, 9)
return row_max, row_start
def find_result_sheet_name(conditions, count):
# 该函数比较简单功能是获取结果文件准确的sheet页名称
# 33%臂展_33%速度_正1
reach = conditions[0].removeprefix('reach')
speed = conditions[1].removeprefix('speed')
result_sheet_name = f"{reach}%臂展_{speed}%速度_正{count}"
return result_sheet_name
def copy_data_to_result(ws_data, ws_result, row_max, row_start):
# 结果文件数据清零
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=6000 - row_start + 2, max_col=2):
for cell in row:
cell.value = None
# 将合适的数据复制到结果文件
data = []
for row in ws_data.iter_rows(min_row=row_start, min_col=1, max_row=row_max, max_col=2):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=row_max - row_start + 2, max_col=2):
for cell in row:
cell.value = data[i]
i = i + 1
def copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file, RC, RR):
try:
del wb_data['dp']
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
except Exception as Err:
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
data = []
for row in ws_result.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_dp.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
cell.value = data[i]
i = i + 1
ws_dp.cell(row=5, column=7).value = RC
ws_dp.cell(row=6, column=7).value = RR
wb_data.save(excel_file)
just_open(excel_file) # 为了能读取到公式计算的数值,必须要用 win32com 打开关闭一次
wb_data = load_workbook(excel_file, data_only=True)
ws_dp = wb_data['dp']
return wb_data, ws_dp
def find_row_start_dp(data_file, ws_dp, row_max, row_start, conditions, AV):
ratio = float(conditions[1].removeprefix('speed'))/100
av_max = AV * ratio
row_max_dp = row_max - row_start + 1 + 1 # title row
row_start_dp = row_max_dp - 5
while row_start_dp > 6:
# 处理异常数据当从数据文件中拷贝的有效数据超过5000时会触发该代码块
if ws_dp.cell(row=row_start_dp, column=4).value is None:
row_start_dp -= 100
continue
_a = float(ws_dp.cell(row=row_start_dp, column=4).value)
_b = float(ws_dp.cell(row=row_start_dp - 1, column=4).value)
_c = float(ws_dp.cell(row=row_start_dp - 2, column=4).value)
_d = float(ws_dp.cell(row=row_start_dp - 3, column=4).value)
_e = float(ws_dp.cell(row=row_start_dp - 4, column=4).value)
avg = (_a + _b + _c + _d + _e) / 5
if abs(avg - av_max) < 1:
row_start_dp = row_start_dp + 10 - 5 # +10 是因为结果文件 C2 的值是 10-5是做了保守处理相当于再往前移动 5 个点位
break
else:
row_start_dp -= 5 # 保守一点,每次移动 5 个点位,如果想要加快程序运行,可适当调整更大一些,建议不超过 15
else:
msg = "数据有误,未找到平衡的点,请确认!"
warn_pause_exit(msg, 1, 1)
return row_start_dp
def single_file_process(data_file, wb_result, count, AV, RR, RC, AXIS):
excel_file = data_file.replace('data', 'xlsx')
sheet_name = data_file.split('\\')[-1].removesuffix('.data')
df = read_csv(data_file, sep='\t')
df.to_excel(excel_file, sheet_name=sheet_name, index=False)
conditions = sorted(data_file.split('\\')[-2].split('_')[1:])
# print(f"conditions = {conditions}")
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
wb_data = load_workbook(excel_file)
ws_data = wb_data[sheet_name]
row_max, row_start = find_row_start(excel_file, ws_data, conditions, AV, RR, AXIS)
copy_data_to_result(ws_data, ws_result, row_max, row_start)
wb_data, ws_dp = copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file, RC, RR)
row_start_dp = find_row_start_dp(data_file, ws_dp, row_max, row_start, conditions, AV)
ws_result["G2"] = int(row_start_dp)
wb_data.save(excel_file)
wb_data.close()
def data_process(result_file, raw_data_dirs, AV, RR, RC, AXIS):
prefix = result_file.split('\\')[-1].split('_')[0]
wb_result = load_workbook(result_file) # 打开和关闭结果文件夹十分耗时间
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 正在处理目录【{raw_data_dir}】中的数据......")
_, data_files = traversal_files(raw_data_dir)
threads = [Thread(target=single_file_process, args=(data_files[0], wb_result, 1, AV, RR, RC, AXIS)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, AV, RR, RC, AXIS)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, AV, RR, RC, AXIS))]
[t.start() for t in threads]
[t.join() for t in threads]
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 目录【{raw_data_dir}】中的数据已处理完成......")
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 结果文件【{result_file}】的数据已整理完成保存文件需要1-2min请耐心等待......")
wb_result.save(result_file)
wb_result.close()
def warn_pause_exit(msg, pause_num, exit_num):
print(msg + '\n')
for i in range(pause_num):
_ = input("Press ENTER to continue......\n")
exit(exit_num)
def check_files(raw_data_dirs, result_files):
if len(result_files) != 3:
msg = "结果文件数目错误,结果文件有且只有三个,请确认!"
for result_file in result_files:
print(result_file)
warn_pause_exit(msg, 1, 3)
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['load33', 'load66', 'load100']):
wd = result_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
msg = f"请关闭所有相关数据文件,并检查工作目录【{wd}】下,有且只允许有类似如下三个文件:\n" \
f"1. load33_自研_制动性能测试.xlsx\n" \
f"2. load66_自研_制动性能测试.xlsx\n" \
f"3. load100_自研_制动性能测试.xlsx"
warn_pause_exit(msg, 1, 8)
for raw_data_dir in raw_data_dirs:
components = raw_data_dir.split('\\')[-1].split('_')
sorted(components)
if components[0] not in ['load33', 'load66', 'load100'] or \
components[1] not in ['speed33', 'speed66', 'speed100'] or \
components[2] not in ['reach33', 'reach66', 'reach100']:
msg = f"报错信息:数据目录【{raw_data_dir}】命名不合规,请参考如下形式\n" \
f"命名规则:\n 1. loadAA_speedBB_reachCC\n 2. loadAA_reachBB_speedCC\n" \
f"规则解释AA/BB/CC 指的是负载/速度/臂展的比例\n" \
f"load66_speed100_reach3366% 负载100% 速度以及 33% 臂展情况下的测试结果文件夹"
warn_pause_exit(msg, 1, 7)
# 直接删掉 excel 文件
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith(".xlsx"):
remove(raw_data_file)
_, raw_data_files = traversal_files(raw_data_dir)
if len(raw_data_files) != 3:
msg = f"数据目录【{raw_data_dir}】下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 6)
for raw_data_file in raw_data_files:
if not raw_data_file.split('\\')[-1].endswith('.data'):
msg = f"数据文件【{raw_data_file}】后缀错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 5)
print("数据目录合规性检查结束,未发现问题......")
def delete_excel_files(raw_data_dirs):
for raw_data_dir in raw_data_dirs:
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith('.xlsx'):
remove(raw_data_file)
def initialization():
time_start = time() # 记录开始时间
try:
# read init configurations from config file
wb_conf = load_workbook('./configuration.xlsx', read_only=True)
ws_conf = wb_conf['conf']
data_dir = ws_conf.cell(row=2, column=2).value
AV = int(ws_conf.cell(row=3, column=2).value)
RR = int(ws_conf.cell(row=4, column=2).value)
RC = float(ws_conf.cell(row=5, column=2).value)
AXIS = ws_conf.cell(row=6, column=2).value
wb_conf.close()
raw_data_dirs, result_files = traversal_files(data_dir)
# print("#调试信息======================================")
# print(f"结果文件:{result_files}")
# print(f'数据目录:{raw_data_dirs}')
check_files(raw_data_dirs, result_files)
return raw_data_dirs, result_files, time_start, AV, RR, RC, AXIS
except Exception as Err:
msg = "无法在当前路径下找到【configuration.xlsx】文件请确认"
warn_pause_exit(msg, 1, 2)
def execution(args):
raw_data_dirs, result_files, time_start, AV, RR, RC, AXIS = args
prefix = []
for raw_data_dir in raw_data_dirs:
prefix.append(raw_data_dir.split('\\')[-1].split("_")[0])
try:
threads = []
for result_file in result_files:
if result_file.split('\\')[-1].split('_')[0] not in set(prefix):
continue
else:
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 正在整理结果文件【{result_file}】的数据......")
# data_process(result_file, raw_data_dirs, AV, RR, RC, AXIS)
threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, AV, RR, RC, AXIS)))
[t.start() for t in threads]
[t.join() for t in threads]
print("#---------------------------------------------------------")
print("全部处理完毕")
delete_excel_files(raw_data_dirs)
except Exception as Err:
print("程序运行错误,请检查配置文件是否准确设定,以及数据文件组织是否正确!")
delete_excel_files(raw_data_dirs) # 运行结束之后,删除中间临时文件
time_end = time() # 记录结束时间
time_total = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
msg = f"数据处理时间:{time_total//3600:02} h {time_total % 3600/60:05.2f} min"
warn_pause_exit(msg, 1, 0)
def main():
execution(initialization())
if __name__ == "__main__":
main()