AIO/codes/autotest/do_brake.py
2025-03-30 18:55:39 +08:00

372 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
import os
import paramiko
import openpyxl
import pandas
import json
from PySide6.QtCore import Signal, QThread
from codes.common import clibs
class DoBrakeTest(QThread):
def __init__(self, dir_path, tool, /):
super().__init__()
self.dir_path = dir_path
self.tool = tool
self.idx = 4
self.logger = clibs.logger
def initialization(self, data_dirs, data_files):
def check_files():
msg = "初始路径下不允许有文件夹,只能存在如下五个文件,且文件为关闭状态,确认后重新运行!<br>"
msg += "1. configs.xlsx<br>2. reach33/reach66/reach100_xxxx.xlsx<br>3. xxxx.zip"
if len(data_dirs) != 0 or len(data_files) != 5:
self.logger("ERROR", "do_brake", msg, "red")
config_file, reach33_file, reach66_file, reach100_file, prj_file, result_dirs = None, None, None, None, None, []
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "configs.xlsx":
config_file = data_file
elif filename.startswith("reach33_") and filename.endswith(".xlsx"):
reach33_file = data_file
elif filename.startswith("reach66_") and filename.endswith(".xlsx"):
reach66_file = data_file
elif filename.startswith("reach100_") and filename.endswith(".xlsx"):
reach100_file = data_file
elif filename.endswith(".zip"):
prj_file = data_file
else:
self.logger("ERROR", "do_brake", msg, "red")
if config_file and reach33_file and reach66_file and reach100_file and prj_file:
os.mkdir(f"{self.dir_path}/j1")
os.mkdir(f"{self.dir_path}/j2")
os.mkdir(f"{self.dir_path}/j3")
load = f"load{self.tool.removeprefix('tool')}"
for reach in ["reach33", "reach66", "reach100"]:
for speed in ["speed33", "speed66", "speed100"]:
dir_name = "_".join([reach, load, speed])
result_dirs.append(dir_name)
os.mkdir(f"{self.dir_path}/j1/{dir_name}")
os.mkdir(f"{self.dir_path}/j2/{dir_name}")
if reach == "reach100":
os.mkdir(f"{self.dir_path}/j3/{dir_name}")
return config_file, prj_file, result_dirs
else:
self.logger("ERROR", "do_brake", msg, "red")
def get_configs():
robot_type = None
msg_id = clibs.c_hr.execution("controller.get_params")
records = clibs.c_hr.get_from_id(msg_id)
for record in records:
if "请求发送成功" not in record[0]:
robot_type = eval(record[0])["data"]["robot_type"]
server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg"
local_file = self.dir_path + f"/{robot_type}.cfg"
clibs.c_pd.pull_file_from_server(server_file, local_file)
try:
with open(local_file, mode="r", encoding="utf-8") as f_config:
configs = json.load(f_config)
except Exception as Err:
self.logger("ERROR", "do_brake", f"无法打开 {local_file}<br>{Err}", "red")
# 最大角速度,额定电流,减速比,额定转速
version = configs["VERSION"]
avs = configs["MOTION"]["JOINT_MAX_SPEED"]
self.logger("INFO", "do_brake", f"get_configs: 机型文件版本 {robot_type}_{version}")
self.logger("INFO", "do_brake", f"get_configs: 各关节角速度 {avs}")
return avs
self.logger("INFO", "do_brake", "正在做初始化校验和配置,这可能需要一点时间......", "green")
_config_file, _prj_file, _result_dirs = check_files()
_avs = get_configs()
self.logger("INFO", "do_brake", "数据目录合规性检查结束,未发现问题......", "green")
return _config_file, _prj_file, _result_dirs, _avs
def gen_result_file(self, axis, end_time, reach, load, speed, speed_max, rounds):
d_vel, d_trq, d_stop, threshold = [], [], [], 0.95
s_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time-clibs.INTERVAL*12))
e_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{s_time}' AND '{e_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC")
records = clibs.cursor.fetchall()
finally:
clibs.lock.release()
for record in records: # 保留最后12s的数据
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback":
d_vel.extend(d_item)
elif item.get("channel", None) == axis-1 and item.get("name", None) == "device_servo_trq_feedback":
d_trq.extend(d_item)
elif item.get("channel", None) == 0 and item.get("name", None) == "device_safety_estop":
d_stop.extend(d_item)
idx = 0
for idx in range(len(d_stop)-10, 0, -1):
if d_stop[idx] == 1:
break
av_estop = abs(sum(d_vel[idx - 20:idx])/20 * clibs.RADIAN)
if av_estop / speed_max < threshold:
self.logger("WARNING", "do_brake", f"[av_estop: {av_estop:.2f} | shouldbe: {speed_max:.2f}] 处理数据时,本次触发 ESTOP 时未采集到指定百分比的最大速度,即将重试!", "#8A2BE2")
clibs.count += 1
if clibs.count < 3:
return "retry"
else:
clibs.count = 0
self.logger("WARNING", "do_brake", f"尝试三次后仍无法获取正确数据,本次数据无效,继续执行...", "red")
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
df3 = pandas.DataFrame.from_dict({"device_safety_estop": d_stop})
df = pandas.concat([df1, df2, df3], axis=1)
filename = f"{self.dir_path}/j{axis}/reach{reach}_load{load}_speed{speed}/reach{reach}_load{load}_speed{speed}_{rounds}.data"
df.to_csv(filename, sep="\t", index=False)
@staticmethod
def change_curve_state(stat):
if not stat:
display_pdo_params = []
else:
display_pdo_params = [{"name": name, "channel": chl} for name in ["hw_joint_vel_feedback", "device_servo_trq_feedback"] for chl in range(6)]
display_pdo_params.append({"name": "device_safety_estop", "channel": 0})
clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat)
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params)
def run_rl(self, config_file, prj_file, result_dirs, avs):
count, total, speed_target = 0, 63, 0
prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1])
wb = openpyxl.load_workbook(config_file, read_only=True)
ws = wb["Target"]
get_init_speed = float(ws.cell(row=2, column=2).value)
single_brake = str(ws.cell(row=3, column=2).value).strip()
pon = ws.cell(row=4, column=2).value
io_name = ws.cell(row=5, column=2).value.upper().strip()
wb.close()
msg = f"基本参数配置get_init_speed = {get_init_speed}, single_brake = {single_brake}, pon = {pon}, IO = {io_name}"
self.logger("INFO", "do_brake", msg)
if pon == "positive":
clibs.c_md.write_pon(1)
elif pon == "negative":
clibs.c_md.write_pon(0)
else:
self.logger("ERROR", "do_brake", "configs.xlsx 中 Target 页面 B5 单元格填写不正确,检查后重新运行...", "red")
self.change_curve_state(True)
for condition in result_dirs:
reach = condition.split("_")[0].removeprefix("reach")
load = condition.split("_")[1].removeprefix("load")
speed = condition.split("_")[2].removeprefix("speed")
# for single condition test
single_axis = -1
if single_brake != "0":
total = 3
single_axis = int(single_brake.split("-")[0])
if reach != single_brake.split("-")[1] or load != single_brake.split("-")[2] or speed != single_brake.split("-")[3]:
continue
for axis in range(1, 4):
# for single condition test
if (single_axis != -1 and single_axis != axis) or (axis == 3 and reach != "100"):
continue
clibs.c_md.write_axis(axis)
self.logger("INFO", "brake", "-" * 90, "purple")
speed_max = 0
for rounds in range(1, 4):
if clibs.stop_flag:
self.logger("ERROR", "do_brake", "后台数据清零完成,现在可以重新运行其他程序。", "green")
count += 1
_ = 3 if count % 3 == 0 else count % 3
this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
prj_path = f"{prj_name}/_build/{prj_name}.prj"
msg = f"[{this_time} | {count}/{total}] 正在执行 {axis}{condition} 的第 {_} 次制动测试..."
self.logger("INFO", "do_brake", msg)
# 1. 触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
clibs.c_md.r_soft_estop(0)
clibs.c_md.r_soft_estop(1)
clibs.c_ec.sr_string(f"setdo:{io_name},true")
clibs.c_md.r_reset_estop()
clibs.c_md.r_clear_alarm()
clibs.c_md.write_act(0)
while count % 3 == 1:
# 2. 修改要执行的场景
rl_cmd = ""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=clibs.ip_addr, port=clibs.ssh_port, username=clibs.username, password=clibs.password)
if pon == "positive":
rl_cmd = f"brake_E(j{axis}_{reach}_p, j{axis}_{reach}_n, p_speed, p_tool)"
elif pon == "negative":
rl_cmd = f"brake_E(j{axis}_{reach}_n, j{axis}_{reach}_p, p_speed, p_tool)"
rl_speed = f"VelSet {speed}"
rl_tool = f"tool p_tool = {self.tool}"
cmd = "cd /home/luoshi/bin/controller/; "
cmd += f'sudo sed -i "/brake_E/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {rl_cmd}" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {rl_speed}" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/tool p_tool/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/i {rl_tool}" projects/{prj_name}/_build/brake/main.mod; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write(clibs.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
# 3. reload工程后pp2main并且自动模式和上电最后运行程序
clibs.c_hr.execution("overview.reload", prj_path=prj_path, tasks=["brake"])
clibs.c_hr.execution("rl_task.pp_to_main", tasks=["brake"])
clibs.c_hr.execution("state.switch_auto")
clibs.c_hr.execution("state.switch_motor_on")
clibs.c_hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0)
clibs.c_hr.execution("rl_task.run", tasks=["brake"])
t_start = time.time()
while True:
if clibs.c_md.read_ready_to_go() == 1:
clibs.c_md.write_act(True)
break
else:
time.sleep(1)
if (time.time() - t_start) > 15:
self.logger("ERROR", "do_brake", "15s 内未收到机器人的运行信号,需要确认 RL 程序编写正确并正常执行...", "red")
# 4. 找出最大速度传递给RL程序最后清除相关记录
time.sleep(5) # 消除前 5s 的不稳定数据
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
time.sleep(get_init_speed) # 指定时间后获取实际【正|负】方向的最大速度可通过configs.xlsx配置
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
clibs.c_hr.execution("rl_task.stop", tasks=["brake"])
# 找出最大速度
@clibs.db_lock
def get_speed_max():
_speed_max = 0
clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{start_time}' AND '{end_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC")
records = clibs.cursor.fetchall()
for record in records:
data = eval(record[0])["data"]
for item in data:
if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback":
_ = clibs.RADIAN * sum(item["value"]) / len(item["value"])
if pon == "positive":
_speed_max = max(_, _speed_max)
elif pon == "negative":
_speed_max = min(_, _speed_max)
return _speed_max
speed_max = abs(get_speed_max())
speed_target = avs[axis-1] * float(speed) / 100
self.logger("INFO", "do_brake", f"axis = {axis}, direction = {pon}, max speed = {speed_max}")
if speed_max < speed_target*0.95 or speed_max > speed_target*1.05:
self.logger("WARNING", "do_brake", f"Axis: {axis}-{count} | 采集获取最大 Speed: {speed_max} | Shouldbe: {speed_target}", "indigo")
clibs.c_md.write_speed_max(speed_max)
if speed_max < 10:
clibs.c_md.r_clear_alarm()
self.logger("WARNING", "do_brake", f"未获取到正确的速度,即将重新获取...", "red")
continue
else:
break
while 1:
clibs.c_ec.sr_string(f"setdo:{io_name},true")
clibs.c_md.r_reset_estop()
clibs.c_md.r_clear_alarm()
clibs.c_md.write_act(0)
# 5. 重新运行程序发送继续运动信号当速度达到最大值时通过DO触发急停
clibs.c_hr.execution("rl_task.pp_to_main", tasks=["brake"])
clibs.c_hr.execution("state.switch_auto")
clibs.c_hr.execution("state.switch_motor_on")
t_start = time.time()
while 1:
clibs.c_md.r_clear_alarm()
clibs.c_hr.execution("rl_task.run", tasks=["brake"])
time.sleep(1)
if clibs.c_md.w_program_state == 1:
break
else:
time.sleep(2)
if time.time() - t_start > 60:
self.logger("ERROR", "do_brake", "60s 内程序未能正常执行,需检查...", "red")
for i in range(16):
if clibs.c_md.read_ready_to_go() == 1:
clibs.c_md.write_act(1)
break
else:
time.sleep(1)
else:
self.logger("ERROR", "do_brake", "16s 内未收到机器人的运行信号,需要确认 RL 程序配置正确并正常执行...", "red")
def exec_brake():
flag, start, data, record = True, time.time(), None, None
while flag:
time.sleep(0.05)
if time.time() - start > 20:
self.logger("ERROR", "do_brake", "20s 内未触发急停,需排查......", "red")
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"select content from logs where content like '%diagnosis.result%' order by id desc limit 1")
record = clibs.cursor.fetchone()
data = eval(record[0])["data"]
finally:
clibs.lock.release()
for item in data:
if item.get("channel", None) != axis-1 or item.get("name", None) != "hw_joint_vel_feedback":
continue
speed_moment = clibs.RADIAN * sum(item["value"]) / len(item["value"])
if abs(speed_moment) > speed_max - 2:
if (pon == "positive" and speed_moment > 0) or (pon == "negative" and speed_moment < 0):
clibs.c_ec.sr_string(f"setdo:{io_name},false")
time.sleep(clibs.INTERVAL*2) # wait speed goes down to 0
flag = False
break
return time.time()
time.sleep(11) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间
end_time = exec_brake()
# 6. 保留数据并处理输出
ret = self.gen_result_file(axis, end_time, reach, load, speed, speed_max, rounds)
if ret != "retry":
clibs.count = 0
break
else:
time.sleep(clibs.INTERVAL*2)
self.change_curve_state(False)
msg = f"<br>{self.tool.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行"
self.logger("INFO", "do_brake", msg, "green")
def processing(self):
time_start = time.time()
clibs.running[self.idx] = 1
if clibs.status["hmi"] != 1 or clibs.status["md"] != 1 or clibs.status["ec"] != 1:
self.logger("ERROR", "do_brake", "processing: 需要在网络设置中连接HMIModbus通信以及外部通信", "red")
data_dirs, data_files = clibs.traversal_files(self.dir_path)
config_file, prj_file, result_dirs, avs = self.initialization(data_dirs, data_files)
clibs.c_pd.push_prj_to_server(prj_file)
self.run_rl(config_file, prj_file, result_dirs, avs)
self.logger("INFO", "do_brake", "-"*60 + "<br>全部处理完毕<br>", "purple")
time_total = time.time() - time_start
msg = f"处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s"
self.logger("INFO", "do_brake", msg)