@ -8,6 +8,8 @@ from time import time, sleep, strftime, localtime
from pymodbus . client . tcp import ModbusTcpClient
from pymodbus . client . tcp import ModbusTcpClient
# from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
# from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
# from pymodbus.constants import Endian
# from pymodbus.constants import Endian
from functools import wraps
from inspect import getcallargs
from paramiko import SSHClient , AutoAddPolicy
from paramiko import SSHClient , AutoAddPolicy
import clibs
import clibs
@ -338,7 +340,8 @@ class HmiRequest(object):
# 尝试连续三次发送心跳包,确认建联成功
# 尝试连续三次发送心跳包,确认建联成功
self . __error_code = 0
self . __error_code = 0
for i in range ( 3 ) :
for i in range ( 3 ) :
self . execution ( " controller.heart " )
self . execution . __wrapped__ ( self , " controller.heart " )
# self.execution("controller.heart")
sleep ( clibs . interval / 5 )
sleep ( clibs . interval / 5 )
if not self . __error_code :
if not self . __error_code :
self . __is_connected = True
self . __is_connected = True
@ -412,12 +415,12 @@ class HmiRequest(object):
for _ in range ( 3 ) :
for _ in range ( 3 ) :
res = find_response ( clibs . log_data_debug )
res = find_response ( clibs . log_data_debug )
if res is not None :
if res is not None :
return res [ " data " ]
return res . get ( " data " , f " { msg_id } has no data item " )
sleep ( clibs . interval * 2 )
sleep ( clibs . interval * 2 )
else : # 尝试在上一次分割的日志中查找,只做一次
else : # 尝试在上一次分割的日志中查找,只做一次
res = find_response ( " " . join ( [ clibs . log_path , listdir ( clibs . log_path ) [ 0 ] ] ) )
res = find_response ( " " . join ( [ clibs . log_path , sorted ( listdir ( clibs . log_path ) ) [ - 3 ] ] ) )
if res is not None :
if res is not None :
return res [ " data " ]
return res . get ( " data " , f " { msg_id } has no data item " )
elif flag == 1 :
elif flag == 1 :
for _ in range ( 3 ) :
for _ in range ( 3 ) :
res = find_response_xs ( clibs . log_data_debug )
res = find_response_xs ( clibs . log_data_debug )
@ -425,13 +428,14 @@ class HmiRequest(object):
return res
return res
sleep ( clibs . interval * 2 )
sleep ( clibs . interval * 2 )
else :
else :
res = find_response_xs ( " " . join ( [ clibs . log_path , listdir ( clibs . log_path ) [ 0 ] ] ) )
res = find_response_xs ( " " . join ( [ clibs . log_path , sorted ( listdir ( clibs . log_path ) ) [ - 3 ] ] ) )
if res is not None :
if res is not None :
return res
return res
self . __sth_wrong ( 11 , f " 无法找到请求 { msg_id } 的返回结果 " )
self . __sth_wrong ( 11 , f " 无法找到请求 { msg_id } 的返回结果 " )
def __msg_storage ( self , response , flag = 0 ) :
def __msg_storage ( self , response , flag = 0 ) :
# response是解码后的字符串
# response是解码后的字符串
if " move.monitor " not in response :
clibs . logger . debug ( f " { loads ( response ) } " )
clibs . logger . debug ( f " { loads ( response ) } " )
messages = self . c_msg if flag == 0 else self . c_msg_xs
messages = self . c_msg if flag == 0 else self . c_msg_xs
if " move.monitor " in response :
if " move.monitor " in response :
@ -644,6 +648,17 @@ class HmiRequest(object):
except Exception as Err :
except Exception as Err :
self . __sth_wrong ( 8 , f " 错误信息: { Err } " )
self . __sth_wrong ( 8 , f " 错误信息: { Err } " )
@staticmethod
def validate_req ( func ) :
@wraps ( func )
def wrapper ( self , * args , * * kwargs ) :
all_args = getcallargs ( func , self , * args , * * kwargs )
_id = func ( self , * args , * * kwargs )
self . get_from_id ( _id , flag = all_args [ " protocol_flag " ] )
return _id
return wrapper
@validate_req
def execution ( self , command , protocol_flag = 0 , * * kwargs ) :
def execution ( self , command , protocol_flag = 0 , * * kwargs ) :
def log_reqs ( request ) :
def log_reqs ( request ) :
with open ( clibs . log_data_reqs , mode = " a " , encoding = " utf-8 " ) as f_log :
with open ( clibs . log_data_reqs , mode = " a " , encoding = " utf-8 " ) as f_log :
@ -1052,7 +1067,7 @@ class HmiRequest(object):
def set_diagnosis_params ( self , display_pdo_params : list , frequency : int = 50 , version : str = " 1.4.1 " ) : # OK
def set_diagnosis_params ( self , display_pdo_params : list , frequency : int = 50 , version : str = " 1.4.1 " ) : # OK
"""
"""
设置诊断功能显示参数
设置诊断功能显示参数 [ { " name " : " hw_joint_vel_feedback " , " channel " : 0}, ]
:param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线
:param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线
:param frequency: 采样频率,默认 50ms
:param frequency: 采样频率,默认 50ms
:param version: xDiagnose的版本号
:param version: xDiagnose的版本号
@ -1799,10 +1814,14 @@ class ExternalCommunication(object):
self . __c . send ( order . encode ( ) )
self . __c . send ( order . encode ( ) )
sleep ( clibs . interval )
sleep ( clibs . interval )
def r_string ( self ) :
def r_string ( self , directive ):
result , char = " " , " "
result , char = " " , " "
while char != self . suffix :
while char != self . suffix :
try :
char = self . __c . recv ( 1 ) . decode ( encoding = " unicode_escape " )
char = self . __c . recv ( 1 ) . decode ( encoding = " unicode_escape " )
except TimeoutError :
clibs . logger . error ( f " 获取请求指令 { directive } 的返回数据超时,需确认指令发送格式以及内容正确! " )
exit ( 101 )
result = " " . join ( [ result , char ] )
result = " " . join ( [ result , char ] )
sleep ( clibs . interval )
sleep ( clibs . interval )
return result
return result
@ -1847,10 +1866,10 @@ class ExternalCommunication(object):
def load_program ( self , program_name ) : # OK
def load_program ( self , program_name ) : # OK
return self . __exec_cmd ( f " load_prog: { program_name } " , " 加载工程 " , self . exec_desc )
return self . __exec_cmd ( f " load_prog: { program_name } " , " 加载工程 " , self . exec_desc )
def estop_reset ( self ) :
def estop_reset ( self ) : # OK | 复原外部 IO 急停和安全门急停,前提是硬件已复原
return self . __exec_cmd ( " estop_reset " , " 急停复位 " , self . exec_desc )
return self . __exec_cmd ( " estop_reset " , " 急停复位 " , self . exec_desc )
def estopreset_and_clearalarm ( self ) :
def estopreset_and_clearalarm ( self ) : # OK | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原
return self . __exec_cmd ( " estopreset_and_clearalarm " , " 急停复位并清除报警 " , self . exec_desc )
return self . __exec_cmd ( " estopreset_and_clearalarm " , " 急停复位并清除报警 " , self . exec_desc )
def motoron_pp2main_start ( self ) : # OK
def motoron_pp2main_start ( self ) : # OK
@ -1862,7 +1881,7 @@ class ExternalCommunication(object):
def pause_motoroff ( self ) : # OK
def pause_motoroff ( self ) : # OK
return self . __exec_cmd ( " pause_motoroff " , " 暂停程序并下电 " , self . exec_desc )
return self . __exec_cmd ( " pause_motoroff " , " 暂停程序并下电 " , self . exec_desc )
def set_program_speed ( self , speed : int ) : # OK
def set_program_speed ( self , speed : int ) : # OK | 1-100
return self . __exec_cmd ( f " set_program_speed: { speed } " , " 设置程序运行速率(滑块) " , self . exec_desc )
return self . __exec_cmd ( f " set_program_speed: { speed } " , " 设置程序运行速率(滑块) " , self . exec_desc )
def set_soft_estop ( self , enable : str ) : # OK
def set_soft_estop ( self , enable : str ) : # OK
@ -1871,10 +1890,10 @@ class ExternalCommunication(object):
def switch_auto_motoron ( self ) : # OK
def switch_auto_motoron ( self ) : # OK
return self . __exec_cmd ( " switch_auto_motoron " , " 切换自动模式并上电 " , self . exec_desc )
return self . __exec_cmd ( " switch_auto_motoron " , " 切换自动模式并上电 " , self . exec_desc )
def open_safe_region ( self , number : int ) : # OK
def open_safe_region ( self , number : int ) : # OK | 1-10
return self . __exec_cmd ( f " open_safe_region: { number } " , f " 打开第 { number } 个安全区域(1-10, 信号控制开关需打开, 不限操作模式) " , self . exec_desc )
return self . __exec_cmd ( f " open_safe_region: { number } " , f " 打开第 { number } 个安全区域(1-10, 信号控制开关需打开, 不限操作模式) " , self . exec_desc )
def close_safe_region ( self , number : int ) : # OK
def close_safe_region ( self , number : int ) : # OK | 1-10
return self . __exec_cmd ( f " close_safe_region: { number } " , f " 关闭第 { number } 个安全区域(1-10, 信号控制开关需打开, 不限操作模式) " , self . exec_desc )
return self . __exec_cmd ( f " close_safe_region: { number } " , f " 关闭第 { number } 个安全区域(1-10, 信号控制开关需打开, 不限操作模式) " , self . exec_desc )
def open_reduced_mode ( self ) : # OK
def open_reduced_mode ( self ) : # OK
@ -1883,11 +1902,11 @@ class ExternalCommunication(object):
def close_reduced_mode ( self ) : # OK
def close_reduced_mode ( self ) : # OK
return self . __exec_cmd ( " close_reduced_mode " , " 关闭缩减模式(不限操作模式) " , self . exec_desc )
return self . __exec_cmd ( " close_reduced_mode " , " 关闭缩减模式(不限操作模式) " , self . exec_desc )
def setdo_value ( self , do_name , do_value ) :
def setdo_value ( self , do_name : str , do_value : str ) : # NG | do_value 为 true/false
return self . __exec_cmd ( f " setdo: { do_name } , { do_value } " , f " 设置 { do_name } 的值为 { do_value } " , self . exec_desc )
return self . __exec_cmd ( f " setdo: { do_name } , { do_value } " , f " 设置 { do_name } 的值为 { do_value } " , self . exec_desc )
def modify_system_time ( self , robot_time ) :
def modify_system_time ( self , robot_time ) : # OK
return self . __exec_cmd ( f " set_robot_time: { robot_time } " , f " 修改控制器和示教器的时间为 { robot_time } " , self . exec_desc )
return self . __exec_cmd ( f " set_robot_time: { robot_time } " , f " 修改控制器和示教器的时间为 { robot_time } 的 ", self . exec_desc )
# --------------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------
@property
@property
@ -1899,7 +1918,7 @@ class ExternalCommunication(object):
return self . __exec_cmd ( " robot_running_state " , " 获取程序运行状态 " , " :--: 返回 true 表示正在运行, false 未运行 " )
return self . __exec_cmd ( " robot_running_state " , " 获取程序运行状态 " , " :--: 返回 true 表示正在运行, false 未运行 " )
@property
@property
def estop_state ( self ) :
def estop_state ( self ) : # OK | 只表示外部急停,安全门触发不会返回 true, 只要有急停标识 E 字母,就会返回 true
return self . __exec_cmd ( " estop_state " , " 急停状态 " , " :--: 返回 true 表示处于急停状态, false 非急停 " )
return self . __exec_cmd ( " estop_state " , " 急停状态 " , " :--: 返回 true 表示处于急停状态, false 非急停 " )
@property
@property
@ -1907,16 +1926,16 @@ class ExternalCommunication(object):
return self . __exec_cmd ( " operating_mode " , " 获取工作模式 " , " :--: 返回 true 表示自动模式, false 手动模式 " )
return self . __exec_cmd ( " operating_mode " , " 获取工作模式 " , " :--: 返回 true 表示自动模式, false 手动模式 " )
@property
@property
def home_state ( self ) : # NG
def home_state ( self ) : # OK | 需要设置一下 "HMI 设置->快速调整"
return self . __exec_cmd ( " home_state " , " 获取 HOME 输出状态 " , " :--: 返回 true 表示处于 HOME 点, false 未处于 HOME 点 " )
return self . __exec_cmd ( " home_state " , " 获取 HOME 输出状态 " , " :--: 返回 true 表示法兰中心 处于 HOME 点, false 未处于 HOME 点 " )
@property
@property
def fault_state ( self ) : # OK
def fault_state ( self ) : # OK
return self . __exec_cmd ( " fault_state " , " 获取 故障状态 " , " :--: 返回 true 表示处于故障状态, false 非故障状态 " )
return self . __exec_cmd ( " fault_state " , " 获取 故障状态 " , " :--: 返回 true 表示处于故障状态, false 非故障状态 " )
@property
@property
def collision_state ( self ) : # NG
def collision_state ( self ) : # OK | 但是触发后,无法清除?
return self . __exec_cmd ( " collision_state " , " 获取碰撞检测 状态 " , " :--: 返回 true 表示碰撞检测打开 , false 未打开 " )
return self . __exec_cmd ( " collision_state " , " 获取碰撞触发 状态 " , " :--: 返回 true 表示碰撞已触发 , false 未触发 " )
@property
@property
def task_state ( self ) : # OK
def task_state ( self ) : # OK
@ -1950,8 +1969,8 @@ class ExternalCommunication(object):
return self . __exec_cmd ( " alarm_state " , " 获取报警状态 " , " :--: 返回 true 表示当前有告警, false 没有告警 " )
return self . __exec_cmd ( " alarm_state " , " 获取报警状态 " , " :--: 返回 true 表示当前有告警, false 没有告警 " )
@property
@property
def collision_alarm_state ( self ) :
def collision_alarm_state ( self ) : # OK
return self . __exec_cmd ( " collision_alarm_state " , " 获取碰撞检测 报警状态 " , " :--: 返回 true 表示碰撞检测已触发 , false 未触发 " )
return self . __exec_cmd ( " collision_alarm_state " , " 获取碰撞报警状态 " , " :--: 返回 true 表示有 碰撞告警 , false 没有碰撞告警 " )
@property
@property
def collision_open_state ( self ) : # OK
def collision_open_state ( self ) : # OK
@ -1970,8 +1989,21 @@ class ExternalCommunication(object):
return self . __exec_cmd ( " robot_error_code " , " 获取机器人错误码 " )
return self . __exec_cmd ( " robot_error_code " , " 获取机器人错误码 " )
@property
@property
def rl_pause_state ( self ) :
def rl_pause_state ( self ) : # OK
return self . __exec_cmd ( " program_full " , " 获取 RL 的暂停状态 " )
# 0 -- 初始化状态,刚开机上电时
# 1 -- RL 运行中
# 2 -- HMI 暂停
# 3 -- 系统 IO 暂停
# 4 -- 寄存器功能码暂停
# 5 -- 外部通讯暂停
# 6 --
# 7 -- Pause 指令暂停
# 8 --
# 9 --
# 10 -- 外部 IO 急停
# 11 -- 安全门急停
# 12 -- 其他因素停止,比如碰撞检测
return self . __exec_cmd ( " program_full " , " 获取 RL 的暂停状态 " , " :--: 返回值含义详见功能定义 " )
@property
@property
def program_reset_state ( self ) : # OK
def program_reset_state ( self ) : # OK
@ -1982,15 +2014,15 @@ class ExternalCommunication(object):
return self . __exec_cmd ( " program_speed " , " 获取程序运行速度 " )
return self . __exec_cmd ( " program_speed " , " 获取程序运行速度 " )
@property
@property
def robot_is_busy ( self ) :
def robot_is_busy ( self ) : # OK | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发
return self . __exec_cmd ( " robot_is_busy " , " 获取程序忙碌状态 " , " :--: 返回 true 表示控制器忙碌,false 非忙碌状态 " )
return self . __exec_cmd ( " robot_is_busy " , " 获取程序忙碌状态 " , " :--: 返回 1 表示控制器忙碌,0 非忙碌状态 " )
@property
@property
def robot_is_moving ( self ) : # OK
def robot_is_moving ( self ) : # OK
return self . __exec_cmd ( " robot_is_moving " , " 获取程序运行状态 " , " :--: 返回 true 表示机器人正在运动, false 未运动 " )
return self . __exec_cmd ( " robot_is_moving " , " 获取程序运行状态 " , " :--: 返回 true 表示机器人正在运动, false 未运动 " )
@property
@property
def safe_door_state ( self ) :
def safe_door_state ( self ) : # OK
return self . __exec_cmd ( " safe_door_state " , " 获取安全门状态 " , " :--: 返回 true 表示安全门已触发, false 未触发 " )
return self . __exec_cmd ( " safe_door_state " , " 获取安全门状态 " , " :--: 返回 true 表示安全门已触发, false 未触发 " )
@property
@property
@ -2027,7 +2059,7 @@ class ExternalCommunication(object):
def __exec_cmd ( self , directive , description , more_desc = " " ) :
def __exec_cmd ( self , directive , description , more_desc = " " ) :
self . s_string ( directive )
self . s_string ( directive )
result = self . r_string ( ) . strip ( )
result = self . r_string ( directive ) . strip ( )
clibs . logger . info ( f " 执行 { description } 指令 { directive } ,返回值为 { result } { more_desc } " )
clibs . logger . info ( f " 执行 { description } 指令 { directive } ,返回值为 { result } { more_desc } " )
return result
return result