This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea b01f8dc19c v0.2.0.3(2024/07/27)
1. [APIs: do_brake.py]: 精简程序,解决 OOM 问题
2. [APIs: do_current.py]: 精简程序,解决 OOM 问题
3. [APIs: factory_test.py]: 精简程序,解决 OOM 问题
4. [APIsL openapi.py]
   - 心跳修改为 1 s,因为 OOM 问题的解决依赖于长久的打开曲线开关,此时对于 hr.c_msg 的定时清理是个挑战,将心跳缩短,有利于清理日志后,避免丢失心跳
   - 新增 diagnosis.save 命令,但是执行时,有问题,待解决
2024-07-27 21:31:09 +08:00

410 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from time import sleep, time
from sys import argv
from os import scandir
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from json import loads
import pandas
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
config_file = current_file = prj_file = None
for data_file in data_files:
filename = data_file.split('\\')[-1]
if filename == 'configs.xlsx':
config_file = data_file
elif filename == 'T_电机电流.xlsx':
current_file = data_file
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
if loadsel == 'tool100':
os.mkdir(f"{path}\\single")
os.mkdir(f"{path}\\s_1")
os.mkdir(f"{path}\\s_2")
os.mkdir(f"{path}\\s_3")
elif loadsel == 'inertia':
os.mkdir(f"{path}\\inertia")
return config_file, current_file, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo mv projects/target/_build/*.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response
def data_proc_regular(path, filename, channel, scenario_time):
if channel in list(range(6)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel+1}_single_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(6, 9)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel_0 = {'hw_joint_vel_feedback': []}
_d2d_trq_0 = {'device_servo_trq_feedback': []}
_d2d_vel_1 = {'hw_joint_vel_feedback': []}
_d2d_trq_1 = {'device_servo_trq_feedback': []}
_d2d_vel_2 = {'hw_joint_vel_feedback': []}
_d2d_trq_2 = {'device_servo_trq_feedback': []}
_d2d_vel_3 = {'hw_joint_vel_feedback': []}
_d2d_trq_3 = {'device_servo_trq_feedback': []}
_d2d_vel_4 = {'hw_joint_vel_feedback': []}
_d2d_trq_4 = {'device_servo_trq_feedback': []}
_d2d_vel_5 = {'hw_joint_vel_feedback': []}
_d2d_trq_5 = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_0['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_1['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_1['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_2['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_2['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_3['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_3['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_4['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_4['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_5['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_5['device_servo_trq_feedback'].extend(item['value'])
df_01 = pandas.DataFrame.from_dict(_d2d_vel_0)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_0)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_1)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_1)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_2)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_2)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_3)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_3)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_4)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_4)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_5)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_5)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(9, 15)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel-8}_hold_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
def data_proc_inertia(path, filename, channel):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\inertia\\j{channel+4}_inertia_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
def gen_result_file(path, loadsel, disc, number, scenario_time):
filename = path + f'\\data.txt'
with open(filename, 'w', encoding='utf-8') as f_obj:
for line in disc[number][1]:
f_obj.write(str(line)+'\n')
if loadsel == 'tool100':
data_proc_regular(path, filename, number, scenario_time)
elif loadsel == 'inertia':
data_proc_inertia(path, filename, number)
def run_rl(path, hr, md, loadsel, w2t):
c_regular = [
"scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)",
"scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool)",
"scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool)",
"scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool)",
"scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool)",
"scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool)",
"scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool)",
"scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool)",
"scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool)",
"scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool)",
"scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool)",
]
c_inertia = [
"scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool)",
]
disc_regular = {
0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []],
6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []], 9: ['一轴保持', []], 10: ['二轴保持', []],
11: ['三轴保持', []], 12: ['四轴保持', []], 13: ['五轴保持', []], 14: ['六轴保持', []]
}
disc_inertia = {0: ['四轴惯量', []], 1: ['五轴惯量', []], 2: ['六轴惯量', []]}
if loadsel == 'tool100':
conditions = c_regular
disc = disc_regular
elif loadsel == 'inertia':
conditions = c_inertia
disc = disc_inertia
# preparation 触发软急停,并解除,目的是让可能正在运行着的机器停下来
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
# _response = execution('diagnosis.save', hr, w2t, save=True) # 这条命令有问题
md.trigger_estop()
md.reset_estop()
for condition in conditions:
number = conditions.index(condition)
w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test')
# 1. 将act重置为False并修改未要执行的场景
md.write_act(False)
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo sed -i "/scenario/d" projects/target/_build/current/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/target/_build/current/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 3. 开始运行程序单轴运行35s
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(1)
# 4. 打开诊断曲线,并执行采集
sleep(10) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定
scenario_time = 0
if number < 6:
sleep(35)
elif number > 8:
sleep(15)
else:
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景{number-5}的周期时间:{scenario_time}", 0, 0, 'green', 'Automatic Test')
break
else:
if (time()-_t_start)//60 > 3:
w2t(f"未收到场景{number-5}的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = md.read_scenario_time()
sleep(float(scenario_time)*0.2) # 再运行周期的20%即可
# 5.停止程序运行,保留数据并处理输出
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
disc[number][1].insert(0, loads(_msg))
else:
hr.c_msg_xs.clear()
if len(hr.c_msg) > 240:
del hr.c_msg[240:]
gen_result_file(path, loadsel, disc, number, scenario_time)
else:
if loadsel == 'tool100':
w2t("单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行。", 0, 0, 'green', 'Automatic Test')
elif loadsel == 'inertia':
w2t("惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行。", 0, 0, 'green', 'Automatic Test')
def main(path, hr, md, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, current_file, prj_file = check_files(path, loadsel, data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, hr, md, loadsel, w2t)
if __name__ == '__main__':
main(*argv[1:])