@ -1,5 +1,4 @@
from random import randint
from time import sleep , time
from time import sleep , time , strftime , localtime
from sys import argv
from os import scandir , mkdir
from os . path import exists
@ -9,12 +8,13 @@ from openpyxl import load_workbook
import pandas
RADIAN = 57.3 # 180 / 3.1415926
tab_name = ' Automatic Test '
def traversal_files ( path , w2t ) :
if not exists ( path ) :
msg = f ' 数据文件夹 { path } 不存在,请确认后重试...... '
w2t ( msg , 0 , 1 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( msg , 0 , 1 , ' red ' , tab_name )
else :
dirs = [ ]
files = [ ]
@ -29,8 +29,8 @@ def traversal_files(path, w2t):
def check_files ( path , loadsel , data_dirs , data_files , w2t ) :
if len ( data_dirs ) != 0 or len ( data_files ) != 5 :
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 1 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 1 , ' red ' , tab_name )
config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files :
@ -46,8 +46,8 @@ def check_files(path, loadsel, data_dirs, data_files, w2t):
elif filename . endswith ( ' .zip ' ) :
prj_file = data_file
else :
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 2 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 2 , ' red ' , tab_name )
if config_file and reach33 and reach66 and reach100 and prj_file :
result_dirs = [ ]
@ -65,11 +65,11 @@ def check_files(path, loadsel, data_dirs, data_files, w2t):
if _reach == ' reach100 ' :
mkdir ( f " { path } \\ j3 \\ { dir_name } " )
w2t ( " 数据目录合规性检查结束,未发现问题...... " , tab_name = ' Automatic Test ' )
w2t ( " 数据目录合规性检查结束,未发现问题...... " , 0 , 0 , ' blue ' , tab_name )
return config_file , reach33 , reach66 , reach100 , prj_file , result_dirs
else :
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 1 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( ' 初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行! ' , 0 , 0 , ' red ' , tab_name )
w2t ( ' 1. configs.xlsx \n 2. reach33/reach66/reach100_xxxx.xlsx \n 3. xxxx.zip ' , 0 , 1 , ' red ' , tab_name )
def prj_to_xcore ( prj_file ) :
@ -90,9 +90,9 @@ def prj_to_xcore(prj_file):
print ( stdout . read ( ) . decode ( ) ) # 必须得输出一下stdout, 才能正确执行sudo
print ( stderr . read ( ) . decode ( ) ) # 顺便也执行以下stderr
_prj_name = prj_file. split ( ' \\ ' ) [ - 1 ] . removesuffix ( ' .zip ' )
# _prj_name = prj_file.split('\\')[-1].removesuffix('.zip' )
cmd = ' cd /home/luoshi/bin/controller/; '
cmd + = f ' sudo mv projects/target/_build/{ _prj_name } .prj projects/target/_build/target.prj '
cmd + = f ' sudo mv projects/target/_build/* .prj projects/target/_build/target.prj '
stdin , stdout , stderr = ssh . exec_command ( cmd , get_pty = True )
stdin . write ( ' luoshi2019 ' + ' \n ' )
stdin . flush ( )
@ -105,11 +105,11 @@ def execution(cmd, hr, w2t, **kwargs):
_id = hr . execution ( cmd , * * kwargs )
_msg = hr . get_from_id ( _id )
if not _msg :
w2t ( f " 无法获取 { _id } 请求的响应信息 " , 0 , 6 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( f " 无法获取 { _id } 请求的响应信息 " , 0 , 6 , ' red ' , tab_name )
else :
_response = loads ( _msg )
if not _response :
w2t ( f " 无法获取 { id } 请求的响应信息 " , 0 , 1 , ' red ' , tab_name = ' Automatic Test ' )
w2t ( f " 无法获取 { id } 请求的响应信息 " , 0 , 1 , ' red ' , tab_name )
return _response
@ -119,7 +119,6 @@ def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
_d2d_stop = { ' device_safety_estop ' : [ ] }
for data in curve_data :
dict_results = data [ ' data ' ]
# dict_results.reverse()
for item in dict_results :
item [ ' value ' ] . reverse ( )
if item . get ( ' channel ' , None ) == axis - 1 and item . get ( ' name ' , None ) == ' hw_joint_vel_feedback ' :
@ -137,9 +136,8 @@ def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
df . to_csv ( _filename , sep = ' \t ' , index = False )
def run_rl ( path , loadsel , hr , md , config_file , prj_file , result_dirs , w2t ) :
def run_rl ( path , loadsel , hr , md , config_file , result_dirs , w2t ) :
_count = 0
speed_max = 0
display_pdo_params = [
{ " name " : " hw_joint_vel_feedback " , " channel " : 0 } ,
{ " name " : " hw_joint_vel_feedback " , " channel " : 1 } ,
@ -157,82 +155,94 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
]
wb = load_workbook ( config_file , read_only = True )
ws = wb [ ' Target ' ]
write_diagnosis = float ( ws . cell ( row = 3 , column = 10 ) . value )
get_init_speed = float ( ws . cell ( row = 4 , column = 10 ) . value )
if ws . cell ( row = 1 , column = 1 ) . value == ' positive ' :
md . write_pon ( True )
md . write_pon ( 1 )
elif ws . cell ( row = 1 , column = 1 ) . value == ' negative ' :
md . write_pon ( False )
md . write_pon ( 0 )
else :
w2t ( " configs.xlsx中Target页面A1单元格填写不正确, 检查后重新运行... " , 0 , 111 , ' red ' , ' Automatic Test ' )
w2t ( " configs.xlsx中Target页面A1单元格填写不正确, 检查后重新运行... " , 0 , 111 , ' red ' , tab_name )
for condition in result_dirs :
_reach = condition . split ( ' _ ' ) [ 0 ] . removeprefix ( ' reach ' )
_load = condition . split ( ' _ ' ) [ 1 ] . removeprefix ( ' load ' )
_speed = condition . split ( ' _ ' ) [ 2 ] . removeprefix ( ' speed ' )
# if _speed != '100' or _reach != '100':
# continue
for axis in range ( 1 , 4 ) :
md . write_axis ( axis )
speed_max = 0
if axis == 3 and _reach != ' 100 ' :
continue
w2t ( f " - " * 90 , 0 , 0 , ' purple ' , tab_name )
for count in range ( 1 , 4 ) :
_count + = 1
w2t ( f " [ { _count } /63- { count } ] 正在执行 { axis } 轴 { condition } 的制动测试...... " , 0 , 0 , ' purple ' , ' Automatic Test ' )
this_time = strftime ( " % Y- % m- %d % H: % M: % S " , localtime ( time ( ) ) )
prj_path = ' target/_build/target.prj '
w2t ( f " [ { this_time } | { _count } /63] 正在执行 { axis } 轴 { condition } 的第 { count } 次制动测试... " , 0 , 0 , ' purple ' , tab_name )
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
md . trigger_estop ( )
md . reset_estop ( )
md . write_act ( False )
md . clear_alarm ( )
md . write_act ( 0 )
_response = execution ( ' diagnosis.open ' , hr , w2t , open = True , display_open = True )
sleep ( write_diagnosis ) # 软急停超差后, 等待写诊断时间, 可通过configs.xlsx配置
_response = execution ( ' diagnosis.open ' , hr , w2t , open = False , display_open = False )
sleep ( 1 ) # 让曲线彻底关闭
_response = execution ( ' state.switch_manual ' , hr , w2t )
_response = execution ( ' state.switch_motor_off ' , hr , w2t )
# 2. 修改未要执行的场景
ssh = SSHClient ( )
ssh . set_missing_host_key_policy ( AutoAddPolicy ( ) )
ssh . connect ( ' 192.168.0.160 ' , 22 , username = ' luoshi ' , password = ' luoshi2019 ' )
if ws . cell ( row = 1 , column = 1 ) . value = = ' positive ' :
_rl_cmd = f " brake_E(j { axis } _ { _reach } _p, j { axis } _ { _reach } _n, p_speed, p_tool) "
elif ws . cell ( row = 1 , column = 1 ) . value == ' negative ' :
_rl_cmd = f " brake_E(j { axis } _ { _reach } _n, j { axis } _ { _reach } _p, p_speed, p_tool) "
else :
w2t ( " configs.xlsx中Target页面A1单元格填写不正确, 检查后重新运行... " , 0 , 111 , ' red ' , ' Automatic Test ' )
_rl_speed = f " VelSet { _speed } "
cmd = ' cd /home/luoshi/bin/controller/; '
cmd + = ' sudo sed -i " /brake_E/d " projects/target/_build/brake/main.mod; '
cmd + = f ' sudo sed -i " /DONOTDELETE/i { _rl_cmd } " projects/target/_build/brake/main.mod; '
cmd + = ' sudo sed -i " /VelSet/d " projects/target/_build/brake/main.mod; '
cmd + = f ' sudo sed -i " /MoveAbsJ/i { _rl_speed } " projects/target/_build/brake/main.mod '
stdin , stdout , stderr = ssh . exec_command ( cmd , get_pty = True )
stdin . write ( ' luoshi2019 ' + ' \n ' )
stdin . flush ( )
print ( stdout . read ( ) . decode ( ) ) # 必须得输出一下stdout, 才能正确执行sudo
print ( stderr . read ( ) . decode ( ) ) # 顺便也执行以下stderr
# 3. reload工程后, pp2main, 并且自动模式和上电, 最后运行程序
prj_path = ' target/_build/target.prj '
_response = execution ( ' overview.reload ' , hr , w2t , prj_path = prj_path , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' rl_task.pp_to_main ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' state.switch_auto ' , hr , w2t )
_response = execution ( ' state.switch_motor_on ' , hr , w2t )
_response = execution ( ' rl_task.run ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
_t_start = time ( )
while True :
if md . read_ready_to_go ( ) == 1 :
md . write_act ( True )
break
while count == 1 :
# 2. 修改未要执行的场景
ssh = SSHClient ( )
ssh . set_missing_host_key_policy ( AutoAddPolicy ( ) )
ssh . connect ( ' 192.168.0.160 ' , 22 , username = ' luoshi ' , password = ' luoshi2019 ' )
if ws . cell ( row = 1 , column = 1 ) . value == ' positive ' :
_rl_cmd = f " brake_E(j { axis } _ { _reach } _p, j { axis } _ { _reach } _n, p_speed, p_tool) "
elif ws . cell ( row = 1 , column = 1 ) . value == ' negative ' :
_rl_cmd = f " brake_E(j { axis } _ { _reach } _n, j { axis } _ { _reach } _p, p_speed, p_tool) "
else :
if ( time ( ) - _t_start ) / / 20 > 1 :
w2t ( " 20s内未收到机器人的运行信号, 需要确认RL程序编写正确并正常执行... " , 0 , 111 , ' red ' , ' Automatic Test ' )
w2t ( " configs.xlsx中Target页面A1单元格填写不正确, 检查后重新运行... " , 0 , 111 , ' red ' , tab_name )
_rl_speed = f " VelSet { _speed } "
_rl_tool = f " tool p_tool = tool { loadsel . removeprefix ( ' tool ' ) } "
cmd = ' cd /home/luoshi/bin/controller/; '
cmd + = ' sudo sed -i " /brake_E/d " projects/target/_build/brake/main.mod; '
cmd + = f ' sudo sed -i " /DONOTDELETE/i { _rl_cmd } " projects/target/_build/brake/main.mod; '
cmd + = ' sudo sed -i " /VelSet/d " projects/target/_build/brake/main.mod; '
cmd + = f ' sudo sed -i " /MoveAbsJ/i { _rl_speed } " projects/target/_build/brake/main.mod; '
cmd + = ' sudo sed -i " /tool p_tool/d " projects/target/_build/brake/main.mod; '
cmd + = f ' sudo sed -i " /VelSet/i { _rl_tool } " projects/target/_build/brake/main.mod; '
stdin , stdout , stderr = ssh . exec_command ( cmd , get_pty = True )
stdin . write ( ' luoshi2019 ' + ' \n ' )
stdin . flush ( )
print ( stdout . read ( ) . decode ( ) ) # 必须得输出一下stdout, 才能正确执行sudo
print ( stderr . read ( ) . decode ( ) ) # 顺便也执行以下stderr
# 3. reload工程后, pp2main, 并且自动模式和上电, 最后运行程序
_response = execution ( ' overview.reload ' , hr , w2t , prj_path = prj_path , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' rl_task.pp_to_main ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' state.switch_auto ' , hr , w2t )
_response = execution ( ' state.switch_motor_on ' , hr , w2t )
_response = execution ( ' rl_task.run ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
_t_start = time ( )
while True :
if md . read_ready_to_go ( ) == 1 :
md . write_act ( True )
break
else :
sleep ( 1 )
# 4. 第一次打开诊断曲线, 并执行采集8s, 之后触发软急停, 关闭曲线采集, 找出最大速度, 传递给RL程序, 最后清除相关记录
if count == 1 :
if ( time ( ) - _t_start ) / / 20 > 1 :
w2t ( " 20s内未收到机器人的运行信号, 需要确认RL程序编写正确并正常执行... " , 0 , 111 , ' red ' , tab_name )
else :
sleep ( 1 )
# 4. 打开诊断曲线, 并执行采集, 之后触发软急停, 关闭曲线采集, 找出最大速度, 传递给RL程序, 最后清除相关记录
_response = execution ( ' diagnosis.open ' , hr , w2t , open = True , display_open = True )
_response = execution ( ' diagnosis.set_params ' , hr , w2t , display_pdo_params = display_pdo_params )
sleep ( 10 ) # 前10秒 获取实际最大速度
md . trigger_esto p( )
sleep ( get_init_speed ) # 获取实际最大速度, 可通过configs.xlsx配置
_response = execution ( ' rl_task.stop ' , hr , w2t , tasks = [ ' brake ' ] )
slee p( 1 )
_response = execution ( ' state.switch_motor_off ' , hr , w2t )
_response = execution ( ' state.switch_manual ' , hr , w2t )
_response = execution ( ' diagnosis.open ' , hr , w2t , open = False , display_open = False )
# 找出最大速度
for _msg in hr . c_msg :
@ -240,11 +250,16 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
dict_results = loads ( _msg ) [ ' data ' ]
for item in dict_results :
if item . get ( ' channel ' , None ) == axis - 1 and item . get ( ' name ' , None ) == ' hw_joint_vel_feedback ' :
_ = abs ( RADIAN * sum ( item [ ' value ' ] ) / len ( item [ ' value ' ] ) )
speed_max = max ( _ , speed_max )
_ = RADIAN * sum ( item [ ' value ' ] ) / len ( item [ ' value ' ] )
if ws . cell ( row = 1 , column = 1 ) . value == ' positive ' :
speed_max = max ( _ , speed_max )
elif ws . cell ( row = 1 , column = 1 ) . value == ' negative ' :
speed_max = min ( _ , speed_max )
print ( f " speed max = { speed_max } " )
speed_max = abs ( speed_max )
speed_target = float ( ws . cell ( row = 3 , column = axis + 1 ) . value ) * float ( _speed ) / 100
if speed_max < speed_target * 0.95 or speed_max > speed_target * 1.05 :
w2t ( f " Axis: { axis } - { count } | Speed: { speed_max } | Shouldbe: { speed_target } " , 0 , 0 , ' indigo ' , ' Automatic Test ' )
w2t ( f " Axis: { axis } - { count } | Speed: { speed_max } | Shouldbe: { speed_target } " , 0 , 0 , ' indigo ' , tab_name )
md . write_speed_max ( speed_max )
sleep ( 1 )
@ -256,8 +271,17 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
hr . c_msg_xs . clear ( )
break
if speed_max < 10 :
md . clear_alarm ( )
w2t ( " 未获取到正确的速度,即将重新获取... " , 0 , 0 , ' red ' , tab_name )
continue
else :
break
# 5. 清除软急停, 重新运行程序, 并打开曲线发送继续运动信号, 当速度达到最大值时, 通过DO触发急停
md . reset_estop ( )
md . reset_estop ( ) # 其实没必要
md . clear_alarm ( )
_response = execution ( ' overview.reload ' , hr , w2t , prj_path = prj_path , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' rl_task.pp_to_main ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
_response = execution ( ' state.switch_auto ' , hr , w2t )
@ -265,31 +289,29 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_response = execution ( ' rl_task.run ' , hr , w2t , tasks = [ ' brake ' , ' stop0_related ' ] )
for i in range ( 3 ) :
if md . read_ready_to_go ( ) == 1 :
md . write_act ( True )
md . write_act ( 1 )
break
else :
sleep ( 1 )
else :
w2t ( " 未收到机器人的运行信号, 需要确认RL程序编写正确并正常执行... " , 0 , 111 , ' red ' , ' Automatic Test ' )
w2t ( " 未收到机器人的运行信号, 需要确认RL程序编写正确并正常执行... " , 0 , 111 , ' red ' , tab_name )
_response = execution ( ' diagnosis.open ' , hr , w2t , open = True , display_open = True )
_response = execution ( ' diagnosis.set_params ' , hr , w2t , display_pdo_params = display_pdo_params )
sleep ( randint ( 3 , 6 ) )
md . write_probe ( True )
sleep ( 10 ) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间
md . write_probe ( 1 )
_t_start = time ( )
while True :
if md . read_brake_done ( ) == 1 :
sleep ( 1 ) # 保证所有数据均已返回
md . write_probe ( False )
sleep ( 1 ) # 保证速度归零
md . write_probe ( 0 )
_response = execution ( ' diagnosis.open ' , hr , w2t , open = False , display_open = False )
sleep ( 1 ) # 保证所有数据均已返回
break
else :
if ( time ( ) - _t_start ) > 30 :
w2t ( f " 30s内未触发急停, 该条数据无效, 需要确认RL/Python程序编写正确并正常执行, 或者判别是否是机器本体问题... " , 0 , 0 , ' red ' , ' Automatic Test ' )
md . write_probe ( False )
w2t ( f " 30s内未触发急停, 该条数据无效, 需要确认RL/Python程序编写正确并正常执行, 或者判别是否是机器本体问题,比如正负方向速度是否一致 ... " , 0 , 0 , ' red ' , tab_name )
md . write_probe ( 0 )
_response = execution ( ' diagnosis.open ' , hr , w2t , open = False , display_open = False )
sleep ( 1 ) # 保证所有数据均已返回
break
else :
sleep ( 1 )
@ -308,7 +330,7 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
break
gen_result_file ( path , curve_data , axis , _reach , _load , _speed , count )
else :
w2t ( f " \n { loadsel . removeprefix ( ' tool ' ) } %负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。 " , 0 , 0 , ' green ' , ' Automatic Test ' )
w2t ( f " \n { loadsel . removeprefix ( ' tool ' ) } %负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。 " , 0 , 0 , ' green ' , tab_name )
def main ( path , hr , md , loadsel , w2t ) :
@ -316,10 +338,10 @@ def main(path, hr, md, loadsel, w2t):
data_dirs , data_files = traversal_files ( path , w2t )
config_file , reach33 , reach66 , reach100 , prj_file , result_dirs = check_files ( path , loadsel , data_dirs , data_files , w2t )
prj_to_xcore ( prj_file )
run_rl ( path , loadsel , hr , md , config_file , prj_file , result_dirs , w2t )
run_rl ( path , loadsel , hr , md , config_file , result_dirs , w2t )
_e_time = time ( )
time_total = _e_time - _s_time
w2t ( f " 处理总时长: { time_total / / 3600 : 02.0f } h { time_total % 3600 / / 60 : 02.0f } m { time_total % 60 : 02.0f } s " , 0 , 0 , ' green ' , ' Automatic Test ' )
w2t ( f " 处理总时长: { time_total / / 3600 : 02.0f } h { time_total % 3600 / / 60 : 02.0f } m { time_total % 60 : 02.0f } s " , 0 , 0 , ' green ' , tab_name )
if __name__ == ' __main__ ' :