v0.1.8.2(2024/07/08)

1. [APIs: do_brake.py]: 完成了制动性能测试逻辑,只不过制动信号传递生效延迟不可控,暂时pending
2. [APIs: do_current.py]: 修改曲线数据时序,主要是value data取反即可,解决了波形锯齿明细的问题
3. [APIs: openapi.py]: modbus新增了触发急停信号的寄存器 stop0_signal,并重写了解除急停,socket新增了register.set_value协议
This commit is contained in:
gitea 2024-07-08 19:50:21 +08:00
parent 7b25b91c37
commit 2fbe500d1d
5 changed files with 189 additions and 49 deletions

View File

@ -429,3 +429,7 @@ v0.1.8.1(2024/07/05)
2. [APIs: aio.py]: 修改了do_brake主函数的参数
3. 增加工程文件target.zip
v0.1.8.2(2024/07/08)
1. [APIs: do_brake.py]: 完成了制动性能测试逻辑只不过制动信号传递生效延迟不可控暂时pending
2. [APIs: do_current.py]: 修改曲线数据时序主要是value data取反即可解决了波形锯齿明细的问题
3. [APIs: openapi.py]: modbus新增了触发急停信号的寄存器 stop0_signal并重写了解除急停socket新增了register.set_value协议

View File

@ -0,0 +1,11 @@
{
"id": "xxxxxxxxxxx",
"module": "fieldbus",
"command": "register.set_value",
"data": {
"name": "",
"type": "bool",
"bias": 0,
"value": 0
}
}

View File

@ -1,10 +1,13 @@
import os
from time import sleep
from time import sleep, time
from sys import argv
from os import scandir
from os import scandir, mkdir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from openpyxl import load_workbook
import pandas
RADIAN = 57.3 # 180 / 3.1415926
def traversal_files(path, w2t):
@ -26,7 +29,7 @@ def traversal_files(path, w2t):
def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 5:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files:
@ -47,19 +50,19 @@ def check_files(path, loadsel, data_dirs, data_files, w2t):
if config_file and reach33 and reach66 and reach100 and prj_file:
result_dirs = []
os.mkdir(f"{path}\\j1")
os.mkdir(f"{path}\\j2")
os.mkdir(f"{path}\\j3")
mkdir(f"{path}\\j1")
mkdir(f"{path}\\j2")
mkdir(f"{path}\\j3")
for _reach in ['reach33', 'reach66', 'reach100']:
for _load in [f'load{loadsel.removeprefix("tool")}']:
for _speed in ['speed33', 'speed66', 'speed100']:
dir_name = '_'.join([_reach, _load, _speed])
result_dirs.append(dir_name)
os.mkdir(f"{path}\\j1\\{dir_name}")
os.mkdir(f"{path}\\j2\\{dir_name}")
mkdir(f"{path}\\j1\\{dir_name}")
mkdir(f"{path}\\j2\\{dir_name}")
if _reach == 'reach100':
os.mkdir(f"{path}\\j3\\{dir_name}")
mkdir(f"{path}\\j3\\{dir_name}")
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
return config_file, reach33, reach66, reach100, prj_file, result_dirs
@ -120,6 +123,30 @@ def execution(cmd, hr, w2t, **kwargs):
return _response
def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_stop = {'device_safety_estop': []}
for data in curve_data:
dict_results = data['data']
# dict_results.reverse()
for item in dict_results:
item['value'].reverse()
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == axis-1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_safety_estop':
_d2d_stop['device_safety_estop'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df3 = pandas.DataFrame.from_dict(_d2d_stop)
df = pandas.concat([df1, df2, df3], axis=1)
_filename = f"{path}\\j{axis}\\reach{_reach}_load{_load}_speed{_speed}\\reach{_reach}_load{_load}_speed{_speed}_{count}.data"
df.to_csv(_filename, sep='\t', index=False)
def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_count = 0
for condition in result_dirs:
@ -128,16 +155,18 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_speed = condition.split('_')[2].removeprefix('speed')
for axis in range(1, 4):
_get_speed_max = 0
if axis == 3 and _reach != '100':
continue
for count in range(1, 4):
_count += 1
w2t(f"[{_count/64}]正在执行{axis}{condition}[{count}]的制动测试......", 0, 0, 'purple', 'Automatic Test')
w2t(f"[{_count}/64-{count}] 正在执行{axis}{condition}的制动测试......", 0, 0, 'purple', 'Automatic Test')
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.set_stop0(1)
md.reset_estop()
sleep(2) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
@ -160,14 +189,77 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
# 3. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake'])
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 4. 开始运行程序每种情况运行15s
_response = execution('rl_task.run', hr, w2t, tasks=['brake'])
# 4. 第一次开始运行程序,运行两轮
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
sleep(1)
md.write_act(False)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
if _get_speed_max == 0:
# 5. 第一次打开诊断曲线,并执行采集
_get_speed_max = 1
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(8) # 前八秒获取实际最大速度
# 6. 关闭诊断曲线,并处理数据,找出最大速度
_response = execution('rl_task.stop', hr, w2t, tasks=['brake'])
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
# 找出最大速度
speed_max = 0
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
dict_results = loads(_msg)['data']
for item in dict_results:
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_ = abs(RADIAN*sum(item['value'])/len(item['value']))
speed_max = max(_, speed_max)
wb = load_workbook(config_file, read_only=True)
ws = wb['Target']
speed_target = float(ws.cell(row=3, column=axis+1).value)
if speed_max < speed_target*0.95:
w2t(f"Axis: {axis}-{count} | Speed: {speed_max*RADIAN} | Shouldbe: {speed_target}", 0, 0, 'indigo')
# 7. 清零c_msg重新运行程序并打开曲线发送继续运动信号开始运动当速度达到最大值时通过DO触发急停
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
@ -179,56 +271,69 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
# 5. 打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(15)
sleep(4)
_flg = 1
_t_start = time()
while _flg:
sleep(0.05)
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
if _flg == 0:
break
dict_results = loads(_msg)['data']
for item in dict_results:
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_ = abs(RADIAN*sum(item['value'])/len(item['value']))
if abs(_-speed_max) < 5:
print(f"speed actual = {_}")
print(f"speed max = {speed_max}")
print(f"speed target = {speed_target*float(_speed)/100}")
print(f"abs minus = {abs(_-speed_max)}")
for _i in range(5):
# md.set_stop0(0) # 触发IO急停
_response = execution('register.set_value', hr, w2t, name='stop0_signal', type='bool', bias=0, value=0)
sleep(0.02)
_flg = 0
break
else:
if (time() - _t_start) // 60 > 1:
w2t(f"规定时间内未找到合适的点触发急停需要确认RL/Python程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
break
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('rl_task.stop', hr, w2t, tasks=['brake'])
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
# 8. 关闭诊断曲线,且换成手动模式
sleep(1) # 保证所有数据均已返回
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('state.switch_manual', hr, w2t)
# 7. 保留数据并处理输出
# 9. 保留数据并处理输出
curve_data = []
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
curve_data.insert(0, loads(_msg))
else:
_index = 210
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
# gen_result_file(path, loadsel, disc, number, scenario_time)
gen_result_file(path, curve_data, axis, _reach, _load, _speed, count)
else:
w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test')
def main(path, hr, md, loadsel, w2t):
_s_time = time()
data_dirs, data_files = traversal_files(path, w2t)
config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t)
_e_time = time()
time_total = _e_time - _s_time
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s")
if __name__ == '__main__':

View File

@ -27,7 +27,7 @@ def traversal_files(path, w2t):
def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = current_file = prj_file = None
for data_file in data_files:
@ -118,9 +118,12 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
item['value'].reverse()
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
item['value'].reverse()
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
@ -146,6 +149,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
@ -214,6 +218,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
@ -234,6 +239,7 @@ def data_proc_inertia(path, filename, channel):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':

View File

@ -47,11 +47,12 @@ class ModbusRequest(object):
def reset_estop(self):
try:
self.c.write_register(40012, 1)
# self.c.write_register(40001, 1)
# sleep(0.2)
# self.c.write_register(40001, 1)
# sleep(0.2)
# self.c.write_register(40001, 0)
sleep(0.2)
self.c.write_register(40001, 0)
sleep(0.2)
self.c.write_register(40001, 1)
sleep(0.2)
self.c.write_register(40001, 0)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
@ -124,6 +125,13 @@ class ModbusRequest(object):
self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def set_stop0(self, number):
try:
self.c.write_register(41004, number)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法通过IO操作stop0急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
class HmiRequest(object):
def __init__(self, w2t):
@ -180,6 +188,7 @@ class HmiRequest(object):
md.reset_estop()
md.clear_alarm()
md.write_act(False)
md.set_stop0(1)
except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
@ -559,6 +568,11 @@ class HmiRequest(object):
case 'diagnosis.open':
req['data']['open'] = kwargs['open']
req['data']['display_open'] = kwargs['display_open']
case 'register.set_value':
req['data']['name'] = kwargs['name']
req['data']['type'] = kwargs['type']
req['data']['bias'] = kwargs['bias']
req['data']['value'] = kwargs['value']
case _:
pass