This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea fc3d5482f8 v0.1.7.2(2024/06/30)
1. 初步完成NB4h_R580_3BH7.zip工程的设计
2. 重新研究了解包操作,重新实现了一版
3. 修改openapi.pi中excution为execution函数
4. 增减了解包原理性文档
2024-06-30 20:29:49 +08:00

182 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from time import sleep
from sys import argv
from os import scandir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = current_file = prj_file = None
for data_file in data_files:
filename = data_file.split('\\')[-1]
if filename == 'configs.xlsx':
config_file = data_file
elif filename == 'T_电机电流.xlsx':
current_file = data_file
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
return config_file, current_file, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; '
cmd += 'rm -rf target/; '
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
cmd = 'cd /home/luoshi/bin/controller/; sudo mv projects/target/_build/*.prj projects/target/_build/target.prj '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def modify_prj():
pass
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def run_rl(hr, w2t):
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'current'])
print(f"reload prj: {_response}")
_response = execution('overview.get_cur_prj', hr, w2t)
print(f"get cur prj name: {_response}")
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake'])
print(f"set pp2main of prj: {_response}")
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
print(f"打开诊断: {_response}")
# _response = execution('diagnosis.get_params', hr, w2t)
# print(f"显示诊断状态: {_response}")
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
{"name": "device_safety_estop", "channel": 0},
{"name": "device_safety_estop", "channel": 0},
{"name": "device_safety_estop", "channel": 0},
{"name": "device_safety_estop", "channel": 0},
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
print(f"执行采样: {_response}")
_response = execution('rl_task.run', hr, w2t, tasks=['brake'])
print(f"run prj: {_response}")
sleep(10)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
print(f"关闭诊断: {_response}")
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
print(_msg)
# _msg = json.loads(_msg)
# if 'channel' in _msg and 'name' in _msg:
# if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback':
# print(f"diagnosis.result: {_msg}")
# count += 1
# if count * 50 > 5 * 1000:
# break
def main(path, hr, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, current_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(hr, w2t)
if __name__ == '__main__':
main(*argv[1:])