@ -0,0 +1,289 @@
import os
import threading
import time
import paramiko
import pandas
from PySide6 . QtCore import Signal , QThread
from codes . common import clibs
class DoCurrentTest ( QThread ) :
def __init__ ( self , dir_path , tool , / ) :
super ( ) . __init__ ( )
self . dir_path = dir_path
self . tool = tool
self . idx = 5
def initialization ( self , data_dirs , data_files ) :
def check_files ( ) :
msg = " 初始路径下不允许有文件夹,初始路径下只能存在如下两个文件,且文件为关闭状态,确认后重新运行!<br> "
msg + = " 1. T_电机电流.xlsx<br>2. xxxx.zip "
if len ( data_dirs ) != 0 or len ( data_files ) != 2 :
clibs . logger ( " ERROR " , " do_current " , msg , " red " )
prj_file , count = None , 0
for data_file in data_files :
filename = data_file . split ( " / " ) [ - 1 ]
if filename == " T_电机电流.xlsx " :
count + = 1
elif filename . endswith ( " .zip " ) :
count + = 1
prj_file = data_file
else :
clibs . logger ( " ERROR " , " do_current " , msg , " red " )
if count != 2 :
clibs . logger ( " ERROR " , " do_current " , msg , " red " )
if self . tool == " tool100 " :
os . mkdir ( f " { self . dir_path } /single " )
os . mkdir ( f " { self . dir_path } /s_1 " )
os . mkdir ( f " { self . dir_path } /s_2 " )
os . mkdir ( f " { self . dir_path } /s_3 " )
elif self . tool == " inertia " :
os . mkdir ( f " { self . dir_path } /inertia " )
else :
clibs . logger ( " ERROR " , " do_current " , " 负载选择错误,电机电流测试只能选择 tool100/inertia 规格! " , " red " )
return prj_file
def get_configs ( ) :
robot_type = None
msg_id = clibs . c_hr . execution ( " controller.get_params " )
records = clibs . c_hr . get_from_id ( msg_id )
for record in records :
if " 请求发送成功 " not in record [ 0 ] :
robot_type = eval ( record [ 0 ] ) [ " data " ] [ " robot_type " ]
server_file = f " /home/luoshi/bin/controller/robot_cfg/ { robot_type } / { robot_type } .cfg "
local_file = self . dir_path + f " / { robot_type } .cfg "
clibs . c_pd . pull_file_from_server ( server_file , local_file )
clibs . logger ( " INFO " , " do_current " , " 正在做初始化校验和配置,这可能需要一点时间...... " , " green " )
_prj_file = check_files ( )
get_configs ( )
clibs . logger ( " INFO " , " do_current " , " 数据目录合规性检查结束,未发现问题...... " , " green " )
return _prj_file
def single_axis_proc ( self , records , number ) :
text = " single " if number < 6 else " hold "
number = number if number < 6 else number - 6
d_vel , d_trq , d_sensor , d_trans , d_predict_trq , d_real_trq = [ ] , [ ] , [ ] , [ ] , [ ] , [ ]
for record in records :
data = eval ( record [ 0 ] ) [ " data " ]
for item in data :
d_item = reversed ( item [ " value " ] )
if item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " hw_joint_vel_feedback " :
d_vel . extend ( d_item )
elif item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " device_servo_trq_feedback " :
d_trq . extend ( d_item )
elif item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " hw_sensor_trq_feedback " :
d_sensor . extend ( d_item )
elif item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " hw_estimate_trans_trq_res " :
d_trans . extend ( d_item )
elif item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " hw_predict_trq_res " :
d_predict_trq . extend ( d_item )
elif item . get ( " channel " , None ) == number and item . get ( " name " , None ) == " hw_real_trq_res " :
d_real_trq . extend ( d_item )
df1 = pandas . DataFrame . from_dict ( { " hw_joint_vel_feedback " : d_vel } )
df2 = pandas . DataFrame . from_dict ( { " device_servo_trq_feedback " : d_trq } )
df3 = pandas . DataFrame . from_dict ( { " hw_sensor_trq_feedback " : d_sensor } )
df4 = pandas . DataFrame . from_dict ( { " hw_estimate_trans_trq_res " : d_trans } )
df5 = pandas . DataFrame . from_dict ( { " hw_predict_trq_res " : d_predict_trq } )
df6 = pandas . DataFrame . from_dict ( { " hw_real_trq_res " : d_real_trq } )
df = pandas . concat ( [ df1 , df2 , df3 , df4 , df5 , df6 ] , axis = 1 )
filename = f " { self . dir_path } /single/j { number + 1 } _ { text } _ { time . time ( ) } .data "
df . to_csv ( filename , sep = " \t " , index = False )
def scenario_proc ( self , records , number , scenario_time ) :
# d_vel, d_trq, d_sensor, d_trans, d_predict_trq, d_real_trq = [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []]
d_vel , d_trq , d_sensor , d_trans , d_predict_trq , d_real_trq = [ [ [ ] , [ ] , [ ] , [ ] , [ ] , [ ] ] for _ in range ( 6 ) ]
for record in records :
data = eval ( record [ 0 ] ) [ " data " ]
for item in data :
d_item = reversed ( item [ " value " ] )
for axis in range ( 6 ) :
if item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " hw_joint_vel_feedback " :
d_vel [ axis ] . extend ( d_item )
elif item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " device_servo_trq_feedback " :
d_trq [ axis ] . extend ( d_item )
elif item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " hw_sensor_trq_feedback " :
d_sensor [ axis ] . extend ( d_item )
elif item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " hw_estimate_trans_trq_res " :
d_trans [ axis ] . extend ( d_item )
elif item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " hw_predict_trq_res " :
d_predict_trq [ axis ] . extend ( d_item )
elif item . get ( " channel " , None ) == axis and item . get ( " name " , None ) == " hw_real_trq_res " :
d_real_trq [ axis ] . extend ( d_item )
for axis in range ( 6 ) :
df1 = pandas . DataFrame . from_dict ( { " hw_joint_vel_feedback " : d_vel [ axis ] } )
df2 = pandas . DataFrame . from_dict ( { " device_servo_trq_feedback " : d_trq [ axis ] } )
df3 = pandas . DataFrame . from_dict ( { " hw_sensor_trq_feedback " : d_sensor [ axis ] } )
df4 = pandas . DataFrame . from_dict ( { " hw_estimate_trans_trq_res " : d_trans [ axis ] } )
df5 = pandas . DataFrame . from_dict ( { " hw_predict_trq_res " : d_predict_trq [ axis ] } )
df6 = pandas . DataFrame . from_dict ( { " hw_real_trq_res " : d_real_trq [ axis ] } )
df = pandas . concat ( [ df1 , df2 , df3 , df4 , df5 , df6 ] , axis = 1 )
filename = f " { self . dir_path } /s_ { number - 11 } /j { axis + 1 } _s_ { number - 11 } _ { scenario_time } _ { time . time ( ) } .data "
df . to_csv ( filename , sep = " \t " , index = False )
def gen_result_file ( self , number , start_time , end_time , scenario_time ) :
def get_records ( ) :
s_time = time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( start_time ) )
e_time = time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( end_time + clibs . INTERVAL ) )
try :
clibs . lock . acquire ( True )
clibs . cursor . execute ( f " SELECT content FROM logs WHERE timestamp BETWEEN ' { s_time } ' AND ' { e_time } ' AND content LIKE ' %diagnosis.result% ' ORDER BY id ASC " )
return clibs . cursor . fetchall ( )
finally :
clibs . lock . release ( )
if number < 12 :
records = get_records ( )
t = threading . Thread ( target = self . single_axis_proc , args = ( records , number ) )
t . daemon = True
t . start ( )
elif number < 15 :
records = get_records ( )
t = threading . Thread ( target = self . scenario_proc , args = ( records , number , scenario_time ) )
t . daemon = True
t . start ( )
@staticmethod
def change_curve_state ( stat ) :
curves = [ " hw_joint_vel_feedback " , " device_servo_trq_feedback " , " hw_sensor_trq_feedback " , " hw_estimate_trans_trq_res " , " hw_predict_trq_res " , " hw_real_trq_res " ]
curves = [ " hw_joint_vel_feedback " , " device_servo_trq_feedback " , " hw_sensor_trq_feedback " , " hw_estimate_trans_trq_res " ]
display_pdo_params = [ ] if not stat else [ { " name " : curve , " channel " : chl } for curve in curves for chl in range ( 6 ) ]
clibs . c_hr . execution ( " diagnosis.open " , open = stat , display_open = stat )
clibs . c_hr . execution ( " diagnosis.set_params " , display_pdo_params = display_pdo_params )
def run_rl ( self , prj_file ) :
prj_name = " . " . join ( prj_file . split ( " / " ) [ - 1 ] . split ( " . " ) [ : - 1 ] )
c_regular = [
" scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool) " ,
" scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool) " ,
" scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool) " ,
" scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool) " ,
" scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool) " ,
" scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool) " ,
" scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool) " ,
" scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool) " ,
" scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool) " ,
" scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool) " ,
" scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool) " ,
" scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool) " ,
" scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool) " ,
" scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool) " ,
" scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool) " ,
]
c_inertia = [
" scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool) " ,
" scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool) " ,
" scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool) " ,
]
disc_regular = [ " 一轴 " , " 二轴 " , " 三轴 " , " 四轴 " , " 五轴 " , " 六轴 " , " 一轴保持 " , " 二轴保持 " , " 三轴保持 " , " 四轴保持 " , " 五轴保持 " , " 六轴保持 " , " 场景一 " , " 场景二 " , " 场景三 " ]
disc_inertia = [ " 四轴惯量 " , " 五轴惯量 " , " 六轴惯量 " ]
conditions , disc = [ ] , [ ]
if self . tool == " tool100 " :
conditions , disc = c_regular , disc_regular
elif self . tool == " inertia " :
conditions , disc = c_inertia , disc_inertia
# 打开诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来
clibs . c_md . r_soft_estop ( 0 )
clibs . c_md . r_soft_estop ( 1 )
clibs . c_md . r_clear_alarm ( )
for condition in conditions :
if clibs . stop_flag :
clibs . logger ( " ERROR " , " do_current " , " 后台数据清零完成,现在可以重新运行其他程序。 " , " green " )
number = conditions . index ( condition )
# for testing
# if number < 12:
# continue
clibs . logger ( " INFO " , " do_current " , f " 正在执行 { disc [ number ] } 测试...... " )
# 1. 将act重置为False, 并修改将要执行的场景
clibs . c_md . write_act ( False )
ssh = paramiko . SSHClient ( )
ssh . set_missing_host_key_policy ( paramiko . AutoAddPolicy ( ) )
ssh . connect ( clibs . ip_addr , clibs . ssh_port , username = clibs . username , password = clibs . password )
cmd = " cd /home/luoshi/bin/controller/; "
cmd + = f ' sudo sed -i " /scenario/d " projects/ { prj_name } /_build/current/main.mod; '
cmd + = f ' sudo sed -i " /DONOTDELETE/i { condition } " projects/ { prj_name } /_build/current/main.mod '
stdin , stdout , stderr = ssh . exec_command ( cmd , get_pty = True )
stdin . write ( clibs . password + " \n " )
stdout . read ( ) . decode ( ) # 需要read一下才能正常执行
stderr . read ( ) . decode ( )
# 2. reload工程后, pp2main, 并且自动模式和上电
prj_path = f " { prj_name } /_build/ { prj_name } .prj "
clibs . c_hr . execution ( " overview.reload " , prj_path = prj_path , tasks = [ " current " ] )
clibs . c_hr . execution ( " rl_task.pp_to_main " , tasks = [ " current " ] )
clibs . c_hr . execution ( " state.switch_auto " )
clibs . c_hr . execution ( " state.switch_motor_on " )
# 3. 开始运行程序
clibs . c_hr . execution ( " rl_task.set_run_params " , loop_mode = True , override = 1.0 )
clibs . c_hr . execution ( " rl_task.run " , tasks = [ " current " ] )
t_start = time . time ( )
while True :
if clibs . c_md . read_ready_to_go ( ) == 1 :
clibs . c_md . write_act ( True )
break
else :
time . sleep ( 1 )
if ( time . time ( ) - t_start ) > 15 :
clibs . logger ( " ERROR " , " do_current " , " 15s 内未收到机器人的运行信号, 需要确认RL程序和工具通信是否正常执行... " , " red " )
# 4. 执行采集
time . sleep ( 10 ) # 消除前 10s 的不稳定数据
self . change_curve_state ( True )
start_time = time . time ( )
single_time , stall_time , scenario_time = 40 , 10 , 0
if number < 6 : # 单轴
time . sleep ( single_time )
elif number < 12 : # 堵转
time . sleep ( stall_time )
else : # 场景
t_start = time . time ( )
while True :
scenario_time = float ( f " { float ( clibs . c_md . read_scenario_time ( ) ) : .2f } " )
if float ( scenario_time ) != 0 :
clibs . logger ( " INFO " , " do_current " , f " 场景 { number - 11 } 的周期时间: { scenario_time } " )
break
else :
time . sleep ( 1 )
if ( time . time ( ) - t_start ) > 180 :
clibs . logger ( " ERROR " , " do_current " , f " 180s 内未收到场景 { number - 11 } 的周期时间, 需要确认RL程序和工具通信交互是否正常执行... " , " red " )
time . sleep ( 20 )
# 5.停止程序运行,保留数据并处理输出
end_time = time . time ( )
clibs . c_hr . execution ( " rl_task.stop " , tasks = [ " current " ] )
self . change_curve_state ( False )
time . sleep ( 2 ) # 确保数据都入库
self . gen_result_file ( number , start_time , end_time , scenario_time )
else :
if self . tool == " tool100 " :
clibs . logger ( " INFO " , " do_current " , " 单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行 " , " green " )
elif self . tool == " inertia " :
clibs . logger ( " INFO " , " do_current " , " 惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行 " , " green " )
def processing ( self ) :
time_start = time . time ( )
clibs . running [ self . idx ] = 1
if clibs . status [ " hmi " ] != 1 or clibs . status [ " md " ] != 1 :
clibs . logger ( " ERROR " , " do_current " , " processing: 需要在网络设置中连接HMI以及Modbus通信! " , " red " )
data_dirs , data_files = clibs . traversal_files ( self . dir_path )
prj_file = self . initialization ( data_dirs , data_files )
clibs . c_pd . push_prj_to_server ( prj_file )
self . run_rl ( prj_file )
clibs . logger ( " INFO " , " do_current " , " - " * 60 + " <br>全部处理完毕<br> " , " purple " )
time_total = time . time ( ) - time_start
msg = f " 处理时间: { time_total / / 3600 : 02.0f } h { time_total % 3600 / / 60 : 02.0f } m { time_total % 60 : 02.0f } s "
clibs . logger ( " INFO " , " do_current " , msg )