scripts/rokae/brake/robot_brake.py

270 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
import os
import pandas as pd
import openpyxl
from win32com.client import DispatchEx
import time
def just_open(filename):
xlApp = DispatchEx("Excel.Application")
xlApp.Visible = False
xlBook = xlApp.Workbooks.Open(filename)
xlApp.DisplayAlerts = 0
xlBook.SaveAs(filename)
xlBook.Close()
def traversal_files(path):
dirs = []
files = []
for item in os.scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def find_row_start(excel_file, ws_data, conditions):
global AV
global RR
ratio = float(conditions[1].removeprefix('speed'))/100
speed_max = AV * ratio * RR / 6
row_max = ws_data.max_row
row_start = row_max - 1000
while row_start > 0:
_a = ws_data[f"A{row_start}"].value
_b = ws_data[f"A{row_start - 200}"].value
if abs(_a-speed_max) < 50 and abs(_b-speed_max) < 50 and abs(_a - _b) < 70:
row_start -= 200
break
else:
row_start -= 200
else:
print(f"{excel_file.replace('xlsx', 'data')}, 这个文件数据有问题,请检查......")
os.remove(excel_file)
exit(9)
return row_max, row_start
def find_result_sheet_name(conditions, count):
# 该函数比较简单功能是获取结果文件准确的sheet页名称
# 33%臂展_33%速度_正1
reach = conditions[0].removeprefix('reach')
speed = conditions[1].removeprefix('speed')
result_sheet_name = f"{reach}%臂展_{speed}%速度_正{count}"
return result_sheet_name
def copy_data_to_result(ws_data, ws_result, row_max, row_start):
# 结果文件数据清零
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=6000 - row_start + 2, max_col=2):
for cell in row:
cell.value = None
# 将合适的数据复制到结果文件
data = []
for row in ws_data.iter_rows(min_row=row_start, min_col=1, max_row=row_max, max_col=2):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=row_max - row_start + 2, max_col=2):
for cell in row:
cell.value = data[i]
i = i + 1
def copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file):
try:
del wb_data['dp']
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
except:
wb_data.create_sheet('dp')
ws_dp = wb_data['dp']
data = []
for row in ws_result.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
data.append(cell.value)
i = 0
for row in ws_dp.iter_rows(min_row=1, min_col=1, max_row=row_max-row_start+2, max_col=5):
for cell in row:
cell.value = data[i]
i = i + 1
global RC
global RR
ws_dp.cell(row=5, column=7).value = RC
ws_dp.cell(row=6, column=7).value = RR
wb_data.save(excel_file)
just_open(excel_file) # 为了能读取到公式计算的数值,必须要用 win32com 打开关闭一次
wb_data = openpyxl.load_workbook(excel_file, data_only=True)
ws_dp = wb_data['dp']
return wb_data, ws_dp
def find_row_start_dp(ws_dp, row_max, row_start, conditions):
global AV
ratio = float(conditions[1].removeprefix('speed'))/100
av_max = AV * ratio
row_max_dp = row_max - row_start + 1 + 1 # title row
row_start_dp = row_max_dp - 5
print(f"row_start_dp = {row_start_dp}")
while row_start_dp > 1:
# 处理异常数据当从数据文件中拷贝的有效数据超过5000时会触发该代码块
if ws_dp.cell(row=row_start_dp, column=4).value is None:
row_start_dp -= 100
continue
_a = float(ws_dp.cell(row=row_start_dp, column=4).value)
_b = float(ws_dp.cell(row=row_start_dp - 1, column=4).value)
_c = float(ws_dp.cell(row=row_start_dp - 2, column=4).value)
_d = float(ws_dp.cell(row=row_start_dp - 3, column=4).value)
_e = float(ws_dp.cell(row=row_start_dp - 4, column=4).value)
avg = (_a + _b + _c + _d + _e) / 5
if abs(avg - av_max) < 1:
row_start_dp = row_start_dp + 10 - 5 # +10 是因为结果文件 C2 的值是 10-5是做了保守处理相当于再往前移动 5 个点位
break
else:
row_start_dp -= 5 # 保守一点,每次移动 5 个点位,如果想要加快程序运行,可适当调整更大一些,建议不超过 15
else:
print("数据有误,请确认!")
print("未找到平衡的点")
exit(1)
return row_start_dp
def single_file_process(data_file, wb_result, count):
excel_file = data_file.replace('data', 'xlsx')
sheet_name = data_file.split('\\')[-1].removesuffix('.data')
df = pd.read_csv(data_file, sep='\t')
df.to_excel(excel_file, sheet_name=sheet_name, index=False)
conditions = sorted(data_file.split('\\')[-2].split('_')[1:])
print(f"conditions = {conditions}")
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
wb_data = openpyxl.load_workbook(excel_file)
ws_data = wb_data[sheet_name]
row_max, row_start = find_row_start(excel_file, ws_data, conditions)
copy_data_to_result(ws_data, ws_result, row_max, row_start)
wb_data, ws_dp = copy_data_to_excel_file(wb_data, ws_result, row_max, row_start, excel_file)
row_start_dp = find_row_start_dp(ws_dp, row_max, row_start, conditions)
ws_result["G2"] = int(row_start_dp)
wb_data.save(excel_file)
wb_data.close()
def data_process(result_file, raw_data_dirs):
prefix = result_file.split('\\')[-1].split('_')[0]
print(f"prefix = {prefix}")
print(f"raw_data_dirs = {raw_data_dirs}")
wb_result = openpyxl.load_workbook(result_file) # 打开和关闭结果文件夹十分耗时间
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
print(f"正在处理【{raw_data_dir}】中的数据......")
_, data_files = traversal_files(raw_data_dir)
count = 1 # 计数器,对应三次急停数据
for data_file in sorted(data_files):
print(f"正在处理【{data_file}】....")
print(f"count = {count}")
single_file_process(data_file, wb_result, count)
count += 1
wb_result.save(result_file)
wb_result.close()
def check_files(raw_data_dirs, result_files):
if len(result_files) != 3:
print("结果文件数目错误,请参考 readme.txt 中的规则。")
exit(3)
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['load33', 'load66', 'load100']):
wd = result_file.split('\\')
del wd[-1]
wd = '\\'.join(wd)
print(f"请关闭所有相关数据文件,并检查工作目录【{wd}】下,有且只允许有类似如下三个文件:")
print("1. load33_自研_制动性能测试.xlsx")
print("2. load66_自研_制动性能测试.xlsx")
print("3. load100_自研_制动性能测试.xlsx")
exit(8)
for raw_data_dir in raw_data_dirs:
prefix = raw_data_dir.split('\\')[-1].split('_')[0]
if prefix not in ['load33', 'load66', 'load100']:
print(f"报错信息:数据目录【{raw_data_dir}】不合规,请参考如下形式。")
print("命名规则:\n\t1. loadAA_speedBB_reachCC\n\t2. loadAA_reachBB_speedCC")
print("规则解释AA/BB/CC 指的是负载/速度/臂展的比例,比如 load66_speed100_reach33 意思是 66% 负载100% 速度以及 33% 臂展情况下的测试结果文件夹。")
exit(7)
_, raw_data_files = traversal_files(raw_data_dir)
if len(raw_data_files) != 3:
print(f"数据目录【{raw_data_dir}】下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件。")
exit(6)
for raw_data_file in raw_data_files:
if not raw_data_file.split('\\')[-1].endswith('.data'):
print(f"数据文件【{raw_data_file}】后缀错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件。")
exit(5)
print("数据目录合规性检查结束......")
def delete_excel_files():
global data_dir
raw_data_dirs, _ = traversal_files(data_dir)
for raw_data_dir in raw_data_dirs:
_, raw_data_files = traversal_files(raw_data_dir)
for raw_data_file in raw_data_files:
if raw_data_file.endswith('.xlsx'):
os.remove(raw_data_file)
def main():
time_start = time.time() # 记录开始时间
global data_dir
raw_data_dirs, result_files = traversal_files(data_dir)
print("#调试信息======================================")
print(f"结果文件:{result_files}")
print(f'数据目录:{raw_data_dirs}')
check_files(raw_data_dirs, result_files)
for result_file in result_files:
print(f"正在整理【{result_file}】文件的数据......")
data_process(result_file, raw_data_dirs)
delete_excel_files() # 运行结束之后,删除中间临时文件
time_end = time.time() # 记录结束时间
time_total = time_end - time_start # 计算的时间差为程序的执行时间,单位为秒/s
print(f"数据处理时间:{time_total//3600:02} h {time_total % 3600/60:05.2f} min")
# 定义初始参数,数据文件夹路径/最大角速度/减速比/额定电流
global data_dir
global AV
global RR
global RC
data_dir = r'D:\Syncthing\company\D-测试工作\X-自动化测试\99-Data\j1'
AV = 180 # AV for Angular velocity
RR = 120 # RR for Angular velocity
RC = 5.6 # RC for Rated Current
if __name__ == "__main__":
main()