AIO/codes/analysis/brake.py
2025-04-07 10:38:59 +08:00

212 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os.path
import time
import pandas
from PySide6.QtCore import Signal, QThread
import openpyxl
import re
from codes.common import clibs
class BrakeDataProcess(QThread):
def __init__(self, dir_path, /):
super().__init__()
self.dir_path = dir_path
self.idx = 0
def check_files(self, rawdata_dirs, result_files):
msg_wrong = "需要有四个文件和若干个数据文件夹,可参考如下确认:<br>"
msg_wrong += "- reach33/66/100_XXXXXXX.xlsx<br>- *.cfg<br>"
msg_wrong += "- reach33_load33_speed33<br>- reach33_load33_speed66<br>...<br>- reach100_load100_speed66<br>- reach100_load100_speed100<br>"
if len(result_files) != 4 or len(rawdata_dirs) == 0:
clibs.logger("ERROR", "brake", msg_wrong, "red")
config_file, reach33_file, reach66_file, reach100_file = None, None, None, None
for result_file in result_files:
filename = result_file.split("/")[-1]
if re.match(".*\\.cfg", filename):
config_file = result_file
elif filename.startswith("reach33_") and filename.endswith(".xlsx"):
reach33_file = result_file
elif filename.startswith("reach66_") and filename.endswith(".xlsx"):
reach66_file = result_file
elif filename.startswith("reach100_") and filename.endswith(".xlsx"):
reach100_file = result_file
else:
if not (config_file and reach33_file and reach66_file and reach100_file):
clibs.logger("ERROR", "brake", msg_wrong, "red")
reach_s = ['reach33', 'reach66', 'reach100']
load_s = ['load33', 'load66', 'load100']
speed_s = ['speed33', 'speed66', 'speed100']
prefix = []
for rawdata_dir in rawdata_dirs:
components = rawdata_dir.split("/")[-1].split('_') # reach_load_speed
prefix.append(components[0])
if components[0] not in reach_s or components[1] not in load_s or components[2] not in speed_s:
msg = f"报错信息:数据目录 {rawdata_dir} 命名不合规,请参考如下形式<br>"
msg += "命名规则reachAA_loadBB_speedCCAA/BB/CC 指的是臂展/负载/速度的比例<br>"
msg += "规则解释reach66_load100_speed33表示 66% 臂展100% 负载以及 33% 速度情况下的测试结果文件夹<br>"
clibs.logger("ERROR", "brake", msg, "red")
_, rawdata_files = clibs.traversal_files(rawdata_dir)
if len(rawdata_files) != 3:
msg = f"数据目录 {rawdata_dir} 下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
clibs.logger("ERROR", "brake", msg, "red")
for rawdata_file in rawdata_files:
if not rawdata_file.endswith(".data"):
msg = f"数据文件 {rawdata_file} 后缀错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
clibs.logger("ERROR", "brake", msg, "red")
result_files = []
for _ in [reach33_file, reach66_file, reach100_file]:
if _.split("/")[-1].split("_")[0] in set(prefix):
result_files.append(_)
clibs.logger("INFO", "brake", "数据目录合规性检查结束,未发现问题......", "green")
return config_file, result_files
def get_configs(self, config_file):
try:
with open(config_file, mode="r", encoding="utf-8") as f_config:
configs = json.load(f_config)
p_dir = config_file.split('/')[-2]
if not re.match("^[jJ][123]$", p_dir):
clibs.logger("ERROR", "brake", "被处理的根文件夹命名必须是 [Jj][123] 的格式", "red")
axis = int(p_dir[-1]) # 要处理的轴
rrs = [abs(_) for _ in configs["TRANSMISSION"]["REDUCTION_RATIO_NUMERATOR"]] # 减速比rr for reduction ratio
avs = configs["MOTION"]["JOINT_MAX_SPEED"]
rr = rrs[axis-1]
av = avs[axis-1]
return av, rr
except Exception as Err:
clibs.logger("ERROR", "brake", f"无法打开 {config_file},或者使用了错误的机型配置文件,需检查<br>{Err}", "red")
def now_doing_msg(self, docs, flag):
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
file_type = 'file' if os.path.isfile(docs) else 'dir'
if flag == 'start' and file_type == 'dir':
clibs.logger("INFO", "brake", f"[{now}] 正在处理目录 {docs} 中的数据......")
elif flag == 'start' and file_type == 'file':
clibs.logger("INFO", "brake", f"[{now}] 正在处理文件 {docs} 中的数据......")
elif flag == 'done' and file_type == 'dir':
clibs.logger("INFO", "brake", f"[{now}] 目录 {docs} 数据文件已处理完毕")
elif flag == 'done' and file_type == 'file':
clibs.logger("INFO", "brake", f"[{now}] 文件 {docs} 数据已处理完毕")
@staticmethod
def data2result(df, ws_result, row_start, row_end):
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
data.append(df.iloc[row, 2])
i = 0
row_max = 1000 if row_end - row_start < 1000 else row_end - row_start + 100
for row in range(2, row_max):
try:
ws_result.cell(row=row, column=1).value = data[i]
ws_result.cell(row=row, column=2).value = data[i + 1]
ws_result.cell(row=row, column=3).value = data[i + 2]
i += 3
except Exception:
ws_result.cell(row=row, column=1).value = None
ws_result.cell(row=row, column=2).value = None
ws_result.cell(row=row, column=3).value = None
def get_row_range(self, data_file, df, conditions, av, rr):
row_start, row_end = 0, 0
ratio = float(conditions[2].removeprefix('speed')) / 100
av_max = av * ratio
threshold = 0.95
for row in range(df.index[-1] - 1, -1, -10):
if df.iloc[row, 2] != 0:
row_start = row - 20 if row - 20 > 0 else 0 # 急停前找 20 个点
break
else:
clibs.logger("ERROR", "brake", f"数据文件 {data_file} 采集的数据中没有 ESTOP 为非 0 的情况,需要确认", "red")
for row in range(row_start, df.index[-1] - 1, 10):
speed_row = df.iloc[row, 0] * clibs.RADIAN * rr * 60 / 360
if abs(speed_row) < 1:
row_end = row + 100 if row + 100 <= df.index[-1] - 1 else df.index[-1] - 1
break
else:
clibs.logger("ERROR", "brake", f"数据文件 {data_file} 最后的速度未降为零", "red")
av_estop = abs(df.iloc[row_start - 20:row_start, 0].abs().mean() * clibs.RADIAN)
if abs(av_estop / av_max) < threshold:
filename = data_file.split("/")[-1]
msg = f"[av_estop: {av_estop:.2f} | shouldbe: {av_max:.2f}] 数据文件 {filename} 触发 ESTOP 时未采集到指定百分比的最大速度,需要检查"
clibs.logger("WARNING", "brake", msg, "#8A2BE2")
return row_start, row_end
@staticmethod
def get_shtname(conditions, count):
# 33%负载_33%速度_1 - reach/load/speed
load = conditions[1].removeprefix('load')
speed = conditions[2].removeprefix('speed')
result_sheet_name = f"{load}%负载_{speed}%速度_{count}"
return result_sheet_name
def single_file_process(self, data_file, wb, count, av, rr):
df = pandas.read_csv(data_file, sep='\t')
conditions = data_file.split("/")[-2].split("_") # reach/load/speed
shtname = self.get_shtname(conditions, count)
ws = wb[shtname]
row_start, row_end = self.get_row_range(data_file, df, conditions, av, rr)
self.data2result(df, ws, row_start, row_end)
def data_process(self, result_file, rawdata_dirs, av, rr):
filename = result_file.split("/")[-1]
clibs.logger("INFO", "brake", f"正在打开文件 {filename},这可能需要一些时间......", "blue")
try:
wb = openpyxl.load_workbook(result_file)
except Exception as Err:
clibs.logger("ERROR", "brake", f"{filename}文件打开失败,可能是文件已损坏,确认后重新执行!<br>{Err}", "red")
prefix = filename.split('_')[0]
for rawdata_dir in rawdata_dirs:
if rawdata_dir.split("/")[-1].split('_')[0] == prefix:
self.now_doing_msg(rawdata_dir, 'start')
_, data_files = clibs.traversal_files(rawdata_dir)
for idx in range(3):
self.single_file_process(data_files[idx], wb, idx+1, av, rr)
# threads = [
# threading.Thread(target=self.single_file_process, args=(data_files[0], wb, 1, av, rr)),
# threading.Thread(target=self.single_file_process, args=(data_files[1], wb, 2, av, rr)),
# threading.Thread(target=self.single_file_process, args=(data_files[2], wb, 3, av, rr))
# ]
# [t.start() for t in threads]
# [t.join() for t in threads]
self.now_doing_msg(rawdata_dir, 'done')
clibs.logger("INFO", "brake", f"正在保存文件 {filename},这可能需要一些时间......<br>", "blue")
wb.save(result_file)
wb.close()
def processing(self):
time_start = time.time()
clibs.running[self.idx] = 1
rawdata_dirs, result_files = clibs.traversal_files(self.dir_path)
config_file, result_files = self.check_files(rawdata_dirs, result_files)
av, rr = self.get_configs(config_file)
for result_file in result_files:
self.data_process(result_file, rawdata_dirs, av, rr)
clibs.logger("INFO", "brake", "-"*60 + "<br>全部处理完毕<br>", "purple")
time_total = time.time() - time_start
msg = f"处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s"
clibs.logger("INFO", "brake", msg)