5 Commits

Author SHA1 Message Date
40ddef1c39 v0.1.9.0(2024/07/10)
1. 完成了制动性能的自动化采集
2. 完善了modbus浮点数读写相关的功能
3. 修改了target.zip工程,该工程目前适配电机电流和制动性能
2024-07-10 19:18:53 +08:00
fa5a9f0f89 Surprisingly I made it! 完成了制动性能测试程序 2024-07-10 18:55:18 +08:00
9fa42fb3e1 尝试在RL里判断最大速度,尝试失败 2024-07-10 14:32:50 +08:00
2fbe500d1d v0.1.8.2(2024/07/08)
1. [APIs: do_brake.py]: 完成了制动性能测试逻辑,只不过制动信号传递生效延迟不可控,暂时pending
2. [APIs: do_current.py]: 修改曲线数据时序,主要是value data取反即可,解决了波形锯齿明细的问题
3. [APIs: openapi.py]: modbus新增了触发急停信号的寄存器 stop0_signal,并重写了解除急停,socket新增了register.set_value协议
2024-07-08 19:50:21 +08:00
7b25b91c37 v0.1.8.1(2024/07/05)
1. [APIs: do_brake.py]: 完成了制动性能测试框架的搭建,可以顺利执行完整的测试程序,但是未实现急停和数据处理
2. [APIs: aio.py]: 修改了do_brake主函数的参数
3. 增加工程文件target.zip
2024-07-05 15:41:12 +08:00
10 changed files with 323 additions and 80 deletions

View File

@ -130,6 +130,15 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
2. excel 制动结果处理文件 2. excel 制动结果处理文件
3. excel configs.xlsx 配置文件 3. excel configs.xlsx 配置文件
> **需要注意的点**
1. 使用之前需要手动修改点位信息,确保所有点位不会发生撞击之后,再进行自动化测试
2. 工程文件不能手动重命名需要重命名存档可以导入HMI然后另存为
3. 务必正确填写configs.xlsx中的Target页面A1单元格可以选择正负方向急停但不完全保证100%大概有95%左右的准确度
4. 由于xCore系统问题运行过程中可能会出现机器人宕机问题如果遇到可以手动重启控制柜重新运行
5. 运行过程中,如果是因为机器问题无法达到额定百分比速度,会在日志输出框提示,注意观察
6. 运行自动化程序之前,确保机器处于正常状态,无故障,未触发急停
#### 6) 电机电流自动化测试 #### 6) 电机电流自动化测试
只需要提前将如下文件放在指定路径下即可: 只需要提前将如下文件放在指定路径下即可:
@ -137,6 +146,9 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
2. excel 电机电流结果处理文件,一份模板即可 2. excel 电机电流结果处理文件,一份模板即可
3. excel configs.xlsx 配置文件 3. excel configs.xlsx 配置文件
> **需要注意的点**
基本同第五点
#### 其他 #### 其他
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现: customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
@ -424,3 +436,17 @@ v0.1.7.6(2024/07/04)
v0.1.8.0(2024/07/04) v0.1.8.0(2024/07/04)
1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成 1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成
v0.1.8.1(2024/07/05)
1. [APIs: do_brake.py]: 完成了制动性能测试框架的搭建,可以顺利执行完整的测试程序,但是未实现急停和数据处理
2. [APIs: aio.py]: 修改了do_brake主函数的参数
3. 增加工程文件target.zip
v0.1.8.2(2024/07/08)
1. [APIs: do_brake.py]: 完成了制动性能测试逻辑只不过制动信号传递生效延迟不可控暂时pending
2. [APIs: do_current.py]: 修改曲线数据时序主要是value data取反即可解决了波形锯齿明细的问题
3. [APIs: openapi.py]: modbus新增了触发急停信号的寄存器 stop0_signal并重写了解除急停socket新增了register.set_value协议
v0.1.9.0(2024/07/10)
1. 完成了制动性能的自动化采集
2. 完善了modbus浮点数读写相关的功能
3. 修改了target.zip工程该工程目前适配电机电流和制动性能

Binary file not shown.

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo( ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4) # filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0. # Set not needed items to zero 0.
filevers=(0, 1, 8, 0), filevers=(0, 1, 9, 0),
prodvers=(0, 1, 8, 0), prodvers=(0, 1, 9, 0),
# Contains a bitmask that specifies the valid bits 'flags'r # Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f, mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file. # Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0', '040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'), [StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'), StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.8.0 (2024-07-04)'), StringStruct('FileVersion', '0.1.9.0 (2024-07-10)'),
StringStruct('InternalName', 'AIO.exe'), StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'), StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'), StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'), StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.8.0 (2024-07-04)')]) StringStruct('ProductVersion', '0.1.9.0 (2024-07-10)')])
]), ]),
VarFileInfo([VarStruct('Translation', [1033, 1200])]) VarFileInfo([VarStruct('Translation', [1033, 1200])])
] ]

BIN
aio/assets/target.zip Normal file

Binary file not shown.

View File

@ -0,0 +1,11 @@
{
"id": "xxxxxxxxxxx",
"module": "fieldbus",
"command": "register.set_value",
"data": {
"name": "",
"type": "bool",
"bias": 0,
"value": 0
}
}

View File

@ -1 +1 @@
0.1.8.0 @ 07/04/2024 0.1.9.0 @ 07/10/2024

View File

@ -459,7 +459,7 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], w2t=self.write2textbox) func_dict[flag](path=args[0], w2t=self.write2textbox)
elif flag == 5: elif flag == 5:
self.pre_warning() self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox) func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
elif flag == 6: elif flag == 6:
self.pre_warning() self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox) func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)

View File

@ -1,15 +1,17 @@
from time import sleep from random import randint
from time import sleep, time
from sys import argv from sys import argv
from os import scandir from os import scandir, mkdir
from os.path import exists from os.path import exists
from paramiko import SSHClient, AutoAddPolicy from paramiko import SSHClient, AutoAddPolicy
from json import loads from json import loads
from openpyxl import load_workbook
import pandas
RADIAN = 57.3 # 180 / 3.1415926
def traversal_files(path, w2t): def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path): if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......' msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test') w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
@ -25,10 +27,10 @@ def traversal_files(path, w2t):
return dirs, files return dirs, files
def check_files(data_dirs, data_files, w2t): def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 5: if len(data_dirs) != 0 or len(data_files) != 5:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = reach33 = reach66 = reach100 = prj_file = None config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files: for data_file in data_files:
@ -48,8 +50,23 @@ def check_files(data_dirs, data_files, w2t):
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test')
if config_file and reach33 and reach66 and reach100 and prj_file: if config_file and reach33 and reach66 and reach100 and prj_file:
result_dirs = []
mkdir(f"{path}\\j1")
mkdir(f"{path}\\j2")
mkdir(f"{path}\\j3")
for _reach in ['reach33', 'reach66', 'reach100']:
for _load in [f'load{loadsel.removeprefix("tool")}']:
for _speed in ['speed33', 'speed66', 'speed100']:
dir_name = '_'.join([_reach, _load, _speed])
result_dirs.append(dir_name)
mkdir(f"{path}\\j1\\{dir_name}")
mkdir(f"{path}\\j2\\{dir_name}")
if _reach == 'reach100':
mkdir(f"{path}\\j3\\{dir_name}")
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test') w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
return config_file, reach33, reach66, reach100, prj_file return config_file, reach33, reach66, reach100, prj_file, result_dirs
else: else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
@ -60,14 +77,9 @@ def prj_to_xcore(prj_file):
ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp() sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip') sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; ' cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'rm -rf target/; ' cmd += 'chmod 777 -R target/; rm target.zip'
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd) ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
@ -78,20 +90,17 @@ def prj_to_xcore(prj_file):
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr print(stderr.read().decode()) # 顺便也执行以下stderr
cmd = 'cd /home/luoshi/bin/controller/; sudo mv projects/target/_build/*.prj projects/target/_build/target.prj ' _prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n') stdin.write('luoshi2019' + '\n')
stdin.flush() stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close() ssh.close()
def modify_prj():
pass
def validate_resp(_id, response, w2t): def validate_resp(_id, response, w2t):
match _id: match _id:
case 'DATA ERR': case 'DATA ERR':
@ -105,7 +114,7 @@ def validate_resp(_id, response, w2t):
def execution(cmd, hr, w2t, **kwargs): def execution(cmd, hr, w2t, **kwargs):
_id = hr.excution(cmd, **kwargs) _id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id) _msg = hr.get_from_id(_id)
if not _msg: if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
@ -115,60 +124,201 @@ def execution(cmd, hr, w2t, **kwargs):
return _response return _response
def run_rl(hr, w2t): def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
# prj_path = 'target/_build/target.prj' _d2d_vel = {'hw_joint_vel_feedback': []}
# _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['Durable_Test_Com', 'Mechanical_Test_Com']) _d2d_trq = {'device_servo_trq_feedback': []}
# print(f"reload prj: {_response}") _d2d_stop = {'device_safety_estop': []}
# # _response = execution('overview.get_cur_prj', hr, w2t) for data in curve_data:
# # print(f"get cur prj name: {_response}") dict_results = data['data']
# _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['Mechanical_Test_Com']) # dict_results.reverse()
# print(f"set pp2main of prj: {_response}") for item in dict_results:
# item['value'].reverse()
# _response = execution('state.switch_auto', hr, w2t) if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
# _response = execution('state.switch_motor_on', hr, w2t) _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
# elif item.get('channel', None) == axis-1 and item.get('name', None) == 'device_servo_trq_feedback':
# _response = execution('rl_task.run', hr, w2t, tasks=['Mechanical_Test_Com']) _d2d_trq['device_servo_trq_feedback'].extend(item['value'])
# print(f"run prj: {_response}") elif item.get('channel', None) == 0 and item.get('name', None) == 'device_safety_estop':
# sleep(10) _d2d_stop['device_safety_estop'].extend(item['value'])
#
# _response = execution('state.switch_motor_off', hr, w2t)
# _response = execution('state.switch_manual', hr, w2t)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) df1 = pandas.DataFrame.from_dict(_d2d_vel)
print(f"打开诊断: {_response}") df2 = pandas.DataFrame.from_dict(_d2d_trq)
# _response = execution('diagnosis.get_params', hr, w2t) df3 = pandas.DataFrame.from_dict(_d2d_stop)
# print(f"显示诊断状态: {_response}") df = pandas.concat([df1, df2, df3], axis=1)
_filename = f"{path}\\j{axis}\\reach{_reach}_load{_load}_speed{_speed}\\reach{_reach}_load{_load}_speed{_speed}_{count}.data"
df.to_csv(_filename, sep='\t', index=False)
def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_count = 0
speed_max = 0
display_pdo_params = [ display_pdo_params = [
{"name": "device_servo_trq_feedback", "channel": 0} {"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
] ]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) wb = load_workbook(config_file, read_only=True)
print(f"执行采样: {_response}") ws = wb['Target']
sleep(5) for condition in result_dirs:
_reach = condition.split('_')[0].removeprefix('reach')
_load = condition.split('_')[1].removeprefix('load')
_speed = condition.split('_')[2].removeprefix('speed')
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) for axis in range(1, 4):
print(f"关闭诊断: {_response}") md.write_axis(axis)
# sleep(1) speed_max = 0
for _msg in hr.c_msg: if axis == 3 and _reach != '100':
if 'diagnosis.result' in _msg: continue
print(_msg) for count in range(1, 4):
# _msg = json.loads(_msg) _count += 1
# if 'channel' in _msg and 'name' in _msg: w2t(f"[{_count}/63-{count}] 正在执行{axis}{condition}的制动测试......", 0, 0, 'purple', 'Automatic Test')
# if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback':
# print(f"diagnosis.result: {_msg}")
# count += 1
# if count * 50 > 5 * 1000:
# break
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
md.trigger_estop()
md.reset_estop()
md.write_act(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
if ws.cell(row=1, column=1).value == 'positive':
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
else:
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', 'Automatic Test')
_rl_speed = f"VelSet {_speed}"
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
# 3. reload工程后pp2main并且自动模式和上电最后运行程序
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
# 4. 第一次打开诊断曲线并执行采集8s之后触发软急停关闭曲线采集找出最大速度传递给RL程序最后清除相关记录
if count == 1:
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(8) # 前八秒获取实际最大速度
md.trigger_estop()
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
# 找出最大速度
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
dict_results = loads(_msg)['data']
for item in dict_results:
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_ = abs(RADIAN*sum(item['value'])/len(item['value']))
speed_max = max(_, speed_max)
speed_target = float(ws.cell(row=3, column=axis+1).value) * float(_speed) / 100
if speed_max < speed_target*0.95 or speed_max > speed_target*1.05:
w2t(f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}", 0, 0, 'indigo', 'Automatic Test')
md.write_speed_max(speed_max)
sleep(1)
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
# 5. 清除软急停重新运行程序并打开曲线发送继续运动信号当速度达到最大值时通过DO触发急停
md.reset_estop()
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
# sleep(randint(3, 6))
md.write_probe(True)
_t_start = time()
while True:
if md.read_brake_done() == 1:
sleep(1) # 保证所有数据均已返回
md.write_probe(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 保证所有数据均已返回
break
else:
if (time() - _t_start) // 60 > 1:
w2t(f"规定时间内未找到合适的点触发急停需要确认RL/Python程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(1)
# 6. 保留数据并处理输出
curve_data = []
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
curve_data.insert(0, loads(_msg))
else:
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
gen_result_file(path, curve_data, axis, _reach, _load, _speed, count)
else: else:
sleep(1) w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test')
def main(path, hr, loadsel, w2t): def main(path, hr, md, loadsel, w2t):
_s_time = time()
data_dirs, data_files = traversal_files(path, w2t) data_dirs, data_files = traversal_files(path, w2t)
config_file, reach33, reach66, reach100, prj_file = check_files(data_dirs, data_files, w2t) config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t)
# prj_to_xcore(prj_file) prj_to_xcore(prj_file)
run_rl(hr, w2t) run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t)
_e_time = time()
time_total = _e_time - _s_time
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s", 0, 0, 'green', 'Automatic Test')
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -27,7 +27,7 @@ def traversal_files(path, w2t):
def check_files(path, loadsel, data_dirs, data_files, w2t): def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3: if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test') w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test') w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = current_file = prj_file = None config_file = current_file = prj_file = None
for data_file in data_files: for data_file in data_files:
@ -118,9 +118,12 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines: for line in lines:
data = eval(line.strip())['data'] data = eval(line.strip())['data']
for item in data: for item in data:
item['value'].reverse()
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback': if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
item['value'].reverse()
_d2d_vel['hw_joint_vel_feedback'].extend(item['value']) _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback': elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
item['value'].reverse()
_d2d_trq['device_servo_trq_feedback'].extend(item['value']) _d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel) df1 = pandas.DataFrame.from_dict(_d2d_vel)
@ -146,6 +149,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines: for line in lines:
data = eval(line.strip())['data'] data = eval(line.strip())['data']
for item in data: for item in data:
item['value'].reverse()
if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback': if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value']) _d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback': elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
@ -214,6 +218,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines: for line in lines:
data = eval(line.strip())['data'] data = eval(line.strip())['data']
for item in data: for item in data:
item['value'].reverse()
if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback': if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value']) _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback': elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
@ -234,6 +239,7 @@ def data_proc_inertia(path, filename, channel):
for line in lines: for line in lines:
data = eval(line.strip())['data'] data = eval(line.strip())['data']
for item in data: for item in data:
item['value'].reverse()
if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback': if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value']) _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback': elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':

View File

@ -5,7 +5,7 @@ import selectors
from time import time, sleep from time import time, sleep
from os.path import dirname from os.path import dirname
from pymodbus.client.tcp import ModbusTcpClient from pymodbus.client.tcp import ModbusTcpClient
from pymodbus.payload import BinaryPayloadDecoder from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
from pymodbus.constants import Endian from pymodbus.constants import Endian
MAX_FRAME_SIZE = 1024 MAX_FRAME_SIZE = 1024
@ -47,11 +47,12 @@ class ModbusRequest(object):
def reset_estop(self): def reset_estop(self):
try: try:
self.c.write_register(40012, 1) self.c.write_register(40012, 1)
# self.c.write_register(40001, 1) sleep(0.2)
# sleep(0.2) self.c.write_register(40001, 0)
# self.c.write_register(40001, 1) sleep(0.2)
# sleep(0.2) self.c.write_register(40001, 1)
# self.c.write_register(40001, 0) sleep(0.2)
self.c.write_register(40001, 0)
except Exception as Err: except Exception as Err:
self.w2t(f"{Err}") self.w2t(f"{Err}")
self.w2t("无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) self.w2t("无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
@ -124,6 +125,48 @@ class ModbusRequest(object):
self.w2t(f"{Err}") self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_stop0(self, number):
try:
self.c.write_register(41004, number)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法通过IO操作stop0急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_speed_max(self, speed):
try:
builder = BinaryPayloadBuilder(byteorder=Endian.BIG, wordorder=Endian.LITTLE)
builder.add_32bit_float(float(speed))
payload = builder.build()
self.c.write_registers(41005, payload, skip_encode=True)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def read_brake_done(self):
try:
results = self.c.read_holding_registers(41007, 1)
return results.registers[0]
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取制动已执行信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_axis(self, axis):
try:
builder = BinaryPayloadBuilder(byteorder=Endian.BIG, wordorder=Endian.LITTLE)
builder.add_32bit_int(int(axis))
payload = builder.to_registers()
self.c.write_registers(41008, payload)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_probe(self, probe):
try:
self.c.write_register(41010, probe)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度探测信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
class HmiRequest(object): class HmiRequest(object):
def __init__(self, w2t): def __init__(self, w2t):
@ -180,6 +223,8 @@ class HmiRequest(object):
md.reset_estop() md.reset_estop()
md.clear_alarm() md.clear_alarm()
md.write_act(False) md.write_act(False)
md.write_probe(False)
md.write_axis(1)
except Exception as Err: except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name) self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
@ -224,7 +269,7 @@ class HmiRequest(object):
f_hb.write(_flag) f_hb.write(_flag)
if _flag == '0': if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name) self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(2) sleep(1.5)
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f: # with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg: # for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n') # f.write(str(loads(msg)) + '\n')
@ -559,6 +604,11 @@ class HmiRequest(object):
case 'diagnosis.open': case 'diagnosis.open':
req['data']['open'] = kwargs['open'] req['data']['open'] = kwargs['open']
req['data']['display_open'] = kwargs['display_open'] req['data']['display_open'] = kwargs['display_open']
case 'register.set_value':
req['data']['name'] = kwargs['name']
req['data']['type'] = kwargs['type']
req['data']['bias'] = kwargs['bias']
req['data']['value'] = kwargs['value']
case _: case _:
pass pass