from json import load, dumps, loads, dump from os import listdir from inspect import currentframe from socket import socket, AF_INET, SOCK_STREAM from threading import Thread import selectors from time import time, sleep, strftime, localtime from pymodbus.client.tcp import ModbusTcpClient # from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder # from pymodbus.constants import Endian from paramiko import SSHClient, AutoAddPolicy import clibs class ModbusRequest(object): def __init__(self): super().__init__() self.__c = ModbusTcpClient(host=clibs.ip_addr, port=clibs.modbus_port) try: self.__c.connect() clibs.logger.success("Modbus connection success...") except Exception as Err: clibs.logger.error(f"Modbus connection failed...\n{Err}") def __reg_high_pulse(self, addr: int) -> None: self.__c.write_register(addr, 0) sleep(clibs.interval) self.__c.write_register(addr, 1) sleep(clibs.interval+1) self.__c.write_register(addr, 0) def r_clear_alarm(self): # OK self.__reg_high_pulse(40000) clibs.logger.info("40000-010 执行清除告警信息") def r_reset_estop(self): # OK self.__reg_high_pulse(40001) clibs.logger.info("40001-010 执行复位急停状态(非软急停)") def r_reset_estop_clear_alarm(self): # OK self.__reg_high_pulse(40002) clibs.logger.info("40002-010 执行复位急停状态(非软急停),并清除告警信息") def r_motor_off(self): # OK self.__reg_high_pulse(40003) clibs.logger.info("40003-010 执行机器人下电") def r_motor_on(self): # OK self.__reg_high_pulse(40004) clibs.logger.info("40004-010 执行机器人上电") def r_motoron_pp2main_start(self): # OK self.__reg_high_pulse(40005) clibs.logger.info( "40005-010 执行机器人上电/pp2main/开始运行程序,需自动模式执行,若运行失败,可清除告警后再次尝试") def r_motoron_start(self): # OK self.__reg_high_pulse(40006) clibs.logger.info("40006-010 执行机器人上电/开始运行程序,需自动模式执行,若运行失败,可清除告警、执行pp2main后再次尝试") def r_pulse_motoroff(self): # OK self.__reg_high_pulse(40007) clibs.logger.info("40007-010 执行机器人停止,并下电,手动模式下可停止程序运行,但不能下电,若运行失败,可清除告警后再次尝试") def r_pp2main(self): # OK self.__reg_high_pulse(40008) clibs.logger.info("40008-010 执行机器人 pp2main,需自动模式执行,若运行失败,可清除告警后再次尝试") def r_program_start(self): # OK self.__reg_high_pulse(40009) clibs.logger.info("40009-010 执行机器人默认程序运行,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试") def r_program_stop(self): # OK self.__reg_high_pulse(40010) clibs.logger.info("40010-010 执行机器人默认程序停止,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试") def r_reduced_mode(self, action: int): # OK self.__c.write_register(40011, action) actions = "进入" if action == 1 else "退出" clibs.logger.info(f"40011-{action} 执行机器人{actions}缩减模式") sleep(clibs.interval) def r_soft_estop(self, action: int): # OK self.__c.write_register(40012, action) actions = "解除" if action == 1 else "触发" clibs.logger.info(f"40012-{action} 执行{actions}急停动作") sleep(clibs.interval) def r_switch_auto_motoron(self): # OK self.__reg_high_pulse(40013) clibs.logger.info(f"40013-010 执行切换为自动模式,并上电,初始状态 !!不能是!! 手动上电模式") def r_switch_auto(self): # OK self.__reg_high_pulse(40014) clibs.logger.info(f"40014-010 执行切换为自动模式") def r_switch_manual(self): # OK self.__reg_high_pulse(40015) clibs.logger.info(f"40015-010 执行切换为手动模式") def r_switch_safe_region01(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.__c.write_register(40016, False) sleep(clibs.interval) self.__c.write_register(40016, True) else: self.__c.write_register(40016, True) sleep(clibs.interval) self.__c.write_register(40016, False) actions = "打开" if action else "关闭" clibs.logger.info(f"40016-{action} 执行{actions}安全区 safe region 01") sleep(clibs.interval) def r_switch_safe_region02(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.__c.write_register(40017, False) sleep(clibs.interval) self.__c.write_register(40017, True) else: self.__c.write_register(40017, True) sleep(clibs.interval) self.__c.write_register(40017, False) actions = "打开" if action else "关闭" clibs.logger.info(f"40017-{action} 执行{actions}安全区 safe region 02") sleep(clibs.interval) def r_switch_safe_region03(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.__c.write_register(40018, False) sleep(clibs.interval) self.__c.write_register(40018, True) else: self.__c.write_register(40018, True) sleep(clibs.interval) self.__c.write_register(40018, False) actions = "打开" if action else "关闭" clibs.logger.info(f"40018-{action} 执行{actions}安全区 safe region 03") sleep(clibs.interval) @property def w_alarm_state(self): # OK res = self.__c.read_holding_registers(40500, 1).registers[0] clibs.logger.info(f"40500 获取告警状态,结果为 {res} :--: 0 表示无告警,,1 表示有告警") return res @property def w_collision_alarm_state(self): # OK res = self.__c.read_holding_registers(40501, 1).registers[0] clibs.logger.info(f"40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_collision_open_state(self): # OK res = self.__c.read_holding_registers(40502, 1).registers[0] clibs.logger.info(f"40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭,1 表示开启") return res @property def w_controller_is_running(self): # OK res = self.__c.read_holding_registers(40503, 1).registers[0] clibs.logger.info(f"40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常,1 表示运行正常") return res @property def w_encoder_low_battery(self): # OK res = self.__c.read_holding_registers(40504, 1).registers[0] clibs.logger.info(f"40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压,1 表示低电压 需关注") return res @property def w_estop_state(self): # OK res = self.__c.read_holding_registers(40505, 1).registers[0] clibs.logger.info(f"40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_motor_state(self): # OK res = self.__c.read_holding_registers(40506, 1).registers[0] clibs.logger.info(f"40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电,1 表示已上电") return res @property def w_operation_mode(self): # OK res = self.__c.read_holding_registers(40507, 1).registers[0] clibs.logger.info(f"40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式,1 表示自动模式") return res @property def w_program_state(self): # OK res = self.__c.read_holding_registers(40508, 1).registers[0] clibs.logger.info(f"40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行,1 表示正在运行") return res @property def w_program_not_run(self): # OK res = self.__c.read_holding_registers(40509, 1).registers[0] clibs.logger.info(f"40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行,1 表示未运行") return res @property def w_program_reset(self): # OK res = self.__c.read_holding_registers(40510, 1).registers[0] clibs.logger.info(f"40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数,1 表示指针在 main 函数") return res @property def w_reduce_mode_state(self): # OK res = self.__c.read_holding_registers(40511, 1).registers[0] clibs.logger.info(f"40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式,1 表示缩减模式") return res @property def w_robot_is_busy(self): # OK res = self.__c.read_holding_registers(40512, 1).registers[0] clibs.logger.info(f"40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态,1 表示处于 busy 状态") return res @property def w_robot_is_moving(self): # OK res = self.__c.read_holding_registers(40513, 1).registers[0] clibs.logger.info(f"40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示为运动,1 表示正在运动") return res @property def w_safe_door_state(self): # OK res = self.__c.read_holding_registers(40514, 1).registers[0] clibs.logger.info(f"40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门,1 表示已触发安全门") return res @property def w_safe_region01_trig_state(self): # OK res = self.__c.read_holding_registers(40515, 1).registers[0] clibs.logger.info(f"40515 获取安全区域 safe region01 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_safe_region02_trig_state(self): # OK res = self.__c.read_holding_registers(40516, 1).registers[0] clibs.logger.info(f"40516 获取安全区域 safe region02 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_safe_region03_trig_state(self): # OK res = self.__c.read_holding_registers(40517, 1).registers[0] clibs.logger.info(f"40517 获取安全区域 safe region03 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_soft_estop_state(self): # OK res = self.__c.read_holding_registers(40518, 1).registers[0] clibs.logger.info(f"40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停,1 表示已触发软急停") return res def io_write_coils(self, addr, action): # OK | 名字叫写线圈,其实是写 modbus 的 discrete inputs(DI) # e.g. io_write_coils(0, 1) # e.g. io_write_coils(1, 1) # e.g. io_write_coils(0, [1, 1, 1]) self.__c.write_coils(addr, action) clibs.logger.info(f"Modbus 执行给 DI 地址 {addr} 赋值为 {action},可根据情况传递列表,实现一次性赋值多个") sleep(clibs.interval) def io_read_coils(self): # OK | 读 modbus 的 16 个 discrete inputs(DI) res = self.__c.read_coils(0, 16).bits clibs.logger.info(f"Modbus 执行读取所有 DI 的结果为 {res}") return res def io_read_discretes(self): # OK | 读 modbus 的 coil outputs(DO) res = self.__c.read_discrete_inputs(0, 16).bits clibs.logger.info(f"Modbus 执行读取所有 DO 的结果为 {res}") return res class HmiRequest(object): def __init__(self): super().__init__() self.__c = None self.__c_xs = None self.c_msg = [] self.c_msg_xs = [] self.__flag = 0 self.__response = "" self.__leftover = 0 self.__response_xs = "" self.__is_connected = False self.__silence = False self.__pkg_size = 0 self.__broke = 0 self.__half = 0 self.__half_length = 0 self.__index = 0 self.__reset_index = 0 self.__close_hmi = False self.__error_code = 0 self.__t_is_alive = Thread(target=self.__is_alive) self.__t_is_alive.daemon = False self.__t_is_alive.start() sleep(2) # 很重要,必须有,因为涉及到建联成功与否 def __is_alive(self): first_time = True while not self.__close_hmi: if not self.__is_connected: if not first_time: clibs.logger.info("重新连接中...") first_time = False self.__silence = True self.__sock_conn() if self.__is_connected: self.__t_heartbeat = Thread(target=self.__heartbeat) self.__t_heartbeat.daemon = True self.__t_heartbeat.start() self.__t_unpackage = Thread(target=self.__unpackage, args=(self.__c,)) self.__t_unpackage.daemon = True self.__t_unpackage.start() self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,)) self.__t_unpackage_xs.daemon = True self.__t_unpackage_xs.start() self.__silence = False first_time = True sleep(clibs.interval*10) def close(self): self.__close_hmi = True self.__is_connected = False self.__silence = True def __sock_conn(self): try: self.__c = socket(AF_INET, SOCK_STREAM) self.__c.connect((clibs.ip_addr, clibs.socket_port)) self.__c.setblocking(True) self.__c_xs = socket(AF_INET, SOCK_STREAM) self.__c_xs.connect((clibs.ip_addr, clibs.xService_port)) self.__c_xs.setblocking(True) # 尝试连续三次发送心跳包,确认建联成功 self.__error_code = 0 for i in range(3): self.execution("controller.heart") sleep(clibs.interval/5) if not self.__error_code: self.__is_connected = True clibs.logger.success("HMI connection success...") else: raise Exception() except Exception as Err: self.__sth_wrong(9, f"HMI connection failed...{Err}") def __sth_wrong(self, exit_code, err_desc=""): self.__is_connected = False self.__error_code = exit_code if not self.__silence: clibs.logger.error(f"[{exit_code}] 可能是 HMI 无法连接到 {clibs.ip_addr}:{clibs.socket_port}...") if err_desc != "": clibs.logger.error(f"[{exit_code}] {err_desc}") def __header_check(self, index, data): if index + 8 < len(data): _frame_size = int.from_bytes(data[index:index + 2], byteorder="big") _pkg_size = int.from_bytes(data[index + 2:index + 6], byteorder="big") _protocol = int.from_bytes(data[index + 6:index + 7], byteorder="big") _reserved = int.from_bytes(data[index + 7:index + 8], byteorder="big") if _reserved == 0 and _protocol == 2: return index + 8, _frame_size, _pkg_size else: print(data) self.__sth_wrong(6, "Header Check: 解包数据有误,需要确认!") else: self.__half_length = len(data) - index self.__half = data[index:] self.__broke = 100 index += clibs.MAX_FRAME_SIZE return index, 0, 0 def __heartbeat(self): while self.__is_connected: self.execution("controller.heart") sleep(clibs.heartbeat_interval) @staticmethod def __gen_id(command): _now = time() _id = f"{command}-{_now}" return _id def get_from_id(self, msg_id, flag=0): def find_response(log_data): with open(log_data, mode="r", encoding="utf-8") as f_log: for line in f_log: if line.split("|")[1].strip() != "DEBUG": continue if msg_id in line.strip(): error_code = int(eval(line.split("|")[-1].strip()).get("error_code", 0)) if not error_code: return eval(line.split("|")[-1].strip()) else: self.__sth_wrong(7, f"请求 {msg_id} 的返回错误码为 {error_code}") def find_response_xs(log_data): with open(log_data, mode="r", encoding="utf-8") as f_log: for line in reversed(f_log.readlines()): if line.split("|")[1].strip() != "DEBUG": continue if msg_id in line.strip(): return eval(line.split("|")[-1].strip()) if flag == 0: for _ in range(3): res = find_response(clibs.log_data_debug) if res is not None: return res["data"] sleep(clibs.interval*2) else: # 尝试在上一次分割的日志中查找,只做一次 res = find_response("".join([clibs.log_path, listdir(clibs.log_path)[0]])) if res is not None: return res["data"] elif flag == 1: for _ in range(3): res = find_response_xs(clibs.log_data_debug) if res is not None: return res sleep(clibs.interval*2) else: res = find_response_xs("".join([clibs.log_path, listdir(clibs.log_path)[0]])) if res is not None: return res self.__sth_wrong(11, f"无法找到请求 {msg_id} 的返回结果") def __msg_storage(self, response, flag=0): # response是解码后的字符串 clibs.logger.debug(f"{loads(response)}") messages = self.c_msg if flag == 0 else self.c_msg_xs if "move.monitor" in response: pass elif len(messages) < 10000: messages.insert(0, response) else: messages.insert(0, response) while len(messages) >= 10000: messages.pop() def __get_response(self, data): _frame_size = 0 # 流式获取单次请求的响应 if self.__broke == 100: _half_1 = self.__half _half_2 = data[:8 - self.__half_length] _full = _half_1 + _half_2 _frame_size = int.from_bytes(_full[:2], byteorder="big") _pkg_size = int.from_bytes(_full[2:6], byteorder="big") _protocol = int.from_bytes(_full[6:7], byteorder="big") _reserved = int.from_bytes(_full[7:8], byteorder="big") if _reserved != 0 or _protocol != 2: print(data) self.__sth_wrong(10, "in get_response: 解包数据有误,需要确认!") self.__pkg_size = _pkg_size self.__index = 8 - self.__half_length else: if self.__reset_index == 1: self.__index = 0 while self.__index < len(data): # flag 为 0,则说明是一次新的请求对应的一次新的相应,也就是需要首次解包 if self.__flag == 0: if self.__broke == 100: self.__broke = 0 else: self.__index, _frame_size, self.__pkg_size = self.__header_check(self.__index, data) if self.__index > clibs.MAX_FRAME_SIZE: break # 详见解包原理数据.txt,self.__pkg_size 永远是除了当前data之外剩余未处理的数据大小 if self.__pkg_size <= len(data) - self.__index: # 说明剩余部分的数据就在当前data内,没有被分割 self.__response = data[self.__index:self.__index + self.__pkg_size].decode() self.__msg_storage(flag=0, response=self.__response) self.__index += self.__pkg_size self.__flag = 0 self.__response = "" self.__leftover = 0 self.__pkg_size = 0 self.__reset_index = 0 if self.__index < len(data) else 1 elif self.__pkg_size > len(data) - self.__index: # 执行到这里说明该data是首包,且有有分包的情况发生了也就是该响应数据量稍微比较大 # 分散在了相邻的两个及以上的data中,需要flag=1的处理 self.__flag = 1 if self.__index + _frame_size - 6 <= len(data): self.__response = data[self.__index:self.__index + _frame_size - 6].decode() self.__index += (_frame_size - 6) self.__pkg_size -= (_frame_size - 6) # 详见解包原理数据.txt,self.__pkg_size self.__reset_index = 0 if self.__index + 2 < len(data): self.__leftover = int.from_bytes(data[self.__index:self.__index + 2], byteorder="big") self.__index += 2 self.__reset_index = 0 else: if self.__index + 2 == len(data): self.__broke = 1 self.__half = data[-2:] elif self.__index + 1 == len(data): self.__broke = 2 self.__half = data[-1:] elif self.__index == len(data): self.__broke = 3 self.__index += clibs.MAX_FRAME_SIZE self.__reset_index = 1 break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE elif self.__index + _frame_size - 6 > len(data): self.__response = data[self.__index:].decode() self.__pkg_size -= (len(data) - self.__index) # 详见解包原理数据.txt,self.__pkg_size self.__leftover = (_frame_size - 6 - (len(data) - self.__index)) self.__index += clibs.MAX_FRAME_SIZE self.__reset_index = 1 elif self.__flag == 1: # 继续处理之前为接收完的数据,处理完之后将flag重置为0 # !!!需要注意的是,包头/帧头也是有可能被分割开的!!!但是目前该程序未实现此种情况!!! if self.__broke == 1: self.__index = 0 self.__leftover = int.from_bytes(self.__half, byteorder="big") self.__broke = 0 elif self.__broke == 2: self.__leftover = int.from_bytes(self.__half + data[:1], byteorder="big") self.__index = 1 self.__broke = 0 if self.__broke == 3: self.__leftover = int.from_bytes(data[:2], byteorder="big") self.__index = 2 self.__broke = 0 while self.__pkg_size > 0: if self.__index + self.__leftover <= len(data): self.__response += data[self.__index:self.__index + self.__leftover].decode() self.__pkg_size -= self.__leftover if self.__pkg_size == 0: self.__msg_storage(flag=0, response=self.__response) self.__index += self.__leftover self.__flag = 0 self.__response = "" self.__leftover = 0 self.__pkg_size = 0 self.__reset_index = 0 if self.__index < len(data) else 1 break self.__index += self.__leftover if self.__index + 2 < len(data): self.__leftover = int.from_bytes(data[self.__index:self.__index + 2], byteorder="big") self.__index += 2 self.__reset_index = 0 else: if self.__index + 2 == len(data): self.__broke = 1 self.__half = data[-2:] elif self.__index + 1 == len(data): self.__broke = 2 self.__half = data[-1:] elif self.__index == len(data): self.__broke = 3 self.__index += clibs.MAX_FRAME_SIZE self.__reset_index = 1 break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE else: self.__response += data[self.__index:].decode() self.__leftover -= (len(data) - self.__index) self.__pkg_size -= (len(data) - self.__index) self.__index += clibs.MAX_FRAME_SIZE self.__reset_index = 1 break # 该data内数据已经处理完毕,需要跳出大循环,通过break和index def __get_response_xs(self, data): char, response = "", self.__response_xs for char in data.decode(): if char != "\r": response += char else: self.__msg_storage(flag=1, response=response) response = "" else: self.__response_xs = response if char != "\r" else "" @staticmethod def __package(cmd): _frame_head = (len(cmd) + 6).to_bytes(length=2, byteorder="big") _pkg_head = len(cmd).to_bytes(length=4, byteorder="big") _protocol = int(2).to_bytes(length=1, byteorder="big") _reserved = int(0).to_bytes(length=1, byteorder="big") return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode() @staticmethod def __package_xs(cmd): return f"{dumps(cmd, separators=(",", ":"))}\r".encode() def __unpackage(self, sock): def to_read(conn, mask): data = conn.recv(clibs.MAX_FRAME_SIZE) if data: # print(data) self.__get_response(data) else: print("closing", conn) sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while self.__is_connected: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: self.__sth_wrong(3, f"错误信息:{Err}") def __unpackage_xs(self, sock): def to_read(conn, mask): data = conn.recv(1024) # Should be ready if data: # print(data) self.__get_response_xs(data) else: print("closing", conn) sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while self.__is_connected: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: self.__sth_wrong(8, f"错误信息:{Err}") def execution(self, command, protocol_flag=0, **kwargs): def log_reqs(request): with open(clibs.log_data_reqs, mode="a", encoding="utf-8") as f_log: log_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time())) f_log.write(" ".join([log_time, dumps(request), "\n"])) if protocol_flag == 0: # for old protocols req = None try: with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json: req = load(f_json) except Exception as Err: self.__sth_wrong(5, f"暂不支持 {command} 功能,或确认该功能存在... {Err}") match command: case "state.set_tp_mode": req["data"]["tp_mode"] = kwargs["tp_mode"] case "overview.set_autoload": req["data"]["autoload_prj_path"] = kwargs["autoload_prj_path"] case "overview.reload": req["data"]["prj_path"] = kwargs["prj_path"] req["data"]["tasks"] = kwargs["tasks"] case "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop": req["data"]["tasks"] = kwargs["tasks"] case "rl_task.set_run_params": req["data"]["loop_mode"] = kwargs["loop_mode"] req["data"]["override"] = kwargs["override"] case "diagnosis.set_params": req["data"]["display_pdo_params"] = kwargs["display_pdo_params"] req["data"]["frequency"] = kwargs["frequency"] req["data"]["version"] = kwargs["version"] case "diagnosis.open": req["data"]["open"] = kwargs["open"] req["data"]["display_open"] = kwargs["display_open"] req["data"]["overrun"] = kwargs["overrun"] req["data"]["turn_area"] = kwargs["turn_area"] req["data"]["delay_motion"] = kwargs["delay_motion"] case "register.set_value": req["data"]["name"] = kwargs["name"] req["data"]["type"] = kwargs["type"] req["data"]["bias"] = kwargs["bias"] req["data"]["value"] = kwargs["value"] case "diagnosis.save": req["data"]["save"] = kwargs["save"] case "socket.set_params": req["data"]["enable"] = kwargs["enable"] req["data"]["ip"] = kwargs["ip"] req["data"]["name"] = kwargs["name"] req["data"]["port"] = kwargs["port"] req["data"]["suffix"] = kwargs["suffix"] req["data"]["type"] = kwargs["type"] req["data"]["reconnect_flag"] = kwargs["reconnect_flag"] req["data"]["disconnection_triggering_behavior"] = kwargs["disconnection_triggering_behavior"] req["data"]["disconnection_detection_time"] = kwargs["disconnection_detection_time"] case "fieldbus_device.set_params": req["data"]["device_name"] = kwargs["device_name"] req["data"]["enable"] = kwargs["enable"] case "soft_limit.set_params": req["data"]["enable"] = kwargs["enable"] req["data"]["upper"] = kwargs["upper"] req["data"]["lower"] = kwargs["lower"] req["data"]["reduced_upper"] = kwargs["reduced_upper"] req["data"]["reduced_lower"] = kwargs["reduced_lower"] case "move.set_quickturn_pos": req["data"]["enable_home"] = kwargs["enable_home"] req["data"]["enable_drag"] = kwargs["enable_drag"] req["data"]["enable_transport"] = kwargs["enable_transport"] req["data"]["end_posture"] = kwargs["end_posture"] case "move.quick_turn": req["data"]["name"] = kwargs["name"] case "move.stop": req["data"]["stoptype"] = kwargs["stoptype"] case "jog.start": req["data"]["index"] = kwargs["index"] req["data"]["direction"] = kwargs["direction"] req["data"]["is_ext"] = kwargs["is_ext"] case "jog.set_params": req["data"]["step"] = kwargs["step"] req["data"]["override"] = kwargs["override"] req["data"]["space"] = kwargs["space"] case "diagnosis.get_params": req["data"]["version"] = kwargs["version"] case "system_io.update_configuration": req["data"]["input_system_io"] = kwargs["input_system_io"] req["data"]["output_system_io"] = kwargs["output_system_io"] case "modbus.set_params": req["data"]["enable_slave"] = kwargs["enable_slave"] req["data"]["ip"] = kwargs["ip"] req["data"]["port"] = kwargs["port"] req["data"]["slave_id"] = kwargs["slave_id"] req["data"]["enable_master"] = kwargs["enable_master"] case "modbus.get_values": req["data"]["mode"] = kwargs["mode"] case "move.set_monitor_cfg": req["data"]["ref_coordinate"] = kwargs["ref_coordinate"] case "move.set_params": req["data"] = kwargs["data"] case "move.set_quickstop_distance": req["data"]["distance"] = kwargs["distance"] case "collision.set_params": req["data"] = kwargs["data"] case "collision.set_state": req["data"]["collision_state"] = kwargs["collision_state"] case "controller.set_params": req["data"]["time"] = kwargs["time"] case "drag.set_params": req["data"]["enable"] = kwargs["enable"] req["data"]["space"] = kwargs["space"] req["data"]["type"] = kwargs["type"] case _: pass req["id"] = self.__gen_id(command) cmd = dumps(req, separators=(",", ":")) try: log_reqs(req) self.__c.send(self.__package(cmd)) sleep(clibs.interval) except Exception as Err: self.__sth_wrong(4, f"{Err} 请求发送失败:{cmd}") return req["id"] elif protocol_flag == 1: # for xService req = None try: with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json: req = load(f_json) except Exception as Err: self.__sth_wrong(1, f"暂不支持 {command} 功能,或确认该功能存在... {Err}") match command: case "safety.safety_area.signal_enable": req["c"]["safety.safety_area.signal_enable"]["signal"] = kwargs["signal"] case "safety.safety_area.overall_enable": req["c"]["safety.safety_area.overall_enable"]["enable"] = kwargs["enable"] case "log_code.data.code_list": req["s"]["log_code.data"]["code_list"] = kwargs["code_list"] case "safety.safety_area.safety_area_enable": req["c"]["safety.safety_area.safety_area_enable"]["id"] = kwargs["id"] req["c"]["safety.safety_area.safety_area_enable"]["enable"] = kwargs["enable"] case _: pass try: log_reqs(req) self.__c_xs.send(self.__package_xs(req)) sleep(clibs.interval) except Exception as Err: self.__sth_wrong(2, f"{Err} 请求发送失败:{req}") return command # =================================== ↓↓↓ specific functions ↓↓↓ =================================== def switch_motor_state(self, state: str): """ 切换上电/下电的状态 :param state: on/off :return: None """ match state: case "on": self.execution("state.switch_motor_on") case "off": self.execution("state.switch_motor_off") case _: clibs.logger.error(f"switch_motor_state 参数错误 {state}: 非法参数,只接受 on/off") def switch_operation_mode(self, mode: str): """ 切换自动/手动操作模式 :param mode: auto/manual :return: None """ match mode: case "auto": self.execution("state.switch_auto") case "manual": self.execution("state.switch_manual") case _: clibs.logger.error("switch_operation_mode 参数错误 非法参数,只接受 auto/manual") def reload_project(self, prj_name: str, tasks: list): """ 重新加载指定工程 :param prj_name: 工程名,也即 zip 文件的名字 :param tasks: 要加载的任务列表 :return: None """ prj_path = f"{prj_name}/_build/{prj_name}.prj" self.execution("overview.reload", prj_path=prj_path, tasks=tasks) def set_project_auto_reload(self, prj_name: str): """ 将指定工程设置为开机自动加载,也即默认工程 :param prj_name: 工程名,也即 zip 文件的名字 :return: None """ autoload_prj_path = f"{prj_name}/_build/{prj_name}.prj" self.execution("overview.set_autoload", autoload_prj_path=autoload_prj_path) def pp_to_main(self, tasks: list): """ 将指定的任务列表的指针,指向 main 函数 :param tasks: 任务列表 :return: None """ self.execution("rl_task.pp_to_main", tasks=tasks) def program_start(self, tasks: list): """ 开始执行程序任务,必须是自动模式下执行 :param tasks: 任务列表 :return: None """ self.execution("rl_task.run", tasks=tasks) def program_stop(self, tasks: list): """ 停止执行程序任务 :param tasks: 人物列表 :return: None """ self.execution("rl_task.stop", tasks=tasks) def set_program_loop_speed(self, loop_mode: bool, override: float): """ :param loop_mode: True为循环模式,False为单次模式 :param override: HMI 左下方的速度滑块,取值范围 [0, 1] :return: None """ self.execution("rl_task.set_run_params", loop_mode=loop_mode, override=override) def clear_alarm(self): """ 清除伺服告警 :return: None """ self.execution("servo.clear_alarm") def reboot_robot(self): """ 重启控制器 :return: None """ self.execution("controller.reboot") clibs.logger.info(f"控制器重启中,重连预计需要等待 100s 左右...") ts = time() sleep(30) while True: sleep(5) te = time() if te - ts > 180: self.__silence = False self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确") break for _ in range(3): if not self.__is_connected: break sleep(2) else: clibs.logger.info("HMI 重新连接成功...") break def reload_io(self): """ 触发控制器重新加载 IO 设备列表 :return: None """ self.execution("io_device.load_cfg") @property def get_quickturn_pos(self): """ 获取机器人的home位姿、拖动位姿和发货位姿,轴关节角度,end_posture 取值如下: 0 法兰平面与地面平行 1 工具坐标系X轴与地面垂直,正向朝下 2 工具坐标系X轴与地面垂直,正向朝上 3 工具坐标系Y轴与地面垂直,正向朝下 4 工具坐标系Y轴与地面垂直,正向朝上 5 工具坐标系Z轴与地面垂直,正向朝下 6 工具坐标系Z轴与地面垂直,正向朝上 :return: as below { "enable_home": false, // 是否开启 home 点快速调整 "enable_drag": false, // 是否开启拖动位姿点快速调整 "enable_transport": false, // 是否开启发货位姿点快速调整 "joint_home": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // home 位姿的关节角度 "joint_drag": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 拖动位姿的关节角度 "joint_transport": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 发货位姿的关节角度 "end_posture":0, // 末端姿态的调整方式,取值 0-6 "home_error_range":[0.0,0.0,0.0,0.0,0.0,0.0,0.0] // home点误差范围 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_quickturn_pos") def set_quickturn_pos(self, enable_home: bool = False, enable_drag: bool = False, enable_transport: bool = False, end_posture: int = 0): """ 设置机器人的home位姿、拖动位姿、发货位姿,轴关节角度,Home点误差范围,详见上一个 get_quickturn_pos 功能实现 :param enable_home: 是否开启 home 点快速调整 :param enable_drag: 是否开启拖动位姿点快速调整 :param enable_transport:是否开启发货位姿点快速调整 :param end_posture: 末端姿态的调整方式,取值 0-6,详见 get_quickturn_pos 注释 :return: None """ self.execution("move.set_quickturn_pos", enable_home=enable_home, enable_drag=enable_drag, enable_transport=enable_transport, end_posture=end_posture) def move2quickturn(self, name: str): """ 运动到指定的快速调整位姿 :param name: 指定快速调整的名称,home/drag/transport :return: None """ self.execution("move.quick_turn", name=name) def stop_move(self, stoptype=0): """ 停止运动 TS_READY | TS_JOG | TS_LOADIDENTIFY | TS_DYNAMICIDENTIFY | TS_DRAG | TS_PROGRAM | TS_DEMO | TS_RCI | TS_DEBUG | TS_FRICTIONIDENTIFY :param stoptype: 对应控制器的任务空间类型TaskSpace的枚举值,0-7 :return: None """ self.execution("move.stop", stoptype=stoptype) @property def get_jog_params(self): """ 获取JOG的参数 世界坐标系 WORLD_COORDINATE 0 法兰盘坐标系 FLANGE_COORDINATE 1 基坐标系 BASE_COORDINATE 2 工具坐标系 TOOL_COORDINATE 3 工件坐标系 FRAME_COORDINATE 4 关节空间 JOINT_SPACE 5 :return: { "step": 1000 [1000-连续] [10/1/0.1/0.001-点动] "override": 0.2 速度比率 "space": 5 JOG的空间 } """ return self.__get_data(currentframe().f_code.co_name, "jog.get_params") def set_jog_params(self, step, override, space): """ 设置JOG的参数,包含步长,空间,速度倍率 :param step: [1000-连续] [10/1/0.1/0.001-点动] :param override: 速度比率 :param space: JOG的空间 :return: None """ self.execution("jog.set_params", step=step, override=override, space=space) def start_jog(self, index: int, direction: bool = False, is_ext: bool = False): """ 开始 JOG 运动 :param index: 0-6,若选轴空间,则 0-6 对应 1-7 轴,若选笛卡尔空间,则 0-6 对应 xyzabc elb :param direction: True 正方向,False 反方向 :param is_ext: 是否是外部轴 jog :return: None """ self.execution("jog.start", index=index, direction=direction, is_ext=is_ext) @property def get_socket_params(self): """ 获取socket参数 :return: { "auto_connect": true, "disconnection_detection_time": 10, "disconnection_triggering_behavior": 0, "enable": true, "ip": "", "name": "name", "port": "8080", "reconnect_flag": true, "suffix": "\r", "type": 1 } """ return self.__get_data(currentframe().f_code.co_name, "socket.get_params") def set_socket_params(self, enable: bool, ip: str, name: str, port: str, suffix: str, type: int, reconnect_flag: bool, auto_connect: bool, disconnection_triggering_behavior: int, disconnection_detection_time: int): """ 设置 socket 参数 :param enable: True 开启或者 False 关闭 :param ip: 仅限于客户端,用于指定服务端 IP;当作为服务端时,该参数设置为空字符串,否则会报错!!! :param name: 连接名字 :param port: 连接端口 :param suffix: 指定发送分隔符 :param type: 0 client | 1 server :param reconnect_flag: True 自动重连,False 不自动重连 :param auto_connect: True 开机启动,False 不开机启动 :param disconnection_triggering_behavior: 断开连接触发行为 0无动作 1暂停程序 2暂停并下电 :param disconnection_detection_time: 链接断开检测周期(s) :return: None """ self.execution("socket.set_params", enable=enable, ip=ip, name=name, port=port, suffix=suffix, type=type, reconnect_flag=reconnect_flag, auto_connect=auto_connect, disconnection_triggering_behavior=disconnection_triggering_behavior, disconnection_detection_time=disconnection_detection_time) @property def get_diagnosis_params(self, version="1.4.1"): """ 获取诊断功能开启状态,以及相应其他信息 :param version: 指定诊断工具版本 :return: { "state": true, "display_open": false, "display_pdo_params": [ { "name": "count", "channel": 0 }, { "name": "servo_error_code", "channel": 5 }, { "name": "rci_data05", "channel": 11 } ], "frequency": 50, "pdo_params": [ { "name": "count", "array": 1 }, { "name": "current_A", "array": 7 }, { "name": "keypad_status", "array": 1 }, { "name": "rci_data05", "array": 12 } ], "overrun": true, "turn_area": true } """ return self.__get_data(currentframe().f_code.co_name, "diagnosis.get_params", version=version) def set_diagnosis_params(self, display_pdo_params: list, frequency: int = 50, version: str = "1.4.1"): """ 设置诊断功能显示参数 :param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线 :param frequency: 采样频率,默认 50ms :param version: xDiagnose的版本号 :return: None """ self.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=frequency, version=version) def open_diagnosis(self, open: bool, display_open: bool, overrun: bool, turn_area: bool, delay_motion: bool): """ 打开或者关闭诊断曲线,并定义其他功能的开关(调试相关功能,比如是否开启线程超时监控和上报,转弯区以及运动延迟等) :param open: 诊断功能 :param display_open: 诊断显示功能 :param overrun: 实时线程超时监控上报 :param turn_area: 转弯区上报 :param delay_motion: 延迟运动 :return: None """ self.execution("diagnosis.open", open=open, display_open=display_open, overrun=overrun, turn_area=turn_area, delay_motion=delay_motion) def save_diagnosis(self, save: bool): """ 保存诊断数据 :param save: 保存数据开关,对应诊断曲线内容的 switch 开关 :return: None """ self.execution("diagnosis.save", save=save) @property def qurry_system_io_configuration(self): """ 系统IO配置的查询协议,trigger 参数取值参照如下: FLANKS 0, //边缘触发 POS_FLANK 1, //上升沿 NEG_FLANK 2, //下降沿 HIGH_LEVEL 3, //高电平 LOW_LEVEL 4 //低电平 :return: { "input_system_io": { "motor_on": { "signal":"DI0_0", "trigger":1 }, "motor_off": { "signal":"DI0_0", "trigger":2 } }, "output_system_io": { "sta_motor_on": { "signal":"DO0_0" }, "sta_robot_running": { "signal":"DO0_1" } } } """ return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration") @property def qurry_system_io_event_configuration(self): """ 查询当前系统支持的系统IO事件列表,包括事件key、名称、支持的触发方式等配置 :return: { "input_system_event": [ { "key": "ctrl_motor_on", "name": "上电", "trigger_types": [ 1, 2 ] }, { "key": "ctrl_motor_off", "name": "下电", "trigger_types": [ 1, 2 ] } ], "output_system_event": [ { "key": "sta_motor_on", "name": "上下电状态" }, { "key": "sta_program", "name": "运行状态" } ], "input_mutex_event": [ { "key0": "ctrl_motor_on", "key1": "ctrl_motor_off" }, { "key0": "ctrl_switch_auto", "key1": "ctrl_switch_manu" } ] } """ return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration") def update_system_io_configuration(self, i_funcs, o_funcs, i_signals, o_signals, trig_types): """ 配置系统 IO :param i_funcs: 输入,只写功能码列表 :param o_funcs: 输出,只读功能码列表 :param i_signals: DI 信号列表 :param o_signals: DO 信号列表 :param trig_types: 触发条件列表,可参考 qurry_system_io_configuration 中的触发条件 :return: None """ input_system_io, output_system_io = {}, {} for i_func in i_funcs: _index = i_funcs.index(i_func) input_system_io[i_func] = {"signal": i_signals[_index], "trigger": trig_types[_index]} for o_func in o_funcs: _index = o_funcs.index(o_func) output_system_io[o_func] = {"signal": o_signals[_index]} self.execution("system_io.update_configuration", input_system_io=input_system_io, output_system_io=output_system_io) @property def get_fieldbus_device_params(self): """ 获取设备列表的开关状态 :return: { "device_list": [ { "device_name": "modbus_1", "enable": true }, { "device_name": "modbus_2", "enable": false } ] } """ return self.__get_data(currentframe().f_code.co_name, "fieldbus_device.get_params") def set_fieldbus_device_params(self, device_name: str, enable: bool): """ 定义开关设备的协议,一次只能打开一个设备 :param device_name: 设备列表中的名称 :param enable: 是否开启 :return: None """ self.execution("fieldbus_device.set_params", device_name=device_name, enable=enable) def reload_fieldbus(self): """ 触发控制器重新加载总线设备 :return: None """ self.execution("fieldbus_device.load_cfg") @property def get_modbus_params(self): """ 获取modbus参数 :return: { "enable_slave":true, "ip":"192.168.0.160", "port":502, "slave_id":0, "enable_master":false } """ return self.__get_data(currentframe().f_code.co_name, "modbus.get_params") def set_modbus_params(self, enable_slave: bool, ip: str, port: int, slave_id: int, enable_master): """ 设置modbus参数 :param enable_slave: Modbus从站是否自动开启 :param ip: ip 地址 :param port: 端口 :param slave_id: 从站 ID :param enable_master: Modbus主站是否自动开启 :return: """ self.execution("modbus.set_params", enable_slave=enable_slave, ip=ip, port=port, slave_id=slave_id, enable_master=enable_master) def reload_registers(self): """ 触发控制器重新加载寄存器列表 :return: None """ self.execution("modbus.load_cfg") def get_modbus_values(self, mode): """ 用于获取 modbus 寄存器变量值的更新信息,展示在状态监控界面 :param mode: all/change :return: """ return self.__get_data(currentframe().f_code.co_name, "modbus.get_values", mode=mode) @property def get_soft_limit_params(self): """ 获取软限位参数 :return: { "enable":true "upper":[0,0,0,0,0,0,0], "lower":[0,0,0,0,0,0,0], "reduced_upper":[0,0,0,0,0,0,0], "reduced_lower":[0,0,0,0,0,0,0] } """ return self.__get_data(currentframe().f_code.co_name, "soft_limit.get_params") def set_soft_limit_params(self, enable: bool, upper: list, lower: list, reduced_upper: list, reduced_lower: list): """ 设定软限位参数 :param enable: 是否启用软限位 :param upper: 软限位上限 :param lower: 软限位下限 :param reduced_upper: 缩减模式软限位上限 :param reduced_lower: 缩减模式软限位下限 :return: None """ self.execution("soft_limit.set_params", enable=enable, upper=upper, lower=lower, reduced_upper=reduced_upper, reduced_lower=reduced_lower) @property def get_device_params(self): """ 获取设备信息 :return: """ return self.__get_data(currentframe().f_code.co_name, "device.get_params") @property def get_cart_pos(self): """ 获取机器人的当前位姿:包括轴关节角度,笛卡尔关节角度,四元数,欧拉角(臂角) :return: { "joint":[0.0,0.0,0.0,0.0,0.0,0.0], "position":[0.0,0.0,0.0,0.0,0.0,0.0], "euler":[0.0,0.0,0.0], "quaternion"[0.0,0.0,0.0,0.0], "elb":0.0, // 可缺省 "ext_joint":[0.0,0.0,0.0,0.0,0.0] } """ return self.__get_data(currentframe().f_code.co_name, "move.get_pos") @property def get_joint_pos(self): """ 获取机器人的当前关节角度:包括内部轴和外部轴 :return: { "inner_pos": [0,0,0,0,0,0,0], "extern_pos": [0,0,0,0,0,0,0] } """ return self.__get_data(currentframe().f_code.co_name, "move.get_joint_pos") @property def get_monitor_cfg(self): """ 获取机器人的监控配置参数,RefCoordinateType 类型数据,表示当前控制器位置监测的相对坐标系 基坐标系 REF_COORDINATE_BASE 0 世界坐标系 REF_COORDINATE_WORLD 1 工件坐标系 REF_COORDINATE_WOBJ 2 :return: { "ref_coordinate": 0 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_monitor_cfg") def set_monitor_cfg(self, ref_coordinate): """ 设置机器人的监控配置参数 :ref_coordinate: RefCoordinateType类型数据,用来设置当前控制器位置监测的相对坐标系 :return: None """ self.execution("move.set_monitor_cfg", ref_coordinate=ref_coordinate) @property def get_move_params(self): """ 获取机器人的运动参数:包括减速比、耦合比、最大速度、加速度、加加速度、acc ramp time、规划步长等信息 :return: { "MOTION": { "ACC_RAMPTIME_JOG": 0.5, "ACC_RAMPTIME_STOP": 0.5, "DEFAULT_ACC_PARAMS": [1.0, 0.5], "JERK_LIMIT_CART": 0, "JERK_LIMIT_JOINT": 0, "JERK_LIMIT_ROT": 0, "JOINT_MAX_ACC": [500, 500, 1000, 1000, 1000], "JOINT_MAX_JERK": [2000, 2000, 2000, 4000, 4000, 4000], "JOINT_MAX_SPEED": [120.0, 120.0, 180.0, 180.0, 180.0, 180.0], "MAX_ACC_PARAMS": [1.0, 1], "MIN_ACC_PARAMS": [0.3, 0.05], "TCP_MAX_ACC": 5000, "TCP_MAX_JERK": 10000, "TCP_MAX_SPEED": 1000, "TCP_ROTATE_MAX_ACC": 1800, "TCP_ROTATE_MAX_JERK": 3600, "TCP_ROTATE_MAX_SPEED": 180, "VEL_SMOOTH_FACTOR": 1.0, "VEL_SMOOTH_FACTOR_RANGE": [1.0, 10.0] } } """ return self.__get_data(currentframe().f_code.co_name, "move.get_params") def set_move_params(self, **kwargs): """ 设置机器人的运动参数:轴最大速度、轴最大加加速度、速度、加加速度、加速度、加加速度、acc ramp time、规划步长等信息 :return: """ res = self.get_move_params keys = res.keys() for _ in keys: if _ in kwargs.keys(): res[_] = kwargs[_] self.execution("move.set_params", data=res) @property def get_quick_stop_distance(self): """ 获取机器人 search 指令最大停止距离 :return: { "distance":2.0 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_quickstop_distance") def set_quick_stop_distance(self, distance): """ 设置机器人 search 指令最大停止距离 :param distance: 停止距离,单位 mm :return: None """ self.execution("move.set_quickstop_distance", distance=distance) @property def get_collision_params(self): """ 获取碰撞检测相关参数 :return: { "action": 1, // 触发行为:1-安全停止;2-触发暂停;3-柔顺停止 "coeff": [100, 100, 100, 100, 100, 100], // 0-整机灵敏度百分比,1-单轴灵敏度百分比,2-单轴和整机灵敏度百分比 "coeff_level": 0, // 灵敏度等级:0-低,1-中,2-高 "compliance": 0, // 柔顺功能比例系数,[0-1] "enable": true, // 功能使能开关 "fallback_distance": 3, // 回退距离 "mode": 0, // 力传感器系数,0 整机 1 单轴 "percent": 100, // 0-200,整机灵敏度百分比 "percent_axis": [100, 100, 100, 100, 100, 100], // 0-200,单轴灵敏度百分比 "reduced_percent": 100, // 0-200,整机缩减模式灵敏度百分比 "reduced_percent_axis": [100, 100, 100, 100, 100, 100] // 0-200,单轴缩减模式灵敏度百分比 } """ return self.__get_data(currentframe().f_code.co_name, "collision.get_params") def set_collision_params(self, enable, mode, action, percent, **kwargs): """ 设置碰撞检测相关参数 :param enable: 功能使能开关 :param mode: 力传感器系数,0 整机 1 单轴 :param action: 触发行为:1-安全停止;2-触发暂停;3-柔顺停止 :param percent: 0-200,整机灵敏度百分比 :return: """ res = self.get_collision_params keys = res.keys() kwargs.update({"enable": enable, "mode": mode, "action": action, "percent": percent}) for _ in keys: if _ in kwargs.keys(): res[_] = kwargs[_] self.execution("collision.set_params", data=res) def set_collision_state(self, collision_state: bool): """ 开启或者关闭碰撞检测,测试该函数功能无效!!! :param collision_state: 碰撞检测的开关状态 :return: None """ self.execution("collision.set_state", collision_state=collision_state) @property def get_robot_state(self): """ { "rc_state":"normal", # "fatal" 、"error"、"block"、"normal" "engine":"on", # "fatal" 、"error"、"GStop"、"EStop"、"on"、"off" "servo_mode":"position", # "torque"、"position" "operate": "auto", # "auto"、"manual" "task_space": "program", # "jog"、"drag"、"ready"、"load_identify"、"demo"、"rci"、"dynamic_identify"、"program"、"debug" "robot_action": "idle", # "idle"、"busy" "safety_mode": "collision" # "normal"、"collision"、"collaboration" } """ return self.__get_data(currentframe().f_code.co_name, "state.get_state") def set_controller_params(self, robot_time: str): """ 设置控制器系统时间 :param robot_time: 系统时间,"2020-02-28 15:28:30" :return: None """ self.execution("controller.set_params", time=robot_time) @property def get_robot_params(self): """ "alias": "", "auth_state": "controller_type": "XBC_XMATE", "controller_types": ["XBC_3", "XBC_5", "XBC_XMATE"], "disk_serial_number": "2338020401535", "mac_addr": "34:df:20:03:1b:45", "model":, "nic_list": ["enp1s0", "enp2s0"], "occupied_addr": "192.168.2.123:49269", "period": 0.002, "period_types": [0.001, 0.002, 0.003, 0.004], "robot_template": 10, "robot_type": "XMC12-R1300-W7G3B1C", "robot_types": ["XMC12-R1300-B7S3B0C", "XMC12-R1300-W7G3B1C", "XMC17_5-R1900-W7G3B1C", "XMC20-R1650-B7G3Z0C"], "security_type": "ROKAE_RSC", "security_types": ["ROKAE_MINI", "ROKAE_RSC"], "time": "2024-09-13 12:36:38", "version": "2.3.0.4" """ return self.__get_data(currentframe().f_code.co_name, "controller.get_params") def switch_tp_mode(self, mode: str): """ 切换示教器模式 :param mode: with/without :return: None """ match mode: case "with": self.execution("state.set_tp_mode", tp_mode="with") case "without": self.execution("state.set_tp_mode", tp_mode="without") case _: clibs.logger.error("switch_tp_mode 参数错误 非法参数,只接受 with/without") @property def get_tp_mode(self): """ 获取示教器连接状态 :return: { "tp_mode":"with" } """ return self.__get_data(currentframe().f_code.co_name, "state.get_tp_mode") @property def get_drag_params(self): """ 获取拖动模式参数 :return: { "enable": true, "space": 0, "type": 0 } """ return self.__get_data(currentframe().f_code.co_name, "drag.get_params") def set_drag_params(self, enable: bool, space: int, type: int): """ 设置拖动模式开关以及参数 :param enable: 是否启用拖动 :param space: 拖动空间 - 0 代表关节 1 代表笛卡尔 :param type: 拖动类型 - 0 只平移 1 只旋转 2 自由拖动 :return: None """ self.execution("drag.set_params", enable=enable, space=space, type=type) def set_safety_area_signal(self, signal: bool): """ 设置安全区域信号控制使能开关 :param signal: True 打开 False 关闭 :return: None """ self.execution("safety.safety_area.signal_enable", protocol_flag=1, signal=signal) def set_safety_area_overall(self, enable: bool): """ 设置安全区域整体控制使能开关 :param enable: True 打开 False 关闭 :return: None """ self.execution("safety.safety_area.overall_enable", protocol_flag=1, enable=enable) @property def get_safety_area_params(self): """ 获取安全区所有的配置信息 :return: "g": { "safety_area_data": { "overall_enable": true, "safety_area_setting": [ { "box": { "Lx": 100.0, "Ly": 100.0, "Lz": 100.0, "direction": false, "ori": { "euler": { "a": 179.9963851353547, "b": -0.006653792532429416, "c": 179.9934560302729 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 } }, "enable": false, "id": 0, "name": "region1", "plane": { "direction": true, "point": { "x": 0.0, "y": 0.0, "z": 0.0 }, "vector": { "x": 0.0, "y": 0.0, "z": 0.0 } }, "shape": 0, "shared_bind_di": "", "shared_bind_do": "", "sphere": { "ori": { "euler": { "a": 0.0, "b": 0.0, "c": 0.0 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 }, "radius": 0.0 }, "state": true, "trigger": 0, "type": 0, "vertebral": { "high": 0.0, "ori": { "euler": { "a": 0.0, "b": 0.0, "c": 0.0 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 }, "radius": 0.0 } }, ... // 剩余 9 个安全区域的配置信息 ], "signal_enable": true } } } """ return self.__get_data(currentframe().f_code.co_name, "safety_area_data", flag=1) def set_safety_area_enable(self, id: int, enable: bool): """ 设置每个安全区域的开关 :param id: 安全区域开关,0-9 :param enable: True 打开,False 关闭 :return: None """ self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable) def set_safety_area_param(self, id: int, enable: bool, **kwargs): # 不生效,待确认 """ 设定单独安全区的参数 :param id: 安全区 id :param enable: 是否开启 :param kwargs: 其他参数,参考 get_safety_area_params 的返回值形式 :return: None """ res = self.get_safety_area_params["g"]["safety_area_data"]["safety_area_setting"][id] keys = res.keys() kwargs.update({"id": id, "enable": enable}) for _ in keys: if _ in kwargs.keys(): res[_] = kwargs[_] with open(f"{clibs.PREFIX}/json/safety.safety_area.set_param.json", mode="w", encoding="utf-8") as f: dump({"c": {"safety.safety_area.set_param": res}}, f, indent=4) self.execution("safety.safety_area.set_param", protocol_flag=1) self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable) @property def get_filtered_error_code(self): """ 获取已设定的错误码过滤列表 :return: { "g": { "log_code.data": { "code_list": [ { "id": 100, "title": "DEMO\\u610f\\u5916\\u505c\\u6b62" }, { "id": 10000, "title": "HMI\\u8bf7\\u6c42\\u5305\\u89e3\\u6790\\u9519\\u8bef" }, { "id": 10002, "title": "\\u5feb\\u901f\\u8c03\\u6574\\u542f\\u52a8\\u5931\\u8d25" } ], "version": 1 } } } """ return self.__get_data(currentframe().f_code.co_name, "log_code.data", flag=1) def set_filtered_error_code(self, action: str, code_list: list): origin_code_list = self.get_filtered_error_code["g"]["log_code.data"]["code_list"] # [{'id': 10000, 'title': 'HMI请求包解析错误'}, {'id': 10002, 'title': '快速调整启动失败'}] if action == "clear": code_list = [] elif action == "add": for error_code in code_list: for item in origin_code_list: if error_code == item["id"]: break else: origin_code_list.append({"id": error_code, "title": ""}) code_list = origin_code_list elif action == "remove": for error_code in code_list: for item in origin_code_list: if error_code == item["id"]: origin_code_list.remove(item) code_list = origin_code_list self.execution("log_code.data.code_list", protocol_flag=1, code_list=code_list) @staticmethod def __cannot_find_response(func_name, req_id): # 无法获取响应信息时,记录错误信息,并退出 clibs.logger.error(f"[{func_name}] 无法找到 {req_id} 的响应信息,执行失败,退出...") exit() def __get_data(self, upper_func, command, flag=0, **kwargs): r_id = self.execution(command, protocol_flag=flag, **kwargs) res = self.get_from_id(r_id, flag=flag) if res is not None: return res else: self.__cannot_find_response(upper_func, r_id) # =================================== ↑↑↑ specific functions ↑↑↑ =================================== class ExternalCommunication(object): def __init__(self): self.__c = None self.suffix = "\r" self.socket_client() def socket_client(self): self.__c = socket(AF_INET, SOCK_STREAM) try: self.__c.connect((clibs.ip_addr, clibs.external_port)) clibs.logger.success(f"外部通信连接成功...") return self.__c except Exception as Err: clibs.logger.error(f"外部通信连接失败... {Err}") def s_string(self, directive): order = "".join([directive, self.suffix]) self.__c.send(order.encode()) sleep(clibs.interval) def r_string(self): result, char = "", "" while char != self.suffix: char = self.__c.recv(1).decode(encoding="unicode_escape") result = "".join([result, char]) sleep(clibs.interval) return result # =================================== ↓↓↓ specific functions ↓↓↓ =================================== def motor_on(self): return self.__exec_cmd("motor_on", "电机上电") def motor_off(self): return self.__exec_cmd("motor_off", "电机下电") def pp_to_main(self): return self.__exec_cmd("pp_to_main", "程序指针到") def program_start(self): return self.__exec_cmd("start", "程序启动") def program_stop(self): return self.__exec_cmd("stop", "程序停止") def clear_alarm(self): return self.__exec_cmd("clear_alarm", "清除伺服报警") def switch_operation_auto(self): return self.__exec_cmd("switch_mode:auto", "切换到自动模式") def switch_operation_manual(self): return self.__exec_cmd("switch_mode:manual", "切换到手动模式") def open_drag_mode(self): return self.__exec_cmd("open_drag", "打开拖动") def close_drag_mode(self): return self.__exec_cmd("close_drag", "关闭拖动") def get_program_list(self): return self.__exec_cmd("list_prog", "获取工程列表") def get_current_program(self): return self.__exec_cmd("current_prog", "当前工程") def load_program(self, program_name): return self.__exec_cmd(f"load_prog:{program_name}", "加载工程") def estop_reset(self): return self.__exec_cmd("estop_reset", "急停复位") def estopreset_and_clearalarm(self): return self.__exec_cmd("estopreset_and_clearalarm", "急停复位并清除报警") def motoron_pp2main_start(self): return self.__exec_cmd("motoron_pptomain_start", "依次执行上电,程序指针到main,启动程序") def motoron_start(self): return self.__exec_cmd("motoron_start", "依次执行上电,启动程序") def pause_motoroff(self): return self.__exec_cmd("pause_motoroff", "暂停程序并下电") def set_program_speed(self, speed: int): return self.__exec_cmd(f"set_program_speed:{speed}", "设置程序运行速率") def set_soft_estop(self, enable: str): return self.__exec_cmd(f"set_soft_estop:{enable}", "触发(true)/解除(false)机器人软急停") def switch_auto_motoron(self): return self.__exec_cmd("switch_auto_motoron", "切换自动模式并上电") def open_safe_region(self, number: int): return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域") def close_safe_region(self, number: int): return self.__exec_cmd(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域") def open_reduced_mode(self): return self.__exec_cmd("open_reduced_mode", "开启缩减模式") def close_reduced_mode(self): return self.__exec_cmd("close_reduced_mode", "关闭缩减模式") def setdo_value(self, do_name, do_value): return self.__exec_cmd(f"setdo:{do_name}, {do_value}", f"设置 {do_name} 的值为 {do_value}") def modify_system_time(self, robot_time): return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time}") # -------------------------------------------------------------------------------------------------- @property def motor_on_state(self): return self.__exec_cmd("motor_on_state", "电机上电状态") @property def robot_running_state(self): return self.__exec_cmd("robot_running_state", "程序运行状态") @property def estop_state(self): return self.__exec_cmd("estop_state", "急停状态") @property def operation_mode(self): return self.__exec_cmd("operating_mode", "工作模式") @property def home_state(self): return self.__exec_cmd("home_state", "HOME 输出状态") @property def fault_state(self): return self.__exec_cmd("fault_state", "故障状态") @property def collision_state(self): return self.__exec_cmd("collision_state", "碰撞检测状态") @property def task_state(self): return self.__exec_cmd("task_state", "获取机器人运行任务状态") @property def get_cart_pos(self): # cart_pos/cart_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("cart_pos", "获取笛卡尔位置") @property def get_joint_pos(self): # jnt_pos/jnt_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_pos", "获取轴位置") @property def get_axis_vel(self): # jnt_vel/jnt_vel_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_vel", "获取轴速度") @property def get_axis_trq(self): # jnt_trq/jnt_trq_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_trq", "获取轴力矩") @property def reduced_mode_state(self): return self.__exec_cmd("reduced_mode_state", "获取缩减模式状态") def get_io_state(self, io_list: str): # DO0_0,DI1_3,DO2_5,不能有空格 return self.__exec_cmd(f"io_state:{io_list}", "获取 IO 状态值") @property def alarm_state(self): return self.__exec_cmd("alarm_state", "获取报警状态") @property def collision_alarm_state(self): return self.__exec_cmd("collision_alarm_state", "获取碰撞检测报警状态") @property def collision_open_state(self): return self.__exec_cmd("collision_open_state", "获取碰撞检测开启状态") @property def controller_is_running(self): return self.__exec_cmd("controller_is_running", "判断控制器是否开机") @property def encoder_low_battery_state(self): return self.__exec_cmd("encoder_low_battery_state", "编码器低电压报警状态") @property def robot_error_code(self): return self.__exec_cmd("robot_error_code", "获取机器人错误码") @property def get_rl_pause_state(self): return self.__exec_cmd("program_full", "获取 RL 的暂停状态") @property def program_reset_state(self): return self.__exec_cmd("program_reset_state", "获取程序复位状态") @property def program_speed(self): return self.__exec_cmd("program_speed", "获取程序运行速度") @property def robot_is_busy(self): return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态") @property def robot_is_moving(self): return self.__exec_cmd("robot_is_moving", "获取程序运行状态") @property def safe_door_state(self): return self.__exec_cmd("safe_door_state", "获取安全门状态") @property def soft_estop_state(self): return self.__exec_cmd("soft_estop_state", "获取软急停状态") @property def get_cart_vel(self): return self.__exec_cmd("cart_vel", "获取笛卡尔速度") @property def get_tcp_pos(self): return self.__exec_cmd("tcp_pos", "获取 TCP 位姿") @property def get_tcp_vel(self): return self.__exec_cmd("tcp_vel", "获取 TCP 速度") @property def get_tcp_vel_mag(self): return self.__exec_cmd("tcp_vel_mag", "获取 TCP 合成线速度") @property def ext_estop_state(self): return self.__exec_cmd("ext_estop_state", "获取外部轴急停状态") @property def hand_estop_state(self): return self.__exec_cmd("hand_estop_state", "获取手持急停状态") @property def collaboration_state(self): # TBD return self.__exec_cmd("collaboration_state", "获取协作模式状态") def __exec_cmd(self, directive, description): self.s_string(directive) result = self.r_string().strip() clibs.logger.info(f"外部通信:执行{description} {directive} 指令,返回值为 {result}") return result class PreDos(object): def __init__(self): self.__ssh = None self.__sftp = None def __ssh2server(self): try: self.__ssh = SSHClient() self.__ssh.set_missing_host_key_policy(AutoAddPolicy()) self.__ssh.connect(clibs.ip_addr, clibs.ssh_port, username=clibs.username, password=clibs.password) self.__sftp = self.__ssh.open_sftp() except Exception as Err: clibs.logger.error(f"SSH 无法连接到 {clibs.ip_addr}:{clibs.ssh_port},需检查网络连通性或者登录信息是否正确 {Err}") exit(110) def push_prj_to_server(self, prj_file): # prj_file:本地工程完整路径 self.__ssh2server() prj_name = prj_file.split("\\")[-1].split("/")[-1].split(".")[0] self.__sftp.put(prj_file, f"/tmp/{prj_name}.zip") cmd = f"cd /tmp; mkdir {prj_name}; unzip -d {prj_name} -q {prj_name}.zip; rm -rf /tmp/{prj_name}.zip; " cmd += f"sudo rm -rf /home/luoshi/bin/controller/projects/{prj_name}; " cmd += f"sudo mv /tmp/{prj_name}/ /home/luoshi/bin/controller/projects/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close() def pull_prj_from_server(self, prj_name, local_prj_path): # NG | 可以拉取文件,但是导入之后,有问题 # prj_name:要拉取的服务端工程名 # local_prj_path:本地工程文件的完整路径 self.__ssh2server() cmd = f"cd /tmp/; sudo rm -rf {prj_name}*; sudo cp -rf /home/luoshi/bin/controller/projects/{prj_name} .; " cmd += f"sudo zip -q -r {prj_name}.zip {prj_name}/; sudo rm -rf {prj_name}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") print(stdout.read().decode()) # 需要read一下才能正常执行 print(stderr.read().decode()) self.__sftp.get(f"/tmp/{prj_name}.zip", local_prj_path) cmd = f"sudo rm -rf /tmp/{prj_name}.zip" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") print(stdout.read().decode()) # 需要read一下才能正常执行 print(stderr.read().decode()) self.__ssh.close() def push_file_to_server(self, local_file, server_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() filename = local_file.split("\\")[-1].split("/")[-1] self.__sftp.put(local_file, f"/tmp/{filename}") cmd = f"sudo mv /tmp/{filename} {server_file}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close() def pull_file_from_server(self, server_file, local_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() cmd = f"sudo cp {server_file} /tmp/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() filename = server_file.split("/")[-1] self.__sftp.get(f"/tmp/{filename}", f"{local_file}") cmd = f"sudo rm -rf /tmp/{filename}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close()