scripts/rokae/brake/brake.py
gitea c61686065e [modify] v0.0.2(2024/05/20)
1. 功能模块化,为后面其他功能奠定一个基本的框架
2. 使用了多线程提高效率
3. 优化了准备工作中的细节
4. 运行初始化时自动删除 raw_data_dir 中的 .xlsx 文件
5. 优化了输出格式
6. 使用 pyinstaller 库进行代码冻结并调试成功
2024-05-23 11:07:35 +08:00

345 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
from os import scandir, remove
from sys import exit
from openpyxl import load_workbook
from win32com.client import DispatchEx
from time import time, strftime, localtime, sleep
from threading import Thread
import pythoncom
import pandas as pd
def just_open(filename):
for i in range(3):
try:
pythoncom.CoInitialize()
xlapp = DispatchEx("Excel.Application")
xlapp.Visible = False
xlbook = xlapp.Workbooks.Open(filename)
xlapp.DisplayAlerts = False
xlbook.SaveAs(filename)
xlbook.Close()
xlapp.Quit()
except Exception as Err:
if xlbook is None:
xlbook.SaveAs(filename)
xlbook.close()
if xlapp is not None:
xlapp.Quit()
print(f"just open 第 {i} 次操作失败,静默三秒钟,重新执行......")
sleep(3)
def traversal_files(path):
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def find_row_start(excel_file, ws_data, conditions, AV, RR):
ratio = float(conditions[1].removeprefix('speed'))/100
speed_max = AV * ratio * RR / 6
row_max = ws_data.max_row
row_start = row_max - 1000
while row_start > 0:
_a = ws_data[f"A{row_start}"].value
_b = ws_data[f"A{row_start - 200}"].value
if abs(_a-speed_max) < 50 and abs(_b-speed_max) < 50 and abs(_a - _b) < 70:
row_start -= 200
break
else:
row_start -= 200
else:
remove(excel_file)
msg = f"{excel_file.replace('xlsx', 'data')}, 这个文件数据有问题,请检查......"
warn_pause_exit(msg, 1, 9)
return row_max, row_start
def find_result_sheet_name(conditions, count):
# 该函数比较简单功能是获取结果文件准确的sheet页名称
# 33%臂展_33%速度_正1
reach = conditions[0].removeprefix('reach')
speed = conditions[1].removeprefix('speed')
result_sheet_name = f"{reach}%臂展_{speed}%速度_正{count}"
return result_sheet_name
def copy_data_to_result(ws_data, ws_result, row_max, row_start):
# 结果文件数据清零
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=6000 - row_start + 2, max_col=2):
for cell in row:
cell.value = None
# 将合适的数据复制到结果文件
data = []
for row in ws_data.iter_rows(min_row=row_start, min_col=1, max_row=row_max, max_col=2):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=row_max - row_start + 2, max_col=2):
for cell in row:
cell.value = data[i]
i = i + 1
def copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file, RC, RR):
try:
del wb_data['dp']
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
except Exception as Err:
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
data = []
for row in ws_result.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_dp.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
cell.value = data[i]
i = i + 1
ws_dp.cell(row=5, column=7).value = RC
ws_dp.cell(row=6, column=7).value = RR
wb_data.save(excel_file)
just_open(excel_file) # 为了能读取到公式计算的数值,必须要用 win32com 打开关闭一次
wb_data = load_workbook(excel_file, data_only=True)
ws_dp = wb_data['dp']
return wb_data, ws_dp
def find_row_start_dp(ws_dp, row_max, row_start, conditions, AV):
ratio = float(conditions[1].removeprefix('speed'))/100
av_max = AV * ratio
row_max_dp = row_max - row_start + 1 + 1 # title row
row_start_dp = row_max_dp - 5
while row_start_dp > 1:
# 处理异常数据当从数据文件中拷贝的有效数据超过5000时会触发该代码块
if ws_dp.cell(row=row_start_dp, column=4).value is None:
row_start_dp -= 100
continue
_a = float(ws_dp.cell(row=row_start_dp, column=4).value)
_b = float(ws_dp.cell(row=row_start_dp - 1, column=4).value)
_c = float(ws_dp.cell(row=row_start_dp - 2, column=4).value)
_d = float(ws_dp.cell(row=row_start_dp - 3, column=4).value)
_e = float(ws_dp.cell(row=row_start_dp - 4, column=4).value)
avg = (_a + _b + _c + _d + _e) / 5
if abs(avg - av_max) < 1:
row_start_dp = row_start_dp + 10 - 5 # +10 是因为结果文件 C2 的值是 10-5是做了保守处理相当于再往前移动 5 个点位
break
else:
row_start_dp -= 5 # 保守一点,每次移动 5 个点位,如果想要加快程序运行,可适当调整更大一些,建议不超过 15
else:
msg = "数据有误,未找到平衡的点,请确认!"
warn_pause_exit(msg, 1, 1)
return row_start_dp
def single_file_process(data_file, wb_result, count, AV, RR, RC):
excel_file = data_file.replace('data', 'xlsx')
sheet_name = data_file.split('\\')[-1].removesuffix('.data')
df = pd.read_csv(data_file, sep='\t')
df.to_excel(excel_file, sheet_name=sheet_name, index=False)
conditions = sorted(data_file.split('\\')[-2].split('_')[1:])
# print(f"conditions = {conditions}")
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
wb_data = load_workbook(excel_file)
ws_data = wb_data[sheet_name]
row_max, row_start = find_row_start(excel_file, ws_data, conditions, AV, RR)
copy_data_to_result(ws_data, ws_result, row_max, row_start)
wb_data, ws_dp = copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file, RC, RR)
row_start_dp = find_row_start_dp(ws_dp, row_max, row_start, conditions, AV)
ws_result["G2"] = int(row_start_dp)
wb_data.save(excel_file)
wb_data.close()
def data_process(result_file, raw_data_dirs, AV, RR, RC):
prefix = result_file.split('\\')[-1].split('_')[0]
wb_result = load_workbook(result_file) # 打开和关闭结果文件夹十分耗时间
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 正在处理目录【{raw_data_dir}】中的数据......")
_, data_files = traversal_files(raw_data_dir)
threads = [Thread(target=single_file_process, args=(data_files[0], wb_result, 1, AV, RR, RC)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, AV, RR, RC)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, AV, RR, RC))]
[t.start() for t in threads]
[t.join() for t in threads]
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 目录【{raw_data_dir}】中的数据已处理完成......")
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 结果文件【{result_file}】的数据已整理完成保存文件需要1-2min请耐心等待......")
wb_result.save(result_file)
wb_result.close()
def warn_pause_exit(msg, pause_num, exit_num):
print(msg + '\n')
for i in range(pause_num):
_ = input("Press ENTER to continue......")
exit(exit_num)
def check_files(raw_data_dirs, result_files):
if len(result_files) != 3:
msg = "结果文件数目错误,结果文件有且只有三个,请确认!"
for result_file in result_files:
print(result_file)
warn_pause_exit(msg, 1, 3)
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['load33', 'load66', 'load100']):
wd = result_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
msg = f"请关闭所有相关数据文件,并检查工作目录【{wd}】下,有且只允许有类似如下三个文件:\n" \
f"1. load33_自研_制动性能测试.xlsx\n" \
f"2. load66_自研_制动性能测试.xlsx\n" \
f"3. load100_自研_制动性能测试.xlsx"
warn_pause_exit(msg, 1, 8)
for raw_data_dir in raw_data_dirs:
components = raw_data_dir.split('\\')[-1].split('_')
sorted(components)
if components[0] not in ['load33', 'load66', 'load100'] or \
components[1] not in ['speed33', 'speed66', 'speed100'] or \
components[2] not in ['reach33', 'reach66', 'reach100']:
msg = f"报错信息:数据目录【{raw_data_dir}】命名不合规,请参考如下形式\n" \
f"命名规则:\n 1. loadAA_speedBB_reachCC\n 2. loadAA_reachBB_speedCC\n" \
f"规则解释AA/BB/CC 指的是负载/速度/臂展的比例\n" \
f"load66_speed100_reach3366% 负载100% 速度以及 33% 臂展情况下的测试结果文件夹"
warn_pause_exit(msg, 1, 7)
# 直接删掉 excel 文件
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith(".xlsx"):
remove(raw_data_file)
_, raw_data_files = traversal_files(raw_data_dir)
if len(raw_data_files) != 3:
msg = f"数据目录【{raw_data_dir}】下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 6)
for raw_data_file in raw_data_files:
if not raw_data_file.split('\\')[-1].endswith('.data'):
msg = f"数据文件【{raw_data_file}】后缀错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
warn_pause_exit(msg, 1, 5)
print("数据目录合规性检查结束,未发现问题......")
def delete_excel_files():
global data_dir
raw_data_dirs, _ = traversal_files(data_dir)
for raw_data_dir in raw_data_dirs:
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith('.xlsx'):
remove(raw_data_file)
def initialization():
time_start = time() # 记录开始时间
try:
# read init configurations from config file
wb_conf = load_workbook('./configuration.xlsx')
ws_conf = wb_conf['conf']
global data_dir
global AV
global RR
global RC
data_dir = ws_conf.cell(row=2, column=2).value
AV = int(ws_conf.cell(row=3, column=2).value)
RR = int(ws_conf.cell(row=4, column=2).value)
RC = float(ws_conf.cell(row=5, column=2).value)
wb_conf.close()
raw_data_dirs, result_files = traversal_files(data_dir)
# print("#调试信息======================================")
# print(f"结果文件:{result_files}")
# print(f'数据目录:{raw_data_dirs}')
check_files(raw_data_dirs, result_files)
return raw_data_dirs, result_files, time_start, AV, RR, RC
except Exception as Err:
msg = "无法在当前路径下找到【configuration.xlsx】文件请确认"
warn_pause_exit(msg, 1, 2)
def execution(args):
raw_data_dirs, result_files, time_start, AV, RR, RC = args
prefix = []
for raw_data_dir in raw_data_dirs:
prefix.append(raw_data_dir.split('\\')[-1].split("_")[0])
try:
threads = []
for result_file in result_files:
if result_file.split('\\')[-1].split('_')[0] not in set(prefix):
continue
else:
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
print(f"[{now}] 正在整理结果文件【{result_file}】的数据......")
# data_process(result_file, raw_data_dirs, AV, RR, RC)
threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, AV, RR, RC)))
[t.start() for t in threads]
[t.join() for t in threads]
print("#---------------------------------------------------------")
print("全部处理完毕")
delete_excel_files()
except Exception as Err:
print("程序运行错误,请检查配置文件是否准确设定,以及数据文件组织是否正确!")
delete_excel_files() # 运行结束之后,删除中间临时文件
time_end = time() # 记录结束时间
time_total = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
msg = f"数据处理时间:{time_total//3600:02} h {time_total % 3600/60:05.2f} min"
warn_pause_exit(msg, 1, 0)
def main():
execution(initialization())
# 定义初始参数,数据文件夹路径/最大角速度/减速比/额定电流
global data_dir # path for data
global AV # AV for Angular velocity
global RR # RR for Angular velocity
global RC # RC for Rated Current
if __name__ == "__main__":
main()