v0.1.7.5(2024/07/03)

1. [APIs: aio.py]
   - 增加触发急停和恢复急停功能逻辑
2. [APIs: do_current.py]
   - 重新调整运行顺序,增加数据处理的逻辑(惯量负载逻辑暂不实现,等待软件部解决了修改工程之后不生效的问题再考虑)
3. [APIs: btn_functions.py]
   - 增加触发急停和恢复急停的modbus实现,仅适用于自动化测试

v0.1.7.6(2024/07/04)
1. [APIs: aio.py]
   - Automatic Test逻辑中增加选择current时,需要选负载类型的逻辑
2. [APIs: do_current.py]
   - 单轴/场景电机电流的采集已完成
3. [APIs: openapi.py]
   - 增加了modbus读取浮点数的功能
   - 优化了get_from_id的逻辑
4. [autotest.xml]: 新增了scenario_time只写寄存器
This commit is contained in:
gitea 2024-07-04 13:34:04 +08:00
parent aedac4c90c
commit 524af160d8
6 changed files with 287 additions and 93 deletions

View File

@ -402,3 +402,22 @@ v0.1.7.4(2024/07/02)
3[APIs: aio.py] 3[APIs: aio.py]
- 引入modbus实例化并以参数的形式传递给相应的tabview - 引入modbus实例化并以参数的形式传递给相应的tabview
- 新增pre_warning函数在做自动化测试之前确保所有条件皆具备 - 新增pre_warning函数在做自动化测试之前确保所有条件皆具备
v0.1.7.5(2024/07/03)
1. [APIs: aio.py]
- 增加触发急停和恢复急停功能逻辑
2. [APIs: do_current.py]
- 重新调整运行顺序,增加数据处理的逻辑(惯量负载逻辑暂不实现,等待软件部解决了修改工程之后不生效的问题再考虑)
3. [APIs: btn_functions.py]
- 增加触发急停和恢复急停的modbus实现仅适用于自动化测试
v0.1.7.6(2024/07/04)
1. [APIs: aio.py]
- Automatic Test逻辑中增加选择current时需要选负载类型的逻辑
2. [APIs: do_current.py]
- 单轴/场景电机电流的采集已完成
3. [APIs: openapi.py]
- 增加了modbus读取浮点数的功能
- 优化了get_from_id的逻辑
4. [autotest.xml]: 新增了scenario_time只写寄存器

View File

@ -891,35 +891,56 @@
<c name="addr_2nd" type="2" value="0"/> <c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/> <c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/> <c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value="robot to pc"/> <c name="description" type="10" value="pc to robot"/>
<c name="dev_name" type="10" value="autotest"/> <c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/> <c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41000"/> <c name="end_addr" type="2" value="41000"/>
<c name="function" type="10" value=""/> <c name="function" type="10" value=""/>
<c name="len" type="2" value="1"/> <c name="len" type="2" value="1"/>
<c name="name" type="10" value="act"/>
<c name="retain" type="1" value="false"/>
<c name="rw" type="10" value="rd"/>
<c name="type" type="10" value="bool"/>
<c name="value"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41001"/>
<c name="addr_1st" type="2" value="0"/>
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value="robot to pc"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41001"/>
<c name="function" type="10" value=""/>
<c name="len" type="2" value="1"/>
<c name="name" type="10" value="ready_to_go"/> <c name="name" type="10" value="ready_to_go"/>
<c name="retain" type="1" value="false"/> <c name="retain" type="1" value="false"/>
<c name="rw" type="10" value="rdwr"/> <c name="rw" type="10" value="rdwr"/>
<c name="type" type="10" value="bool"/> <c name="type" type="10" value="bool"/>
<c name="value"/> <c name="value"/>
<c name="value_single" type="10" value=""/> <c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l> </l>
<l> <l>
<c name="addr" type="2" value="41001"/> <c name="addr" type="2" value="41002"/>
<c name="addr_1st" type="2" value="0"/> <c name="addr_1st" type="2" value="0"/>
<c name="addr_2nd" type="2" value="0"/> <c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/> <c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/> <c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value="pc to robot"/> <c name="description" type="10" value="robot to pc"/>
<c name="dev_name" type="10" value="autotest"/> <c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/> <c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41001"/> <c name="end_addr" type="2" value="41003"/>
<c name="function" type="10" value=""/> <c name="function" type="10" value=""/>
<c name="len" type="2" value="1"/> <c name="len" type="2" value="1"/>
<c name="name" type="10" value="act"/> <c name="name" type="10" value="scenario_time"/>
<c name="retain" type="1" value="false"/> <c name="retain" type="1" value="false"/>
<c name="rw" type="10" value="rd"/> <c name="rw" type="10" value="rdwr"/>
<c name="type" type="10" value="int16"/> <c name="type" type="10" value="float"/>
<c name="value"/> <c name="value"/>
<c name="value_single" type="10" value=""/> <c name="value_single" type="10" value=""/>
</l> </l>

View File

@ -116,7 +116,7 @@ class App(customtkinter.CTk):
# create buttons # create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback)) self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew") self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "止运动", "继续运动", "零点位姿", "机器状态", "告警信息"]) self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换") self.seg_button.set("功能切换")
# create progress bar # create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test')) self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
@ -132,7 +132,7 @@ class App(customtkinter.CTk):
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we') widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled') widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]: elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font) widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100", "inertia"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we') widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text']) widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled') widgits_at[widgit]['optionmenu'].configure(state='disabled')
@ -161,7 +161,7 @@ class App(customtkinter.CTk):
self.myThread.start() self.myThread.start()
def segmented_button_callback(self): def segmented_button_callback(self):
_btn_funcs = {'get_state': '机器状态', 'warning_info': '告警信息', '3': '4', '5': '6', '7': '8'} _btn_funcs = {'trigger_estop': '触发急停', 'reset_estop': '恢复急停', 'get_state': '机器状态', 'warning_info': '告警信息'}
value = self.seg_button.get() value = self.seg_button.get()
self.seg_button.configure(state='disabled') self.seg_button.configure(state='disabled')
@ -175,7 +175,7 @@ class App(customtkinter.CTk):
else: else:
for _func in _btn_funcs: for _func in _btn_funcs:
if _btn_funcs[_func] == value: if _btn_funcs[_func] == value:
btn_functions.main(self.hr, _func, self.write2textbox) btn_functions.main(self.hr, self.md_at, _func, self.write2textbox)
break break
self.seg_button.configure(state='normal') self.seg_button.configure(state='normal')
@ -196,7 +196,7 @@ class App(customtkinter.CTk):
if c_state == '0': if c_state == '0':
# self.textbox.delete(index1='1.0', index2='end') # self.textbox.delete(index1='1.0', index2='end')
self.hr.t_bool = False self.hr.t_bool = False
sleep(4) sleep(3)
del self.hr del self.hr
self.hr = openapi.HmiRequest(self.write2textbox) self.hr = openapi.HmiRequest(self.write2textbox)
sleep(3) sleep(3)
@ -288,7 +288,7 @@ class App(customtkinter.CTk):
widgits_at[widgit]['entry'].configure(state='normal') widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]: elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text']) widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(text_color='black', state='disabled') widgits_at[widgit]['optionmenu'].configure(state='normal', text_color='red')
else: else:
self.initialization() self.initialization()
self.menu_main_at.set("Start Here!") self.menu_main_at.set("Start Here!")
@ -434,8 +434,11 @@ class App(customtkinter.CTk):
return 0, 0 return 0, 0
elif func_name == 'current': elif func_name == 'current':
path = widgits_at['path']['entry'].get().strip() path = widgits_at['path']['entry'].get().strip()
if exists(path): loadsel = widgits_at['loadsel']['optionmenu'].get()
return 6, path c1 = exists(path)
c2 = loadsel in ['tool100', 'inertia']
if c1 and c2:
return 6, path, loadsel
else: else:
return 0, 0 return 0, 0
else: else:
@ -459,7 +462,7 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox) func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox)
elif flag == 6: elif flag == 6:
self.pre_warning() self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, w2t=self.write2textbox) func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
else: else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )

View File

@ -1,4 +1,4 @@
import json from json import loads
from sys import argv from sys import argv
@ -20,11 +20,21 @@ def execution(cmd, hr, w2t, **kwargs):
if not _msg: if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else: else:
_response = json.loads(_msg) _response = loads(_msg)
validate_resp(_id, _response, w2t) validate_resp(_id, _response, w2t)
return _response return _response
def trigger_estop(md, w2t):
md.trigger_estop()
w2t("触发急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
def reset_estop(md, w2t):
md.reset_estop()
w2t("恢复急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
def get_state(hr, w2t): def get_state(hr, w2t):
# 获取机器状态 # 获取机器状态
_response = execution('state.get_state', hr, w2t) _response = execution('state.get_state', hr, w2t)
@ -48,17 +58,21 @@ def get_state(hr, w2t):
def warning_info(hr, w2t): def warning_info(hr, w2t):
for msg in hr.c_msg: for msg in hr.c_msg:
if 'alarm' in msg.lower(): if 'alarm' in msg.lower():
w2t(msg, tab_name='Automatic Test') w2t(str(loads(msg)), tab_name='Automatic Test')
for msg in hr.c_msg_xs: for msg in hr.c_msg_xs:
if 'alarm' in msg.lower(): if 'alarm' in msg.lower():
w2t(msg, tab_name='Automatic Test') w2t(str(loads(msg)), tab_name='Automatic Test')
def main(hr, func, w2t): def main(hr, md, func, w2t):
if hr is None: if hr is None:
w2t("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 49, 'red', tab_name='Automatic Test') w2t("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 49, 'red', tab_name='Automatic Test')
# func: get_state/ # func: get_state/
match func: match func:
case 'trigger_estop':
trigger_estop(md, w2t)
case 'reset_estop':
reset_estop(md, w2t)
case 'get_state': case 'get_state':
get_state(hr, w2t) get_state(hr, w2t)
case 'warning_info': case 'warning_info':
@ -68,21 +82,3 @@ def main(hr, func, w2t):
if __name__ == '__main__': if __name__ == '__main__':
main(*argv[1:]) main(*argv[1:])
# 一、设置/检测机器人状态:
# 1. 上电
# 2. 软限位打开
# 3. 示教器断开
# 4. 操作模式/机器人类型
# 5. 控制器状态/工作任务控件/机器人动态
# 二、加载RL程序开始运行
# 1. 怎么触发急停
# 2. 怎么恢复急停
# 3. 怎么采集曲线
# 4.
# 三、运行过程中,收集数据,并处理出结果
# 四

View File

@ -1,9 +1,11 @@
from time import sleep import os
from time import sleep, time
from sys import argv from sys import argv
from os import scandir from os import scandir
from os.path import exists from os.path import exists
from paramiko import SSHClient, AutoAddPolicy from paramiko import SSHClient, AutoAddPolicy
from json import loads from json import loads
import pandas
def traversal_files(path, w2t): def traversal_files(path, w2t):
@ -22,7 +24,7 @@ def traversal_files(path, w2t):
return dirs, files return dirs, files
def check_files(data_dirs, data_files, w2t): def check_files(path, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3: if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
@ -42,6 +44,12 @@ def check_files(data_dirs, data_files, w2t):
if config_file and current_file and prj_file: if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
os.mkdir(f"{path}\\single")
os.mkdir(f"{path}\\s_1")
os.mkdir(f"{path}\\s_2")
os.mkdir(f"{path}\\s_3")
os.mkdir(f"{path}\\inertia")
os.mkdir(f"{path}\\hold")
return config_file, current_file, prj_file return config_file, current_file, prj_file
else: else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
@ -56,11 +64,8 @@ def prj_to_xcore(prj_file):
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip') # stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip') # ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip') sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; ' cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'rm -rf target/; ' cmd += 'chmod 777 -R target/; rm target.zip'
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd) ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
@ -72,8 +77,8 @@ def prj_to_xcore(prj_file):
print(stderr.read().decode()) # 顺便也执行以下stderr print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip') _prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = f'cd /home/luoshi/bin/controller/; ' cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj ' cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n') stdin.write('luoshi2019' + '\n')
stdin.flush() stdin.flush()
@ -98,15 +103,125 @@ def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs) _id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id) _msg = hr.get_from_id(_id)
if not _msg: if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 0, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else: else:
_response = loads(_msg) _response = loads(_msg)
validate_resp(_id, _response, w2t) validate_resp(_id, _response, w2t)
return _response return _response
def run_rl(hr, md, w2t): def data_proc(path, filename, channel):
# need to run if channel in list(range(6)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel+1}_single.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(6, 9)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel_0 = {'hw_joint_vel_feedback': []}
_d2d_trq_0 = {'device_servo_trq_feedback': []}
_d2d_vel_1 = {'hw_joint_vel_feedback': []}
_d2d_trq_1 = {'device_servo_trq_feedback': []}
_d2d_vel_2 = {'hw_joint_vel_feedback': []}
_d2d_trq_2 = {'device_servo_trq_feedback': []}
_d2d_vel_3 = {'hw_joint_vel_feedback': []}
_d2d_trq_3 = {'device_servo_trq_feedback': []}
_d2d_vel_4 = {'hw_joint_vel_feedback': []}
_d2d_trq_4 = {'device_servo_trq_feedback': []}
_d2d_vel_5 = {'hw_joint_vel_feedback': []}
_d2d_trq_5 = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_0['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_1['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_1['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_2['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_2['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_3['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_3['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_4['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_4['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_5['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_5['device_servo_trq_feedback'].extend(item['value'])
df_01 = pandas.DataFrame.from_dict(_d2d_vel_0)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_0)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_1)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_1)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_2)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_2)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_3)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_3)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_4)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_4)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_5)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_5)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_1.data'
df.to_csv(_filename, sep='\t', index=False)
def gen_result_file(path, loadsel, disc, number):
filename = path + f'\\data.txt'
with open(filename, 'w', encoding='utf-8') as f_obj:
for line in disc[number][1]:
f_obj.write(str(line)+'\n')
if loadsel == 'tool100':
data_proc(path, filename, number)
elif loadsel == 'inertia':
data_proc(path, filename, 'all')
def regular_load(path, hr, md, loadsel, w2t):
conditions = [ conditions = [
" scenario(0, j1_p, j1_n, p_speed, p_tool)", " scenario(0, j1_p, j1_n, p_speed, p_tool)",
" scenario(0, j2_p, j2_n, p_speed, p_tool)", " scenario(0, j2_p, j2_n, p_speed, p_tool)",
@ -118,45 +233,44 @@ def run_rl(hr, md, w2t):
" scenario(2, j5_p, j5_n, p_speed, p_tool)", " scenario(2, j5_p, j5_n, p_speed, p_tool)",
" scenario(3, j5_p, j5_n, p_speed, p_tool)", " scenario(3, j5_p, j5_n, p_speed, p_tool)",
] ]
for condition in conditions: for condition in conditions:
disc = {0: '一轴', 1: '二轴', 2: '三轴', 3: '四轴', 4: '五轴', 5: '六轴', 6: '场景一', 7: '场景二', 8: '场景三'} disc = {
0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []],
6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []]
}
number = conditions.index(condition) number = conditions.index(condition)
w2t(f"正在执行{disc[number]}测试......", 0, 0, 'purple', 'Automatic Test') w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test')
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来 # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop() md.trigger_estop()
md.reset_estop() md.reset_estop()
_response = execution('state.switch_manual', hr, w2t) _response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 3. 修改未要执行的场景 # 2. 修改未要执行的场景
ssh = SSHClient() ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
cmd = f'cd /home/luoshi/bin/controller/; ' cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo sed -i "/scenario/d" projects/target/current/main.mod; ' cmd += 'sudo sed -i "/scenario/d" projects/target/_build/current/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/current/main.mod; ' cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/_build/current/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n') stdin.write('luoshi2019' + '\n')
stdin.flush() stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr print(stderr.read().decode()) # 顺便也执行以下stderr
# 4. reload工程后pp2main # 3. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj' prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current']) _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t) _response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current']) _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
print(f"set pp2main of prj: {_response}")
# 5. 切换自动并上电
_response = execution('state.switch_auto', hr, w2t) _response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t) _response = execution('state.switch_motor_on', hr, w2t)
# 6. 开始运行程序单轴运行15s # 4. 开始运行程序单轴运行15s
_response = execution('rl_task.run', hr, w2t, tasks=['current']) _response = execution('rl_task.run', hr, w2t, tasks=['current'])
print(f"run prj: {_response}")
for i in range(3): for i in range(3):
if md.read_ready_to_go() == 1: if md.read_ready_to_go() == 1:
md.write_act(True) md.write_act(True)
@ -165,10 +279,10 @@ def run_rl(hr, md, w2t):
break break
else: else:
sleep(1) sleep(1)
continue
else: else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test') w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
# 7. 打开诊断曲线,并执行采集
# 5. 打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [ display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0}, {"name": "hw_joint_vel_feedback", "channel": 0},
@ -186,32 +300,62 @@ def run_rl(hr, md, w2t):
{"name": "device_safety_estop", "channel": 0}, {"name": "device_safety_estop", "channel": 0},
] ]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(8) if number < 6:
# 8. 关闭诊断曲线,停止程序运行,下电并且换成手动模式 sleep(15)
else:
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景{number-5}的周期时间:{scenario_time}", 0, 0, 'green', 'Automatic Test')
break
else:
if (time()-_t_start)//60 > 3:
w2t(f"未收到场景{number-5}的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
sleep(float(md.read_scenario_time())*0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('rl_task.stop', hr, w2t, tasks=['current']) _response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_response = execution('state.switch_motor_off', hr, w2t) _response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t) _response = execution('state.switch_manual', hr, w2t)
sleep(1) sleep(2) # 保证所有数据均已返回
# 9. 处理数据并输出文件 # 7. 保留数据并处理输出
# for _msg in hr.c_msg: for _msg in hr.c_msg:
# if 'diagnosis.result' in _msg: if 'diagnosis.result' in _msg:
# print(_msg) disc[number][1].insert(0, loads(_msg))
# _msg = json.loads(_msg) else:
# if 'channel' in _msg and 'name' in _msg: _index = 210
# if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback': for _msg in hr.c_msg:
# print(f"diagnosis.result: {_msg}") if 'diagnosis.result' in _msg:
# count += 1 _index = hr.c_msg.index(_msg)
# if count * 50 > 5 * 1000: break
# break del hr.c_msg[_index-10:]
hr.c_msg_xs.clear()
gen_result_file(path, loadsel, disc, number)
else:
w2t("满载情况下,单轴和场景电机电流采集完毕。", 0, 0, 'green', 'Automatic Test')
def main(path, hr, md, w2t): def inertia_load(path, hr, md, loadsel, w2t):
pass
def run_rl(path, hr, md, loadsel, w2t):
if loadsel == 'tool100':
regular_load(path, hr, md, loadsel, w2t)
elif loadsel == 'inertia':
inertia_load(path, hr, md, loadsel, w2t)
def main(path, hr, md, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t) data_dirs, data_files = traversal_files(path, w2t)
config_file, current_file, prj_file = check_files(data_dirs, data_files, w2t) config_file, current_file, prj_file = check_files(path, data_dirs, data_files, w2t)
prj_to_xcore(prj_file) prj_to_xcore(prj_file)
run_rl(hr, md, w2t) run_rl(path, hr, md, loadsel, w2t)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -4,8 +4,9 @@ from threading import Thread
import selectors import selectors
from time import time, sleep from time import time, sleep
from os.path import dirname from os.path import dirname
from binascii import b2a_hex, a2b_hex
from pymodbus.client.tcp import ModbusTcpClient from pymodbus.client.tcp import ModbusTcpClient
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
MAX_FRAME_SIZE = 1024 MAX_FRAME_SIZE = 1024
setdefaulttimeout(2) setdefaulttimeout(2)
@ -113,6 +114,16 @@ class ModbusRequest(object):
self.w2t(f"{Err}") self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def read_scenario_time(self):
try:
results = self.c.read_holding_registers(41002, 2)
result = BinaryPayloadDecoder.fromRegisters(results.registers, byteorder=Endian.BIG, wordorder=Endian.LITTLE)
result = f"{result.decode_32bit_float():.3f}"
return result
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
class HmiRequest(object): class HmiRequest(object):
def __init__(self, w2t): def __init__(self, w2t):
@ -185,10 +196,10 @@ class HmiRequest(object):
return index + 8, _frame_size, _pkg_size return index + 8, _frame_size, _pkg_size
else: else:
print(data) print(data)
print(f"index = {index}") # print(f"index = {index}")
print(f"reserve = {_reserved}") # print(f"reserve = {_reserved}")
print(f"protocol = {_protocol}") # print(f"protocol = {_protocol}")
print("head check 数据有误,需要确认") # print("head check 数据有误,需要确认")
self.w2t("Header Check: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name) self.w2t("Header Check: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name)
else: else:
self.half_length = len(data) - index self.half_length = len(data) - index
@ -208,12 +219,12 @@ class HmiRequest(object):
_flag = '0' if self.get_from_id(_id) is None else '1' _flag = '0' if self.get_from_id(_id) is None else '1'
print(f"hb = {_flag}", end=' ') print(f"hb = {_flag}", end=' ')
print(f"len(c_msg) = {len(self.c_msg)}", end=' ') print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end=' ') print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write(_flag) f_hb.write(_flag)
if _flag == '0': if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name) self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(1) sleep(2)
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f: # with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg: # for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n') # f.write(str(loads(msg)) + '\n')
@ -457,8 +468,8 @@ class HmiRequest(object):
self.flag_xs = 1 self.flag_xs = 1
def get_from_id(self, msg_id, flag=0): def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3): for i in range(3):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for msg in messages: for msg in messages:
if msg_id is None: if msg_id is None:
return None return None
@ -552,7 +563,7 @@ class HmiRequest(object):
pass pass
req['id'] = self.gen_id(command) req['id'] = self.gen_id(command)
print(f"req = {req}") # print(f"req = {req}")
cmd = dumps(req, separators=(',', ':')) cmd = dumps(req, separators=(',', ':'))
try: try:
self.c.send(self.package(cmd)) self.c.send(self.package(cmd))