import os from time import sleep from sys import argv from os import scandir from os.path import exists from paramiko import SSHClient, AutoAddPolicy from json import loads def traversal_files(path, w2t): if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1, 'red', tab_name='Automatic Test') else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def check_files(path, loadsel, data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 5: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') config_file = reach33 = reach66 = reach100 = prj_file = None for data_file in data_files: filename = data_file.split('\\')[-1] if filename == 'configs.xlsx': config_file = data_file elif filename.startswith('reach33_') and filename.endswith('.xlsx'): reach33 = data_file elif filename.startswith('reach66_') and filename.endswith('.xlsx'): reach66 = data_file elif filename.startswith('reach100_') and filename.endswith('.xlsx'): reach100 = data_file elif filename.endswith('.zip'): prj_file = data_file else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test') if config_file and reach33 and reach66 and reach100 and prj_file: result_dirs = [] os.mkdir(f"{path}\\j1") os.mkdir(f"{path}\\j2") os.mkdir(f"{path}\\j3") for _reach in ['reach33', 'reach66', 'reach100']: for _load in [f'load{loadsel.removeprefix("tool")}']: for _speed in ['speed33', 'speed66', 'speed100']: dir_name = '_'.join([_reach, _load, _speed]) result_dirs.append(dir_name) os.mkdir(f"{path}\\j1\\{dir_name}") os.mkdir(f"{path}\\j2\\{dir_name}") if _reach == 'reach100': os.mkdir(f"{path}\\j3\\{dir_name}") w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') return config_file, reach33, reach66, reach100, prj_file, result_dirs else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') def prj_to_xcore(prj_file): ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() sftp.put(prj_file, '/tmp/target.zip') cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; ' cmd += 'chmod 777 -R target/; rm target.zip' ssh.exec_command(cmd) cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr _prj_name = prj_file.split('\\')[-1].removesuffix('.zip') cmd = 'cd /home/luoshi/bin/controller/; ' cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr ssh.close() def validate_resp(_id, response, w2t): match _id: case 'DATA ERR': w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test') case 'DATA READ ERR': w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test') case 'NOT SUPPORT': w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test') if not response: w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test') def execution(cmd, hr, w2t, **kwargs): _id = hr.execution(cmd, **kwargs) _msg = hr.get_from_id(_id) if not _msg: w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') else: _response = loads(_msg) validate_resp(_id, _response, w2t) return _response def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t): _count = 0 for condition in result_dirs: _reach = condition.split('_')[0].removeprefix('reach') _load = condition.split('_')[1].removeprefix('load') _speed = condition.split('_')[2].removeprefix('speed') for axis in range(1, 4): if axis == 3 and _reach != '100': continue for count in range(1, 4): _count += 1 w2t(f"[{_count/64}]正在执行{axis}轴{condition}[{count}]的制动测试......", 0, 0, 'purple', 'Automatic Test') # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) md.trigger_estop() md.reset_estop() _response = execution('state.switch_manual', hr, w2t) _response = execution('state.switch_motor_off', hr, w2t) # 2. 修改未要执行的场景 ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') _rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)" _rl_speed = f"VelSet {_speed}" cmd = 'cd /home/luoshi/bin/controller/; ' cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; ' cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; ' cmd += f'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; ' cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; ' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr # 3. reload工程后,pp2main,并且自动模式和上电 prj_path = 'target/_build/target.prj' _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake']) _response = execution('overview.get_cur_prj', hr, w2t) _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake']) _response = execution('state.switch_auto', hr, w2t) _response = execution('state.switch_motor_on', hr, w2t) # 4. 开始运行程序,每种情况运行15s _response = execution('rl_task.run', hr, w2t, tasks=['brake']) for i in range(3): if md.read_ready_to_go() == 1: md.write_act(True) sleep(1) md.write_act(False) break else: sleep(1) else: w2t("未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') # 5. 打开诊断曲线,并执行采集 _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) display_pdo_params = [ {"name": "hw_joint_vel_feedback", "channel": 0}, {"name": "hw_joint_vel_feedback", "channel": 1}, {"name": "hw_joint_vel_feedback", "channel": 2}, {"name": "hw_joint_vel_feedback", "channel": 3}, {"name": "hw_joint_vel_feedback", "channel": 4}, {"name": "hw_joint_vel_feedback", "channel": 5}, {"name": "device_servo_trq_feedback", "channel": 0}, {"name": "device_servo_trq_feedback", "channel": 1}, {"name": "device_servo_trq_feedback", "channel": 2}, {"name": "device_servo_trq_feedback", "channel": 3}, {"name": "device_servo_trq_feedback", "channel": 4}, {"name": "device_servo_trq_feedback", "channel": 5}, {"name": "device_safety_estop", "channel": 0}, ] _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) sleep(15) # 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式 _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('rl_task.stop', hr, w2t, tasks=['brake']) _response = execution('state.switch_motor_off', hr, w2t) _response = execution('state.switch_manual', hr, w2t) sleep(1) # 保证所有数据均已返回 # 7. 保留数据并处理输出 curve_data = [] for _msg in hr.c_msg: if 'diagnosis.result' in _msg: curve_data.insert(0, loads(_msg)) else: _index = 210 for _msg in hr.c_msg: if 'diagnosis.result' in _msg: _index = hr.c_msg.index(_msg) break del hr.c_msg[_index:] hr.c_msg_xs.clear() # gen_result_file(path, loadsel, disc, number, scenario_time) else: w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test') def main(path, hr, md, loadsel, w2t): data_dirs, data_files = traversal_files(path, w2t) config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t) prj_to_xcore(prj_file) run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t) if __name__ == '__main__': main(*argv[1:])