This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea aedac4c90c v0.1.7.4(2024/07/02)
1. [APIs: openapi.py]
   - 增加了modbus的python实现
   - heartbeat函数修改发送间隔为1s
   - 清除了绝大部分调试性输出,发现太多的这种输出也会导致心跳丢包...,不清楚这个原理是什么
   - 在get_response函数中的while self.pkg > 0循环中,删除了else语句,因为它永不会被执行到
   - 在get_response函数中,修复一个bug,在flag==0的else语句中,补齐了index==6的情况
2. [APIs: do_current.py]
   - 完成了六个轴的电机电流动作的执行,以及数据采集
   - 完成了对应的RL程序的编写
3[APIs: aio.py]
   - 引入modbus实例化,并以参数的形式,传递给相应的tabview
   - 新增pre_warning函数,在做自动化测试之前,确保所有条件皆具备
2024-07-02 21:48:00 +08:00

219 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from time import sleep
from sys import argv
from os import scandir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = current_file = prj_file = None
for data_file in data_files:
filename = data_file.split('\\')[-1]
if filename == 'configs.xlsx':
config_file = data_file
elif filename == 'T_电机电流.xlsx':
current_file = data_file
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
return config_file, current_file, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; '
cmd += 'rm -rf target/; '
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = f'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 0, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def run_rl(hr, md, w2t):
# need to run
conditions = [
" scenario(0, j1_p, j1_n, p_speed, p_tool)",
" scenario(0, j2_p, j2_n, p_speed, p_tool)",
" scenario(0, j3_p, j3_n, p_speed, p_tool)",
" scenario(0, j4_p, j4_n, p_speed, p_tool)",
" scenario(0, j5_p, j5_n, p_speed, p_tool)",
" scenario(0, j6_p, j6_n, p_speed, p_tool)",
" scenario(1, j5_p, j5_n, p_speed, p_tool)",
" scenario(2, j5_p, j5_n, p_speed, p_tool)",
" scenario(3, j5_p, j5_n, p_speed, p_tool)",
]
for condition in conditions:
disc = {0: '一轴', 1: '二轴', 2: '三轴', 3: '四轴', 4: '五轴', 5: '六轴', 6: '场景一', 7: '场景二', 8: '场景三'}
number = conditions.index(condition)
w2t(f"正在执行{disc[number]}测试......", 0, 0, 'purple', 'Automatic Test')
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.reset_estop()
_response = execution('state.switch_manual', hr, w2t)
# 3. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
cmd = f'cd /home/luoshi/bin/controller/; '
cmd += f'sudo sed -i "/scenario/d" projects/target/current/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/current/main.mod; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
# 4. reload工程后pp2main
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
print(f"set pp2main of prj: {_response}")
# 5. 切换自动并上电
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 6. 开始运行程序单轴运行15s
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
print(f"run prj: {_response}")
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
sleep(1)
md.write_act(False)
break
else:
sleep(1)
continue
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
# 7. 打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(8)
# 8. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
sleep(1)
# 9. 处理数据并输出文件
# for _msg in hr.c_msg:
# if 'diagnosis.result' in _msg:
# print(_msg)
# _msg = json.loads(_msg)
# if 'channel' in _msg and 'name' in _msg:
# if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback':
# print(f"diagnosis.result: {_msg}")
# count += 1
# if count * 50 > 5 * 1000:
# break
def main(path, hr, md, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, current_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])