This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea 611d848b41 v0.1.7.1(2024/06/29)
1. [APIs: aio.py]
   - 修改detect_network函数中sleep语句放到最后,重新生成HmiRequest实例中增加sleep(4),这个停顿时间一定是比openapi中heartbeat函数的sleep要长1s以上才能正常工作
   - 修改write2textbox函数,新增默认参数tab_name,只有当该值与当前tab一致时,函数才会有输出
   - 第二条改动影响到了automatic_test文件夹下所有的文件
2. [APIs: openapi.py]
   - 规定了所有的网络异常均由heartbeat函数来定义,其他异常不做中断处理
   - execution函数中合并了case条件
   - 增加了N多指令,多为诊断曲线和rl程序相关
3. [APIs: do_brake.py]
   - 实现自动推送工程到xCore并自动运行
   - 初步实现了Modbus发送消息和检测状态
4. [APIs: do_current.py]
   - 将do_brake.py的内容完全拷贝到此文件,待修改
2024-06-29 20:40:17 +08:00

176 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from time import sleep
from sys import argv
from os import scandir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 5:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files:
filename = data_file.split('\\')[-1]
if filename == 'configs.xlsx':
config_file = data_file
elif filename.startswith('reach33_') and filename.endswith('.xlsx'):
reach33 = data_file
elif filename.startswith('reach66_') and filename.endswith('.xlsx'):
reach66 = data_file
elif filename.startswith('reach100_') and filename.endswith('.xlsx'):
reach100 = data_file
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test')
if config_file and reach33 and reach66 and reach100 and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
return config_file, reach33, reach66, reach100, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; '
cmd += 'rm -rf target/; '
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
cmd = 'cd /home/luoshi/bin/controller/; sudo mv projects/target/_build/*.prj projects/target/_build/target.prj '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def modify_prj():
pass
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.excution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def run_rl(hr, w2t):
# prj_path = 'target/_build/target.prj'
# _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['Durable_Test_Com', 'Mechanical_Test_Com'])
# print(f"reload prj: {_response}")
# # _response = execution('overview.get_cur_prj', hr, w2t)
# # print(f"get cur prj name: {_response}")
# _response = execution('rl_task.pp_to_main', hr, w2t, tasks=['Mechanical_Test_Com'])
# print(f"set pp2main of prj: {_response}")
#
# _response = execution('state.switch_auto', hr, w2t)
# _response = execution('state.switch_motor_on', hr, w2t)
#
# _response = execution('rl_task.run', hr, w2t, tasks=['Mechanical_Test_Com'])
# print(f"run prj: {_response}")
# sleep(10)
#
# _response = execution('state.switch_motor_off', hr, w2t)
# _response = execution('state.switch_manual', hr, w2t)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
print(f"打开诊断: {_response}")
# _response = execution('diagnosis.get_params', hr, w2t)
# print(f"显示诊断状态: {_response}")
display_pdo_params = [
{"name": "device_servo_trq_feedback", "channel": 0}
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
print(f"执行采样: {_response}")
sleep(5)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
print(f"关闭诊断: {_response}")
# sleep(1)
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
print(_msg)
# _msg = json.loads(_msg)
# if 'channel' in _msg and 'name' in _msg:
# if int(_msg['channel']) == 0 and _msg['name'] == 'device_servo_trq_feedback':
# print(f"diagnosis.result: {_msg}")
# count += 1
# if count * 50 > 5 * 1000:
# break
else:
sleep(1)
def main(path, hr, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, reach33, reach66, reach100, prj_file = check_files(data_dirs, data_files, w2t)
# prj_to_xcore(prj_file)
run_rl(hr, w2t)
if __name__ == '__main__':
main(*argv[1:])