from random import randint from time import sleep, time from sys import argv from os import scandir, mkdir from os.path import exists from paramiko import SSHClient, AutoAddPolicy from json import loads from openpyxl import load_workbook import pandas RADIAN = 57.3 # 180 / 3.1415926 def traversal_files(path, w2t): if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1, 'red', tab_name='Automatic Test') else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def check_files(path, loadsel, data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 5: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') config_file = reach33 = reach66 = reach100 = prj_file = None for data_file in data_files: filename = data_file.split('\\')[-1] if filename == 'configs.xlsx': config_file = data_file elif filename.startswith('reach33_') and filename.endswith('.xlsx'): reach33 = data_file elif filename.startswith('reach66_') and filename.endswith('.xlsx'): reach66 = data_file elif filename.startswith('reach100_') and filename.endswith('.xlsx'): reach100 = data_file elif filename.endswith('.zip'): prj_file = data_file else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test') if config_file and reach33 and reach66 and reach100 and prj_file: result_dirs = [] mkdir(f"{path}\\j1") mkdir(f"{path}\\j2") mkdir(f"{path}\\j3") for _reach in ['reach33', 'reach66', 'reach100']: for _load in [f'load{loadsel.removeprefix("tool")}']: for _speed in ['speed33', 'speed66', 'speed100']: dir_name = '_'.join([_reach, _load, _speed]) result_dirs.append(dir_name) mkdir(f"{path}\\j1\\{dir_name}") mkdir(f"{path}\\j2\\{dir_name}") if _reach == 'reach100': mkdir(f"{path}\\j3\\{dir_name}") w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') return config_file, reach33, reach66, reach100, prj_file, result_dirs else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') def prj_to_xcore(prj_file): ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() sftp.put(prj_file, '/tmp/target.zip') cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; ' cmd += 'chmod 777 -R target/; rm target.zip' ssh.exec_command(cmd) cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr _prj_name = prj_file.split('\\')[-1].removesuffix('.zip') cmd = 'cd /home/luoshi/bin/controller/; ' cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr ssh.close() def validate_resp(_id, response, w2t): match _id: case 'DATA ERR': w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test') case 'DATA READ ERR': w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test') case 'NOT SUPPORT': w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test') if not response: w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test') def execution(cmd, hr, w2t, **kwargs): _id = hr.execution(cmd, **kwargs) _msg = hr.get_from_id(_id) if not _msg: w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') else: _response = loads(_msg) validate_resp(_id, _response, w2t) return _response def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count): _d2d_vel = {'hw_joint_vel_feedback': []} _d2d_trq = {'device_servo_trq_feedback': []} _d2d_stop = {'device_safety_estop': []} for data in curve_data: dict_results = data['data'] # dict_results.reverse() for item in dict_results: item['value'].reverse() if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback': _d2d_vel['hw_joint_vel_feedback'].extend(item['value']) elif item.get('channel', None) == axis-1 and item.get('name', None) == 'device_servo_trq_feedback': _d2d_trq['device_servo_trq_feedback'].extend(item['value']) elif item.get('channel', None) == 0 and item.get('name', None) == 'device_safety_estop': _d2d_stop['device_safety_estop'].extend(item['value']) df1 = pandas.DataFrame.from_dict(_d2d_vel) df2 = pandas.DataFrame.from_dict(_d2d_trq) df3 = pandas.DataFrame.from_dict(_d2d_stop) df = pandas.concat([df1, df2, df3], axis=1) _filename = f"{path}\\j{axis}\\reach{_reach}_load{_load}_speed{_speed}\\reach{_reach}_load{_load}_speed{_speed}_{count}.data" df.to_csv(_filename, sep='\t', index=False) def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t): _count = 0 speed_max = 0 display_pdo_params = [ {"name": "hw_joint_vel_feedback", "channel": 0}, {"name": "hw_joint_vel_feedback", "channel": 1}, {"name": "hw_joint_vel_feedback", "channel": 2}, {"name": "hw_joint_vel_feedback", "channel": 3}, {"name": "hw_joint_vel_feedback", "channel": 4}, {"name": "hw_joint_vel_feedback", "channel": 5}, {"name": "device_servo_trq_feedback", "channel": 0}, {"name": "device_servo_trq_feedback", "channel": 1}, {"name": "device_servo_trq_feedback", "channel": 2}, {"name": "device_servo_trq_feedback", "channel": 3}, {"name": "device_servo_trq_feedback", "channel": 4}, {"name": "device_servo_trq_feedback", "channel": 5}, {"name": "device_safety_estop", "channel": 0}, ] wb = load_workbook(config_file, read_only=True) ws = wb['Target'] if ws.cell(row=1, column=1).value == 'positive': md.write_pon(True) elif ws.cell(row=1, column=1).value == 'negative': md.write_pon(False) else: w2t("configs.xlsx中Target页面A1单元格填写不正确,检查后重新运行...", 0, 111, 'red', 'Automatic Test') for condition in result_dirs: _reach = condition.split('_')[0].removeprefix('reach') _load = condition.split('_')[1].removeprefix('load') _speed = condition.split('_')[2].removeprefix('speed') for axis in range(1, 4): md.write_axis(axis) speed_max = 0 if axis == 3 and _reach != '100': continue for count in range(1, 4): _count += 1 w2t(f"[{_count}/63-{count}] 正在执行{axis}轴{condition}的制动测试......", 0, 0, 'purple', 'Automatic Test') # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 md.trigger_estop() md.reset_estop() md.write_act(False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) sleep(1) # 让曲线彻底关闭 _response = execution('state.switch_manual', hr, w2t) _response = execution('state.switch_motor_off', hr, w2t) # 2. 修改未要执行的场景 ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') if ws.cell(row=1, column=1).value == 'positive': _rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)" elif ws.cell(row=1, column=1).value == 'negative': _rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)" else: w2t("configs.xlsx中Target页面A1单元格填写不正确,检查后重新运行...", 0, 111, 'red', 'Automatic Test') _rl_speed = f"VelSet {_speed}" cmd = 'cd /home/luoshi/bin/controller/; ' cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; ' cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; ' cmd += 'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; ' cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr # 3. reload工程后,pp2main,并且自动模式和上电,最后运行程序 prj_path = 'target/_build/target.prj' _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related']) _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related']) _response = execution('state.switch_auto', hr, w2t) _response = execution('state.switch_motor_on', hr, w2t) _response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related']) _t_start = time() while True: if md.read_ready_to_go() == 1: md.write_act(True) break else: if (time() - _t_start) // 20 > 1: w2t("20s内未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') else: sleep(1) # 4. 第一次打开诊断曲线,并执行采集8s,之后触发软急停,关闭曲线采集,找出最大速度,传递给RL程序,最后清除相关记录 if count == 1: _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) sleep(10) # 前10秒获取实际最大速度 md.trigger_estop() _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) # 找出最大速度 for _msg in hr.c_msg: if 'diagnosis.result' in _msg: dict_results = loads(_msg)['data'] for item in dict_results: if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback': _ = abs(RADIAN*sum(item['value'])/len(item['value'])) speed_max = max(_, speed_max) speed_target = float(ws.cell(row=3, column=axis+1).value) * float(_speed) / 100 if speed_max < speed_target*0.95 or speed_max > speed_target*1.05: w2t(f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}", 0, 0, 'indigo', 'Automatic Test') md.write_speed_max(speed_max) sleep(1) for _msg in hr.c_msg: if 'diagnosis.result' in _msg: _index = hr.c_msg.index(_msg) del hr.c_msg[_index:] hr.c_msg_xs.clear() break # 5. 清除软急停,重新运行程序,并打开曲线发送继续运动信号,当速度达到最大值时,通过DO触发急停 md.reset_estop() _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related']) _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related']) _response = execution('state.switch_auto', hr, w2t) _response = execution('state.switch_motor_on', hr, w2t) _response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related']) for i in range(3): if md.read_ready_to_go() == 1: md.write_act(True) break else: sleep(1) else: w2t("未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) sleep(randint(3, 6)) md.write_probe(True) _t_start = time() while True: if md.read_brake_done() == 1: sleep(1) # 保证所有数据均已返回 md.write_probe(False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) sleep(1) # 保证所有数据均已返回 break else: if (time() - _t_start) > 30: w2t(f"30s内未触发急停,该条数据无效,需要确认RL/Python程序编写正确并正常执行,或者判别是否是机器本体问题...", 0, 0, 'red', 'Automatic Test') md.write_probe(False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) sleep(1) # 保证所有数据均已返回 break else: sleep(1) # 6. 保留数据并处理输出 curve_data = [] for _msg in hr.c_msg: if 'diagnosis.result' in _msg: curve_data.insert(0, loads(_msg)) else: for _msg in hr.c_msg: if 'diagnosis.result' in _msg: _index = hr.c_msg.index(_msg) del hr.c_msg[_index:] hr.c_msg_xs.clear() break gen_result_file(path, curve_data, axis, _reach, _load, _speed, count) else: w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test') def main(path, hr, md, loadsel, w2t): _s_time = time() data_dirs, data_files = traversal_files(path, w2t) config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t) prj_to_xcore(prj_file) run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t) _e_time = time() time_total = _e_time - _s_time w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s", 0, 0, 'green', 'Automatic Test') if __name__ == '__main__': main(*argv[1:])