From fc56d81e9c1b8edf16f44074a3c33bf737ccaef1 Mon Sep 17 00:00:00 2001 From: gitea Date: Thu, 4 Jul 2024 21:08:13 +0800 Subject: [PATCH] =?UTF-8?q?v0.1.8.0(2024/07/04)=201.=20[APIs:=20do=5Fcurre?= =?UTF-8?q?nt.py]:=20=E5=AE=8C=E6=88=90=E4=BA=86=E5=A0=B5=E8=BD=AC?= =?UTF-8?q?=E7=94=B5=E6=B5=81=E5=92=8C=E6=83=AF=E9=87=8F=E8=B4=9F=E8=BD=BD?= =?UTF-8?q?=E7=94=B5=E6=9C=BA=E7=94=B5=E6=B5=81=E7=9A=84=E9=87=87=E9=9B=86?= =?UTF-8?q?=E5=92=8C=E5=A4=84=E7=90=86=EF=BC=8C=E8=87=B3=E6=AD=A4=EF=BC=8C?= =?UTF-8?q?=E7=94=B5=E6=9C=BA=E7=94=B5=E6=B5=81=E7=9A=84=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=8C=96=E5=B7=A5=E4=BD=9C=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio/README.md | 3 + aio/assets/file_version_info.txt | 8 +- aio/assets/vers | 2 +- aio/code/aio.py | 2 +- aio/code/automatic_test/do_current.py | 156 +++++++++++++++++--------- 5 files changed, 113 insertions(+), 58 deletions(-) diff --git a/aio/README.md b/aio/README.md index e0110fd..83a4732 100644 --- a/aio/README.md +++ b/aio/README.md @@ -421,3 +421,6 @@ v0.1.7.6(2024/07/04) - 优化了get_from_id的逻辑 4. [autotest.xml]: 新增了scenario_time只写寄存器 +v0.1.8.0(2024/07/04) +1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成 + diff --git a/aio/assets/file_version_info.txt b/aio/assets/file_version_info.txt index 73047a0..02f3e89 100644 --- a/aio/assets/file_version_info.txt +++ b/aio/assets/file_version_info.txt @@ -6,8 +6,8 @@ VSVersionInfo( ffi=FixedFileInfo( # filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4) # Set not needed items to zero 0. - filevers=(0, 1, 7, 0), - prodvers=(0, 1, 7, 0), + filevers=(0, 1, 8, 0), + prodvers=(0, 1, 8, 0), # Contains a bitmask that specifies the valid bits 'flags'r mask=0x3f, # Contains a bitmask that specifies the Boolean attributes of the file. @@ -31,12 +31,12 @@ VSVersionInfo( '040904b0', [StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'), StringStruct('FileDescription', 'All in one automatic toolbox'), - StringStruct('FileVersion', '0.1.7.0 (2024-06-26)'), + StringStruct('FileVersion', '0.1.8.0 (2024-07-04)'), StringStruct('InternalName', 'AIO.exe'), StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'), StringStruct('OriginalFilename', 'AIO.exe'), StringStruct('ProductName', 'AIO'), - StringStruct('ProductVersion', '0.1.7.0 (2024-06-26)')]) + StringStruct('ProductVersion', '0.1.8.0 (2024-07-04)')]) ]), VarFileInfo([VarStruct('Translation', [1033, 1200])]) ] diff --git a/aio/assets/vers b/aio/assets/vers index f01032e..e3c9505 100644 --- a/aio/assets/vers +++ b/aio/assets/vers @@ -1 +1 @@ -0.1.7.0 @ 06/26/2024 \ No newline at end of file +0.1.8.0 @ 07/04/2024 \ No newline at end of file diff --git a/aio/code/aio.py b/aio/code/aio.py index 9b60a32..0674425 100644 --- a/aio/code/aio.py +++ b/aio/code/aio.py @@ -71,7 +71,7 @@ class App(customtkinter.CTk): btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback)) btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback)) # create version info - self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.7.0\nDate: 06/26/2024", font=self.my_font, text_color="#4F4F4F") + self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.8.0\nDate: 07/04/2024", font=self.my_font, text_color="#4F4F4F") self.frame_func.rowconfigure(6, weight=1) self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s') # ===================================================================== diff --git a/aio/code/automatic_test/do_current.py b/aio/code/automatic_test/do_current.py index 1ddf512..09e538b 100644 --- a/aio/code/automatic_test/do_current.py +++ b/aio/code/automatic_test/do_current.py @@ -24,7 +24,7 @@ def traversal_files(path, w2t): return dirs, files -def check_files(path, data_dirs, data_files, w2t): +def check_files(path, loadsel, data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 3: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') @@ -44,12 +44,13 @@ def check_files(path, data_dirs, data_files, w2t): if config_file and current_file and prj_file: w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') - os.mkdir(f"{path}\\single") - os.mkdir(f"{path}\\s_1") - os.mkdir(f"{path}\\s_2") - os.mkdir(f"{path}\\s_3") - os.mkdir(f"{path}\\inertia") - os.mkdir(f"{path}\\hold") + if loadsel == 'tool100': + os.mkdir(f"{path}\\single") + os.mkdir(f"{path}\\s_1") + os.mkdir(f"{path}\\s_2") + os.mkdir(f"{path}\\s_3") + elif loadsel == 'inertia': + os.mkdir(f"{path}\\inertia") return config_file, current_file, prj_file else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') @@ -61,8 +62,6 @@ def prj_to_xcore(prj_file): ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() - # stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip') - # ssh.exec_command('rm /tmp/target.zip') sftp.put(prj_file, '/tmp/target.zip') cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; ' cmd += 'chmod 777 -R target/; rm target.zip' @@ -110,7 +109,7 @@ def execution(cmd, hr, w2t, **kwargs): return _response -def data_proc(path, filename, channel): +def data_proc_regular(path, filename, channel, scenario_time): if channel in list(range(6)): with open(filename, 'r', encoding='utf-8') as f_obj: lines = f_obj.readlines() @@ -175,69 +174,127 @@ def data_proc(path, filename, channel): df_01 = pandas.DataFrame.from_dict(_d2d_vel_0) df_02 = pandas.DataFrame.from_dict(_d2d_trq_0) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j1_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}.data' df.to_csv(_filename, sep='\t', index=False) df_01 = pandas.DataFrame.from_dict(_d2d_vel_1) df_02 = pandas.DataFrame.from_dict(_d2d_trq_1) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j2_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}.data' df.to_csv(_filename, sep='\t', index=False) df_01 = pandas.DataFrame.from_dict(_d2d_vel_2) df_02 = pandas.DataFrame.from_dict(_d2d_trq_2) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j3_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}.data' df.to_csv(_filename, sep='\t', index=False) df_01 = pandas.DataFrame.from_dict(_d2d_vel_3) df_02 = pandas.DataFrame.from_dict(_d2d_trq_3) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j4_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}.data' df.to_csv(_filename, sep='\t', index=False) df_01 = pandas.DataFrame.from_dict(_d2d_vel_4) df_02 = pandas.DataFrame.from_dict(_d2d_trq_4) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j5_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}.data' df.to_csv(_filename, sep='\t', index=False) df_01 = pandas.DataFrame.from_dict(_d2d_vel_5) df_02 = pandas.DataFrame.from_dict(_d2d_trq_5) df = pandas.concat([df_01, df_02], axis=1) - _filename = f'{path}\\s_{channel-5}\\j6_s_1.data' + _filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}.data' + df.to_csv(_filename, sep='\t', index=False) + elif channel in list(range(9, 15)): + with open(filename, 'r', encoding='utf-8') as f_obj: + lines = f_obj.readlines() + _d2d_vel = {'hw_joint_vel_feedback': []} + _d2d_trq = {'device_servo_trq_feedback': []} + for line in lines: + data = eval(line.strip())['data'] + for item in data: + if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback': + _d2d_vel['hw_joint_vel_feedback'].extend(item['value']) + elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback': + _d2d_trq['device_servo_trq_feedback'].extend(item['value']) + + df1 = pandas.DataFrame.from_dict(_d2d_vel) + df2 = pandas.DataFrame.from_dict(_d2d_trq) + df = pandas.concat([df1, df2], axis=1) + _filename = f'{path}\\single\\j{channel-8}_hold.data' df.to_csv(_filename, sep='\t', index=False) -def gen_result_file(path, loadsel, disc, number): +def data_proc_inertia(path, filename, channel): + with open(filename, 'r', encoding='utf-8') as f_obj: + lines = f_obj.readlines() + _d2d_vel = {'hw_joint_vel_feedback': []} + _d2d_trq = {'device_servo_trq_feedback': []} + for line in lines: + data = eval(line.strip())['data'] + for item in data: + if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback': + _d2d_vel['hw_joint_vel_feedback'].extend(item['value']) + elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback': + _d2d_trq['device_servo_trq_feedback'].extend(item['value']) + + df1 = pandas.DataFrame.from_dict(_d2d_vel) + df2 = pandas.DataFrame.from_dict(_d2d_trq) + df = pandas.concat([df1, df2], axis=1) + _filename = f'{path}\\inertia\\j{channel+4}_inertia.data' + df.to_csv(_filename, sep='\t', index=False) + + +def gen_result_file(path, loadsel, disc, number, scenario_time): filename = path + f'\\data.txt' with open(filename, 'w', encoding='utf-8') as f_obj: for line in disc[number][1]: f_obj.write(str(line)+'\n') if loadsel == 'tool100': - data_proc(path, filename, number) + data_proc_regular(path, filename, number, scenario_time) elif loadsel == 'inertia': - data_proc(path, filename, 'all') + data_proc_inertia(path, filename, number) -def regular_load(path, hr, md, loadsel, w2t): - conditions = [ - " scenario(0, j1_p, j1_n, p_speed, p_tool)", - " scenario(0, j2_p, j2_n, p_speed, p_tool)", - " scenario(0, j3_p, j3_n, p_speed, p_tool)", - " scenario(0, j4_p, j4_n, p_speed, p_tool)", - " scenario(0, j5_p, j5_n, p_speed, p_tool)", - " scenario(0, j6_p, j6_n, p_speed, p_tool)", - " scenario(1, j5_p, j5_n, p_speed, p_tool)", - " scenario(2, j5_p, j5_n, p_speed, p_tool)", - " scenario(3, j5_p, j5_n, p_speed, p_tool)", +def run_rl(path, hr, md, loadsel, w2t): + c_regular = [ + "scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)", + "scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool)", + "scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool)", + "scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool)", + "scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool)", + "scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool)", + "scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool)", + "scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool)", + "scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool)", + "scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool)", + "scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool)", + "scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool)", + "scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool)", + "scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool)", + "scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool)", ] + c_inertia = [ + "scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool)", + "scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool)", + "scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool)", + ] + disc_regular = { + 0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []], + 6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []], 9: ['一轴保持', []], 10: ['二轴保持', []], + 11: ['三轴保持', []], 12: ['四轴保持', []], 13: ['五轴保持', []], 14: ['六轴保持', []] + } + disc_inertia = {0: ['四轴惯量', []], 1: ['五轴惯量', []], 2: ['六轴惯量', []]} + if loadsel == 'tool100': + conditions = c_regular + disc = disc_regular + elif loadsel == 'inertia': + conditions = c_inertia + disc = disc_inertia + for condition in conditions: - disc = { - 0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []], - 6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []] - } number = conditions.index(condition) w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test') @@ -283,6 +340,7 @@ def regular_load(path, hr, md, loadsel, w2t): w2t("未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') # 5. 打开诊断曲线,并执行采集 + sleep(7) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定 _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) display_pdo_params = [ {"name": "hw_joint_vel_feedback", "channel": 0}, @@ -300,7 +358,8 @@ def regular_load(path, hr, md, loadsel, w2t): {"name": "device_safety_estop", "channel": 0}, ] _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) - if number < 6: + scenario_time = 0 + if number < 6 or number > 8: sleep(15) else: _t_start = time() @@ -315,14 +374,15 @@ def regular_load(path, hr, md, loadsel, w2t): else: sleep(5) sleep(1) # 一定要延迟一秒再读一次scenario time寄存器,因为一开始读取的数值不准确 - sleep(float(md.read_scenario_time())*0.2) # 再运行周期的20%即可 + scenario_time = md.read_scenario_time() + sleep(float(scenario_time)*0.2) # 再运行周期的20%即可 # 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式 _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('rl_task.stop', hr, w2t, tasks=['current']) _response = execution('state.switch_motor_off', hr, w2t) _response = execution('state.switch_manual', hr, w2t) - sleep(2) # 保证所有数据均已返回 + sleep(1) # 保证所有数据均已返回 # 7. 保留数据并处理输出 for _msg in hr.c_msg: if 'diagnosis.result' in _msg: @@ -333,27 +393,19 @@ def regular_load(path, hr, md, loadsel, w2t): if 'diagnosis.result' in _msg: _index = hr.c_msg.index(_msg) break - del hr.c_msg[_index-10:] + del hr.c_msg[_index:] hr.c_msg_xs.clear() - gen_result_file(path, loadsel, disc, number) + gen_result_file(path, loadsel, disc, number, scenario_time) else: - w2t("满载情况下,单轴和场景电机电流采集完毕。", 0, 0, 'green', 'Automatic Test') - - -def inertia_load(path, hr, md, loadsel, w2t): - pass - - -def run_rl(path, hr, md, loadsel, w2t): - if loadsel == 'tool100': - regular_load(path, hr, md, loadsel, w2t) - elif loadsel == 'inertia': - inertia_load(path, hr, md, loadsel, w2t) + if loadsel == 'tool100': + w2t("单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行。", 0, 0, 'green', 'Automatic Test') + elif loadsel == 'inertia': + w2t("惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行。", 0, 0, 'green', 'Automatic Test') def main(path, hr, md, loadsel, w2t): data_dirs, data_files = traversal_files(path, w2t) - config_file, current_file, prj_file = check_files(path, data_dirs, data_files, w2t) + config_file, current_file, prj_file = check_files(path, loadsel, data_dirs, data_files, w2t) prj_to_xcore(prj_file) run_rl(path, hr, md, loadsel, w2t)