import threading import time import os import paramiko import openpyxl import pandas import json from PySide6.QtCore import Signal, QThread from codes.common import clibs class DoBrakeTest(QThread): def __init__(self, dir_path, tool, /): super().__init__() self.dir_path = dir_path self.tool = tool self.idx = 4 def initialization(self, data_dirs, data_files): def check_files(): msg = "初始路径下不允许有文件夹,只能存在如下五个文件,且文件为关闭状态,确认后重新运行!
" msg += "1. configs.xlsx
2. reach33/reach66/reach100_xxxx.xlsx
3. xxxx.zip" if len(data_dirs) != 0 or len(data_files) != 5: clibs.logger("ERROR", "do_brake", msg, "red") config_file, reach33_file, reach66_file, reach100_file, prj_file, result_dirs = None, None, None, None, None, [] for data_file in data_files: filename = data_file.split("/")[-1] if filename == "configs.xlsx": config_file = data_file elif filename.startswith("reach33_") and filename.endswith(".xlsx"): reach33_file = data_file elif filename.startswith("reach66_") and filename.endswith(".xlsx"): reach66_file = data_file elif filename.startswith("reach100_") and filename.endswith(".xlsx"): reach100_file = data_file elif filename.endswith(".zip"): prj_file = data_file else: clibs.logger("ERROR", "do_brake", msg, "red") if config_file and reach33_file and reach66_file and reach100_file and prj_file: os.mkdir(f"{self.dir_path}/j1") os.mkdir(f"{self.dir_path}/j2") os.mkdir(f"{self.dir_path}/j3") load = f"load{self.tool.removeprefix('tool')}" for reach in ["reach33", "reach66", "reach100"]: for speed in ["speed33", "speed66", "speed100"]: dir_name = "_".join([reach, load, speed]) result_dirs.append(dir_name) os.mkdir(f"{self.dir_path}/j1/{dir_name}") os.mkdir(f"{self.dir_path}/j2/{dir_name}") if reach == "reach100": os.mkdir(f"{self.dir_path}/j3/{dir_name}") return config_file, prj_file, result_dirs else: clibs.logger("ERROR", "do_brake", msg, "red") def get_configs(): robot_type = None msg_id = clibs.c_hr.execution("controller.get_params") records = clibs.c_hr.get_from_id(msg_id) for record in records: if "请求发送成功" not in record[0]: robot_type = eval(record[0])["data"]["robot_type"] server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg" local_file = self.dir_path + f"/{robot_type}.cfg" clibs.c_pd.pull_file_from_server(server_file, local_file) try: with open(local_file, mode="r", encoding="utf-8") as f_config: configs = json.load(f_config) except Exception as Err: clibs.logger("ERROR", "do_brake", f"无法打开 {local_file}
{Err}", "red") # 最大角速度,额定电流,减速比,额定转速 version = configs["VERSION"] avs = configs["MOTION"]["JOINT_MAX_SPEED"] clibs.logger("INFO", "do_brake", f"get_configs: 机型文件版本 {robot_type}_{version}") clibs.logger("INFO", "do_brake", f"get_configs: 各关节角速度 {avs}") return avs clibs.logger("INFO", "do_brake", "正在做初始化校验和配置,这可能需要一点时间......", "green") _config_file, _prj_file, _result_dirs = check_files() _avs = get_configs() clibs.logger("INFO", "do_brake", "数据目录合规性检查结束,未发现问题......", "green") return _config_file, _prj_file, _result_dirs, _avs def gen_result_file(self, axis, end_time, reach, load, speed, speed_max, rounds): d_vel, d_trq, d_stop, threshold = [], [], [], 0.95 s_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time-clibs.INTERVAL*12)) e_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) try: clibs.lock.acquire(True) clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{s_time}' AND '{e_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC") records = clibs.cursor.fetchall() finally: clibs.lock.release() for record in records: # 保留最后12s的数据 data = eval(record[0])["data"] for item in data: d_item = reversed(item["value"]) if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback": d_vel.extend(d_item) elif item.get("channel", None) == axis-1 and item.get("name", None) == "device_servo_trq_feedback": d_trq.extend(d_item) elif item.get("channel", None) == 0 and item.get("name", None) == "device_safety_estop": d_stop.extend(d_item) idx = 0 for idx in range(len(d_stop)-10, 0, -1): if d_stop[idx] == 1: break av_estop = abs(sum(d_vel[idx - 20:idx])/20 * clibs.RADIAN) if av_estop / speed_max < threshold: clibs.logger("WARNING", "do_brake", f"[av_estop: {av_estop:.2f} | shouldbe: {speed_max:.2f}] 处理数据时,本次触发 ESTOP 时未采集到指定百分比的最大速度,即将重试!", "#8A2BE2") clibs.count += 1 if clibs.count < 3: return "retry" else: clibs.count = 0 clibs.logger("WARNING", "do_brake", f"尝试三次后仍无法获取正确数据,本次数据无效,继续执行...", "red") df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel}) df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq}) df3 = pandas.DataFrame.from_dict({"device_safety_estop": d_stop}) df = pandas.concat([df1, df2, df3], axis=1) filename = f"{self.dir_path}/j{axis}/reach{reach}_load{load}_speed{speed}/reach{reach}_load{load}_speed{speed}_{rounds}.data" df.to_csv(filename, sep="\t", index=False) @staticmethod def change_curve_state(stat): if not stat: display_pdo_params = [] else: display_pdo_params = [{"name": name, "channel": chl} for name in ["hw_joint_vel_feedback", "device_servo_trq_feedback"] for chl in range(6)] display_pdo_params.append({"name": "device_safety_estop", "channel": 0}) clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat) clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params) def run_rl(self, config_file, prj_file, result_dirs, avs): count, total, speed_target = 0, 63, 0 prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1]) wb = openpyxl.load_workbook(config_file, read_only=True) ws = wb["Target"] get_init_speed = float(ws.cell(row=2, column=2).value) single_brake = str(ws.cell(row=3, column=2).value).strip() pon = ws.cell(row=4, column=2).value io_name = ws.cell(row=5, column=2).value.upper().strip() wb.close() msg = f"基本参数配置:get_init_speed = {get_init_speed}, single_brake = {single_brake}, pon = {pon}, IO = {io_name}" clibs.logger("INFO", "do_brake", msg) if pon == "positive": clibs.c_md.write_pon(1) elif pon == "negative": clibs.c_md.write_pon(0) else: clibs.logger("ERROR", "do_brake", "configs.xlsx 中 Target 页面 B5 单元格填写不正确,检查后重新运行...", "red") self.change_curve_state(True) for condition in result_dirs: reach = condition.split("_")[0].removeprefix("reach") load = condition.split("_")[1].removeprefix("load") speed = condition.split("_")[2].removeprefix("speed") # for single condition test single_axis = -1 if single_brake != "0": total = 3 single_axis = int(single_brake.split("-")[0]) if reach != single_brake.split("-")[1] or load != single_brake.split("-")[2] or speed != single_brake.split("-")[3]: continue for axis in range(1, 4): # for single condition test if (single_axis != -1 and single_axis != axis) or (axis == 3 and reach != "100"): continue clibs.c_md.write_axis(axis) clibs.logger("INFO", "brake", "-" * 90, "purple") speed_max = 0 for rounds in range(1, 4): if clibs.stop_flag: clibs.logger("ERROR", "do_brake", "后台数据清零完成,现在可以重新运行其他程序。", "green") count += 1 _ = 3 if count % 3 == 0 else count % 3 this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) prj_path = f"{prj_name}/_build/{prj_name}.prj" msg = f"[{this_time} | {count}/{total}] 正在执行 {axis} 轴 {condition} 的第 {_} 次制动测试..." clibs.logger("INFO", "do_brake", msg) # 1. 触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 clibs.c_md.r_soft_estop(0) clibs.c_md.r_soft_estop(1) clibs.c_ec.sr_string(f"setdo:{io_name},true") clibs.c_md.r_reset_estop() clibs.c_md.r_clear_alarm() clibs.c_md.write_act(0) while count % 3 == 1: # 2. 修改要执行的场景 rl_cmd = "" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=clibs.ip_addr, port=clibs.ssh_port, username=clibs.username, password=clibs.password) if pon == "positive": rl_cmd = f"brake_E(j{axis}_{reach}_p, j{axis}_{reach}_n, p_speed, p_tool)" elif pon == "negative": rl_cmd = f"brake_E(j{axis}_{reach}_n, j{axis}_{reach}_p, p_speed, p_tool)" rl_speed = f"VelSet {speed}" rl_tool = f"tool p_tool = {self.tool}" cmd = "cd /home/luoshi/bin/controller/; " cmd += f'sudo sed -i "/brake_E/d" projects/{prj_name}/_build/brake/main.mod; ' cmd += f'sudo sed -i "/DONOTDELETE/i {rl_cmd}" projects/{prj_name}/_build/brake/main.mod; ' cmd += f'sudo sed -i "/VelSet/d" projects/{prj_name}/_build/brake/main.mod; ' cmd += f'sudo sed -i "/MoveAbsJ/i {rl_speed}" projects/{prj_name}/_build/brake/main.mod; ' cmd += f'sudo sed -i "/tool p_tool/d" projects/{prj_name}/_build/brake/main.mod; ' cmd += f'sudo sed -i "/VelSet/i {rl_tool}" projects/{prj_name}/_build/brake/main.mod; ' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() # 3. reload工程后,pp2main,并且自动模式和上电,最后运行程序 clibs.c_hr.execution("overview.reload", prj_path=prj_path, tasks=["brake"]) clibs.c_hr.execution("rl_task.pp_to_main", tasks=["brake"]) clibs.c_hr.execution("state.switch_auto") clibs.c_hr.execution("state.switch_motor_on") clibs.c_hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0) clibs.c_hr.execution("rl_task.run", tasks=["brake"]) t_start = time.time() while True: if clibs.c_md.read_ready_to_go() == 1: clibs.c_md.write_act(True) break else: time.sleep(1) if (time.time() - t_start) > 15: clibs.logger("ERROR", "do_brake", "15s 内未收到机器人的运行信号,需要确认 RL 程序编写正确并正常执行...", "red") # 4. 找出最大速度,传递给RL程序,最后清除相关记录 time.sleep(5) # 消除前 5s 的不稳定数据 start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) time.sleep(get_init_speed) # 指定时间后获取实际【正|负】方向的最大速度,可通过configs.xlsx配置 end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) clibs.c_hr.execution("rl_task.stop", tasks=["brake"]) # 找出最大速度 @clibs.db_lock def get_speed_max(): _speed_max = 0 clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{start_time}' AND '{end_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC") records = clibs.cursor.fetchall() for record in records: data = eval(record[0])["data"] for item in data: if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback": _ = clibs.RADIAN * sum(item["value"]) / len(item["value"]) if pon == "positive": _speed_max = max(_, _speed_max) elif pon == "negative": _speed_max = min(_, _speed_max) return _speed_max speed_max = abs(get_speed_max()) speed_target = avs[axis-1] * float(speed) / 100 clibs.logger("INFO", "do_brake", f"axis = {axis}, direction = {pon}, max speed = {speed_max}") if speed_max < speed_target*0.95 or speed_max > speed_target*1.05: clibs.logger("WARNING", "do_brake", f"Axis: {axis}-{count} | 采集获取最大 Speed: {speed_max} | Shouldbe: {speed_target}", "indigo") clibs.c_md.write_speed_max(speed_max) if speed_max < 10: clibs.c_md.r_clear_alarm() clibs.logger("WARNING", "do_brake", f"未获取到正确的速度,即将重新获取...", "red") continue else: break while 1: clibs.c_ec.sr_string(f"setdo:{io_name},true") clibs.c_md.r_reset_estop() clibs.c_md.r_clear_alarm() clibs.c_md.write_act(0) # 5. 重新运行程序,发送继续运动信号,当速度达到最大值时,通过DO触发急停 clibs.c_hr.execution("rl_task.pp_to_main", tasks=["brake"]) clibs.c_hr.execution("state.switch_auto") clibs.c_hr.execution("state.switch_motor_on") t_start = time.time() while 1: clibs.c_md.r_clear_alarm() clibs.c_hr.execution("rl_task.run", tasks=["brake"]) time.sleep(1) if clibs.c_md.w_program_state == 1: break else: time.sleep(2) if time.time() - t_start > 60: clibs.logger("ERROR", "do_brake", "60s 内程序未能正常执行,需检查...", "red") for i in range(16): if clibs.c_md.read_ready_to_go() == 1: clibs.c_md.write_act(1) break else: time.sleep(1) else: clibs.logger("ERROR", "do_brake", "16s 内未收到机器人的运行信号,需要确认 RL 程序配置正确并正常执行...", "red") def exec_brake(): flag, start, data, record = True, time.time(), None, None while flag: time.sleep(0.05) if time.time() - start > 20: clibs.logger("ERROR", "do_brake", "20s 内未触发急停,需排查......", "red") try: clibs.lock.acquire(True) clibs.cursor.execute(f"select content from logs where content like '%diagnosis.result%' order by id desc limit 1") record = clibs.cursor.fetchone() data = eval(record[0])["data"] finally: clibs.lock.release() for item in data: if item.get("channel", None) != axis-1 or item.get("name", None) != "hw_joint_vel_feedback": continue speed_moment = clibs.RADIAN * sum(item["value"]) / len(item["value"]) if abs(speed_moment) > speed_max - 2: if (pon == "positive" and speed_moment > 0) or (pon == "negative" and speed_moment < 0): clibs.c_ec.sr_string(f"setdo:{io_name},false") time.sleep(clibs.INTERVAL*2) # wait speed goes down to 0 flag = False break return time.time() time.sleep(11) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间 end_time = exec_brake() # 6. 保留数据并处理输出 ret = self.gen_result_file(axis, end_time, reach, load, speed, speed_max, rounds) if ret != "retry": clibs.count = 0 break else: time.sleep(clibs.INTERVAL*2) self.change_curve_state(False) msg = f"
{self.tool.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行" clibs.logger("INFO", "do_brake", msg, "green") def processing(self): time_start = time.time() clibs.running[self.idx] = 1 if clibs.status["hmi"] != 1 or clibs.status["md"] != 1 or clibs.status["ec"] != 1: clibs.logger("ERROR", "do_brake", "processing: 需要在网络设置中连接HMI,Modbus通信以及外部通信!", "red") data_dirs, data_files = clibs.traversal_files(self.dir_path) config_file, prj_file, result_dirs, avs = self.initialization(data_dirs, data_files) clibs.c_pd.push_prj_to_server(prj_file) self.run_rl(config_file, prj_file, result_dirs, avs) clibs.logger("INFO", "do_brake", "-"*60 + "
全部处理完毕
", "purple") time_total = time.time() - time_start msg = f"处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s" clibs.logger("INFO", "do_brake", msg)