2146 lines
89 KiB
Python
2146 lines
89 KiB
Python
from json import load, dumps, loads
|
||
from os import listdir
|
||
from inspect import currentframe
|
||
from socket import socket, AF_INET, SOCK_STREAM
|
||
from threading import Thread
|
||
import selectors
|
||
from time import time, sleep, strftime, localtime
|
||
from pymodbus.client.tcp import ModbusTcpClient
|
||
# from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
|
||
# from pymodbus.constants import Endian
|
||
from functools import wraps
|
||
from inspect import getcallargs
|
||
from paramiko import SSHClient, AutoAddPolicy
|
||
import clibs
|
||
|
||
|
||
class ModbusRequest(object):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.__c = ModbusTcpClient(host=clibs.ip_addr, port=clibs.modbus_port)
|
||
try:
|
||
self.__c.connect()
|
||
clibs.logger.success("Modbus connection success...")
|
||
except Exception as Err:
|
||
clibs.logger.error(f"Modbus connection failed...\n{Err}")
|
||
|
||
def __reg_high_pulse(self, addr: int) -> None:
|
||
self.__c.write_register(addr, 0)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(addr, 1)
|
||
sleep(clibs.interval+1)
|
||
self.__c.write_register(addr, 0)
|
||
|
||
def r_clear_alarm(self): # OK
|
||
self.__reg_high_pulse(40000)
|
||
clibs.logger.info("40000-010 执行清除告警信息")
|
||
|
||
def r_reset_estop(self): # OK
|
||
self.__reg_high_pulse(40001)
|
||
clibs.logger.info("40001-010 执行复位急停状态(非软急停)")
|
||
|
||
def r_reset_estop_clear_alarm(self): # OK
|
||
self.__reg_high_pulse(40002)
|
||
clibs.logger.info("40002-010 执行复位急停状态(非软急停),并清除告警信息")
|
||
|
||
def r_motor_off(self): # OK
|
||
self.__reg_high_pulse(40003)
|
||
clibs.logger.info("40003-010 执行机器人下电")
|
||
|
||
def r_motor_on(self): # OK
|
||
self.__reg_high_pulse(40004)
|
||
clibs.logger.info("40004-010 执行机器人上电")
|
||
|
||
def r_motoron_pp2main_start(self): # OK
|
||
self.__reg_high_pulse(40005)
|
||
clibs.logger.info(
|
||
"40005-010 执行机器人上电/pp2main/开始运行程序,需自动模式执行,若运行失败,可清除告警后再次尝试")
|
||
|
||
def r_motoron_start(self): # OK
|
||
self.__reg_high_pulse(40006)
|
||
clibs.logger.info("40006-010 执行机器人上电/开始运行程序,需自动模式执行,若运行失败,可清除告警、执行pp2main后再次尝试")
|
||
|
||
def r_pulse_motoroff(self): # OK
|
||
self.__reg_high_pulse(40007)
|
||
clibs.logger.info("40007-010 执行机器人停止,并下电,手动模式下可停止程序运行,但不能下电,若运行失败,可清除告警后再次尝试")
|
||
|
||
def r_pp2main(self): # OK
|
||
self.__reg_high_pulse(40008)
|
||
clibs.logger.info("40008-010 执行机器人 pp2main,需自动模式执行,若运行失败,可清除告警后再次尝试")
|
||
|
||
def r_program_start(self): # OK
|
||
self.__reg_high_pulse(40009)
|
||
clibs.logger.info("40009-010 执行机器人默认程序运行,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试")
|
||
|
||
def r_program_stop(self): # OK
|
||
self.__reg_high_pulse(40010)
|
||
clibs.logger.info("40010-010 执行机器人默认程序停止,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试")
|
||
|
||
def r_reduced_mode(self, action: int): # OK
|
||
self.__c.write_register(40011, action)
|
||
actions = "进入" if action == 1 else "退出"
|
||
clibs.logger.info(f"40011-{action} 执行机器人{actions}缩减模式")
|
||
sleep(clibs.interval)
|
||
|
||
def r_soft_estop(self, action: int): # OK
|
||
self.__c.write_register(40012, action)
|
||
actions = "解除" if action == 1 else "触发"
|
||
clibs.logger.info(f"40012-{action} 执行{actions}急停动作")
|
||
sleep(clibs.interval)
|
||
|
||
def r_switch_auto_motoron(self): # OK
|
||
self.__reg_high_pulse(40013)
|
||
clibs.logger.info(f"40013-010 执行切换为自动模式,并上电,初始状态 !!不能是!! 手动上电模式")
|
||
|
||
def r_switch_auto(self): # OK
|
||
self.__reg_high_pulse(40014)
|
||
clibs.logger.info(f"40014-010 执行切换为自动模式")
|
||
|
||
def r_switch_manual(self): # OK
|
||
self.__reg_high_pulse(40015)
|
||
clibs.logger.info(f"40015-010 执行切换为手动模式")
|
||
|
||
def r_switch_safe_region01(self, action: bool): # OK | 上升沿打开,下降沿关闭
|
||
if action:
|
||
self.__c.write_register(40016, False)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40016, True)
|
||
else:
|
||
self.__c.write_register(40016, True)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40016, False)
|
||
actions = "打开" if action else "关闭"
|
||
clibs.logger.info(f"40016-{action} 执行{actions}安全区 safe region 01")
|
||
sleep(clibs.interval)
|
||
|
||
def r_switch_safe_region02(self, action: bool): # OK | 上升沿打开,下降沿关闭
|
||
if action:
|
||
self.__c.write_register(40017, False)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40017, True)
|
||
else:
|
||
self.__c.write_register(40017, True)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40017, False)
|
||
actions = "打开" if action else "关闭"
|
||
clibs.logger.info(f"40017-{action} 执行{actions}安全区 safe region 02")
|
||
sleep(clibs.interval)
|
||
|
||
def r_switch_safe_region03(self, action: bool): # OK | 上升沿打开,下降沿关闭
|
||
if action:
|
||
self.__c.write_register(40018, False)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40018, True)
|
||
else:
|
||
self.__c.write_register(40018, True)
|
||
sleep(clibs.interval)
|
||
self.__c.write_register(40018, False)
|
||
actions = "打开" if action else "关闭"
|
||
clibs.logger.info(f"40018-{action} 执行{actions}安全区 safe region 03")
|
||
sleep(clibs.interval)
|
||
|
||
@property
|
||
def w_alarm_state(self): # OK
|
||
res = self.__c.read_holding_registers(40500, 1).registers[0]
|
||
clibs.logger.info(f"40500 获取告警状态,结果为 {res} :--: 0 表示无告警,,1 表示有告警")
|
||
return res
|
||
|
||
@property
|
||
def w_collision_alarm_state(self): # OK
|
||
res = self.__c.read_holding_registers(40501, 1).registers[0]
|
||
clibs.logger.info(f"40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发,1 表示已触发")
|
||
return res
|
||
|
||
@property
|
||
def w_collision_open_state(self): # OK
|
||
res = self.__c.read_holding_registers(40502, 1).registers[0]
|
||
clibs.logger.info(f"40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭,1 表示开启")
|
||
return res
|
||
|
||
@property
|
||
def w_controller_is_running(self): # OK
|
||
res = self.__c.read_holding_registers(40503, 1).registers[0]
|
||
clibs.logger.info(f"40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常,1 表示运行正常")
|
||
return res
|
||
|
||
@property
|
||
def w_encoder_low_battery(self): # OK
|
||
res = self.__c.read_holding_registers(40504, 1).registers[0]
|
||
clibs.logger.info(f"40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压,1 表示低电压 需关注")
|
||
return res
|
||
|
||
@property
|
||
def w_estop_state(self): # OK
|
||
res = self.__c.read_holding_registers(40505, 1).registers[0]
|
||
clibs.logger.info(f"40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发,1 表示已触发")
|
||
return res
|
||
|
||
@property
|
||
def w_motor_state(self): # OK
|
||
res = self.__c.read_holding_registers(40506, 1).registers[0]
|
||
clibs.logger.info(f"40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电,1 表示已上电")
|
||
return res
|
||
|
||
@property
|
||
def w_operation_mode(self): # OK
|
||
res = self.__c.read_holding_registers(40507, 1).registers[0]
|
||
clibs.logger.info(f"40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式,1 表示自动模式")
|
||
return res
|
||
|
||
@property
|
||
def w_program_state(self): # OK
|
||
res = self.__c.read_holding_registers(40508, 1).registers[0]
|
||
clibs.logger.info(f"40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行,1 表示正在运行")
|
||
return res
|
||
|
||
@property
|
||
def w_program_not_run(self): # OK
|
||
res = self.__c.read_holding_registers(40509, 1).registers[0]
|
||
clibs.logger.info(f"40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行,1 表示未运行")
|
||
return res
|
||
|
||
@property
|
||
def w_program_reset(self): # OK
|
||
res = self.__c.read_holding_registers(40510, 1).registers[0]
|
||
clibs.logger.info(f"40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数,1 表示指针在 main 函数")
|
||
return res
|
||
|
||
@property
|
||
def w_reduce_mode_state(self): # OK
|
||
res = self.__c.read_holding_registers(40511, 1).registers[0]
|
||
clibs.logger.info(f"40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式,1 表示缩减模式")
|
||
return res
|
||
|
||
@property
|
||
def w_robot_is_busy(self): # OK
|
||
res = self.__c.read_holding_registers(40512, 1).registers[0]
|
||
clibs.logger.info(f"40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态,1 表示处于 busy 状态")
|
||
return res
|
||
|
||
@property
|
||
def w_robot_is_moving(self): # OK
|
||
res = self.__c.read_holding_registers(40513, 1).registers[0]
|
||
clibs.logger.info(f"40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示未运动,1 表示正在运动")
|
||
return res
|
||
|
||
@property
|
||
def w_safe_door_state(self): # OK
|
||
res = self.__c.read_holding_registers(40514, 1).registers[0]
|
||
clibs.logger.info(f"40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门,1 表示已触发安全门")
|
||
return res
|
||
|
||
@property
|
||
def w_safe_region01_trig_state(self): # OK
|
||
res = self.__c.read_holding_registers(40515, 1).registers[0]
|
||
clibs.logger.info(f"40515 获取安全区域 safe region01 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发")
|
||
return res
|
||
|
||
@property
|
||
def w_safe_region02_trig_state(self): # OK
|
||
res = self.__c.read_holding_registers(40516, 1).registers[0]
|
||
clibs.logger.info(f"40516 获取安全区域 safe region02 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发")
|
||
return res
|
||
|
||
@property
|
||
def w_safe_region03_trig_state(self): # OK
|
||
res = self.__c.read_holding_registers(40517, 1).registers[0]
|
||
clibs.logger.info(f"40517 获取安全区域 safe region03 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发")
|
||
return res
|
||
|
||
@property
|
||
def w_soft_estop_state(self): # OK
|
||
res = self.__c.read_holding_registers(40518, 1).registers[0]
|
||
clibs.logger.info(f"40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停,1 表示已触发软急停")
|
||
return res
|
||
|
||
def io_write_coils(self, addr, action): # OK | 名字叫写线圈,其实是写 modbus 的 discrete inputs(DI)
|
||
# e.g. io_write_coils(0, 1)
|
||
# e.g. io_write_coils(1, 1)
|
||
# e.g. io_write_coils(0, [1, 1, 1])
|
||
self.__c.write_coils(addr, action)
|
||
clibs.logger.info(f"Modbus 执行给 DI 地址 {addr} 赋值为 {action},可根据情况传递列表,实现一次性赋值多个")
|
||
sleep(clibs.interval)
|
||
|
||
def io_read_coils(self): # OK | 读 modbus 的 16 个 discrete inputs(DI)
|
||
res = self.__c.read_coils(0, 16).bits
|
||
clibs.logger.info(f"Modbus 执行读取所有 DI 的结果为 {res}")
|
||
return res
|
||
|
||
def io_read_discretes(self): # OK | 读 modbus 的 coil outputs(DO)
|
||
res = self.__c.read_discrete_inputs(0, 16).bits
|
||
clibs.logger.info(f"Modbus 执行读取所有 DO 的结果为 {res}")
|
||
return res
|
||
|
||
|
||
class HmiRequest(object):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.__c = None
|
||
self.__c_xs = None
|
||
self.c_msg = []
|
||
self.c_msg_xs = []
|
||
self.__flag = 0
|
||
self.__response = ""
|
||
self.__leftover = 0
|
||
self.__response_xs = ""
|
||
self.__is_connected = False
|
||
self.__silence = False
|
||
self.__pkg_size = 0
|
||
self.__broke = 0
|
||
self.__half = 0
|
||
self.__half_length = 0
|
||
self.__index = 0
|
||
self.__reset_index = 0
|
||
self.__close_hmi = False
|
||
self.__error_code = 0
|
||
|
||
self.__t_is_alive = Thread(target=self.__is_alive)
|
||
self.__t_is_alive.daemon = False
|
||
self.__t_is_alive.start()
|
||
sleep(2) # 很重要,必须有,因为涉及到建联成功与否
|
||
|
||
def __is_alive(self):
|
||
first_time = True
|
||
while not self.__close_hmi:
|
||
if not self.__is_connected:
|
||
if not first_time:
|
||
clibs.logger.info("重新连接中...")
|
||
first_time = False
|
||
self.__silence = True
|
||
self.__sock_conn()
|
||
if self.__is_connected:
|
||
self.__t_heartbeat = Thread(target=self.__heartbeat)
|
||
self.__t_heartbeat.daemon = True
|
||
self.__t_heartbeat.start()
|
||
self.__t_unpackage = Thread(target=self.__unpackage, args=(self.__c,))
|
||
self.__t_unpackage.daemon = True
|
||
self.__t_unpackage.start()
|
||
self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,))
|
||
self.__t_unpackage_xs.daemon = True
|
||
self.__t_unpackage_xs.start()
|
||
self.__silence = False
|
||
first_time = True
|
||
|
||
sleep(clibs.interval*6)
|
||
|
||
def close(self):
|
||
self.__close_hmi = True
|
||
self.__is_connected = False
|
||
self.__silence = True
|
||
|
||
def __sock_conn(self):
|
||
try:
|
||
self.__c = socket(AF_INET, SOCK_STREAM)
|
||
self.__c.connect((clibs.ip_addr, clibs.socket_port))
|
||
self.__c.setblocking(True)
|
||
self.__c_xs = socket(AF_INET, SOCK_STREAM)
|
||
self.__c_xs.connect((clibs.ip_addr, clibs.xService_port))
|
||
self.__c_xs.setblocking(True)
|
||
|
||
# 尝试连续三次发送心跳包,确认建联成功
|
||
self.__error_code = 0
|
||
for i in range(3):
|
||
self.execution.__wrapped__(self, "controller.heart")
|
||
# self.execution("controller.heart")
|
||
sleep(clibs.interval/5)
|
||
if not self.__error_code:
|
||
self.__is_connected = True
|
||
clibs.logger.success("HMI connection success...")
|
||
else:
|
||
raise Exception()
|
||
|
||
except Exception as Err:
|
||
self.__sth_wrong(9, f"HMI connection failed...{Err}")
|
||
|
||
def __sth_wrong(self, exit_code, err_desc=""):
|
||
self.__is_connected = False
|
||
self.__error_code = exit_code
|
||
if not self.__silence:
|
||
clibs.logger.error(f"[{exit_code}] 可能是 HMI 无法连接到 {clibs.ip_addr}:{clibs.socket_port}...")
|
||
if err_desc != "":
|
||
clibs.logger.error(f"[{exit_code}] {err_desc}")
|
||
|
||
def __header_check(self, index, data):
|
||
if index + 8 < len(data):
|
||
_frame_size = int.from_bytes(data[index:index + 2], byteorder="big")
|
||
_pkg_size = int.from_bytes(data[index + 2:index + 6], byteorder="big")
|
||
_protocol = int.from_bytes(data[index + 6:index + 7], byteorder="big")
|
||
_reserved = int.from_bytes(data[index + 7:index + 8], byteorder="big")
|
||
|
||
if _reserved == 0 and _protocol == 2:
|
||
return index + 8, _frame_size, _pkg_size
|
||
else:
|
||
print(data)
|
||
self.__sth_wrong(6, "Header Check: 解包数据有误,需要确认!")
|
||
else:
|
||
self.__half_length = len(data) - index
|
||
self.__half = data[index:]
|
||
self.__broke = 100
|
||
index += clibs.MAX_FRAME_SIZE
|
||
return index, 0, 0
|
||
|
||
def __heartbeat(self):
|
||
while self.__is_connected:
|
||
self.execution("controller.heart")
|
||
sleep(clibs.heartbeat_interval)
|
||
|
||
@staticmethod
|
||
def __gen_id(command):
|
||
_now = time()
|
||
_id = f"{command}-{_now}"
|
||
return _id
|
||
|
||
def get_from_id(self, msg_id, flag=0):
|
||
def find_response(log_data):
|
||
with open(log_data, mode="r", encoding="utf-8") as f_log:
|
||
for line in f_log:
|
||
if line.split("|")[1].strip() != "DEBUG":
|
||
continue
|
||
if msg_id in line.strip():
|
||
error_code = int(eval(line.split("|")[-1].strip()).get("error_code", 0))
|
||
if not error_code:
|
||
return eval(line.split("|")[-1].strip())
|
||
else:
|
||
self.__sth_wrong(7, f"请求 {msg_id} 的返回错误码为 {error_code}")
|
||
|
||
def find_response_xs(log_data):
|
||
with open(log_data, mode="r", encoding="utf-8") as f_log:
|
||
for line in reversed(f_log.readlines()):
|
||
if line.split("|")[1].strip() != "DEBUG":
|
||
continue
|
||
if msg_id in line.strip():
|
||
return eval(line.split("|")[-1].strip())
|
||
|
||
if flag == 0:
|
||
for _ in range(3):
|
||
res = find_response(clibs.log_data_debug)
|
||
if res is not None:
|
||
return res.get("data", f"{msg_id} has no data item")
|
||
sleep(clibs.interval*2)
|
||
else: # 尝试在上一次分割的日志中查找,只做一次
|
||
res = find_response("".join([clibs.log_path, sorted(listdir(clibs.log_path))[-3]]))
|
||
if res is not None:
|
||
return res.get("data", f"{msg_id} has no data item")
|
||
elif flag == 1:
|
||
for _ in range(3):
|
||
res = find_response_xs(clibs.log_data_debug)
|
||
if res is not None:
|
||
return res
|
||
sleep(clibs.interval*2)
|
||
else:
|
||
res = find_response_xs("".join([clibs.log_path, sorted(listdir(clibs.log_path))[-3]]))
|
||
if res is not None:
|
||
return res
|
||
self.__sth_wrong(11, f"无法找到请求 {msg_id} 的返回结果")
|
||
|
||
def __msg_storage(self, response, flag=0):
|
||
# response是解码后的字符串
|
||
if "move.monitor" not in response:
|
||
clibs.logger.debug(f"{loads(response)}")
|
||
messages = self.c_msg if flag == 0 else self.c_msg_xs
|
||
if "move.monitor" in response:
|
||
pass
|
||
elif len(messages) < 10000:
|
||
messages.insert(0, response)
|
||
else:
|
||
messages.insert(0, response)
|
||
while len(messages) >= 10000:
|
||
messages.pop()
|
||
|
||
def __get_response(self, data):
|
||
_frame_size = 0
|
||
# 流式获取单次请求的响应
|
||
if self.__broke == 100:
|
||
_half_1 = self.__half
|
||
_half_2 = data[:8 - self.__half_length]
|
||
_full = _half_1 + _half_2
|
||
|
||
_frame_size = int.from_bytes(_full[:2], byteorder="big")
|
||
_pkg_size = int.from_bytes(_full[2:6], byteorder="big")
|
||
_protocol = int.from_bytes(_full[6:7], byteorder="big")
|
||
_reserved = int.from_bytes(_full[7:8], byteorder="big")
|
||
if _reserved != 0 or _protocol != 2:
|
||
print(data)
|
||
self.__sth_wrong(10, "in get_response: 解包数据有误,需要确认!")
|
||
|
||
self.__pkg_size = _pkg_size
|
||
self.__index = 8 - self.__half_length
|
||
else:
|
||
if self.__reset_index == 1:
|
||
self.__index = 0
|
||
|
||
while self.__index < len(data):
|
||
# flag 为 0,则说明是一次新的请求对应的一次新的相应,也就是需要首次解包
|
||
if self.__flag == 0:
|
||
if self.__broke == 100:
|
||
self.__broke = 0
|
||
else:
|
||
self.__index, _frame_size, self.__pkg_size = self.__header_check(self.__index, data)
|
||
if self.__index > clibs.MAX_FRAME_SIZE:
|
||
break
|
||
# 详见解包原理数据.txt,self.__pkg_size 永远是除了当前data之外剩余未处理的数据大小
|
||
if self.__pkg_size <= len(data) - self.__index:
|
||
# 说明剩余部分的数据就在当前data内,没有被分割
|
||
self.__response = data[self.__index:self.__index + self.__pkg_size].decode()
|
||
self.__msg_storage(flag=0, response=self.__response)
|
||
self.__index += self.__pkg_size
|
||
self.__flag = 0
|
||
self.__response = ""
|
||
self.__leftover = 0
|
||
self.__pkg_size = 0
|
||
self.__reset_index = 0 if self.__index < len(data) else 1
|
||
elif self.__pkg_size > len(data) - self.__index:
|
||
# 执行到这里说明该data是首包,且有有分包的情况发生了也就是该响应数据量稍微比较大
|
||
# 分散在了相邻的两个及以上的data中,需要flag=1的处理
|
||
self.__flag = 1
|
||
if self.__index + _frame_size - 6 <= len(data):
|
||
self.__response = data[self.__index:self.__index + _frame_size - 6].decode()
|
||
self.__index += (_frame_size - 6)
|
||
self.__pkg_size -= (_frame_size - 6) # 详见解包原理数据.txt,self.__pkg_size
|
||
self.__reset_index = 0
|
||
|
||
if self.__index + 2 < len(data):
|
||
self.__leftover = int.from_bytes(data[self.__index:self.__index + 2], byteorder="big")
|
||
self.__index += 2
|
||
self.__reset_index = 0
|
||
else:
|
||
if self.__index + 2 == len(data):
|
||
self.__broke = 1
|
||
self.__half = data[-2:]
|
||
elif self.__index + 1 == len(data):
|
||
self.__broke = 2
|
||
self.__half = data[-1:]
|
||
elif self.__index == len(data):
|
||
self.__broke = 3
|
||
|
||
self.__index += clibs.MAX_FRAME_SIZE
|
||
self.__reset_index = 1
|
||
break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE
|
||
|
||
elif self.__index + _frame_size - 6 > len(data):
|
||
self.__response = data[self.__index:].decode()
|
||
self.__pkg_size -= (len(data) - self.__index) # 详见解包原理数据.txt,self.__pkg_size
|
||
self.__leftover = (_frame_size - 6 - (len(data) - self.__index))
|
||
self.__index += clibs.MAX_FRAME_SIZE
|
||
self.__reset_index = 1
|
||
|
||
elif self.__flag == 1:
|
||
# 继续处理之前为接收完的数据,处理完之后将flag重置为0
|
||
# !!!需要注意的是,包头/帧头也是有可能被分割开的!!!但是目前该程序未实现此种情况!!!
|
||
if self.__broke == 1:
|
||
self.__index = 0
|
||
self.__leftover = int.from_bytes(self.__half, byteorder="big")
|
||
self.__broke = 0
|
||
elif self.__broke == 2:
|
||
self.__leftover = int.from_bytes(self.__half + data[:1], byteorder="big")
|
||
self.__index = 1
|
||
self.__broke = 0
|
||
if self.__broke == 3:
|
||
self.__leftover = int.from_bytes(data[:2], byteorder="big")
|
||
self.__index = 2
|
||
self.__broke = 0
|
||
while self.__pkg_size > 0:
|
||
if self.__index + self.__leftover <= len(data):
|
||
self.__response += data[self.__index:self.__index + self.__leftover].decode()
|
||
self.__pkg_size -= self.__leftover
|
||
if self.__pkg_size == 0:
|
||
self.__msg_storage(flag=0, response=self.__response)
|
||
self.__index += self.__leftover
|
||
self.__flag = 0
|
||
self.__response = ""
|
||
self.__leftover = 0
|
||
self.__pkg_size = 0
|
||
self.__reset_index = 0 if self.__index < len(data) else 1
|
||
break
|
||
|
||
self.__index += self.__leftover
|
||
if self.__index + 2 < len(data):
|
||
self.__leftover = int.from_bytes(data[self.__index:self.__index + 2], byteorder="big")
|
||
self.__index += 2
|
||
self.__reset_index = 0
|
||
else:
|
||
if self.__index + 2 == len(data):
|
||
self.__broke = 1
|
||
self.__half = data[-2:]
|
||
elif self.__index + 1 == len(data):
|
||
self.__broke = 2
|
||
self.__half = data[-1:]
|
||
elif self.__index == len(data):
|
||
self.__broke = 3
|
||
|
||
self.__index += clibs.MAX_FRAME_SIZE
|
||
self.__reset_index = 1
|
||
break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE
|
||
else:
|
||
self.__response += data[self.__index:].decode()
|
||
self.__leftover -= (len(data) - self.__index)
|
||
self.__pkg_size -= (len(data) - self.__index)
|
||
self.__index += clibs.MAX_FRAME_SIZE
|
||
self.__reset_index = 1
|
||
break # 该data内数据已经处理完毕,需要跳出大循环,通过break和index
|
||
|
||
def __get_response_xs(self, data):
|
||
char, response = "", self.__response_xs
|
||
for char in data.decode():
|
||
if char != "\r":
|
||
response += char
|
||
else:
|
||
self.__msg_storage(flag=1, response=response)
|
||
response = ""
|
||
else:
|
||
self.__response_xs = response if char != "\r" else ""
|
||
|
||
@staticmethod
|
||
def __package(cmd):
|
||
_frame_head = (len(cmd) + 6).to_bytes(length=2, byteorder="big")
|
||
_pkg_head = len(cmd).to_bytes(length=4, byteorder="big")
|
||
_protocol = int(2).to_bytes(length=1, byteorder="big")
|
||
_reserved = int(0).to_bytes(length=1, byteorder="big")
|
||
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
|
||
|
||
@staticmethod
|
||
def __package_xs(cmd):
|
||
return "".join([dumps(cmd, separators=(",", ":")), "\r"]).encode()
|
||
|
||
def __unpackage(self, sock):
|
||
def to_read(conn, mask):
|
||
data = conn.recv(clibs.MAX_FRAME_SIZE)
|
||
if data:
|
||
# print(data)
|
||
self.__get_response(data)
|
||
else:
|
||
print("closing", conn)
|
||
sel.unregister(conn)
|
||
conn.close()
|
||
|
||
try:
|
||
sel = selectors.DefaultSelector()
|
||
sel.register(sock, selectors.EVENT_READ, to_read)
|
||
|
||
while self.__is_connected:
|
||
events = sel.select()
|
||
for key, mask in events:
|
||
callback = key.data
|
||
callback(key.fileobj, mask)
|
||
except Exception as Err:
|
||
self.__sth_wrong(3, f"错误信息:{Err}")
|
||
|
||
def __unpackage_xs(self, sock):
|
||
def to_read(conn, mask):
|
||
data = conn.recv(1024) # Should be ready
|
||
if data:
|
||
# print(data)
|
||
self.__get_response_xs(data)
|
||
else:
|
||
print("closing", conn)
|
||
sel.unregister(conn)
|
||
conn.close()
|
||
|
||
try:
|
||
sel = selectors.DefaultSelector()
|
||
sel.register(sock, selectors.EVENT_READ, to_read)
|
||
|
||
while self.__is_connected:
|
||
events = sel.select()
|
||
for key, mask in events:
|
||
callback = key.data
|
||
callback(key.fileobj, mask)
|
||
except Exception as Err:
|
||
self.__sth_wrong(8, f"错误信息:{Err}")
|
||
|
||
@staticmethod
|
||
def validate_req(func):
|
||
@wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
all_args = getcallargs(func, self, *args, **kwargs)
|
||
_id = func(self, *args, **kwargs)
|
||
self.get_from_id(_id, flag=all_args["protocol_flag"])
|
||
return _id
|
||
return wrapper
|
||
|
||
@validate_req
|
||
def execution(self, command, protocol_flag=0, **kwargs):
|
||
def log_reqs(request):
|
||
with open(clibs.log_data_reqs, mode="a", encoding="utf-8") as f_log:
|
||
log_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()))
|
||
f_log.write(" ".join([log_time, dumps(request), "\n"]))
|
||
|
||
if protocol_flag == 0: # for old protocols
|
||
req = None
|
||
try:
|
||
with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json:
|
||
req = load(f_json)
|
||
except Exception as Err:
|
||
self.__sth_wrong(5, f"暂不支持 {command} 功能,或确认该功能存在... {Err}")
|
||
|
||
match command:
|
||
case "state.set_tp_mode":
|
||
req["data"]["tp_mode"] = kwargs["tp_mode"]
|
||
case "overview.set_autoload":
|
||
req["data"]["autoload_prj_path"] = kwargs["autoload_prj_path"]
|
||
case "overview.reload":
|
||
req["data"]["prj_path"] = kwargs["prj_path"]
|
||
req["data"]["tasks"] = kwargs["tasks"]
|
||
case "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop":
|
||
req["data"]["tasks"] = kwargs["tasks"]
|
||
case "rl_task.set_run_params":
|
||
req["data"]["loop_mode"] = kwargs["loop_mode"]
|
||
req["data"]["override"] = kwargs["override"]
|
||
case "diagnosis.set_params":
|
||
req["data"]["display_pdo_params"] = kwargs["display_pdo_params"]
|
||
req["data"]["frequency"] = kwargs["frequency"]
|
||
req["data"]["version"] = kwargs["version"]
|
||
case "diagnosis.open":
|
||
req["data"]["open"] = kwargs["open"]
|
||
req["data"]["display_open"] = kwargs["display_open"]
|
||
req["data"]["overrun"] = kwargs["overrun"]
|
||
req["data"]["turn_area"] = kwargs["turn_area"]
|
||
req["data"]["delay_motion"] = kwargs["delay_motion"]
|
||
case "register.set_value":
|
||
req["data"]["name"] = kwargs["name"]
|
||
req["data"]["type"] = kwargs["type"]
|
||
req["data"]["bias"] = kwargs["bias"]
|
||
req["data"]["value"] = kwargs["value"]
|
||
case "diagnosis.save":
|
||
req["data"]["save"] = kwargs["save"]
|
||
case "socket.set_params":
|
||
req["data"] = kwargs["data"]
|
||
case "fieldbus_device.set_params":
|
||
req["data"]["device_name"] = kwargs["device_name"]
|
||
req["data"]["enable"] = kwargs["enable"]
|
||
case "soft_limit.set_params":
|
||
req["data"] = kwargs["data"]
|
||
case "move.set_quickturn_pos":
|
||
req["data"]["enable_home"] = kwargs["enable_home"]
|
||
req["data"]["enable_drag"] = kwargs["enable_drag"]
|
||
req["data"]["enable_transport"] = kwargs["enable_transport"]
|
||
req["data"]["end_posture"] = kwargs["end_posture"]
|
||
case "move.quick_turn":
|
||
req["data"]["name"] = kwargs["name"]
|
||
case "move.stop":
|
||
req["data"]["stoptype"] = kwargs["stoptype"]
|
||
case "jog.start":
|
||
req["data"]["index"] = kwargs["index"]
|
||
req["data"]["direction"] = kwargs["direction"]
|
||
req["data"]["is_ext"] = kwargs["is_ext"]
|
||
case "jog.set_params":
|
||
req["data"]["step"] = kwargs["step"]
|
||
req["data"]["override"] = kwargs["override"]
|
||
req["data"]["space"] = kwargs["space"]
|
||
case "diagnosis.get_params":
|
||
req["data"]["version"] = kwargs["version"]
|
||
case "system_io.update_configuration":
|
||
req["data"]["input_system_io"] = kwargs["input_system_io"]
|
||
req["data"]["output_system_io"] = kwargs["output_system_io"]
|
||
case "modbus.set_params":
|
||
req["data"]["enable_slave"] = kwargs["enable_slave"]
|
||
req["data"]["ip"] = kwargs["ip"]
|
||
req["data"]["port"] = kwargs["port"]
|
||
req["data"]["slave_id"] = kwargs["slave_id"]
|
||
req["data"]["enable_master"] = kwargs["enable_master"]
|
||
case "modbus.get_values":
|
||
req["data"]["mode"] = kwargs["mode"]
|
||
case "move.set_monitor_cfg":
|
||
req["data"]["ref_coordinate"] = kwargs["ref_coordinate"]
|
||
case "move.set_params":
|
||
req["data"]["MOTION"] = kwargs["data"]
|
||
case "move.set_quickstop_distance":
|
||
req["data"]["distance"] = kwargs["distance"]
|
||
case "collision.set_params":
|
||
req["data"] = kwargs["data"]
|
||
case "collision.set_state":
|
||
req["data"]["collision_state"] = kwargs["collision_state"]
|
||
case "controller.set_params":
|
||
req["data"]["time"] = kwargs["time"]
|
||
case "drag.set_params":
|
||
req["data"]["enable"] = kwargs["enable"]
|
||
req["data"]["space"] = kwargs["space"]
|
||
req["data"]["type"] = kwargs["type"]
|
||
case _:
|
||
pass
|
||
|
||
req["id"] = self.__gen_id(command)
|
||
cmd = dumps(req, separators=(",", ":"))
|
||
try:
|
||
log_reqs(req)
|
||
self.__c.send(self.__package(cmd))
|
||
sleep(clibs.interval)
|
||
except Exception as Err:
|
||
self.__sth_wrong(4, f"{Err} 请求发送失败:{cmd}")
|
||
return req["id"]
|
||
elif protocol_flag == 1: # for xService
|
||
req = None
|
||
try:
|
||
with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json:
|
||
req = load(f_json)
|
||
except Exception as Err:
|
||
self.__sth_wrong(1, f"暂不支持 {command} 功能,或确认该功能存在... {Err}")
|
||
|
||
match command:
|
||
case "safety.safety_area.signal_enable":
|
||
req["c"]["safety.safety_area.signal_enable"]["signal"] = kwargs["signal"]
|
||
case "safety.safety_area.overall_enable":
|
||
req["c"]["safety.safety_area.overall_enable"]["enable"] = kwargs["enable"]
|
||
case "log_code.data.code_list":
|
||
req["s"]["log_code.data"]["code_list"] = kwargs["code_list"]
|
||
case "safety.safety_area.safety_area_enable":
|
||
req["c"]["safety.safety_area.safety_area_enable"]["id"] = kwargs["id"]
|
||
req["c"]["safety.safety_area.safety_area_enable"]["enable"] = kwargs["enable"]
|
||
case "safety.safety_area.set_param":
|
||
req["c"]["safety.safety_area.set_param"] = kwargs["data"]
|
||
case _:
|
||
pass
|
||
|
||
try:
|
||
log_reqs(req)
|
||
self.__c_xs.send(self.__package_xs(req))
|
||
sleep(clibs.interval)
|
||
except Exception as Err:
|
||
self.__sth_wrong(2, f"{Err} 请求发送失败:{req}")
|
||
return command
|
||
|
||
# =================================== ↓↓↓ specific functions ↓↓↓ ===================================
|
||
|
||
def switch_motor_state(self, state: str): # OK
|
||
"""
|
||
切换上电/下电的状态
|
||
:param state: on/off
|
||
:return: None
|
||
"""
|
||
match state:
|
||
case "on":
|
||
self.execution("state.switch_motor_on")
|
||
case "off":
|
||
self.execution("state.switch_motor_off")
|
||
case _:
|
||
clibs.logger.error(f"switch_motor_state 参数错误 {state}: 非法参数,只接受 on/off")
|
||
|
||
def switch_operation_mode(self, mode: str): # OK
|
||
"""
|
||
切换自动/手动操作模式
|
||
:param mode: auto/manual
|
||
:return: None
|
||
"""
|
||
match mode:
|
||
case "auto":
|
||
self.execution("state.switch_auto")
|
||
case "manual":
|
||
self.execution("state.switch_manual")
|
||
case _:
|
||
clibs.logger.error("switch_operation_mode 参数错误 非法参数,只接受 auto/manual")
|
||
|
||
def reload_project(self, prj_name: str, tasks: list): # OK
|
||
"""
|
||
重新加载指定工程
|
||
:param prj_name: 工程名,也即 zip 文件的名字
|
||
:param tasks: 要加载的任务列表
|
||
:return: None
|
||
"""
|
||
prj_path = f"{prj_name}/_build/{prj_name}.prj"
|
||
self.execution("overview.reload", prj_path=prj_path, tasks=tasks)
|
||
|
||
def set_project_auto_reload(self, prj_name: str): # OK
|
||
"""
|
||
将指定工程设置为开机自动加载,也即默认工程
|
||
:param prj_name: 工程名,也即 zip 文件的名字
|
||
:return: None
|
||
"""
|
||
autoload_prj_path = f"{prj_name}/_build/{prj_name}.prj"
|
||
self.execution("overview.set_autoload", autoload_prj_path=autoload_prj_path)
|
||
|
||
def pp_to_main(self, tasks: list): # OK
|
||
"""
|
||
将指定的任务列表的指针,指向 main 函数
|
||
:param tasks: 任务列表
|
||
:return: None
|
||
"""
|
||
self.execution("rl_task.pp_to_main", tasks=tasks)
|
||
|
||
def program_start(self, tasks: list): # OK
|
||
"""
|
||
开始执行程序任务,必须是自动模式下执行
|
||
:param tasks: 任务列表
|
||
:return: None
|
||
"""
|
||
self.execution("rl_task.run", tasks=tasks)
|
||
|
||
def program_stop(self, tasks: list): # OK
|
||
"""
|
||
停止执行程序任务
|
||
:param tasks: 人物列表
|
||
:return: None
|
||
"""
|
||
self.execution("rl_task.stop", tasks=tasks)
|
||
|
||
def set_program_loop_speed(self, loop_mode: bool = True, override: float = 0.5): # OK
|
||
"""
|
||
:param loop_mode: True为循环模式,False为单次模式
|
||
:param override: HMI 左下方的速度滑块,取值范围 [0, 1]
|
||
:return: None
|
||
"""
|
||
self.execution("rl_task.set_run_params", loop_mode=loop_mode, override=override)
|
||
|
||
def clear_alarm(self): # OK
|
||
"""
|
||
清除伺服告警
|
||
:return: None
|
||
"""
|
||
self.execution("servo.clear_alarm")
|
||
|
||
def reboot_robot(self): # OK
|
||
"""
|
||
重启控制器
|
||
:return: None
|
||
"""
|
||
self.execution("controller.reboot")
|
||
clibs.logger.info(f"控制器重启中,重连预计需要等待 100s 左右...")
|
||
ts = time()
|
||
sleep(30)
|
||
while True:
|
||
sleep(5)
|
||
te = time()
|
||
if te - ts > 180:
|
||
self.__silence = False
|
||
self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确")
|
||
break
|
||
for _ in range(3):
|
||
if not self.__is_connected:
|
||
break
|
||
sleep(2)
|
||
else:
|
||
clibs.logger.info("HMI 重新连接成功...")
|
||
break
|
||
|
||
def reload_io(self):
|
||
"""
|
||
触发控制器重新加载 IO 设备列表
|
||
:return: None
|
||
"""
|
||
self.execution("io_device.load_cfg")
|
||
|
||
@property
|
||
def get_quickturn_pos(self): # OK
|
||
"""
|
||
获取机器人的home位姿、拖动位姿和发货位姿,轴关节角度,end_posture 取值如下:
|
||
0 法兰平面与地面平行
|
||
1 工具坐标系X轴与地面垂直,正向朝下
|
||
2 工具坐标系X轴与地面垂直,正向朝上
|
||
3 工具坐标系Y轴与地面垂直,正向朝下
|
||
4 工具坐标系Y轴与地面垂直,正向朝上
|
||
5 工具坐标系Z轴与地面垂直,正向朝下
|
||
6 工具坐标系Z轴与地面垂直,正向朝上
|
||
:return: as below
|
||
{
|
||
"enable_home": false, // 是否开启 home 点快速调整
|
||
"enable_drag": false, // 是否开启拖动位姿点快速调整
|
||
"enable_transport": false, // 是否开启发货位姿点快速调整
|
||
"joint_home": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // home 位姿的关节角度
|
||
"joint_drag": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 拖动位姿的关节角度
|
||
"joint_transport": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 发货位姿的关节角度
|
||
"end_posture":0, // 末端姿态的调整方式,取值 0-6
|
||
"home_error_range":[0.0,0.0,0.0,0.0,0.0,0.0,0.0] // home点误差范围
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_quickturn_pos")
|
||
|
||
def set_quickturn_pos(self, enable_home: bool = False, enable_drag: bool = False, enable_transport: bool = False, end_posture: int = 0): # OK
|
||
"""
|
||
设置机器人的home位姿、拖动位姿、发货位姿,轴关节角度,Home点误差范围,详见上一个 get_quickturn_pos 功能实现
|
||
:param enable_home: 是否开启 home 点快速调整
|
||
:param enable_drag: 是否开启拖动位姿点快速调整
|
||
:param enable_transport:是否开启发货位姿点快速调整
|
||
:param end_posture: 末端姿态的调整方式,取值 0-6,详见 get_quickturn_pos 注释
|
||
:return: None
|
||
"""
|
||
self.execution("move.set_quickturn_pos", enable_home=enable_home, enable_drag=enable_drag, enable_transport=enable_transport, end_posture=end_posture)
|
||
|
||
def move2quickturn(self, name: str): # OK
|
||
"""
|
||
运动到指定的快速调整位姿
|
||
:param name: 指定快速调整的名称,home/drag/transport
|
||
:return: None
|
||
"""
|
||
self.execution("move.quick_turn", name=name)
|
||
|
||
def stop_move(self, stoptype=0): # OK
|
||
"""
|
||
停止运动
|
||
TS_READY | TS_JOG | TS_LOADIDENTIFY | TS_DYNAMICIDENTIFY | TS_DRAG | TS_PROGRAM | TS_DEMO | TS_RCI | TS_DEBUG | TS_FRICTIONIDENTIFY
|
||
:param stoptype: 对应控制器的任务空间类型TaskSpace的枚举值,0-7
|
||
:return: None
|
||
"""
|
||
self.execution("move.stop", stoptype=stoptype)
|
||
|
||
@property
|
||
def get_jog_params(self): # OK
|
||
"""
|
||
获取JOG的参数
|
||
世界坐标系 WORLD_COORDINATE 0
|
||
法兰盘坐标系 FLANGE_COORDINATE 1
|
||
基坐标系 BASE_COORDINATE 2
|
||
工具坐标系 TOOL_COORDINATE 3
|
||
工件坐标系 FRAME_COORDINATE 4
|
||
关节空间 JOINT_SPACE 5
|
||
:return:
|
||
{
|
||
"step": 1000 [1000-连续] [10/1/0.1/0.001-点动]
|
||
"override": 0.2 速度比率
|
||
"space": 5 JOG的空间
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "jog.get_params")
|
||
|
||
def set_jog_params(self, step, override, space): # OK
|
||
"""
|
||
设置JOG的参数,包含步长,空间,速度倍率
|
||
:param step: [1000-连续] [10/1/0.1/0.001-点动]
|
||
:param override: 速度比率
|
||
:param space: JOG的空间
|
||
:return: None
|
||
"""
|
||
self.execution("jog.set_params", step=step, override=override, space=space)
|
||
|
||
def start_jog(self, index: int, direction: bool = False, is_ext: bool = False): # OK
|
||
"""
|
||
开始 JOG 运动
|
||
:param index: 0-6,若选轴空间,则 0-6 对应 1-7 轴,若选笛卡尔空间,则 0-6 对应 xyzabc elb
|
||
:param direction: True 正方向,False 反方向
|
||
:param is_ext: 是否是外部轴 jog
|
||
:return: None
|
||
"""
|
||
self.execution("jog.start", index=index, direction=direction, is_ext=is_ext)
|
||
|
||
@property
|
||
def get_socket_params(self): # OK
|
||
"""
|
||
获取socket参数
|
||
:return:
|
||
{
|
||
"auto_connect": true, // True 开机启动,False 不开机启动
|
||
"disconnection_detection_time": 10, // 链接断开检测周期(s)
|
||
"disconnection_triggering_behavior": 0, // 断开连接触发行为 0无动作 1暂停程序 2暂停并下电
|
||
"enable": true, // True 开启或者 False 关闭
|
||
"ip": "", // 仅限于客户端,用于指定服务端 IP;当作为服务端时,该参数设置为空字符串,否则会报错!!!
|
||
"name": "name", // 连接名称
|
||
"port": "8080", // 连接端口
|
||
"reconnect_flag": true, // True 自动重连,False 不自动重连
|
||
"suffix": "\r", // 指定发送分隔符
|
||
"type": 1 // 连接类型 0 client | 1 server
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "socket.get_params")
|
||
|
||
def set_socket_params(self, enable: bool, port: str, suffix: str, type: int = 1, **kwargs): # OK
|
||
"""
|
||
设置 socket 参数,一般作为服务器使用
|
||
:param enable: True 开启或者 False 关闭
|
||
:param port: 连接端口
|
||
:param suffix: 指定发送分隔符
|
||
:param type: 0 client | 1 server
|
||
:return: None
|
||
"""
|
||
data = self.get_socket_params
|
||
keys = data.keys()
|
||
kwargs.update({"enable": enable, "port": port, "suffix": suffix, "type": type})
|
||
for _ in keys:
|
||
if _ in kwargs.keys():
|
||
data[_] = kwargs[_]
|
||
self.execution("socket.set_params", data=data)
|
||
|
||
@property
|
||
def get_diagnosis_params(self, version="1.4.1"): # OK
|
||
"""
|
||
获取诊断功能开启状态,以及相应其他信息
|
||
:param version: 指定诊断工具版本
|
||
:return:
|
||
{
|
||
"delay_motion": false, // -
|
||
"display_open": false, // 诊断显示功能开启状态
|
||
"ecat_diagnosis_state": false, // -
|
||
"overrun": false, // 是否开启实时线程超时监控上报
|
||
"pdo_params": [...], // 指定版本支持的所有曲线信息
|
||
"state": true, // 诊断功能的开启状态
|
||
"turn_area": false // 转弯区是否上报
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "diagnosis.get_params", version=version)
|
||
|
||
def set_diagnosis_params(self, display_pdo_params: list, frequency: int = 50, version: str = "1.4.1"): # OK
|
||
"""
|
||
设置诊断功能显示参数 [{"name": "hw_joint_vel_feedback", "channel": 0}, ]
|
||
:param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线
|
||
:param frequency: 采样频率,默认 50ms
|
||
:param version: xDiagnose的版本号
|
||
:return: None
|
||
"""
|
||
self.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=frequency, version=version)
|
||
|
||
def open_diagnosis(self, open: bool, display_open: bool, overrun: bool = False, turn_area: bool = False, delay_motion: bool = False): # OK
|
||
"""
|
||
打开或者关闭诊断曲线,并定义其他功能的开关(调试相关功能,比如是否开启线程超时监控和上报,转弯区以及运动延迟等)
|
||
:param open: 诊断功能,控制HMI->日志->诊断设置->私服诊断开关,一般设置成 True
|
||
:param display_open: 诊断显示功能,指的是在线诊断插件中的打开 switch 的状态,需要诊断数据的情况,设置成 True
|
||
:param overrun: 实时线程超时监控上报
|
||
:param turn_area: 转弯区上报
|
||
:param delay_motion: 延迟运动
|
||
:return: None
|
||
"""
|
||
self.execution("diagnosis.open", open=open, display_open=display_open, overrun=overrun, turn_area=turn_area, delay_motion=delay_motion)
|
||
|
||
def save_diagnosis(self, save: bool = True): # OK
|
||
"""
|
||
保存诊断数据,也就是主动写诊断动作,HMI日志->诊断设置->保存诊断数据
|
||
:param save: 保存数据开关
|
||
:return: None
|
||
"""
|
||
self.execution("diagnosis.save", save=save)
|
||
|
||
@property
|
||
def qurry_system_io_configuration(self): # OK
|
||
"""
|
||
系统IO配置的查询协议,trigger 参数取值参照如下:
|
||
FLANKS 0, //边缘触发
|
||
POS_FLANK 1, //上升沿
|
||
NEG_FLANK 2, //下降沿
|
||
HIGH_LEVEL 3, //高电平
|
||
LOW_LEVEL 4 //低电平
|
||
:return:
|
||
{
|
||
"input_system_io": {
|
||
"motor_on": {
|
||
"signal":"DI0_0",
|
||
"trigger":1
|
||
},
|
||
"motor_off": {
|
||
"signal":"DI0_0",
|
||
"trigger":2
|
||
}
|
||
},
|
||
"output_system_io": {
|
||
"sta_motor_on": {
|
||
"signal":"DO0_0"
|
||
},
|
||
"sta_robot_running": {
|
||
"signal":"DO0_1"
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration")
|
||
|
||
@property
|
||
def qurry_system_io_event_configuration(self): # OK
|
||
"""
|
||
查询当前系统支持的系统IO事件列表,包括事件key、名称、支持的触发方式等配置
|
||
:return:
|
||
{
|
||
"input_system_event": [
|
||
{
|
||
"key": "ctrl_motor_on",
|
||
"name": "上电",
|
||
"trigger_types": [
|
||
1,
|
||
2
|
||
]
|
||
},
|
||
{
|
||
"key": "ctrl_motor_off",
|
||
"name": "下电",
|
||
"trigger_types": [
|
||
1,
|
||
2
|
||
]
|
||
}
|
||
],
|
||
"output_system_event": [
|
||
{
|
||
"key": "sta_motor_on",
|
||
"name": "上下电状态"
|
||
},
|
||
{
|
||
"key": "sta_program",
|
||
"name": "运行状态"
|
||
}
|
||
],
|
||
"input_mutex_event": [
|
||
{
|
||
"key0": "ctrl_motor_on",
|
||
"key1": "ctrl_motor_off"
|
||
},
|
||
{
|
||
"key0": "ctrl_switch_auto",
|
||
"key1": "ctrl_switch_manu"
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration")
|
||
|
||
def update_system_io_configuration(self, i_funcs: list, o_funcs: list, i_signals: list, o_signals: list, trig_types: list): # OK
|
||
"""
|
||
配置系统 IO
|
||
:param i_funcs: 输入,只写功能码列表
|
||
:param o_funcs: 输出,只读功能码列表
|
||
:param i_signals: DI 信号列表
|
||
:param o_signals: DO 信号列表
|
||
:param trig_types: 触发条件列表,可参考 qurry_system_io_configuration 中的触发条件
|
||
:return: None
|
||
"""
|
||
input_system_io, output_system_io = {}, {}
|
||
for i_func in i_funcs:
|
||
_index = i_funcs.index(i_func)
|
||
input_system_io[i_func] = {"signal": i_signals[_index], "trigger": trig_types[_index]}
|
||
for o_func in o_funcs:
|
||
_index = o_funcs.index(o_func)
|
||
output_system_io[o_func] = {"signal": o_signals[_index]}
|
||
self.execution("system_io.update_configuration", input_system_io=input_system_io, output_system_io=output_system_io)
|
||
|
||
@property
|
||
def get_fieldbus_device_params(self): # OK
|
||
"""
|
||
获取的是 HMI通讯->总线设备列表的信息,以及开关状态
|
||
:return:
|
||
{
|
||
"device_list": [
|
||
{
|
||
"device_name": "modbus_1",
|
||
"enable": true
|
||
}, {
|
||
"device_name": "modbus_2",
|
||
"enable": false
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "fieldbus_device.get_params")
|
||
|
||
def set_fieldbus_device_params(self, device_name: str, enable: bool): # OK
|
||
"""
|
||
定义开关设备的协议,一次只能打开一个设备
|
||
:param device_name: 设备列表中的名称
|
||
:param enable: 是否开启,这里操作的是 HMI通信->IO设备里面的开关状态
|
||
:return: None
|
||
"""
|
||
self.execution("fieldbus_device.set_params", device_name=device_name, enable=enable)
|
||
|
||
def reload_fieldbus(self): # OK
|
||
"""
|
||
触发控制器重新加载总线设备
|
||
:return: None
|
||
"""
|
||
self.execution("fieldbus_device.load_cfg")
|
||
|
||
@property
|
||
def get_modbus_params(self): # OK
|
||
"""
|
||
获取modbus参数
|
||
:return:
|
||
{
|
||
"connect_state": true,
|
||
"enable_master": false,
|
||
"enable_slave": false,
|
||
"ip": "192.168.0.160",
|
||
"is_convert": true,
|
||
"port": 502,
|
||
"slave_id": 1
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "modbus.get_params")
|
||
|
||
def set_modbus_params(self, enable_slave: bool, ip: str, port: int, slave_id: int, enable_master: bool): # OK
|
||
"""
|
||
设置modbus参数,相当于新建
|
||
:param enable_slave: Modbus从站是否自动开启
|
||
:param ip: ip 地址
|
||
:param port: 端口
|
||
:param slave_id: 从站 ID
|
||
:param enable_master: Modbus主站是否自动开启
|
||
:return:
|
||
"""
|
||
self.execution("modbus.set_params", enable_slave=enable_slave, ip=ip, port=port, slave_id=slave_id, enable_master=enable_master)
|
||
|
||
def reload_registers(self): # OK
|
||
"""
|
||
触发控制器重新加载寄存器列表
|
||
:return: None
|
||
"""
|
||
self.execution("modbus.load_cfg")
|
||
|
||
def get_modbus_values(self, mode: str = "all"): # OK
|
||
"""
|
||
用于获取 modbus 寄存器变量值的更新信息,展示在状态监控界面
|
||
:param mode: all/change
|
||
:return:
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "modbus.get_values", mode=mode)
|
||
|
||
@property
|
||
def get_soft_limit_params(self): # OK
|
||
"""
|
||
获取软限位参数
|
||
:return:
|
||
{
|
||
"enable":true
|
||
"upper":[0,0,0,0,0,0,0],
|
||
"lower":[0,0,0,0,0,0,0],
|
||
"reduced_upper":[0,0,0,0,0,0,0],
|
||
"reduced_lower":[0,0,0,0,0,0,0]
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "soft_limit.get_params")
|
||
|
||
def set_soft_limit_params(self, **kwargs): # OK
|
||
"""
|
||
设定软限位参数 enable: bool, upper: list, lower: list, reduced_upper: list, reduced_lower: list
|
||
:enable: 是否启用软限位
|
||
:upper: 软限位上限
|
||
:lower: 软限位下限
|
||
:reduced_upper: 缩减模式软限位上限
|
||
:reduced_lower: 缩减模式软限位下限
|
||
:return: None
|
||
"""
|
||
data = self.get_soft_limit_params
|
||
keys = data.keys()
|
||
for _ in keys:
|
||
if _ in kwargs.keys():
|
||
data[_] = kwargs[_]
|
||
self.execution("soft_limit.set_params", data=data)
|
||
|
||
@property
|
||
def get_device_params(self): # OK
|
||
"""
|
||
获取设备信息
|
||
:return:
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "device.get_params")
|
||
|
||
@property
|
||
def get_cart_pos(self): # OK
|
||
"""
|
||
获取机器人的当前位姿:包括轴关节角度,笛卡尔关节角度,四元数,欧拉角(臂角)
|
||
:return:
|
||
{
|
||
"joint":[0.0,0.0,0.0,0.0,0.0,0.0],
|
||
"position":[0.0,0.0,0.0,0.0,0.0,0.0],
|
||
"euler":[0.0,0.0,0.0],
|
||
"quaternion"[0.0,0.0,0.0,0.0],
|
||
"elb":0.0, // 可缺省
|
||
"ext_joint":[0.0,0.0,0.0,0.0,0.0]
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_pos")
|
||
|
||
@property
|
||
def get_joint_pos(self): # OK
|
||
"""
|
||
获取机器人的当前关节角度:包括内部轴和外部轴
|
||
:return:
|
||
{
|
||
"inner_pos": [0,0,0,0,0,0,0],
|
||
"extern_pos": [0,0,0,0,0,0,0]
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_joint_pos")
|
||
|
||
@property
|
||
def get_monitor_cfg(self): # OK
|
||
"""
|
||
获取机器人的监控配置参数,RefCoordinateType 类型数据,表示当前控制器位置监测的相对坐标系
|
||
基坐标系 REF_COORDINATE_BASE 0
|
||
世界坐标系 REF_COORDINATE_WORLD 1
|
||
工件坐标系 REF_COORDINATE_WOBJ 2
|
||
:return:
|
||
{
|
||
"ref_coordinate": 0
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_monitor_cfg")
|
||
|
||
def set_monitor_cfg(self, ref_coordinate): # OK
|
||
"""
|
||
设置机器人的监控配置参数
|
||
:ref_coordinate: RefCoordinateType类型数据,用来设置当前控制器位置监测的相对坐标系
|
||
:return: None
|
||
"""
|
||
self.execution("move.set_monitor_cfg", ref_coordinate=ref_coordinate)
|
||
|
||
@property
|
||
def get_move_params(self): # OK
|
||
"""
|
||
获取机器人的运动参数:包括减速比、耦合比、最大速度、加速度、加加速度、acc ramp time、规划步长等信息
|
||
:return:
|
||
{
|
||
"MOTION": {
|
||
"ACC_RAMPTIME_JOG": 0.5,
|
||
"ACC_RAMPTIME_STOP": 0.5,
|
||
"DEFAULT_ACC_PARAMS": [1.0, 0.5],
|
||
"JERK_LIMIT_CART": 0,
|
||
"JERK_LIMIT_JOINT": 0,
|
||
"JERK_LIMIT_ROT": 0,
|
||
"JOINT_MAX_ACC": [500, 500, 1000, 1000, 1000],
|
||
"JOINT_MAX_JERK": [2000, 2000, 2000, 4000, 4000, 4000],
|
||
"JOINT_MAX_SPEED": [120.0, 120.0, 180.0, 180.0, 180.0, 180.0],
|
||
"MAX_ACC_PARAMS": [1.0, 1],
|
||
"MIN_ACC_PARAMS": [0.3, 0.05],
|
||
"TCP_MAX_ACC": 5000,
|
||
"TCP_MAX_JERK": 10000,
|
||
"TCP_MAX_SPEED": 1000,
|
||
"TCP_ROTATE_MAX_ACC": 1800,
|
||
"TCP_ROTATE_MAX_JERK": 3600,
|
||
"TCP_ROTATE_MAX_SPEED": 180,
|
||
"VEL_SMOOTH_FACTOR": 1.0,
|
||
"VEL_SMOOTH_FACTOR_RANGE": [1.0, 10.0]
|
||
}
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_params")
|
||
|
||
def set_move_params(self, **kwargs): # OK
|
||
"""
|
||
设置机器人的运动参数:轴最大速度、轴最大加加速度、速度、加加速度、加速度、加加速度、acc ramp time、规划步长等信息
|
||
可选参数:参考 get_move_params 函数返回值 MOTION 里面的选项
|
||
:return: None
|
||
"""
|
||
data = self.get_move_params["MOTION"]
|
||
print(f"res = {data}")
|
||
keys = data.keys()
|
||
for _ in keys:
|
||
if _ in kwargs.keys():
|
||
data[_] = kwargs[_]
|
||
self.execution("move.set_params", data=data)
|
||
|
||
@property
|
||
def get_quick_stop_distance(self): # OK
|
||
"""
|
||
获取机器人 search 指令最大停止距离
|
||
:return:
|
||
{
|
||
"distance":2.0
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "move.get_quickstop_distance")
|
||
|
||
def set_quick_stop_distance(self, distance: float): # OK
|
||
"""
|
||
设置机器人 search 指令最大停止距离
|
||
:param distance: 停止距离,单位 mm
|
||
:return: None
|
||
"""
|
||
self.execution("move.set_quickstop_distance", distance=distance)
|
||
|
||
@property
|
||
def get_collision_params(self): # OK
|
||
"""
|
||
获取碰撞检测相关参数
|
||
:return:
|
||
{
|
||
"action": 1, // 触发行为:1-安全停止;2-触发暂停;3-柔顺停止
|
||
"coeff": [100, 100, 100, 100, 100, 100], // 0-整机灵敏度百分比,1-单轴灵敏度百分比,2-单轴和整机灵敏度百分比
|
||
"coeff_level": 0, // 灵敏度等级:0-低,1-中,2-高
|
||
"compliance": 0, // 柔顺功能比例系数,[0-1]
|
||
"enable": true, // 功能使能开关
|
||
"fallback_distance": 3, // 回退距离
|
||
"mode": 0, // 力传感器系数,0 整机 1 单轴
|
||
"percent": 100, // 0-200,整机灵敏度百分比
|
||
"percent_axis": [100, 100, 100, 100, 100, 100], // 0-200,单轴灵敏度百分比
|
||
"reduced_percent": 100, // 0-200,整机缩减模式灵敏度百分比
|
||
"reduced_percent_axis": [100, 100, 100, 100, 100, 100] // 0-200,单轴缩减模式灵敏度百分比
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "collision.get_params")
|
||
|
||
def set_collision_params(self, enable, mode, action, percent, **kwargs): # OK
|
||
"""
|
||
设置碰撞检测相关参数
|
||
:param enable: 功能使能开关
|
||
:param mode: 力传感器系数,0 整机 1 单轴
|
||
:param action: 触发行为:1-安全停止;2-触发暂停;3-柔顺停止
|
||
:param percent: 0-200,整机灵敏度百分比
|
||
:return:
|
||
"""
|
||
data = self.get_collision_params
|
||
keys = data.keys()
|
||
kwargs.update({"enable": enable, "mode": mode, "action": action, "percent": percent})
|
||
for _ in keys:
|
||
if _ in kwargs.keys():
|
||
data[_] = kwargs[_]
|
||
self.execution("collision.set_params", data=data)
|
||
|
||
def set_collision_state(self, collision_state: bool): # NG
|
||
"""
|
||
开启或者关闭虚拟墙碰撞检测,测试该函数功能无效!!!
|
||
:param collision_state: 碰撞检测的开关状态
|
||
:return: None
|
||
"""
|
||
self.execution("collision.set_state", collision_state=collision_state)
|
||
|
||
@property
|
||
def get_robot_state(self): # OK
|
||
"""
|
||
{
|
||
"rc_state":"normal", # "fatal" 、"error"、"block"、"normal"
|
||
"engine":"on", # "fatal" 、"error"、"GStop"、"EStop"、"on"、"off"
|
||
"servo_mode":"position", # "torque"、"position"
|
||
"operate": "auto", # "auto"、"manual"
|
||
"task_space": "program", # "jog"、"drag"、"ready"、"load_identify"、"demo"、"rci"、"dynamic_identify"、"program"、"debug"
|
||
"robot_action": "idle", # "idle"、"busy"
|
||
"safety_mode": "collision" # "normal"、"collision"、"collaboration"
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "state.get_state")
|
||
|
||
def set_controller_params(self, robot_time: str): # OK
|
||
"""
|
||
设置控制器系统时间
|
||
:param robot_time: 系统时间,"2020-02-28 15:28:30"
|
||
:return: None
|
||
"""
|
||
self.execution("controller.set_params", time=robot_time)
|
||
|
||
@property
|
||
def get_robot_params(self): # OK
|
||
"""
|
||
"alias": "",
|
||
"auth_state":
|
||
"controller_type": "XBC_XMATE",
|
||
"controller_types": ["XBC_3", "XBC_5", "XBC_XMATE"],
|
||
"disk_serial_number": "2338020401535",
|
||
"mac_addr": "34:df:20:03:1b:45",
|
||
"model":,
|
||
"nic_list": ["enp1s0", "enp2s0"],
|
||
"occupied_addr": "192.168.2.123:49269",
|
||
"period": 0.002,
|
||
"period_types": [0.001, 0.002, 0.003, 0.004],
|
||
"robot_template": 10,
|
||
"robot_type": "XMC12-R1300-W7G3B1C",
|
||
"robot_types": ["XMC12-R1300-B7S3B0C", "XMC12-R1300-W7G3B1C", "XMC17_5-R1900-W7G3B1C", "XMC20-R1650-B7G3Z0C"],
|
||
"security_type": "ROKAE_RSC",
|
||
"security_types": ["ROKAE_MINI", "ROKAE_RSC"],
|
||
"time": "2024-09-13 12:36:38",
|
||
"version": "2.3.0.4"
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "controller.get_params")
|
||
|
||
def switch_tp_mode(self, mode: str): # OK
|
||
"""
|
||
切换示教器模式
|
||
:param mode: with/without
|
||
:return: None
|
||
"""
|
||
match mode:
|
||
case "with":
|
||
self.execution("state.set_tp_mode", tp_mode="with")
|
||
case "without":
|
||
self.execution("state.set_tp_mode", tp_mode="without")
|
||
case _:
|
||
clibs.logger.error("switch_tp_mode 参数错误 非法参数,只接受 with/without")
|
||
|
||
@property
|
||
def get_tp_mode(self): # OK
|
||
"""
|
||
获取示教器连接状态
|
||
:return:
|
||
{
|
||
"tp_mode":"with"
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "state.get_tp_mode")
|
||
|
||
@property
|
||
def get_drag_params(self): # OK
|
||
"""
|
||
获取拖动模式参数
|
||
:return:
|
||
{
|
||
"enable": true,
|
||
"space": 0,
|
||
"type": 0
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "drag.get_params")
|
||
|
||
def set_drag_params(self, enable: bool, space: int = 1, type: int = 2): # OK
|
||
"""
|
||
设置拖动模式开关以及参数
|
||
:param enable: 是否启用拖动
|
||
:param space: 拖动空间 - 0 代表关节 1 代表笛卡尔
|
||
:param type: 拖动类型 - 0 只平移 1 只旋转 2 自由拖动
|
||
:return: None
|
||
"""
|
||
self.execution("drag.set_params", enable=enable, space=space, type=type)
|
||
|
||
def set_safety_area_signal(self, signal: bool): # OK
|
||
"""
|
||
设置安全区域信号控制使能开关
|
||
:param signal: True 打开 False 关闭
|
||
:return: None
|
||
"""
|
||
self.execution("safety.safety_area.signal_enable", protocol_flag=1, signal=signal)
|
||
|
||
def set_safety_area_overall(self, enable: bool): # OK
|
||
"""
|
||
设置安全区域整体控制使能开关
|
||
:param enable: True 打开 False 关闭
|
||
:return: None
|
||
"""
|
||
self.execution("safety.safety_area.overall_enable", protocol_flag=1, enable=enable)
|
||
|
||
@property
|
||
def get_safety_area_params(self): # OK
|
||
"""
|
||
获取安全区所有的配置信息
|
||
:return:
|
||
"g": {
|
||
"safety_area_data": {
|
||
"overall_enable": true,
|
||
"safety_area_setting": [
|
||
{
|
||
"box": {
|
||
"Lx": 100.0,
|
||
"Ly": 100.0,
|
||
"Lz": 100.0,
|
||
"direction": false,
|
||
"ori": {
|
||
"euler": {
|
||
"a": 179.9963851353547,
|
||
"b": -0.006653792532429416,
|
||
"c": 179.9934560302729
|
||
},
|
||
"quaternion": {
|
||
"q1": 0.0,
|
||
"q2": 0.0,
|
||
"q3": 0.0,
|
||
"q4": 0.0
|
||
}
|
||
},
|
||
"pos": {
|
||
"x": 0.0,
|
||
"y": 0.0,
|
||
"z": 0.0
|
||
}
|
||
},
|
||
"enable": false,
|
||
"id": 0,
|
||
"name": "region1",
|
||
"plane": {
|
||
"direction": true,
|
||
"point": {
|
||
"x": 0.0,
|
||
"y": 0.0,
|
||
"z": 0.0
|
||
},
|
||
"vector": {
|
||
"x": 0.0,
|
||
"y": 0.0,
|
||
"z": 0.0
|
||
}
|
||
},
|
||
"shape": 0,
|
||
"shared_bind_di": "",
|
||
"shared_bind_do": "",
|
||
"sphere": {
|
||
"ori": {
|
||
"euler": {
|
||
"a": 0.0,
|
||
"b": 0.0,
|
||
"c": 0.0
|
||
},
|
||
"quaternion": {
|
||
"q1": 0.0,
|
||
"q2": 0.0,
|
||
"q3": 0.0,
|
||
"q4": 0.0
|
||
}
|
||
},
|
||
"pos": {
|
||
"x": 0.0,
|
||
"y": 0.0,
|
||
"z": 0.0
|
||
},
|
||
"radius": 0.0
|
||
},
|
||
"state": true,
|
||
"trigger": 0,
|
||
"type": 0,
|
||
"vertebral": {
|
||
"high": 0.0,
|
||
"ori": {
|
||
"euler": {
|
||
"a": 0.0,
|
||
"b": 0.0,
|
||
"c": 0.0
|
||
},
|
||
"quaternion": {
|
||
"q1": 0.0,
|
||
"q2": 0.0,
|
||
"q3": 0.0,
|
||
"q4": 0.0
|
||
}
|
||
},
|
||
"pos": {
|
||
"x": 0.0,
|
||
"y": 0.0,
|
||
"z": 0.0
|
||
},
|
||
"radius": 0.0
|
||
}
|
||
},
|
||
... // 剩余 9 个安全区域的配置信息
|
||
],
|
||
"signal_enable": true
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "safety_area_data", flag=1)
|
||
|
||
def set_safety_area_enable(self, id: int, enable: bool): # OK
|
||
"""
|
||
设置每个安全区域的开关
|
||
:param id: 安全区域开关,0-9
|
||
:param enable: True 打开,False 关闭
|
||
:return: None
|
||
"""
|
||
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable)
|
||
|
||
def set_safety_area_param(self, id: int, enable: bool, **kwargs): # OK
|
||
"""
|
||
设定单独安全区的参数
|
||
:param id: 安全区 id
|
||
:param enable: 是否开启
|
||
:param kwargs: 其他参数,参考 get_safety_area_params 的返回值形式
|
||
:return: None
|
||
"""
|
||
data = self.get_safety_area_params["g"]["safety_area_data"]["safety_area_setting"][id]
|
||
keys = data.keys()
|
||
kwargs.update({"id": id, "enable": enable})
|
||
for _ in keys:
|
||
if _ in kwargs.keys():
|
||
data[_] = kwargs[_]
|
||
self.execution("safety.safety_area.set_param", protocol_flag=1, data=data)
|
||
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable)
|
||
|
||
@property
|
||
def get_filtered_error_code(self): # OK
|
||
"""
|
||
获取已设定的错误码过滤列表
|
||
:return:
|
||
{
|
||
"g": {
|
||
"log_code.data": {
|
||
"code_list": [
|
||
{
|
||
"id": 100,
|
||
"title": "DEMO\\u610f\\u5916\\u505c\\u6b62"
|
||
},
|
||
{
|
||
"id": 10000,
|
||
"title": "HMI\\u8bf7\\u6c42\\u5305\\u89e3\\u6790\\u9519\\u8bef"
|
||
},
|
||
{
|
||
"id": 10002,
|
||
"title": "\\u5feb\\u901f\\u8c03\\u6574\\u542f\\u52a8\\u5931\\u8d25"
|
||
}
|
||
],
|
||
"version": 1
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
return self.__get_data(currentframe().f_code.co_name, "log_code.data", flag=1)
|
||
|
||
def set_filtered_error_code(self, action: str, code_list: list): # OK
|
||
"""
|
||
清空,增加,删除错误过滤码
|
||
:param action: 支持 add/remove/clear,当为 clear 时,code_list 可为任意列表
|
||
:param code_list: 需要添加/删除的过滤码列表
|
||
:return: None
|
||
"""
|
||
origin_code_list = self.get_filtered_error_code["g"]["log_code.data"]["code_list"]
|
||
# [{"id": 10000, "title": "HMI请求包解析错误"}, {"id": 10002, "title": "快速调整启动失败"}]
|
||
if action == "clear":
|
||
code_list = []
|
||
elif action == "add":
|
||
for error_code in code_list:
|
||
for item in origin_code_list:
|
||
if error_code == item["id"]:
|
||
break
|
||
else:
|
||
origin_code_list.append({"id": error_code, "title": ""})
|
||
code_list = origin_code_list
|
||
elif action == "remove":
|
||
for error_code in code_list:
|
||
for item in origin_code_list:
|
||
if error_code == item["id"]:
|
||
origin_code_list.remove(item)
|
||
code_list = origin_code_list
|
||
|
||
self.execution("log_code.data.code_list", protocol_flag=1, code_list=code_list)
|
||
|
||
@staticmethod
|
||
def __cannot_find_response(func_name, req_id):
|
||
# 无法获取响应信息时,记录错误信息,并退出
|
||
clibs.logger.error(f"[{func_name}] 无法找到 {req_id} 的响应信息,执行失败,退出...")
|
||
exit()
|
||
|
||
def __get_data(self, upper_func, command, flag=0, **kwargs):
|
||
r_id = self.execution(command, protocol_flag=flag, **kwargs)
|
||
res = self.get_from_id(r_id, flag=flag)
|
||
if res is not None:
|
||
return res
|
||
else:
|
||
self.__cannot_find_response(upper_func, r_id)
|
||
# =================================== ↑↑↑ specific functions ↑↑↑ ===================================
|
||
|
||
|
||
class ExternalCommunication(object):
|
||
def __init__(self):
|
||
self.__c = None
|
||
self.suffix = "\r"
|
||
self.socket_client()
|
||
self.exec_desc = " :--: 返回 true 表示执行成功,false 失败"
|
||
|
||
def socket_client(self):
|
||
self.__c = socket(AF_INET, SOCK_STREAM)
|
||
try:
|
||
self.__c.connect((clibs.ip_addr, clibs.external_port))
|
||
clibs.logger.success(f"外部通信连接成功...")
|
||
return self.__c
|
||
except Exception as Err:
|
||
clibs.logger.error(f"外部通信连接失败... {Err}")
|
||
|
||
def s_string(self, directive):
|
||
order = "".join([directive, self.suffix])
|
||
self.__c.send(order.encode())
|
||
sleep(clibs.interval)
|
||
|
||
def r_string(self, directive):
|
||
result, char = "", ""
|
||
while char != self.suffix:
|
||
try:
|
||
char = self.__c.recv(1).decode(encoding="unicode_escape")
|
||
except TimeoutError:
|
||
clibs.logger.error(f"获取请求指令 {directive} 的返回数据超时,需确认指令发送格式以及内容正确!")
|
||
exit(101)
|
||
result = "".join([result, char])
|
||
sleep(clibs.interval)
|
||
return result
|
||
|
||
# =================================== ↓↓↓ specific functions ↓↓↓ ===================================
|
||
def motor_on(self): # OK
|
||
return self.__exec_cmd("motor_on", "电机上电", self.exec_desc)
|
||
|
||
def motor_off(self): # OK
|
||
return self.__exec_cmd("motor_off", "电机下电", self.exec_desc)
|
||
|
||
def pp_to_main(self): # OK
|
||
return self.__exec_cmd("pp_to_main", "程序指针到", self.exec_desc)
|
||
|
||
def program_start(self): # OK
|
||
return self.__exec_cmd("start", "程序启动(可能需先清告警)", self.exec_desc)
|
||
|
||
def program_stop(self): # OK
|
||
return self.__exec_cmd("stop", "程序停止", self.exec_desc)
|
||
|
||
def clear_alarm(self): # OK
|
||
return self.__exec_cmd("clear_alarm", "清除伺服报警", self.exec_desc)
|
||
|
||
def switch_operation_auto(self): # OK
|
||
return self.__exec_cmd("switch_mode:auto", "切换到自动模式", self.exec_desc)
|
||
|
||
def switch_operation_manual(self): # OK
|
||
return self.__exec_cmd("switch_mode:manual", "切换到手动模式", self.exec_desc)
|
||
|
||
def open_drag_mode(self): # OK
|
||
return self.__exec_cmd("open_drag", "打开拖动", self.exec_desc)
|
||
|
||
def close_drag_mode(self): # OK
|
||
return self.__exec_cmd("close_drag", "关闭拖动", self.exec_desc)
|
||
|
||
def get_program_list(self): # OK
|
||
return self.__exec_cmd("list_prog", "获取工程列表")
|
||
|
||
def get_current_program(self): # OK
|
||
return self.__exec_cmd("current_prog", "当前工程")
|
||
|
||
def load_program(self, program_name): # OK
|
||
return self.__exec_cmd(f"load_prog:{program_name}", "加载工程", self.exec_desc)
|
||
|
||
def estop_reset(self): # OK | 复原外部 IO 急停和安全门急停,前提是硬件已复原
|
||
return self.__exec_cmd("estop_reset", "急停复位", self.exec_desc)
|
||
|
||
def estopreset_and_clearalarm(self): # OK | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原
|
||
return self.__exec_cmd("estopreset_and_clearalarm", "急停复位并清除报警", self.exec_desc)
|
||
|
||
def motoron_pp2main_start(self): # OK
|
||
return self.__exec_cmd("motoron_pptomain_start", "依次执行上电,程序指针到main,启动程序(可以是手动模式,必要时需清告警)", self.exec_desc)
|
||
|
||
def motoron_start(self): # OK
|
||
return self.__exec_cmd("motoron_start", "依次执行上电,启动程序(可以是手动模式,必要时需清告警)", self.exec_desc)
|
||
|
||
def pause_motoroff(self): # OK
|
||
return self.__exec_cmd("pause_motoroff", "暂停程序并下电", self.exec_desc)
|
||
|
||
def set_program_speed(self, speed: int): # OK | 1-100
|
||
return self.__exec_cmd(f"set_program_speed:{speed}", "设置程序运行速率(滑块)", self.exec_desc)
|
||
|
||
def set_soft_estop(self, enable: str): # OK
|
||
return self.__exec_cmd(f"set_soft_estop:{enable}", "触发(true)/解除(false)机器人软急停", self.exec_desc)
|
||
|
||
def switch_auto_motoron(self): # OK
|
||
return self.__exec_cmd("switch_auto_motoron", "切换自动模式并上电", self.exec_desc)
|
||
|
||
def open_safe_region(self, number: int): # OK | 1-10
|
||
return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域(1-10,信号控制开关需打开,不限操作模式)", self.exec_desc)
|
||
|
||
def close_safe_region(self, number: int): # OK | 1-10
|
||
return self.__exec_cmd(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域(1-10,信号控制开关需打开,不限操作模式)", self.exec_desc)
|
||
|
||
def open_reduced_mode(self): # OK
|
||
return self.__exec_cmd("open_reduced_mode", "开启缩减模式(不限操作模式)", self.exec_desc)
|
||
|
||
def close_reduced_mode(self): # OK
|
||
return self.__exec_cmd("close_reduced_mode", "关闭缩减模式(不限操作模式)", self.exec_desc)
|
||
|
||
def setdo_value(self, do_name: str, do_value: str): # NG | do_value 为 true/false
|
||
return self.__exec_cmd(f"setdo:{do_name},{do_value}", f"设置 {do_name} 的值为 {do_value}", self.exec_desc)
|
||
|
||
def modify_system_time(self, robot_time): # OK
|
||
return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time} 的", self.exec_desc)
|
||
|
||
# --------------------------------------------------------------------------------------------------
|
||
@property
|
||
def motor_on_state(self): # OK
|
||
return self.__exec_cmd("motor_on_state", "获取上电状态", " :--: 返回 true 表示已上电,false 已下电")
|
||
|
||
@property
|
||
def robot_running_state(self): # OK
|
||
return self.__exec_cmd("robot_running_state", "获取程序运行状态", " :--: 返回 true 表示正在运行,false 未运行")
|
||
|
||
@property
|
||
def estop_state(self): # OK | 只表示外部急停,安全门触发不会返回 true,只要有急停标识 E 字母,就会返回 true
|
||
return self.__exec_cmd("estop_state", "急停状态", " :--: 返回 true 表示处于急停状态,false 非急停")
|
||
|
||
@property
|
||
def operation_mode(self): # OK
|
||
return self.__exec_cmd("operating_mode", "获取工作模式", " :--: 返回 true 表示自动模式,false 手动模式")
|
||
|
||
@property
|
||
def home_state(self): # OK | 需要设置一下 "HMI 设置->快速调整"
|
||
return self.__exec_cmd("home_state", "获取 HOME 输出状态", " :--: 返回 true 表示法兰中心处于 HOME 点,false 未处于 HOME 点")
|
||
|
||
@property
|
||
def fault_state(self): # OK
|
||
return self.__exec_cmd("fault_state", "获取 故障状态", " :--: 返回 true 表示处于故障状态,false 非故障状态")
|
||
|
||
@property
|
||
def collision_state(self): # OK | 但是触发后,无法清除?
|
||
return self.__exec_cmd("collision_state", "获取碰撞触发状态", " :--: 返回 true 表示碰撞已触发,false 未触发")
|
||
|
||
@property
|
||
def task_state(self): # OK
|
||
return self.__exec_cmd("task_state", "获取机器人运行任务状态", " :--: 返回 program 表示任务正在运行,ready 未运行")
|
||
|
||
@property
|
||
def get_cart_pos(self): # OK | cart_pos/cart_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别
|
||
return self.__exec_cmd("cart_pos", "获取笛卡尔位置")
|
||
|
||
@property
|
||
def get_joint_pos(self): # OK | jnt_pos/jnt_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别
|
||
return self.__exec_cmd("jnt_pos", "获取轴位置")
|
||
|
||
@property
|
||
def get_axis_vel(self): # OK | jnt_vel/jnt_vel_name 都可以正常返回,区别在返回的前缀,可测试辨别
|
||
return self.__exec_cmd("jnt_vel", "获取轴速度")
|
||
|
||
@property
|
||
def get_axis_trq(self): # OK | jnt_trq/jnt_trq_name 都可以正常返回,区别在返回的前缀,可测试辨别
|
||
return self.__exec_cmd("jnt_trq", "获取轴力矩")
|
||
|
||
@property
|
||
def reduced_mode_state(self): # OK
|
||
return self.__exec_cmd("reduced_mode_state", "获取缩减模式状态", " :--: 返回 true 表示缩减模式,false 非缩减模式")
|
||
|
||
def get_io_state(self, io_list: str): # OK | DO0_0,DI1_3,DO2_5,不能有空格
|
||
return self.__exec_cmd(f"io_state:{io_list}", "获取 IO 状态值")
|
||
|
||
@property
|
||
def alarm_state(self): # OK
|
||
return self.__exec_cmd("alarm_state", "获取报警状态", " :--: 返回 true 表示当前有告警,false 没有告警")
|
||
|
||
@property
|
||
def collision_alarm_state(self): # OK
|
||
return self.__exec_cmd("collision_alarm_state", "获取碰撞报警状态", " :--: 返回 true 表示有碰撞告警,false 没有碰撞告警")
|
||
|
||
@property
|
||
def collision_open_state(self): # OK
|
||
return self.__exec_cmd("collision_open_state", "获取碰撞检测开启状态", " :--: 返回 true 表示已开启碰撞检测,false 未开启")
|
||
|
||
@property
|
||
def controller_is_running(self): # OK
|
||
return self.__exec_cmd("controller_is_running", "判断控制器是否开机", " :--: 返回 true 表示控制器正在运行,false 未运行")
|
||
|
||
@property
|
||
def encoder_low_battery_state(self): # OK
|
||
return self.__exec_cmd("encoder_low_battery_state", "编码器低电压报警状态", " :--: 返回 true 表示编码器处于低电压状态,false 电压正常")
|
||
|
||
@property
|
||
def robot_error_code(self): # OK
|
||
return self.__exec_cmd("robot_error_code", "获取机器人错误码")
|
||
|
||
@property
|
||
def rl_pause_state(self): # OK
|
||
# 0 -- 初始化状态,刚开机上电时
|
||
# 1 -- RL 运行中
|
||
# 2 -- HMI 暂停
|
||
# 3 -- 系统 IO 暂停
|
||
# 4 -- 寄存器功能码暂停
|
||
# 5 -- 外部通讯暂停
|
||
# 6 --
|
||
# 7 -- Pause 指令暂停
|
||
# 8 --
|
||
# 9 --
|
||
# 10 -- 外部 IO 急停
|
||
# 11 -- 安全门急停
|
||
# 12 -- 其他因素停止,比如碰撞检测
|
||
return self.__exec_cmd("program_full", "获取 RL 的暂停状态", " :--: 返回值含义详见功能定义")
|
||
|
||
@property
|
||
def program_reset_state(self): # OK
|
||
return self.__exec_cmd("program_reset_state", "获取程序复位状态", " :--: 返回 true 表示指针指向 main,false 未指向 main")
|
||
|
||
@property
|
||
def program_speed_value(self): # OK | 速度滑块
|
||
return self.__exec_cmd("program_speed", "获取程序运行速度")
|
||
|
||
@property
|
||
def robot_is_busy(self): # OK | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发
|
||
return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态", " :--: 返回 1 表示控制器忙碌,0 非忙碌状态")
|
||
|
||
@property
|
||
def robot_is_moving(self): # OK
|
||
return self.__exec_cmd("robot_is_moving", "获取程序运行状态", " :--: 返回 true 表示机器人正在运动,false 未运动")
|
||
|
||
@property
|
||
def safe_door_state(self): # OK
|
||
return self.__exec_cmd("safe_door_state", "获取安全门状态", " :--: 返回 true 表示安全门已触发,false 未触发")
|
||
|
||
@property
|
||
def soft_estop_state(self): # OK
|
||
return self.__exec_cmd("soft_estop_state", "获取软急停状态", " :--: 返回 true 表示软急停已触发,false 未触发")
|
||
|
||
@property
|
||
def get_cart_vel(self): # OK
|
||
return self.__exec_cmd("cart_vel", "获取笛卡尔速度")
|
||
|
||
@property
|
||
def get_tcp_pos(self): # OK
|
||
return self.__exec_cmd("tcp_pose", "获取 TCP 位姿")
|
||
|
||
@property
|
||
def get_tcp_vel(self): # OK
|
||
return self.__exec_cmd("tcp_vel", "获取 TCP 速度")
|
||
|
||
@property
|
||
def get_tcp_vel_mag(self): # OK
|
||
return self.__exec_cmd("tcp_vel_mag", "获取 TCP 合成线速度")
|
||
|
||
@property
|
||
def ext_estop_state(self): # OK
|
||
return self.__exec_cmd("ext_estop_state", "获取外部轴急停状态", " :--: 返回 true 表示外部轴急停已触发,false 未触发")
|
||
|
||
@property
|
||
def hand_estop_state(self): # OK
|
||
return self.__exec_cmd("hand_estop_state", "获取手持急停状态", " :--: 返回 true 表示手持急停已触发,false 未触发")
|
||
|
||
@property
|
||
def collaboration_state(self): # OK
|
||
return self.__exec_cmd("collaboration_state", "获取协作模式状态(其实就是缩减模式)", " :--: 返回 true 表示协作模式,false 非协作模式")
|
||
|
||
def __exec_cmd(self, directive, description, more_desc=""):
|
||
self.s_string(directive)
|
||
result = self.r_string(directive).strip()
|
||
clibs.logger.info(f"执行{description}指令 {directive},返回值为 {result}{more_desc}")
|
||
return result
|
||
|
||
|
||
class PreDos(object):
|
||
def __init__(self):
|
||
self.__ssh = None
|
||
self.__sftp = None
|
||
|
||
def __ssh2server(self):
|
||
try:
|
||
self.__ssh = SSHClient()
|
||
self.__ssh.set_missing_host_key_policy(AutoAddPolicy())
|
||
self.__ssh.connect(clibs.ip_addr, clibs.ssh_port, username=clibs.username, password=clibs.password)
|
||
self.__sftp = self.__ssh.open_sftp()
|
||
except Exception as Err:
|
||
clibs.logger.error(f"SSH 无法连接到 {clibs.ip_addr}:{clibs.ssh_port},需检查网络连通性或者登录信息是否正确 {Err}")
|
||
exit(110)
|
||
|
||
def push_prj_to_server(self, prj_file):
|
||
# prj_file:本地工程完整路径
|
||
self.__ssh2server()
|
||
prj_name = prj_file.split("\\")[-1].split("/")[-1].split(".")[0]
|
||
self.__sftp.put(prj_file, f"/tmp/{prj_name}.zip")
|
||
cmd = f"cd /tmp; mkdir {prj_name}; unzip -d {prj_name} -q {prj_name}.zip; rm -rf /tmp/{prj_name}.zip; "
|
||
cmd += f"sudo rm -rf /home/luoshi/bin/controller/projects/{prj_name}; "
|
||
cmd += f"sudo mv /tmp/{prj_name}/ /home/luoshi/bin/controller/projects/"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
stdout.read().decode() # 需要read一下才能正常执行
|
||
stderr.read().decode()
|
||
self.__ssh.close()
|
||
|
||
def pull_prj_from_server(self, prj_name, local_prj_path): # NG | 可以拉取文件,但是导入之后,有问题
|
||
# prj_name:要拉取的服务端工程名
|
||
# local_prj_path:本地工程文件的完整路径
|
||
self.__ssh2server()
|
||
cmd = f"cd /tmp/; sudo rm -rf {prj_name}*; sudo cp -rf /home/luoshi/bin/controller/projects/{prj_name} .; "
|
||
cmd += f"sudo zip -q -r {prj_name}.zip {prj_name}/; sudo rm -rf {prj_name}"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
print(stdout.read().decode()) # 需要read一下才能正常执行
|
||
print(stderr.read().decode())
|
||
|
||
self.__sftp.get(f"/tmp/{prj_name}.zip", local_prj_path)
|
||
cmd = f"sudo rm -rf /tmp/{prj_name}.zip"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
print(stdout.read().decode()) # 需要read一下才能正常执行
|
||
print(stderr.read().decode())
|
||
|
||
self.__ssh.close()
|
||
|
||
def push_file_to_server(self, local_file, server_file):
|
||
# local_file:本地文件完整路径
|
||
# server_file:服务端文件完整路径
|
||
self.__ssh2server()
|
||
filename = local_file.split("\\")[-1].split("/")[-1]
|
||
self.__sftp.put(local_file, f"/tmp/{filename}")
|
||
cmd = f"sudo mv /tmp/{filename} {server_file}"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
stdout.read().decode() # 需要read一下才能正常执行
|
||
stderr.read().decode()
|
||
self.__ssh.close()
|
||
|
||
def pull_file_from_server(self, server_file, local_file):
|
||
# local_file:本地文件完整路径
|
||
# server_file:服务端文件完整路径
|
||
self.__ssh2server()
|
||
cmd = f"sudo cp {server_file} /tmp/"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
stdout.read().decode() # 需要read一下才能正常执行
|
||
stderr.read().decode()
|
||
filename = server_file.split("/")[-1]
|
||
self.__sftp.get(f"/tmp/{filename}", f"{local_file}")
|
||
cmd = f"sudo rm -rf /tmp/{filename}"
|
||
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
|
||
stdin.write(clibs.password + "\n")
|
||
stdout.read().decode() # 需要read一下才能正常执行
|
||
stderr.read().decode()
|
||
self.__ssh.close()
|