@ -77,7 +77,7 @@ def initialization(path, sub, data_dirs, data_files, hr, w2t):
return avs
try :
clibs . c_hr . set_socket_params ( True , str ( clibs . external_port ) , " \r " , 1 )
clibs . c_hr . set_socket_params ( True , str ( clibs . external_port ) , " \r " , 1 , ip = " 0.0.0.0 " )
clibs . c_ec = openapi . ExternalCommunication ( clibs . ip_addr , clibs . external_port )
except Exception :
w2t ( f " { clibs . ip_addr } : { clibs . socket_port } 或者 { clibs . ip_addr } : { clibs . external_port } 不可达,需检查网络连接! \n " , color = " red " , desc = " NetworkError " )
@ -88,7 +88,7 @@ def initialization(path, sub, data_dirs, data_files, hr, w2t):
return _config_file , _prj_file , _result_dirs , _avs
def gen_result_file ( path , axis , t_end , reach , load , speed , speed_target , rounds , w2t ) :
def gen_result_file ( path , axis , t_end , reach , load , speed , speed_max , rounds , w2t ) :
d_vel , d_trq , d_stop , threshold = [ ] , [ ] , [ ] , 0.95
start_time = time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( t_end - 12 ) )
@ -111,10 +111,14 @@ def gen_result_file(path, axis, t_end, reach, load, speed, speed_target, rounds,
elif item . get ( " channel " , None ) == 0 and item . get ( " name " , None ) == " device_safety_estop " :
d_stop . extend ( d_item )
idx = d_stop . index ( 0 )
idx = 0
for idx in range ( len ( d_stop ) - 10 , 0 , - 1 ) :
if d_stop [ idx ] == 1 :
break
av_estop = abs ( sum ( d_vel [ idx - 20 : idx ] ) / 20 * clibs . RADIAN )
if av_estop / speed_target < threshold :
w2t ( f " [av_estop: { av_estop : .2f } | shouldbe: { speed_target : .2f } ] 本次触发 ESTOP 时未采集到指定百分比的最大速度,即将重试! \n " , " #8A2BE2 " )
if av_estop / speed_max < threshold :
w2t ( f " [av_estop: { av_estop : .2f } | shouldbe: { speed_max : .2f } ] 处理数据时, 本次触发 ESTOP 时未采集到指定百分比的最大速度,即将重试! \n " , " #8A2BE2 " )
clibs . count + = 1
if clibs . count < 3 :
return " retry "
@ -160,6 +164,7 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
else :
w2t ( " configs.xlsx 中 Target 页面 B5 单元格填写不正确,检查后重新运行... \n " , " red " , " DirectionError " )
change_curve_state ( hr , True )
for condition in result_dirs :
reach = condition . split ( " _ " ) [ 0 ] . removeprefix ( " reach " )
load = condition . split ( " _ " ) [ 1 ] . removeprefix ( " load " )
@ -184,6 +189,7 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
md . write_axis ( axis )
w2t ( f " - " * 90 + " \n " , " purple " )
speed_max = 0
for rounds in range ( 1 , 4 ) :
count + = 1
_ = 3 if count % 3 == 0 else count % 3
@ -241,12 +247,10 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
w2t ( " 15s 内未收到机器人的运行信号,需要确认 RL 程序编写正确并正常执行... \n " , " red " , " ReadySignalTimeoutError " )
# 4. 找出最大速度, 传递给RL程序, 最后清除相关记录
time . sleep ( 5 ) # 消除前 5s 的不稳定数据
change_curve_state ( hr , True )
start_time = time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( time . time ( ) ) )
time . sleep ( get_init_speed ) # 指定时间后获取实际【正|负】方向的最大速度, 可通过configs.xlsx配置
end_time = time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( time . time ( ) ) )
hr . execution ( " rl_task.stop " , tasks = [ " brake " ] )
change_curve_state ( hr , False )
# 找出最大速度
@clibs.db_lock
@ -269,8 +273,8 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
speed_target = avs [ axis - 1 ] * float ( speed ) / 100
clibs . insert_logdb ( " INFO " , " do_brake " , f " axis = { axis } , direction = { pon } , max speed = { speed_max } " )
if speed_max < speed_target * 0.95 or speed_max > speed_target * 1.05 :
w2t ( f " Axis: { axis } - { count } | Speed: { speed_max } | Shouldbe: { speed_target } \n " , " indigo " )
clibs . insert_logdb ( " WARNING " , " do_brake " , f " Axis: { axis } - { count } | Speed: { speed_max } | Shouldbe: { speed_target } " )
w2t ( f " Axis: { axis } - { count } | 采集获取最大 Speed: { speed_max } | Shouldbe: { speed_target } \n " , " indigo " )
clibs . insert_logdb ( " WARNING " , " do_brake " , f " Axis: { axis } - { count } | 采集获取最大 Speed: { speed_max } | Shouldbe: { speed_target } " )
md . write_speed_max ( speed_max )
if speed_max < 10 :
@ -314,7 +318,6 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
def exec_brake ( ) :
flag , start , data , record = True , time . time ( ) , None , None
change_curve_state ( hr , True )
while flag :
time . sleep ( 0.05 )
if time . time ( ) - start > 20 :
@ -333,22 +336,24 @@ def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
continue
speed_moment = clibs . RADIAN * sum ( item [ " value " ] ) / len ( item [ " value " ] )
if abs ( speed_moment ) > speed_target * 0.95 :
if abs ( speed_moment ) > speed_max - 2 :
if ( pon == " positive " and speed_moment > 0 ) or ( pon == " negative " and speed_moment < 0 ) :
clibs . c_ec . setdo_value ( io_name , " false " )
time . sleep ( 2 )
change_curve_state ( hr , False )
flag = False
break
return time . time ( )
t_end = exec_brake ( )
# 6. 保留数据并处理输出
ret = gen_result_file ( path , axis , t_end , reach , load , speed , speed_target , rounds , w2t )
ret = gen_result_file ( path , axis , t_end , reach , load , speed , speed_max , rounds , w2t )
if ret != " retry " :
clibs . count = 0
break
else :
time . sleep ( 50 )
change_curve_state ( hr , False )
w2t ( f " \n { sub . removeprefix ( ' tool ' ) } %负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。 \n " , " green " )