diff --git a/aio/README.md b/aio/README.md index 83a4732..b5d3ff6 100644 --- a/aio/README.md +++ b/aio/README.md @@ -424,3 +424,8 @@ v0.1.7.6(2024/07/04) v0.1.8.0(2024/07/04) 1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成 +v0.1.8.1(2024/07/05) +1. [APIs: do_brake.py]: 完成了制动性能测试框架的搭建,可以顺利执行完整的测试程序,但是未实现急停和数据处理 +2. [APIs: aio.py]: 修改了do_brake主函数的参数 +3. 增加工程文件target.zip + diff --git a/aio/assets/target.zip b/aio/assets/target.zip new file mode 100644 index 0000000..92a9174 Binary files /dev/null and b/aio/assets/target.zip differ diff --git a/aio/code/aio.py b/aio/code/aio.py index 0674425..45a0461 100644 --- a/aio/code/aio.py +++ b/aio/code/aio.py @@ -459,7 +459,7 @@ class App(customtkinter.CTk): func_dict[flag](path=args[0], w2t=self.write2textbox) elif flag == 5: self.pre_warning() - func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox) + func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox) elif flag == 6: self.pre_warning() func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox) diff --git a/aio/code/automatic_test/do_brake.py b/aio/code/automatic_test/do_brake.py index d60c955..5310abd 100644 --- a/aio/code/automatic_test/do_brake.py +++ b/aio/code/automatic_test/do_brake.py @@ -1,3 +1,4 @@ +import os from time import sleep from sys import argv from os import scandir @@ -7,9 +8,6 @@ from json import loads def traversal_files(path, w2t): - # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 - # 参数:路径 - # 返回值:路径下的文件夹列表 路径下的文件列表 if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1, 'red', tab_name='Automatic Test') @@ -25,7 +23,7 @@ def traversal_files(path, w2t): return dirs, files -def check_files(data_dirs, data_files, w2t): +def check_files(path, loadsel, data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 5: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') @@ -48,8 +46,23 @@ def check_files(data_dirs, data_files, w2t): w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test') if config_file and reach33 and reach66 and reach100 and prj_file: + result_dirs = [] + os.mkdir(f"{path}\\j1") + os.mkdir(f"{path}\\j2") + os.mkdir(f"{path}\\j3") + + for _reach in ['reach33', 'reach66', 'reach100']: + for _load in [f'load{loadsel.removeprefix("tool")}']: + for _speed in ['speed33', 'speed66', 'speed100']: + dir_name = '_'.join([_reach, _load, _speed]) + result_dirs.append(dir_name) + os.mkdir(f"{path}\\j1\\{dir_name}") + os.mkdir(f"{path}\\j2\\{dir_name}") + if _reach == 'reach100': + os.mkdir(f"{path}\\j3\\{dir_name}") + w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') - return config_file, reach33, reach66, reach100, prj_file + return config_file, reach33, reach66, reach100, prj_file, result_dirs else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') @@ -60,14 +73,9 @@ def prj_to_xcore(prj_file): ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() - # stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip') - # ssh.exec_command('rm /tmp/target.zip') sftp.put(prj_file, '/tmp/target.zip') - cmd = 'cd /tmp; ' - cmd += 'rm -rf target/; ' - cmd += 'mkdir target; ' - cmd += 'unzip -d target/ -q target.zip; ' - cmd += 'rm target.zip; ' + cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; ' + cmd += 'chmod 777 -R target/; rm target.zip' ssh.exec_command(cmd) cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' @@ -78,20 +86,17 @@ def prj_to_xcore(prj_file): print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr - cmd = 'cd /home/luoshi/bin/controller/; sudo mv projects/target/_build/*.prj projects/target/_build/target.prj ' + _prj_name = prj_file.split('\\')[-1].removesuffix('.zip') + cmd = 'cd /home/luoshi/bin/controller/; ' + cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr - ssh.close() -def modify_prj(): - pass - - def validate_resp(_id, response, w2t): match _id: case 'DATA ERR': @@ -105,7 +110,7 @@ def validate_resp(_id, response, w2t): def execution(cmd, hr, w2t, **kwargs): - _id = hr.excution(cmd, **kwargs) + _id = hr.execution(cmd, **kwargs) _msg = hr.get_from_id(_id) if not _msg: w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') @@ -115,60 +120,115 @@ def execution(cmd, hr, w2t, **kwargs): return _response -def run_rl(hr, w2t): - # prj_path = 'target/_build/target.prj' - # _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['Durable_Test_Com', 'Mechanical_Test_Com']) - # print(f"reload prj: {_response}") - # # _response = execution('overview.get_cur_prj', hr, w2t) - # # print(f"get cur prj name: {_response}") - # _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['Mechanical_Test_Com']) - # print(f"set pp2main of prj: {_response}") - # - # _response = execution('state.switch_auto', hr, w2t) - # _response = execution('state.switch_motor_on', hr, w2t) - # - # _response = execution('rl_task.run', hr, w2t, tasks=['Mechanical_Test_Com']) - # print(f"run prj: {_response}") - # sleep(10) - # - # _response = execution('state.switch_motor_off', hr, w2t) - # _response = execution('state.switch_manual', hr, w2t) +def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t): + _count = 0 + for condition in result_dirs: + _reach = condition.split('_')[0].removeprefix('reach') + _load = condition.split('_')[1].removeprefix('load') + _speed = condition.split('_')[2].removeprefix('speed') - _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) - print(f"打开诊断: {_response}") - # _response = execution('diagnosis.get_params', hr, w2t) - # print(f"显示诊断状态: {_response}") - display_pdo_params = [ - {"name": "device_servo_trq_feedback", "channel": 0} - ] - _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) - print(f"执行采样: {_response}") - sleep(5) + for axis in range(1, 4): + if axis == 3 and _reach != '100': + continue + for count in range(1, 4): + _count += 1 + w2t(f"[{_count/64}]正在执行{axis}轴{condition}[{count}]的制动测试......", 0, 0, 'purple', 'Automatic Test') - _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) - print(f"关闭诊断: {_response}") - # sleep(1) - for _msg in hr.c_msg: - if 'diagnosis.result' in _msg: - print(_msg) - # _msg = json.loads(_msg) - # if 'channel' in _msg and 'name' in _msg: - # if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback': - # print(f"diagnosis.result: {_msg}") - # count += 1 - # if count * 50 > 5 * 1000: - # break + # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 + _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) + md.trigger_estop() + md.reset_estop() + _response = execution('state.switch_manual', hr, w2t) + _response = execution('state.switch_motor_off', hr, w2t) + # 2. 修改未要执行的场景 + ssh = SSHClient() + ssh.set_missing_host_key_policy(AutoAddPolicy()) + ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') + _rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)" + _rl_speed = f"VelSet {_speed}" + cmd = 'cd /home/luoshi/bin/controller/; ' + cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; ' + cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; ' + cmd += f'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; ' + cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; ' + stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) + stdin.write('luoshi2019' + '\n') + stdin.flush() + print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo + print(stderr.read().decode()) # 顺便也执行以下stderr + # 3. reload工程后,pp2main,并且自动模式和上电 + prj_path = 'target/_build/target.prj' + _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake']) + _response = execution('overview.get_cur_prj', hr, w2t) + _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake']) + _response = execution('state.switch_auto', hr, w2t) + _response = execution('state.switch_motor_on', hr, w2t) + + # 4. 开始运行程序,每种情况运行15s + _response = execution('rl_task.run', hr, w2t, tasks=['brake']) + for i in range(3): + if md.read_ready_to_go() == 1: + md.write_act(True) + sleep(1) + md.write_act(False) + break + else: + sleep(1) + else: + w2t("未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') + + # 5. 打开诊断曲线,并执行采集 + _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) + display_pdo_params = [ + {"name": "hw_joint_vel_feedback", "channel": 0}, + {"name": "hw_joint_vel_feedback", "channel": 1}, + {"name": "hw_joint_vel_feedback", "channel": 2}, + {"name": "hw_joint_vel_feedback", "channel": 3}, + {"name": "hw_joint_vel_feedback", "channel": 4}, + {"name": "hw_joint_vel_feedback", "channel": 5}, + {"name": "device_servo_trq_feedback", "channel": 0}, + {"name": "device_servo_trq_feedback", "channel": 1}, + {"name": "device_servo_trq_feedback", "channel": 2}, + {"name": "device_servo_trq_feedback", "channel": 3}, + {"name": "device_servo_trq_feedback", "channel": 4}, + {"name": "device_servo_trq_feedback", "channel": 5}, + {"name": "device_safety_estop", "channel": 0}, + ] + _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) + sleep(15) + + # 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式 + _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) + _response = execution('rl_task.stop', hr, w2t, tasks=['brake']) + _response = execution('state.switch_motor_off', hr, w2t) + _response = execution('state.switch_manual', hr, w2t) + sleep(1) # 保证所有数据均已返回 + + # 7. 保留数据并处理输出 + curve_data = [] + for _msg in hr.c_msg: + if 'diagnosis.result' in _msg: + curve_data.insert(0, loads(_msg)) + else: + _index = 210 + for _msg in hr.c_msg: + if 'diagnosis.result' in _msg: + _index = hr.c_msg.index(_msg) + break + del hr.c_msg[_index:] + hr.c_msg_xs.clear() + # gen_result_file(path, loadsel, disc, number, scenario_time) else: - sleep(1) + w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test') -def main(path, hr, loadsel, w2t): +def main(path, hr, md, loadsel, w2t): data_dirs, data_files = traversal_files(path, w2t) - config_file, reach33, reach66, reach100, prj_file = check_files(data_dirs, data_files, w2t) - # prj_to_xcore(prj_file) - run_rl(hr, w2t) + config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t) + prj_to_xcore(prj_file) + run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t) if __name__ == '__main__':