diff --git a/aio/README.md b/aio/README.md
index 6b60d99..e0110fd 100644
--- a/aio/README.md
+++ b/aio/README.md
@@ -402,3 +402,22 @@ v0.1.7.4(2024/07/02)
3[APIs: aio.py]
- 引入modbus实例化,并以参数的形式,传递给相应的tabview
- 新增pre_warning函数,在做自动化测试之前,确保所有条件皆具备
+
+v0.1.7.5(2024/07/03)
+1. [APIs: aio.py]
+ - 增加触发急停和恢复急停功能逻辑
+2. [APIs: do_current.py]
+ - 重新调整运行顺序,增加数据处理的逻辑(惯量负载逻辑暂不实现,等待软件部解决了修改工程之后不生效的问题再考虑)
+3. [APIs: btn_functions.py]
+ - 增加触发急停和恢复急停的modbus实现,仅适用于自动化测试
+
+v0.1.7.6(2024/07/04)
+1. [APIs: aio.py]
+ - Automatic Test逻辑中增加选择current时,需要选负载类型的逻辑
+2. [APIs: do_current.py]
+ - 单轴/场景电机电流的采集已完成
+3. [APIs: openapi.py]
+ - 增加了modbus读取浮点数的功能
+ - 优化了get_from_id的逻辑
+4. [autotest.xml]: 新增了scenario_time只写寄存器
+
diff --git a/aio/assets/autotest.xml b/aio/assets/autotest.xml
index 12abf31..a16e7c4 100644
--- a/aio/assets/autotest.xml
+++ b/aio/assets/autotest.xml
@@ -891,35 +891,56 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
-
+
-
-
+
+
diff --git a/aio/code/aio.py b/aio/code/aio.py
index 54b4e2f..9b60a32 100644
--- a/aio/code/aio.py
+++ b/aio/code/aio.py
@@ -116,7 +116,7 @@ class App(customtkinter.CTk):
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
- self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"])
+ self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
@@ -132,7 +132,7 @@ class App(customtkinter.CTk):
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]:
- widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font)
+ widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100", "inertia"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
@@ -161,7 +161,7 @@ class App(customtkinter.CTk):
self.myThread.start()
def segmented_button_callback(self):
- _btn_funcs = {'get_state': '机器状态', 'warning_info': '告警信息', '3': '4', '5': '6', '7': '8'}
+ _btn_funcs = {'trigger_estop': '触发急停', 'reset_estop': '恢复急停', 'get_state': '机器状态', 'warning_info': '告警信息'}
value = self.seg_button.get()
self.seg_button.configure(state='disabled')
@@ -175,7 +175,7 @@ class App(customtkinter.CTk):
else:
for _func in _btn_funcs:
if _btn_funcs[_func] == value:
- btn_functions.main(self.hr, _func, self.write2textbox)
+ btn_functions.main(self.hr, self.md_at, _func, self.write2textbox)
break
self.seg_button.configure(state='normal')
@@ -196,7 +196,7 @@ class App(customtkinter.CTk):
if c_state == '0':
# self.textbox.delete(index1='1.0', index2='end')
self.hr.t_bool = False
- sleep(4)
+ sleep(3)
del self.hr
self.hr = openapi.HmiRequest(self.write2textbox)
sleep(3)
@@ -288,7 +288,7 @@ class App(customtkinter.CTk):
widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
- widgits_at[widgit]['optionmenu'].configure(text_color='black', state='disabled')
+ widgits_at[widgit]['optionmenu'].configure(state='normal', text_color='red')
else:
self.initialization()
self.menu_main_at.set("Start Here!")
@@ -434,8 +434,11 @@ class App(customtkinter.CTk):
return 0, 0
elif func_name == 'current':
path = widgits_at['path']['entry'].get().strip()
- if exists(path):
- return 6, path
+ loadsel = widgits_at['loadsel']['optionmenu'].get()
+ c1 = exists(path)
+ c2 = loadsel in ['tool100', 'inertia']
+ if c1 and c2:
+ return 6, path, loadsel
else:
return 0, 0
else:
@@ -459,7 +462,7 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox)
elif flag == 6:
self.pre_warning()
- func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, w2t=self.write2textbox)
+ func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
diff --git a/aio/code/automatic_test/btn_functions.py b/aio/code/automatic_test/btn_functions.py
index 9b3c705..884a2b0 100644
--- a/aio/code/automatic_test/btn_functions.py
+++ b/aio/code/automatic_test/btn_functions.py
@@ -1,4 +1,4 @@
-import json
+from json import loads
from sys import argv
@@ -20,11 +20,21 @@ def execution(cmd, hr, w2t, **kwargs):
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
- _response = json.loads(_msg)
+ _response = loads(_msg)
validate_resp(_id, _response, w2t)
return _response
+def trigger_estop(md, w2t):
+ md.trigger_estop()
+ w2t("触发急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
+
+
+def reset_estop(md, w2t):
+ md.reset_estop()
+ w2t("恢复急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
+
+
def get_state(hr, w2t):
# 获取机器状态
_response = execution('state.get_state', hr, w2t)
@@ -48,17 +58,21 @@ def get_state(hr, w2t):
def warning_info(hr, w2t):
for msg in hr.c_msg:
if 'alarm' in msg.lower():
- w2t(msg, tab_name='Automatic Test')
+ w2t(str(loads(msg)), tab_name='Automatic Test')
for msg in hr.c_msg_xs:
if 'alarm' in msg.lower():
- w2t(msg, tab_name='Automatic Test')
+ w2t(str(loads(msg)), tab_name='Automatic Test')
-def main(hr, func, w2t):
+def main(hr, md, func, w2t):
if hr is None:
w2t("无法连接机器人,检查是否已经使用Robot Assist软件连接机器,重试中...", 0, 49, 'red', tab_name='Automatic Test')
# func: get_state/
match func:
+ case 'trigger_estop':
+ trigger_estop(md, w2t)
+ case 'reset_estop':
+ reset_estop(md, w2t)
case 'get_state':
get_state(hr, w2t)
case 'warning_info':
@@ -68,21 +82,3 @@ def main(hr, func, w2t):
if __name__ == '__main__':
main(*argv[1:])
-
-# 一、设置/检测机器人状态:
-# 1. 上电
-# 2. 软限位打开
-# 3. 示教器断开
-# 4. 操作模式/机器人类型
-# 5. 控制器状态/工作任务控件/机器人动态
-
-# 二、加载RL程序开始运行
-# 1. 怎么触发急停
-# 2. 怎么恢复急停
-# 3. 怎么采集曲线
-# 4.
-
-# 三、运行过程中,收集数据,并处理出结果
-
-# 四
-
diff --git a/aio/code/automatic_test/do_current.py b/aio/code/automatic_test/do_current.py
index 0b28ecf..1ddf512 100644
--- a/aio/code/automatic_test/do_current.py
+++ b/aio/code/automatic_test/do_current.py
@@ -1,9 +1,11 @@
-from time import sleep
+import os
+from time import sleep, time
from sys import argv
from os import scandir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
+import pandas
def traversal_files(path, w2t):
@@ -22,7 +24,7 @@ def traversal_files(path, w2t):
return dirs, files
-def check_files(data_dirs, data_files, w2t):
+def check_files(path, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
@@ -42,6 +44,12 @@ def check_files(data_dirs, data_files, w2t):
if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
+ os.mkdir(f"{path}\\single")
+ os.mkdir(f"{path}\\s_1")
+ os.mkdir(f"{path}\\s_2")
+ os.mkdir(f"{path}\\s_3")
+ os.mkdir(f"{path}\\inertia")
+ os.mkdir(f"{path}\\hold")
return config_file, current_file, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
@@ -56,11 +64,8 @@ def prj_to_xcore(prj_file):
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
- cmd = 'cd /tmp; '
- cmd += 'rm -rf target/; '
- cmd += 'mkdir target; '
- cmd += 'unzip -d target/ -q target.zip; '
- cmd += 'rm target.zip; '
+ cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
+ cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
@@ -72,8 +77,8 @@ def prj_to_xcore(prj_file):
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
- cmd = f'cd /home/luoshi/bin/controller/; '
- cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj '
+ cmd = 'cd /home/luoshi/bin/controller/; '
+ cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
@@ -98,15 +103,125 @@ def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
- w2t(f"无法获取{_id}请求的响应信息", 0, 0, 'red', tab_name='Automatic Test')
+ w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
return _response
-def run_rl(hr, md, w2t):
- # need to run
+def data_proc(path, filename, channel):
+ if channel in list(range(6)):
+ with open(filename, 'r', encoding='utf-8') as f_obj:
+ lines = f_obj.readlines()
+ _d2d_vel = {'hw_joint_vel_feedback': []}
+ _d2d_trq = {'device_servo_trq_feedback': []}
+ for line in lines:
+ data = eval(line.strip())['data']
+ for item in data:
+ if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq['device_servo_trq_feedback'].extend(item['value'])
+
+ df1 = pandas.DataFrame.from_dict(_d2d_vel)
+ df2 = pandas.DataFrame.from_dict(_d2d_trq)
+ df = pandas.concat([df1, df2], axis=1)
+ _filename = f'{path}\\single\\j{channel+1}_single.data'
+ df.to_csv(_filename, sep='\t', index=False)
+ elif channel in list(range(6, 9)):
+ with open(filename, 'r', encoding='utf-8') as f_obj:
+ lines = f_obj.readlines()
+ _d2d_vel_0 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_0 = {'device_servo_trq_feedback': []}
+ _d2d_vel_1 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_1 = {'device_servo_trq_feedback': []}
+ _d2d_vel_2 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_2 = {'device_servo_trq_feedback': []}
+ _d2d_vel_3 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_3 = {'device_servo_trq_feedback': []}
+ _d2d_vel_4 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_4 = {'device_servo_trq_feedback': []}
+ _d2d_vel_5 = {'hw_joint_vel_feedback': []}
+ _d2d_trq_5 = {'device_servo_trq_feedback': []}
+ for line in lines:
+ data = eval(line.strip())['data']
+ for item in data:
+ if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_0['device_servo_trq_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_1['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 1 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_1['device_servo_trq_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 2 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_2['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 2 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_2['device_servo_trq_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_3['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 3 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_3['device_servo_trq_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_4['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 4 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_4['device_servo_trq_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_joint_vel_feedback':
+ _d2d_vel_5['hw_joint_vel_feedback'].extend(item['value'])
+ elif item.get('channel', None) == 5 and item.get('name', None) == 'device_servo_trq_feedback':
+ _d2d_trq_5['device_servo_trq_feedback'].extend(item['value'])
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_0)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_0)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j1_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_1)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_1)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j2_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_2)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_2)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j3_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_3)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_3)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j4_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_4)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_4)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j5_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+ df_01 = pandas.DataFrame.from_dict(_d2d_vel_5)
+ df_02 = pandas.DataFrame.from_dict(_d2d_trq_5)
+ df = pandas.concat([df_01, df_02], axis=1)
+ _filename = f'{path}\\s_{channel-5}\\j6_s_1.data'
+ df.to_csv(_filename, sep='\t', index=False)
+
+
+def gen_result_file(path, loadsel, disc, number):
+ filename = path + f'\\data.txt'
+ with open(filename, 'w', encoding='utf-8') as f_obj:
+ for line in disc[number][1]:
+ f_obj.write(str(line)+'\n')
+
+ if loadsel == 'tool100':
+ data_proc(path, filename, number)
+ elif loadsel == 'inertia':
+ data_proc(path, filename, 'all')
+
+
+def regular_load(path, hr, md, loadsel, w2t):
conditions = [
" scenario(0, j1_p, j1_n, p_speed, p_tool)",
" scenario(0, j2_p, j2_n, p_speed, p_tool)",
@@ -118,45 +233,44 @@ def run_rl(hr, md, w2t):
" scenario(2, j5_p, j5_n, p_speed, p_tool)",
" scenario(3, j5_p, j5_n, p_speed, p_tool)",
]
-
for condition in conditions:
- disc = {0: '一轴', 1: '二轴', 2: '三轴', 3: '四轴', 4: '五轴', 5: '六轴', 6: '场景一', 7: '场景二', 8: '场景三'}
+ disc = {
+ 0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []],
+ 6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []]
+ }
number = conditions.index(condition)
- w2t(f"正在执行{disc[number]}测试......", 0, 0, 'purple', 'Automatic Test')
+ w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test')
- # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来
+ # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.reset_estop()
_response = execution('state.switch_manual', hr, w2t)
+ _response = execution('state.switch_motor_off', hr, w2t)
- # 3. 修改未要执行的场景
+ # 2. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
- cmd = f'cd /home/luoshi/bin/controller/; '
- cmd += f'sudo sed -i "/scenario/d" projects/target/current/main.mod; '
- cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/current/main.mod; '
+ cmd = 'cd /home/luoshi/bin/controller/; '
+ cmd += 'sudo sed -i "/scenario/d" projects/target/_build/current/main.mod; '
+ cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/_build/current/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
- # 4. reload工程后,pp2main
+ # 3. reload工程后,pp2main,并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
- print(f"set pp2main of prj: {_response}")
-
- # 5. 切换自动并上电
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
- # 6. 开始运行程序,单轴运行15s
+ # 4. 开始运行程序,单轴运行15s
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
- print(f"run prj: {_response}")
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
@@ -165,10 +279,10 @@ def run_rl(hr, md, w2t):
break
else:
sleep(1)
- continue
else:
w2t("未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
- # 7. 打开诊断曲线,并执行采集
+
+ # 5. 打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
@@ -186,32 +300,62 @@ def run_rl(hr, md, w2t):
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
- sleep(8)
- # 8. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
+ if number < 6:
+ sleep(15)
+ else:
+ _t_start = time()
+ while True:
+ scenario_time = md.read_scenario_time()
+ if float(scenario_time) > 1:
+ w2t(f"场景{number-5}的周期时间:{scenario_time}", 0, 0, 'green', 'Automatic Test')
+ break
+ else:
+ if (time()-_t_start)//60 > 3:
+ w2t(f"未收到场景{number-5}的周期时间,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
+ else:
+ sleep(5)
+ sleep(1) # 一定要延迟一秒再读一次scenario time寄存器,因为一开始读取的数值不准确
+ sleep(float(md.read_scenario_time())*0.2) # 再运行周期的20%即可
+
+ # 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
- sleep(1)
- # 9. 处理数据并输出文件
- # for _msg in hr.c_msg:
- # if 'diagnosis.result' in _msg:
- # print(_msg)
- # _msg = json.loads(_msg)
- # if 'channel' in _msg and 'name' in _msg:
- # if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback':
- # print(f"diagnosis.result: {_msg}")
- # count += 1
- # if count * 50 > 5 * 1000:
- # break
+ sleep(2) # 保证所有数据均已返回
+ # 7. 保留数据并处理输出
+ for _msg in hr.c_msg:
+ if 'diagnosis.result' in _msg:
+ disc[number][1].insert(0, loads(_msg))
+ else:
+ _index = 210
+ for _msg in hr.c_msg:
+ if 'diagnosis.result' in _msg:
+ _index = hr.c_msg.index(_msg)
+ break
+ del hr.c_msg[_index-10:]
+ hr.c_msg_xs.clear()
+ gen_result_file(path, loadsel, disc, number)
+ else:
+ w2t("满载情况下,单轴和场景电机电流采集完毕。", 0, 0, 'green', 'Automatic Test')
-def main(path, hr, md, w2t):
+def inertia_load(path, hr, md, loadsel, w2t):
+ pass
+
+def run_rl(path, hr, md, loadsel, w2t):
+ if loadsel == 'tool100':
+ regular_load(path, hr, md, loadsel, w2t)
+ elif loadsel == 'inertia':
+ inertia_load(path, hr, md, loadsel, w2t)
+
+
+def main(path, hr, md, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
- config_file, current_file, prj_file = check_files(data_dirs, data_files, w2t)
+ config_file, current_file, prj_file = check_files(path, data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
- run_rl(hr, md, w2t)
+ run_rl(path, hr, md, loadsel, w2t)
if __name__ == '__main__':
diff --git a/aio/code/automatic_test/openapi.py b/aio/code/automatic_test/openapi.py
index a7f4418..4ed67bf 100644
--- a/aio/code/automatic_test/openapi.py
+++ b/aio/code/automatic_test/openapi.py
@@ -4,8 +4,9 @@ from threading import Thread
import selectors
from time import time, sleep
from os.path import dirname
-from binascii import b2a_hex, a2b_hex
from pymodbus.client.tcp import ModbusTcpClient
+from pymodbus.payload import BinaryPayloadDecoder
+from pymodbus.constants import Endian
MAX_FRAME_SIZE = 1024
setdefaulttimeout(2)
@@ -113,6 +114,16 @@ class ModbusRequest(object):
self.w2t(f"{Err}")
self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
+ def read_scenario_time(self):
+ try:
+ results = self.c.read_holding_registers(41002, 2)
+ result = BinaryPayloadDecoder.fromRegisters(results.registers, byteorder=Endian.BIG, wordorder=Endian.LITTLE)
+ result = f"{result.decode_32bit_float():.3f}"
+ return result
+ except Exception as Err:
+ self.w2t(f"{Err}")
+ self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
+
class HmiRequest(object):
def __init__(self, w2t):
@@ -185,10 +196,10 @@ class HmiRequest(object):
return index + 8, _frame_size, _pkg_size
else:
print(data)
- print(f"index = {index}")
- print(f"reserve = {_reserved}")
- print(f"protocol = {_protocol}")
- print("head check 数据有误,需要确认")
+ # print(f"index = {index}")
+ # print(f"reserve = {_reserved}")
+ # print(f"protocol = {_protocol}")
+ # print("head check 数据有误,需要确认")
self.w2t("Header Check: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name)
else:
self.half_length = len(data) - index
@@ -208,12 +219,12 @@ class HmiRequest(object):
_flag = '0' if self.get_from_id(_id) is None else '1'
print(f"hb = {_flag}", end=' ')
print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
- print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end=' ')
+ print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write(_flag)
if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
- sleep(1)
+ sleep(2)
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n')
@@ -457,8 +468,8 @@ class HmiRequest(object):
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
- messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
+ messages = self.c_msg if flag == 0 else self.c_msg_xs
for msg in messages:
if msg_id is None:
return None
@@ -552,7 +563,7 @@ class HmiRequest(object):
pass
req['id'] = self.gen_id(command)
- print(f"req = {req}")
+ # print(f"req = {req}")
cmd = dumps(req, separators=(',', ':'))
try:
self.c.send(self.package(cmd))