import json import socket from inspect import currentframe import threading from paramiko import SSHClient, AutoAddPolicy from pymodbus.client.tcp import ModbusTcpClient import selectors import time import os.path from ctypes import * import hashlib import struct from datetime import datetime from PySide6.QtCore import Signal, QThread from codes.common import clibs class ModbusRequest(QThread): def __init__(self, ip, port, /): super().__init__() self.ip = ip self.port = port self.c = None def net_conn(self): clibs.logger("INFO", "openapi", f"Modbus 正在连接中,需要配置设备,这可能需要一点时间......", "blue") RobotInit.modbus_init() self.c = ModbusTcpClient(host=self.ip, port=self.port) if self.c.connect(): clibs.logger("INFO", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) success!", "green") else: clibs.logger("ERROR", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) failed!", "red") def close(self): self.c.close() clibs.logger("INFO", "openapi", f"modbus: 关闭 Modbus 连接成功", "green") def __reg_high_pulse(self, addr: int) -> None: self.c.write_register(addr, 0) time.sleep(clibs.INTERVAL) self.c.write_register(addr, 1) time.sleep(clibs.INTERVAL+1) self.c.write_register(addr, 0) def r_clear_alarm(self): # OK self.__reg_high_pulse(40000) clibs.logger("DEBUG", "openapi", "modbus: 40000-010 执行清除告警信息") def r_reset_estop(self): # OK self.__reg_high_pulse(40001) clibs.logger("DEBUG", "openapi", "modbus: 40001-010 执行复位急停状态(非软急停)") def r_reset_estop_clear_alarm(self): # OK self.__reg_high_pulse(40002) clibs.logger("DEBUG", "openapi", "modbus: 40002-010 执行复位急停状态(非软急停),并清除告警信息") def r_motor_off(self): # OK self.__reg_high_pulse(40003) clibs.logger("DEBUG", "openapi", "modbus: 40003-010 执行机器人下电") def r_motor_on(self): # OK self.__reg_high_pulse(40004) clibs.logger("DEBUG", "openapi", "modbus: 40004-010 执行机器人上电") def r_motoron_pp2main_start(self): # OK self.__reg_high_pulse(40005) clibs.logger("DEBUG", "openapi", "modbus: 40005-010 执行机器人上电/pp2main/开始运行程序,需自动模式执行,若运行失败,可清除告警后再次尝试") def r_motoron_start(self): # OK self.__reg_high_pulse(40006) clibs.logger("DEBUG", "openapi", "modbus: 40006-010 执行机器人上电/开始运行程序,需自动模式执行,若运行失败,可清除告警、执行pp2main后再次尝试") def r_pulse_motoroff(self): # OK self.__reg_high_pulse(40007) clibs.logger("DEBUG", "openapi", "modbus: 40007-010 执行机器人停止,并下电,手动模式下可停止程序运行,但不能下电,若运行失败,可清除告警后再次尝试") def r_pp2main(self): # OK self.__reg_high_pulse(40008) clibs.logger("DEBUG", "openapi", "modbus: 40008-010 执行机器人 pp2main,需自动模式执行,若运行失败,可清除告警后再次尝试") def r_program_start(self): # OK self.__reg_high_pulse(40009) clibs.logger("DEBUG", "openapi", "modbus: 40009-010 执行机器人默认程序运行,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试") def r_program_stop(self): # OK self.__reg_high_pulse(40010) clibs.logger("DEBUG", "openapi", "modbus: 40010-010 执行机器人默认程序停止,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试") def r_reduced_mode(self, action: int): # OK self.c.write_register(40011, action) actions = "进入" if action == 1 else "退出" clibs.logger("DEBUG", "openapi", f"modbus: 40011-{action} 执行机器人{actions}缩减模式") time.sleep(clibs.INTERVAL) def r_soft_estop(self, action: int): # OK self.c.write_register(40012, action) actions = "解除" if action == 1 else "触发" clibs.logger("DEBUG", "openapi", f"modbus: 40012-{action} 执行{actions}急停动作") time.sleep(clibs.INTERVAL) def r_switch_auto_motoron(self): # OK self.__reg_high_pulse(40013) clibs.logger("DEBUG", "openapi", "modbus: 40013-010 执行切换为自动模式,并上电,初始状态 !!不能是!! 手动上电模式") def r_switch_auto(self): # OK self.__reg_high_pulse(40014) clibs.logger("DEBUG", "openapi", "modbus: 40014-010 执行切换为自动模式") def r_switch_manual(self): # OK self.__reg_high_pulse(40015) clibs.logger("DEBUG", "openapi", "modbus: 40015-010 执行切换为手动模式") def r_switch_safe_region01(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.c.write_register(40016, False) time.sleep(clibs.INTERVAL) self.c.write_register(40016, True) else: self.c.write_register(40016, True) time.sleep(clibs.INTERVAL) self.c.write_register(40016, False) actions = "打开" if action else "关闭" clibs.logger("DEBUG", "openapi", f"modbus: 40016-{action} 执行{actions}安全区 safe region 01") time.sleep(clibs.INTERVAL) def r_switch_safe_region02(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.c.write_register(40017, False) time.sleep(clibs.INTERVAL) self.c.write_register(40017, True) else: self.c.write_register(40017, True) time.sleep(clibs.INTERVAL) self.c.write_register(40017, False) actions = "打开" if action else "关闭" clibs.logger("DEBUG", "openapi", f"modbus: 40017-{action} 执行{actions}安全区 safe region 02") time.sleep(clibs.INTERVAL) def r_switch_safe_region03(self, action: bool): # OK | 上升沿打开,下降沿关闭 if action: self.c.write_register(40018, False) time.sleep(clibs.INTERVAL) self.c.write_register(40018, True) else: self.c.write_register(40018, True) time.sleep(clibs.INTERVAL) self.c.write_register(40018, False) actions = "打开" if action else "关闭" clibs.logger("DEBUG", "openapi", f"modbus: 40018-{action} 执行{actions}安全区 safe region 03") time.sleep(clibs.INTERVAL) def write_act(self, number): self.c.write_register(40100, number) clibs.logger("DEBUG", "openapi", f"modbus: 40100 将 {number} 写入") def write_probe(self, probe): self.c.write_register(40101, probe) clibs.logger("DEBUG", "openapi", f"modbus: 40101 将 {probe} 写入") def write_pon(self, pon): self.c.write_register(40102, pon) clibs.logger("DEBUG", "openapi", f"modbus: 40102 将 {pon} 写入") def write_axis(self, axis): result = self.c.convert_to_registers(int(axis), self.c.DATATYPE.INT32, "little") self.c.write_registers(40103, result) clibs.logger("DEBUG", "openapi", f"modbus: 40103 将 {axis} 写入") def write_speed_max(self, speed): result = self.c.convert_to_registers(float(speed), self.c.DATATYPE.FLOAT32, "little") self.c.write_registers(40105, result) clibs.logger("DEBUG", "openapi", f"modbus: 40105 将 {speed} 写入") def r_write_signals(self, addr: int, value): # OK | 40100 - 40109: signal_0 ~ signal_9 if -1 < addr < 10 and addr.is_integer(): self.c.write_register(40100+addr, value) clibs.logger("DEBUG", "openapi", f"modbus: {40100+addr}-{value} 将寄存器 signal_{addr} 赋值为 {value}") else: clibs.logger("INFO", "openapi", f"modbus: {40100+addr}-{value} 地址错误,无法赋值!", "red") @property def w_alarm_state(self): # OK res = self.c.read_holding_registers(40500, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40500 获取告警状态,结果为 {res} :--: 0 表示无告警,,1 表示有告警") return res @property def w_collision_alarm_state(self): # OK res = self.c.read_holding_registers(40501, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_collision_open_state(self): # OK res = self.c.read_holding_registers(40502, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭,1 表示开启") return res @property def w_controller_is_running(self): # OK res = self.c.read_holding_registers(40503, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常,1 表示运行正常") return res @property def w_encoder_low_battery(self): # OK res = self.c.read_holding_registers(40504, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压,1 表示低电压 需关注") return res @property def w_estop_state(self): # OK res = self.c.read_holding_registers(40505, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_motor_state(self): # OK res = self.c.read_holding_registers(40506, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电,1 表示已上电") return res @property def w_operation_mode(self): # OK res = self.c.read_holding_registers(40507, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式,1 表示自动模式") return res @property def w_program_state(self): # OK res = self.c.read_holding_registers(40508, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行,1 表示正在运行") return res @property def w_program_not_run(self): # OK res = self.c.read_holding_registers(40509, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行,1 表示未运行") return res @property def w_program_reset(self): # OK res = self.c.read_holding_registers(40510, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数,1 表示指针在 main 函数") return res @property def w_reduce_mode_state(self): # OK res = self.c.read_holding_registers(40511, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式,1 表示缩减模式") return res @property def w_robot_is_busy(self): # OK res = self.c.read_holding_registers(40512, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态,1 表示处于 busy 状态") return res @property def w_robot_is_moving(self): # OK res = self.c.read_holding_registers(40513, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示未运动,1 表示正在运动") return res @property def w_safe_door_state(self): # OK res = self.c.read_holding_registers(40514, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门,1 表示已触发安全门") return res @property def w_safe_region01_trig_state(self): # OK res = self.c.read_holding_registers(40515, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40515 获取安全区域 safe region01 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_safe_region02_trig_state(self): # OK res = self.c.read_holding_registers(40516, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40516 获取安全区域 safe region02 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_safe_region03_trig_state(self): # OK res = self.c.read_holding_registers(40517, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40517 获取安全区域 safe region03 的触发状态,结果为 {res} :--: 0 表示未触发,1 表示已触发") return res @property def w_soft_estop_state(self): # OK res = self.c.read_holding_registers(40518, count=1).registers[0] clibs.logger("DEBUG", "openapi", f"modbus: 40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停,1 表示已触发软急停") return res def io_write_coils(self, addr, action): # OK | 名字叫写线圈,其实是写 modbus 的 discrete inputs(DI) # e.g. io_write_coils(0, 1) # e.g. io_write_coils(1, 1) # e.g. io_write_coils(0, [1, 1, 1]) self.c.write_coils(addr, action) clibs.logger("DEBUG", "openapi", f"modbus: 执行给 DI 地址 {addr} 赋值为 {action},可根据情况传递列表,实现一次性赋值多个") time.sleep(clibs.INTERVAL) def io_read_coils(self): # OK | 读 modbus 的 16 个 discrete inputs(DI) res = self.c.read_coils(0, count=16).bits clibs.logger("DEBUG", "openapi", f"modbus: 执行读取所有 DI 的结果为 {res}") return res def io_read_discretes(self): # OK | 读 modbus 的 coil outputs(DO) res = self.c.read_discrete_inputs(0, count=16).bits clibs.logger("DEBUG", "openapi", f"modbus: 执行读取所有 DO 的结果为 {res}") return res def read_ready_to_go(self): result = self.c.read_holding_registers(40600, count=1) return result.registers[0] def read_scenario_time(self): results = self.c.read_holding_registers(40601, count=2) result = self.c.convert_from_registers(registers=results.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") return result def read_capture_start(self): result = self.c.read_holding_registers(40603, count=1) return result.registers[0] class HmiRequest(QThread): output = Signal(str, str) def __init__(self, ip, port, port_xs, /): super().__init__() self.__ip = ip self.__port = port self.__port_xs = port_xs self.__close_hmi = False self.__index = 0 self.__previous_data = b"" self.__valid_data_length = 0 self.__leftovers = 0 self.__response = b"" self.__response_xs = "" self.__half_frm = b"" self.__half_frm_flag = -1 self.__half_pkg = b"" self.__half_pkg_flag = False self.__is_first_frame = True self.__is_debug = True def net_conn(self): self.__socket_conn() self.__t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c,)) self.__t_unpackage.daemon = True self.__t_unpackage.start() self.__t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs,)) self.__t_unpackage_xs.daemon = True self.__t_unpackage_xs.start() def close(self): self.c.close() self.c_xs.close() clibs.logger("INFO", "openapi", f"hmi: 关闭 Socket 连接成功", "green") def __socket_conn(self): # self.close() try: self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.settimeout(clibs.INTERVAL*5) self.c.connect((self.__ip, self.__port)) self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c_xs.settimeout(clibs.INTERVAL*5) self.c_xs.connect((self.__ip, self.__port_xs)) # 尝试连续三次发送心跳包,确认建联成功 state = None for i in range(3): _ = self.execution("controller.heart") time.sleep(clibs.INTERVAL/4) clibs.status["hmi"] = 1 clibs.logger("INFO", "openapi", "hmi: HMI connection success...", "green") except Exception: clibs.logger("ERROR", "openapi", f"hmi: HMI connection failed...", "red") @staticmethod def package(cmd): frm_value = (len(cmd) + 6).to_bytes(length=2, byteorder="big") pkg_value = len(cmd).to_bytes(length=4, byteorder="big") protocol = int(2).to_bytes(length=1, byteorder="big") reserved = int(0).to_bytes(length=1, byteorder="big") return frm_value + pkg_value + protocol + reserved + cmd.encode() def __unpackage(self, sock): def to_read(conn, mask): data = conn.recv(clibs.MAX_FRAME_SIZE) if data: # print(f"data = {data}") # with open(f"{clibs.log_path}/logs.txt", mode="a", encoding="utf-8") as f_logs: # f_logs.write(str(data) + "\n") self.__get_response(data) else: sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while clibs.status["hmi"]: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as err: clibs.logger("DEBUG", "openapi", f"hmi: 老协议解包报错 err = {err}") def __get_headers(self, index, data): if index + 8 < len(data): frm_value = int.from_bytes(data[index:index + 2], byteorder="big") pkg_value = int.from_bytes(data[index + 2:index + 6], byteorder="big") protocol = int.from_bytes(data[index + 6:index + 7], byteorder="big") reserved = int.from_bytes(data[index + 7:index + 8], byteorder="big") if reserved == 0 and protocol == 2: return index + 8, frm_value, pkg_value else: # print("hr-get_headers: 解包数据有误,需要确认!") # print(data) clibs.logger("ERROR", "openapi", f"hmi: 解包数据有误,需要确认,最后一个数据包如下 {data}", "red") else: self.__half_pkg = data[index:] self.__half_pkg_flag = True return -1, 0, 0 def __get_response(self, data): frm_value, pkg_value, self.__index = 0, 0, 0 while self.__index < len(data): if self.__is_first_frame: if self.__half_pkg_flag: len_one = len(self.__half_pkg) len_another = 8 - len_one headers = self.__half_pkg + data[:len_another] self.__index = len_another frm_value = int.from_bytes(headers[0:2], byteorder="big") pkg_value = int.from_bytes(headers[2:6], byteorder="big") self.__half_pkg_flag = False else: self.__index, frm_value, pkg_value = self.__get_headers(self.__index, data) if self.__half_pkg_flag: break if frm_value - pkg_value == 6: if len(data[self.__index:]) >= pkg_value: self.__response = data[self.__index:self.__index + pkg_value] self.__index += pkg_value clibs.logger("DEBUG", "openapi", str(json.loads(self.__response.decode()))) self.__response = b"" self.__leftovers = 0 self.__is_first_frame = True elif len(data[self.__index:]) < pkg_value: self.__response = data[self.__index:] self.__leftovers = pkg_value - len(data[self.__index:]) self.__index += clibs.MAX_FRAME_SIZE # whatever self.__is_first_frame = False elif frm_value < pkg_value: self.__valid_data_length = pkg_value if len(data[self.__index:]) >= frm_value - 6: self.__is_first_frame = False self.__leftovers = 0 self.__response = data[self.__index:self.__index + frm_value - 6] self.__index += (frm_value - 6) self.__valid_data_length -= (frm_value - 6) if len(data[self.__index:]) == 2: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 2 elif len(data[self.__index:]) == 1: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 1 elif len(data[self.__index:]) == 0: self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 0 else: self.__half_frm_flag = -1 elif len(data[self.__index:]) < frm_value - 6: self.__response = data[self.__index:] self.__leftovers = frm_value - 6 - len(data[self.__index:]) self.__valid_data_length -= len(data[self.__index:]) self.__index += clibs.MAX_FRAME_SIZE self.__is_first_frame = False elif not self.__is_first_frame: # 不是首帧 if self.__leftovers > 0 and self.__valid_data_length > 0: if len(data) >= self.__leftovers: self.__is_first_frame = False self.__response += data[:self.__leftovers] self.__index = self.__leftovers self.__valid_data_length -= self.__leftovers self.__leftovers = 0 if self.__valid_data_length == 0: # with open(f"{clibs.log_path}/response.txt", mode="a", encoding="utf-8") as f_res: # f_res.write(f"{json.loads(self.__response.decode())}" + "\n") clibs.logger("DEBUG", "openapi", str(json.loads(self.__response.decode()))) self.__response = b"" self.__is_first_frame = True continue # 此时应该重新 get_headers if len(data[self.__index:]) == 2: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 2 elif len(data[self.__index:]) == 1: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 1 elif len(data[self.__index:]) == 0: self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 0 else: self.__half_frm_flag = -1 elif len(data) < self.__leftovers: self.__response += data self.__leftovers -= len(data) self.__valid_data_length -= len(data) self.__index += clibs.MAX_FRAME_SIZE elif self.__leftovers > 0 and self.__valid_data_length == 0: if len(data) >= self.__leftovers: self.__response += data[:self.__leftovers] self.__index = self.__leftovers self.__leftovers = 0 clibs.logger("DEBUG", "openapi", str(json.loads(self.__response.decode()))) self.__response = b"" self.__is_first_frame = True elif len(data) < self.__leftovers: self.__response += data self.__leftovers -= len(data) self.__index += clibs.MAX_FRAME_SIZE elif self.__leftovers == 0 and self.__valid_data_length > 0: # 1. len(data[self.__index:]) > 2 # 2. len(data[self.__index:]) = 2 # 3. len(data[self.__index:]) = 1 # 4. len(data[self.__index:]) = 0 if self.__half_frm_flag != -1: if self.__half_frm_flag == 2: frm_value = int.from_bytes(self.__half_frm) if len(data) >= frm_value: self.__response += data[:frm_value] self.__leftovers = 0 self.__valid_data_length -= len(data[:frm_value]) self.__index = frm_value elif len(data) < frm_value: self.__response += data self.__leftovers = frm_value - len(data) self.__valid_data_length -= len(data) self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = -1 elif self.__half_frm_flag == 1: frm_value = int.from_bytes(self.__half_frm + data[0:1]) if len(data[1:]) >= frm_value: self.__response += data[1:frm_value+1] self.__leftovers = 0 self.__valid_data_length -= len(data[1:frm_value+1]) self.__index = frm_value + 1 elif len(data[1:]) < frm_value: self.__response += data[1:] self.__leftovers = frm_value - len(data[1:]) self.__valid_data_length -= len(data[1:]) self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = -1 elif self.__half_frm_flag == 0: frm_value = int.from_bytes(data[0:2]) if len(data[2:]) >= frm_value: self.__response += data[2:frm_value+2] self.__leftovers = 0 self.__valid_data_length -= len(data[2:frm_value+2]) self.__index = frm_value + 2 elif len(data[2:]) < frm_value: self.__response += data[2:] self.__leftovers = frm_value - len(data[2:]) self.__valid_data_length -= len(data[2:]) self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = -1 if self.__valid_data_length == 0: clibs.logger("DEBUG", "openapi", str(json.loads(self.__response.decode()))) self.__response = b"" self.__is_first_frame = True continue if len(data[self.__index:]) == 2: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 2 elif len(data[self.__index:]) == 1: self.__half_frm = data[self.__index:] self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 1 elif len(data[self.__index:]) == 0: self.__index += clibs.MAX_FRAME_SIZE self.__half_frm_flag = 0 else: self.__half_frm_flag = -1 else: frm_value = int.from_bytes(data[self.__index:self.__index + 2]) self.__index += 2 if len(data[self.__index:]) >= frm_value: self.__leftovers = 0 self.__response += data[self.__index:self.__index + frm_value] self.__index += frm_value self.__valid_data_length -= frm_value if self.__valid_data_length == 0: clibs.logger("DEBUG", "openapi", str(json.loads(self.__response.decode()))) self.__response = b"" self.__is_first_frame = True continue if len(data[self.__index:]) == 2: self.__index += clibs.MAX_FRAME_SIZE self.__half_frm = data[self.__index:] self.__half_frm_flag = 2 elif len(data[self.__index:]) == 1: self.__index += clibs.MAX_FRAME_SIZE self.__half_frm = data[self.__index:] self.__half_frm_flag = 1 elif len(data[self.__index:]) == 0: self.__half_frm_flag = 0 else: self.__half_frm_flag = -1 elif len(data[self.__index:]) < frm_value: self.__response += data[self.__index:] self.__leftovers = frm_value - len(data[self.__index:]) self.__valid_data_length -= len(data[self.__index:]) self.__index += clibs.MAX_FRAME_SIZE else: # DEBUG INFO # if self.__is_debug: # print(f"12 index = {self.__index}") # print(f"12 frm_value = {frm_value}") # print(f"12 leftovers = {self.__leftovers}") # print(f"12 valid_data_length = {self.__valid_data_length}") # print(f"12 is_first_frame = {self.__is_first_frame}") # if self.__valid_data_length < 0 or self.__leftovers > 1024: # print(f"data = {data}") # raise Exception("DataError") clibs.logger("ERROR", "openapi", "hmi: will never be here", "red") @staticmethod def package_xs(cmd): return "".join([json.dumps(cmd, separators=(",", ":")), "\r"]).encode() def __unpackage_xs(self, sock): def to_read(conn, mask): data = conn.recv(1024) # Should be ready if data: # print(data) self.get_response_xs(data) else: sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while clibs.status["hmi"]: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as err: clibs.logger("DEBUG", "openapi", f"hmi: xService解包报错 err = {err}") def get_response_xs(self, data): char, response = "", self.__response_xs for char in data.decode(): if char != "\r": response = "".join([response, char]) else: clibs.logger("DEBUG", "openapi", response) self.response_xs = response response = "" else: self.__response_xs = response def get_from_id(self, msg_id): f_text, flag, records, time_delay, ts = f"%{msg_id}%", False, "Null", clibs.INTERVAL*10, msg_id.split("@")[-1] t = ts.replace("T", " ") t_ts = datetime.timestamp(datetime.strptime(t, "%Y-%m-%d %H:%M:%S.%f")) t_send = datetime.strftime(datetime.fromtimestamp(t_ts-time_delay), '%Y-%m-%d %H:%M:%S.%f') t_receive = datetime.strftime(datetime.fromtimestamp(t_ts+time_delay), '%Y-%m-%d %H:%M:%S.%f') for idx in range(time_delay): time.sleep(clibs.INTERVAL) try: clibs.lock.acquire(True) clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{t_send}' AND '{t_receive}' AND content LIKE '{f_text}'") records = clibs.cursor.fetchall() flag = True if len(records) == 2 else False finally: clibs.lock.release() if flag is True: return records else: clibs.logger("ERROR", "openapi", f"hmi: {time_delay}s内无法找到请求 {msg_id} 的响应!", "red") def execution(self, command, **kwargs): req = None try: with open(f"{clibs.PREFIX}/files/protocols/hmi/{command}.json", encoding="utf-8", mode="r") as f_json: req = json.load(f_json) t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f") req["id"] = f"{command}@{t}" flag = req["p_type"] del req["p_type"] except Exception as err: clibs.logger("ERROR", "openapi", f"hmi: 暂不支持 {command} 功能,或确认该功能存在...
err = {err}", "red") match command: case "state.set_tp_mode" | "overview.set_autoload" | "overview.reload" | "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop" | "rl_task.set_run_params" | "diagnosis.set_params" | "diagnosis.open" | "drag.set_params" | "controller.set_params" | "collision.set_state" | "collision.set_params" | "move.set_quickstop_distance" | "move.set_params" | "move.set_monitor_cfg" | "modbus.get_values" | "modbus.set_params" | "system_io.update_configuration" | "diagnosis.get_params" | "jog.set_params" | "jog.start" | "move.stop" | "move.quick_turn" | "move.set_quickturn_pos" | "soft_limit.set_params" | "fieldbus_device.set_params" | "socket.set_params" | "diagnosis.save" | "register.set_value": req["data"].update(kwargs) case "safety.safety_area.signal_enable" | "safety.safety_area.overall_enable" | "safety.safety_area.safety_area_enable" | "safety.safety_area.set_param": req["c"].update(kwargs) case "log_code.data.code_list": req["s"]["log_code.data"]["code_list"] = kwargs["code_list"] case _: pass if flag == 0: cmd = json.dumps(req, separators=(",", ":")) try: self.c.send(self.package(cmd)) time.sleep(clibs.INTERVAL/4) # 这里一定是要等几百毫秒的,避免多指令同一时间发送,导致xCore不响应 clibs.logger("DEBUG", "openapi", f"hmi: 老协议请求发送成功 {cmd}") except Exception as err: if "controller.heart" in cmd: raise Exception() clibs.logger("ERROR", "openapi", f"hmi: 老协议请求发送失败 {cmd},报错信息 {err}", "red") elif flag == 1: try: self.c_xs.send(self.package_xs(req)) time.sleep(clibs.INTERVAL/4) clibs.logger("DEBUG", "openapi", f"hmi: xService请求发送成功 {req}") except Exception as err: clibs.logger("ERROR", "openapi", f"hr: xService请求发送失败 {req} 报错信息 {err}", "red") return req["id"] # =================================== ↓↓↓ specific functions ↓↓↓ =================================== def switch_motor_state(self, state: str): # OK """ 切换上电/下电的状态 :param state: on/off :return: None """ match state: case "on": self.execution("state.switch_motor_on") case "off": self.execution("state.switch_motor_off") case _: clibs.logger("ERROR", "openapi", f"hmi: switch_motor_state 参数错误 {state}, 非法参数,只接受 on/off", "red") def switch_operation_mode(self, mode: str): # OK """ 切换自动/手动操作模式 :param mode: auto/manual :return: None """ match mode: case "auto": self.execution("state.switch_auto") case "manual": self.execution("state.switch_manual") case _: clibs.logger("ERROR", "openapi", f"hmi: switch_operation_mode 参数错误 {mode},非法参数,只接受 auto/manual", "red") def reload_project(self, prj_name: str, tasks: list): # OK """ 重新加载指定工程 :param prj_name: 工程名,也即 zip 文件的名字 :param tasks: 要加载的任务列表 :return: None """ prj_path = f"{prj_name}/_build/{prj_name}.prj" self.execution("overview.reload", prj_path=prj_path, tasks=tasks) def set_project_auto_reload(self, prj_name: str): # OK """ 将指定工程设置为开机自动加载,也即默认工程 :param prj_name: 工程名,也即 zip 文件的名字 :return: None """ autoload_prj_path = f"{prj_name}/_build/{prj_name}.prj" self.execution("overview.set_autoload", autoload_prj_path=autoload_prj_path) def pp_to_main(self, tasks: list): # OK """ 将指定的任务列表的指针,指向 main 函数 :param tasks: 任务列表 :return: None """ self.execution("rl_task.pp_to_main", tasks=tasks) def program_start(self, tasks: list): # OK """ 开始执行程序任务,必须是自动模式下执行 :param tasks: 任务列表 :return: None """ self.execution("rl_task.run", tasks=tasks) def program_stop(self, tasks: list): # OK """ 停止执行程序任务 :param tasks: 人物列表 :return: None """ self.execution("rl_task.stop", tasks=tasks) def set_program_loop_speed(self, loop_mode: bool = True, override: float = 0.5): # OK """ :param loop_mode: True为循环模式,False为单次模式 :param override: HMI 左下方的速度滑块,取值范围 [0, 1] :return: None """ self.execution("rl_task.set_run_params", loop_mode=loop_mode, override=override) def clear_alarm(self): # OK """ 清除伺服告警 :return: None """ self.execution("servo.clear_alarm") def reboot_robot(self): # OK """ 重启控制器 :return: None """ self.execution("controller.reboot") clibs.logger("INFO", "openapi", f"hmi: 控制器重启中,重连预计需要等待 100s 左右...") ts = time.time() time.sleep(30) while True: time.sleep(5) te = time.time() if te - ts > 180: self.__silence = False self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确") break for _ in range(3): if not clibs.status["hmi"]: break time.sleep(2) else: clibs.logger("INFO", "openapi", "hr: HMI 重新连接成功...", "green") break def reload_io(self): """ 触发控制器重新加载 IO 设备列表 :return: None """ self.execution("io_device.load_cfg") @property def get_quickturn_pos(self): # OK """ 获取机器人的home位姿、拖动位姿和发货位姿,轴关节角度,end_posture 取值如下: 0 法兰平面与地面平行 1 工具坐标系X轴与地面垂直,正向朝下 2 工具坐标系X轴与地面垂直,正向朝上 3 工具坐标系Y轴与地面垂直,正向朝下 4 工具坐标系Y轴与地面垂直,正向朝上 5 工具坐标系Z轴与地面垂直,正向朝下 6 工具坐标系Z轴与地面垂直,正向朝上 :return: as below { "enable_home": false, // 是否开启 home 点快速调整 "enable_drag": false, // 是否开启拖动位姿点快速调整 "enable_transport": false, // 是否开启发货位姿点快速调整 "joint_home": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // home 位姿的关节角度 "joint_drag": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 拖动位姿的关节角度 "joint_transport": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 发货位姿的关节角度 "end_posture":0, // 末端姿态的调整方式,取值 0-6 "home_error_range":[0.0,0.0,0.0,0.0,0.0,0.0,0.0] // home点误差范围 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_quickturn_pos") def set_quickturn_pos(self, enable_home: bool = False, enable_drag: bool = False, enable_transport: bool = False, end_posture: int = 0): # OK """ 设置机器人的home位姿、拖动位姿、发货位姿,轴关节角度,Home点误差范围,详见上一个 get_quickturn_pos 功能实现 :param enable_home: 是否开启 home 点快速调整 :param enable_drag: 是否开启拖动位姿点快速调整 :param enable_transport:是否开启发货位姿点快速调整 :param end_posture: 末端姿态的调整方式,取值 0-6,详见 get_quickturn_pos 注释 :return: None """ self.execution("move.set_quickturn_pos", enable_home=enable_home, enable_drag=enable_drag, enable_transport=enable_transport, end_posture=end_posture) def move2quickturn(self, name: str): # OK """ 运动到指定的快速调整位姿 :param name: 指定快速调整的名称,home/drag/transport :return: None """ self.execution("move.quick_turn", name=name) def stop_move(self, stoptype=0): # OK """ 停止运动 TS_READY | TS_JOG | TS_LOADIDENTIFY | TS_DYNAMICIDENTIFY | TS_DRAG | TS_PROGRAM | TS_DEMO | TS_RCI | TS_DEBUG | TS_FRICTIONIDENTIFY :param stoptype: 对应控制器的任务空间类型TaskSpace的枚举值,0-7 :return: None """ self.execution("move.stop", stoptype=stoptype) @property def get_jog_params(self): # OK """ 获取JOG的参数 世界坐标系 WORLD_COORDINATE 0 法兰盘坐标系 FLANGE_COORDINATE 1 基坐标系 BASE_COORDINATE 2 工具坐标系 TOOL_COORDINATE 3 工件坐标系 FRAME_COORDINATE 4 关节空间 JOINT_SPACE 5 :return: { "step": 1000 [1000-连续] [10/1/0.1/0.001-点动] "override": 0.2 速度比率 "space": 5 JOG的空间 } """ return self.__get_data(currentframe().f_code.co_name, "jog.get_params") def set_jog_params(self, step, override, space): # OK """ 设置JOG的参数,包含步长,空间,速度倍率 :param step: [1000-连续] [10/1/0.1/0.001-点动] :param override: 速度比率 :param space: JOG的空间 :return: None """ self.execution("jog.set_params", step=step, override=override, space=space) def start_jog(self, index: int, direction: bool = False, is_ext: bool = False): # OK """ 开始 JOG 运动 :param index: 0-6,若选轴空间,则 0-6 对应 1-7 轴,若选笛卡尔空间,则 0-6 对应 xyzabc elb :param direction: True 正方向,False 反方向 :param is_ext: 是否是外部轴 jog :return: None """ self.execution("jog.start", index=index, direction=direction, is_ext=is_ext) @property def get_socket_params(self): # OK """ 获取socket参数 :return: { "auto_connect": true, // True 开机启动,False 不开机启动 "disconnection_detection_time": 10, // 链接断开检测周期(s) "disconnection_triggering_behavior": 0, // 断开连接触发行为 0无动作 1暂停程序 2暂停并下电 "enable": true, // True 开启或者 False 关闭 "ip": "", // 仅限于客户端,用于指定服务端 IP;当作为服务端时,该参数设置为空字符串,否则会报错!!! "name": "name", // 连接名称 "port": "8080", // 连接端口 "reconnect_flag": true, // True 自动重连,False 不自动重连 "suffix": "\r", // 指定发送分隔符 "type": 1 // 连接类型 0 client | 1 server } """ return self.__get_data(currentframe().f_code.co_name, "socket.get_params") def set_socket_params(self, enable: bool, ip: str, port: str, suffix: str, type: int = 1, **kwargs): # OK """ 设置 socket 参数,一般作为服务器使用 :param enable: True 开启或者 False 关闭 :param ip: 连接ip :param port: 连接端口 :param suffix: 指定发送分隔符 :param type: 0 client | 1 server :return: None """ data = self.get_socket_params keys = data.keys() kwargs.update({"enable": enable, "ip": ip, "port": port, "suffix": suffix, "type": type}) for _ in keys: if _ in kwargs.keys(): data[_] = kwargs[_] self.execution("socket.set_params", data=data) @property def get_diagnosis_params(self, version="1.4.1"): # OK """ 获取诊断功能开启状态,以及相应其他信息 :param version: 指定诊断工具版本 :return: { "delay_motion": false, // - "display_open": false, // 诊断显示功能开启状态 "ecat_diagnosis_state": false, // - "overrun": false, // 是否开启实时线程超时监控上报 "pdo_params": [...], // 指定版本支持的所有曲线信息 "state": true, // 诊断功能的开启状态 "turn_area": false // 转弯区是否上报 } """ return self.__get_data(currentframe().f_code.co_name, "diagnosis.get_params", version=version) def set_diagnosis_params(self, display_pdo_params: list, frequency: int = 50, version: str = "1.4.1"): # OK """ 设置诊断功能显示参数 [{"name": "hw_joint_vel_feedback", "channel": 0}, ] :param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线 :param frequency: 采样频率,默认 50ms :param version: xDiagnose的版本号 :return: None """ self.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=frequency, version=version) def open_diagnosis(self, open: bool, display_open: bool, overrun: bool = False, turn_area: bool = False, delay_motion: bool = False): # OK """ 打开或者关闭诊断曲线,并定义其他功能的开关(调试相关功能,比如是否开启线程超时监控和上报,转弯区以及运动延迟等) :param open: 诊断功能,控制HMI->日志->诊断设置->私服诊断开关,一般设置成 True :param display_open: 诊断显示功能,指的是在线诊断插件中的打开 switch 的状态,需要诊断数据的情况,设置成 True :param overrun: 实时线程超时监控上报 :param turn_area: 转弯区上报 :param delay_motion: 延迟运动 :return: None """ self.execution("diagnosis.open", open=open, display_open=display_open, overrun=overrun, turn_area=turn_area, delay_motion=delay_motion) def save_diagnosis(self, save: bool = True): # OK """ 保存诊断数据,也就是主动写诊断动作,HMI日志->诊断设置->保存诊断数据 :param save: 保存数据开关 :return: None """ self.execution("diagnosis.save", save=save) @property def qurry_system_io_configuration(self): # OK """ 系统IO配置的查询协议,trigger 参数取值参照如下: FLANKS 0, //边缘触发 POS_FLANK 1, //上升沿 NEG_FLANK 2, //下降沿 HIGH_LEVEL 3, //高电平 LOW_LEVEL 4 //低电平 :return: { "input_system_io": { "motor_on": { "signal":"DI0_0", "trigger":1 }, "motor_off": { "signal":"DI0_0", "trigger":2 } }, "output_system_io": { "sta_motor_on": { "signal":"DO0_0" }, "sta_robot_running": { "signal":"DO0_1" } } } """ return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration") @property def qurry_system_io_event_configuration(self): # OK """ 查询当前系统支持的系统IO事件列表,包括事件key、名称、支持的触发方式等配置 :return: { "input_system_event": [ { "key": "ctrl_motor_on", "name": "上电", "trigger_types": [ 1, 2 ] }, { "key": "ctrl_motor_off", "name": "下电", "trigger_types": [ 1, 2 ] } ], "output_system_event": [ { "key": "sta_motor_on", "name": "上下电状态" }, { "key": "sta_program", "name": "运行状态" } ], "input_mutex_event": [ { "key0": "ctrl_motor_on", "key1": "ctrl_motor_off" }, { "key0": "ctrl_switch_auto", "key1": "ctrl_switch_manu" } ] } """ return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration") def update_system_io_configuration(self, i_funcs: list, o_funcs: list, i_signals: list, o_signals: list, trig_types: list): # OK """ 配置系统 IO :param i_funcs: 输入,只写功能码列表 :param o_funcs: 输出,只读功能码列表 :param i_signals: DI 信号列表 :param o_signals: DO 信号列表 :param trig_types: 触发条件列表,可参考 qurry_system_io_configuration 中的触发条件 :return: None """ input_system_io, output_system_io = {}, {} for i_func in i_funcs: _index = i_funcs.index(i_func) input_system_io[i_func] = {"signal": i_signals[_index], "trigger": trig_types[_index]} for o_func in o_funcs: _index = o_funcs.index(o_func) output_system_io[o_func] = {"signal": o_signals[_index]} self.execution("system_io.update_configuration", input_system_io=input_system_io, output_system_io=output_system_io) @property def get_fieldbus_device_params(self): # OK """ 获取的是 HMI通讯->总线设备列表的信息,以及开关状态 :return: { "device_list": [ { "device_name": "modbus_1", "enable": true }, { "device_name": "modbus_2", "enable": false } ] } """ return self.__get_data(currentframe().f_code.co_name, "fieldbus_device.get_params") def set_fieldbus_device_params(self, device_name: str, enable: bool): # OK """ 定义开关设备的协议,一次只能打开一个设备 :param device_name: 设备列表中的名称 :param enable: 是否开启,这里操作的是 HMI通信->IO设备里面的开关状态 :return: None """ self.execution("fieldbus_device.set_params", device_name=device_name, enable=enable) def reload_fieldbus(self): # OK """ 触发控制器重新加载总线设备 :return: None """ self.execution("fieldbus_device.load_cfg") @property def get_modbus_params(self): # OK """ 获取modbus参数 :return: { "connect_state": true, "enable_master": false, "enable_slave": false, "ip": "192.168.0.160", "is_convert": true, "port": 502, "slave_id": 1 } """ return self.__get_data(currentframe().f_code.co_name, "modbus.get_params") def set_modbus_params(self, enable_slave: bool, ip: str, port: int, slave_id: int, enable_master: bool): # OK """ 设置modbus参数,相当于新建 :param enable_slave: Modbus从站是否自动开启 :param ip: ip 地址 :param port: 端口 :param slave_id: 从站 ID :param enable_master: Modbus主站是否自动开启 :return: """ self.execution("modbus.set_params", enable_slave=enable_slave, ip=ip, port=port, slave_id=slave_id, enable_master=enable_master) def reload_registers(self): # OK """ 触发控制器重新加载寄存器列表 :return: None """ self.execution("modbus.load_cfg") def get_modbus_values(self, mode: str = "all"): # OK """ 用于获取 modbus 寄存器变量值的更新信息,展示在状态监控界面 :param mode: all/change :return: """ return self.__get_data(currentframe().f_code.co_name, "modbus.get_values", mode=mode) @property def get_soft_limit_params(self): # OK """ 获取软限位参数 :return: { "enable":true "upper":[0,0,0,0,0,0,0], "lower":[0,0,0,0,0,0,0], "reduced_upper":[0,0,0,0,0,0,0], "reduced_lower":[0,0,0,0,0,0,0] } """ return self.__get_data(currentframe().f_code.co_name, "soft_limit.get_params") def set_soft_limit_params(self, **kwargs): # OK """ 设定软限位参数 enable: bool, upper: list, lower: list, reduced_upper: list, reduced_lower: list :enable: 是否启用软限位 :upper: 软限位上限 :lower: 软限位下限 :reduced_upper: 缩减模式软限位上限 :reduced_lower: 缩减模式软限位下限 :return: None """ data = self.get_soft_limit_params keys = data.keys() for _ in keys: if _ in kwargs.keys(): data[_] = kwargs[_] self.execution("soft_limit.set_params", data=data) @property def get_device_params(self): # OK """ 获取设备信息 :return: """ return self.__get_data(currentframe().f_code.co_name, "device.get_params") @property def get_cart_pos(self): # OK """ 获取机器人的当前位姿:包括轴关节角度,笛卡尔关节角度,四元数,欧拉角(臂角) :return: { "joint":[0.0,0.0,0.0,0.0,0.0,0.0], "position":[0.0,0.0,0.0,0.0,0.0,0.0], "euler":[0.0,0.0,0.0], "quaternion"[0.0,0.0,0.0,0.0], "elb":0.0, // 可缺省 "ext_joint":[0.0,0.0,0.0,0.0,0.0] } """ return self.__get_data(currentframe().f_code.co_name, "move.get_pos") @property def get_joint_pos(self): # OK """ 获取机器人的当前关节角度:包括内部轴和外部轴 :return: { "inner_pos": [0,0,0,0,0,0,0], "extern_pos": [0,0,0,0,0,0,0] } """ return self.__get_data(currentframe().f_code.co_name, "move.get_joint_pos") @property def get_monitor_cfg(self): # OK """ 获取机器人的监控配置参数,RefCoordinateType 类型数据,表示当前控制器位置监测的相对坐标系 基坐标系 REF_COORDINATE_BASE 0 世界坐标系 REF_COORDINATE_WORLD 1 工件坐标系 REF_COORDINATE_WOBJ 2 :return: { "ref_coordinate": 0 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_monitor_cfg") def set_monitor_cfg(self, ref_coordinate): # OK """ 设置机器人的监控配置参数 :ref_coordinate: RefCoordinateType类型数据,用来设置当前控制器位置监测的相对坐标系 :return: None """ self.execution("move.set_monitor_cfg", ref_coordinate=ref_coordinate) @property def get_move_params(self): # OK """ 获取机器人的运动参数:包括减速比、耦合比、最大速度、加速度、加加速度、acc ramp time、规划步长等信息 :return: { "MOTION": { "ACC_RAMPTIME_JOG": 0.5, "ACC_RAMPTIME_STOP": 0.5, "DEFAULT_ACC_PARAMS": [1.0, 0.5], "JERK_LIMIT_CART": 0, "JERK_LIMIT_JOINT": 0, "JERK_LIMIT_ROT": 0, "JOINT_MAX_ACC": [500, 500, 1000, 1000, 1000], "JOINT_MAX_JERK": [2000, 2000, 2000, 4000, 4000, 4000], "JOINT_MAX_SPEED": [120.0, 120.0, 180.0, 180.0, 180.0, 180.0], "MAX_ACC_PARAMS": [1.0, 1], "MIN_ACC_PARAMS": [0.3, 0.05], "TCP_MAX_ACC": 5000, "TCP_MAX_JERK": 10000, "TCP_MAX_SPEED": 1000, "TCP_ROTATE_MAX_ACC": 1800, "TCP_ROTATE_MAX_JERK": 3600, "TCP_ROTATE_MAX_SPEED": 180, "VEL_SMOOTH_FACTOR": 1.0, "VEL_SMOOTH_FACTOR_RANGE": [1.0, 10.0] } } """ return self.__get_data(currentframe().f_code.co_name, "move.get_params") def set_move_params(self, **kwargs): # OK """ 设置机器人的运动参数:轴最大速度、轴最大加加速度、速度、加加速度、加速度、加加速度、acc ramp time、规划步长等信息 可选参数:参考 get_move_params 函数返回值 MOTION 里面的选项 :return: None """ data = self.get_move_params["MOTION"] print(f"res = {data}") keys = data.keys() for _ in keys: if _ in kwargs.keys(): data[_] = kwargs[_] self.execution("move.set_params", data=data) @property def get_quick_stop_distance(self): # OK """ 获取机器人 search 指令最大停止距离 :return: { "distance":2.0 } """ return self.__get_data(currentframe().f_code.co_name, "move.get_quickstop_distance") def set_quick_stop_distance(self, distance: float): # OK """ 设置机器人 search 指令最大停止距离 :param distance: 停止距离,单位 mm :return: None """ self.execution("move.set_quickstop_distance", distance=distance) @property def get_collision_params(self): # OK """ 获取碰撞检测相关参数 :return: { "action": 1, // 触发行为:1-安全停止;2-触发暂停;3-柔顺停止 "coeff": [100, 100, 100, 100, 100, 100], // 0-整机灵敏度百分比,1-单轴灵敏度百分比,2-单轴和整机灵敏度百分比 "coeff_level": 0, // 灵敏度等级:0-低,1-中,2-高 "compliance": 0, // 柔顺功能比例系数,[0-1] "enable": true, // 功能使能开关 "fallback_distance": 3, // 回退距离 "mode": 0, // 力传感器系数,0 整机 1 单轴 "percent": 100, // 0-200,整机灵敏度百分比 "percent_axis": [100, 100, 100, 100, 100, 100], // 0-200,单轴灵敏度百分比 "reduced_percent": 100, // 0-200,整机缩减模式灵敏度百分比 "reduced_percent_axis": [100, 100, 100, 100, 100, 100] // 0-200,单轴缩减模式灵敏度百分比 } """ return self.__get_data(currentframe().f_code.co_name, "collision.get_params") def set_collision_params(self, enable, mode, action, percent, **kwargs): # OK """ 设置碰撞检测相关参数 :param enable: 功能使能开关 :param mode: 力传感器系数,0 整机 1 单轴 :param action: 触发行为:1-安全停止;2-触发暂停;3-柔顺停止 :param percent: 0-200,整机灵敏度百分比 :return: """ data = self.get_collision_params keys = data.keys() kwargs.update({"enable": enable, "mode": mode, "action": action, "percent": percent}) for _ in keys: if _ in kwargs.keys(): data[_] = kwargs[_] self.execution("collision.set_params", data=data) def set_collision_state(self, collision_state: bool): # NG """ 开启或者关闭虚拟墙碰撞检测,测试该函数功能无效!!! :param collision_state: 碰撞检测的开关状态 :return: None """ self.execution("collision.set_state", collision_state=collision_state) @property def get_robot_state(self): # OK """ { "rc_state":"normal", # "fatal" 、"error"、"block"、"normal" "engine":"on", # "fatal" 、"error"、"GStop"、"EStop"、"on"、"off" "servo_mode":"position", # "torque"、"position" "operate": "auto", # "auto"、"manual" "task_space": "program", # "jog"、"drag"、"ready"、"load_identify"、"demo"、"rci"、"dynamic_identify"、"program"、"debug" "robot_action": "idle", # "idle"、"busy" "safety_mode": "collision" # "normal"、"collision"、"collaboration" } """ return self.__get_data(currentframe().f_code.co_name, "state.get_state") def set_controller_params(self, robot_time: str): # OK """ 设置控制器系统时间 :param robot_time: 系统时间,"2020-02-28 15:28:30" :return: None """ self.execution("controller.set_params", time=robot_time) @property def get_robot_params(self): # OK """ "alias": "", "auth_state": "controller_type": "XBC_XMATE", "controller_types": ["XBC_3", "XBC_5", "XBC_XMATE"], "disk_serial_number": "2338020401535", "mac_addr": "34:df:20:03:1b:45", "model":, "nic_list": ["enp1s0", "enp2s0"], "occupied_addr": "192.168.2.123:49269", "period": 0.002, "period_types": [0.001, 0.002, 0.003, 0.004], "robot_template": 10, "robot_type": "XMC12-R1300-W7G3B1C", "robot_types": ["XMC12-R1300-B7S3B0C", "XMC12-R1300-W7G3B1C", "XMC17_5-R1900-W7G3B1C", "XMC20-R1650-B7G3Z0C"], "security_type": "ROKAE_RSC", "security_types": ["ROKAE_MINI", "ROKAE_RSC"], "time": "2024-09-13 12:36:38", "version": "2.3.0.4" """ return self.__get_data(currentframe().f_code.co_name, "controller.get_params") def switch_tp_mode(self, mode: str): # OK """ 切换示教器模式 :param mode: with/without :return: None """ match mode: case "with": self.execution("state.set_tp_mode", tp_mode="with") case "without": self.execution("state.set_tp_mode", tp_mode="without") case _: clibs.logger("ERROR", "openapi", f"hmi: switch_tp_mode 参数错误{mode}, 非法参数,只接受 with/without", "red") @property def get_tp_mode(self): # OK """ 获取示教器连接状态 :return: { "tp_mode":"with" } """ return self.__get_data(currentframe().f_code.co_name, "state.get_tp_mode") @property def get_drag_params(self): # OK """ 获取拖动模式参数 :return: { "enable": true, "space": 0, "type": 0 } """ return self.__get_data(currentframe().f_code.co_name, "drag.get_params") def set_drag_params(self, enable: bool, space: int = 1, type: int = 2): # OK """ 设置拖动模式开关以及参数 :param enable: 是否启用拖动 :param space: 拖动空间 - 0 代表关节 1 代表笛卡尔 :param type: 拖动类型 - 0 只平移 1 只旋转 2 自由拖动 :return: None """ self.execution("drag.set_params", enable=enable, space=space, type=type) def set_safety_area_signal(self, signal: bool): # OK """ 设置安全区域信号控制使能开关 :param signal: True 打开 False 关闭 :return: None """ self.execution("safety.safety_area.signal_enable", protocol_flag=1, signal=signal) def set_safety_area_overall(self, enable: bool): # OK """ 设置安全区域整体控制使能开关 :param enable: True 打开 False 关闭 :return: None """ self.execution("safety.safety_area.overall_enable", protocol_flag=1, enable=enable) @property def get_safety_area_params(self): # OK """ 获取安全区所有的配置信息 :return: "g": { "safety_area_data": { "overall_enable": true, "safety_area_setting": [ { "box": { "Lx": 100.0, "Ly": 100.0, "Lz": 100.0, "direction": false, "ori": { "euler": { "a": 179.9963851353547, "b": -0.006653792532429416, "c": 179.9934560302729 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 } }, "enable": false, "id": 0, "name": "region1", "plane": { "direction": true, "point": { "x": 0.0, "y": 0.0, "z": 0.0 }, "vector": { "x": 0.0, "y": 0.0, "z": 0.0 } }, "shape": 0, "shared_bind_di": "", "shared_bind_do": "", "sphere": { "ori": { "euler": { "a": 0.0, "b": 0.0, "c": 0.0 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 }, "radius": 0.0 }, "state": true, "trigger": 0, "type": 0, "vertebral": { "high": 0.0, "ori": { "euler": { "a": 0.0, "b": 0.0, "c": 0.0 }, "quaternion": { "q1": 0.0, "q2": 0.0, "q3": 0.0, "q4": 0.0 } }, "pos": { "x": 0.0, "y": 0.0, "z": 0.0 }, "radius": 0.0 } }, ... // 剩余 9 个安全区域的配置信息 ], "signal_enable": true } } } """ return self.__get_data(currentframe().f_code.co_name, "safety_area_data", flag=1) def set_safety_area_enable(self, id: int, enable: bool): # OK """ 设置每个安全区域的开关 :param id: 安全区域开关,0-9 :param enable: True 打开,False 关闭 :return: None """ self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable) def set_safety_area_param(self, id: int, enable: bool, **kwargs): # OK """ 设定单独安全区的参数 :param id: 安全区 id :param enable: 是否开启 :param kwargs: 其他参数,参考 get_safety_area_params 的返回值形式 :return: None """ data = self.get_safety_area_params["g"]["safety_area_data"]["safety_area_setting"][id] keys = data.keys() kwargs.update({"id": id, "enable": enable}) for _ in keys: if _ in kwargs.keys(): data[_] = kwargs[_] self.execution("safety.safety_area.set_param", protocol_flag=1, data=data) self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable) @property def get_filtered_error_code(self): # OK """ 获取已设定的错误码过滤列表 :return: { "g": { "log_code.data": { "code_list": [ { "id": 100, "title": "DEMO\\u610f\\u5916\\u505c\\u6b62" }, { "id": 10000, "title": "HMI\\u8bf7\\u6c42\\u5305\\u89e3\\u6790\\u9519\\u8bef" }, { "id": 10002, "title": "\\u5feb\\u901f\\u8c03\\u6574\\u542f\\u52a8\\u5931\\u8d25" } ], "version": 1 } } } """ return self.__get_data(currentframe().f_code.co_name, "log_code.data", flag=1) def set_filtered_error_code(self, action: str, code_list: list): # OK """ 清空,增加,删除错误过滤码 :param action: 支持 add/remove/clear,当为 clear 时,code_list 可为任意列表 :param code_list: 需要添加/删除的过滤码列表 :return: None """ origin_code_list = self.get_filtered_error_code["g"]["log_code.data"]["code_list"] # [{"id": 10000, "title": "HMI请求包解析错误"}, {"id": 10002, "title": "快速调整启动失败"}] if action == "clear": code_list = [] elif action == "add": for error_code in code_list: for item in origin_code_list: if error_code == item["id"]: break else: origin_code_list.append({"id": error_code, "title": ""}) code_list = origin_code_list elif action == "remove": for error_code in code_list: for item in origin_code_list: if error_code == item["id"]: origin_code_list.remove(item) code_list = origin_code_list self.execution("log_code.data.code_list", protocol_flag=1, code_list=code_list) def __get_data(self, upper_func, command, flag=0, **kwargs): msg_id, state = self.execution(command, protocol_flag=flag, **kwargs) records = clibs.c_hr.get_from_id(msg_id) for record in records: if "请求发送成功" not in record[0]: data = eval(record[0])["data"] return data # =================================== ↑↑↑ specific functions ↑↑↑ =================================== class ExternalCommunication(QThread): def __init__(self, ip, port, /): super().__init__() self.c = None self.ip = ip self.port = int(port) self.suffix = "\r" self.exec_desc = " :--: 返回 true 表示执行成功,false 失败" def net_conn(self): clibs.c_hr.execution("socket.set_params", enable=False, ip="0.0.0.0", port=str(self.port), suffix="\r", type=1) time.sleep(clibs.INTERVAL) clibs.c_hr.execution("socket.set_params", enable=True, ip="0.0.0.0", port=str(self.port), suffix="\r", type=1) # time.sleep(clibs.INTERVAL*2) self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.settimeout(clibs.INTERVAL*3) try: self.c.connect((self.ip, self.port)) clibs.logger("INFO", "openapi", f"ec: 外部通信连接成功...", "green") # return self.c except Exception as err: clibs.logger("ERROR", "openapi", f"ec: 外部通信连接失败... {err}", "red") def close(self): self.c.close() clibs.logger("INFO", "openapi", f"ec: 关闭外部通信连接成功", "green") @clibs.db_lock def sr_string(self, directive, interval=clibs.INTERVAL/2): self.s_string(directive) time.sleep(interval) result = self.r_string(directive) return result def s_string(self, directive): order = "".join([directive, self.suffix]) self.c.send(order.encode()) def r_string(self, directive): result, char = "", "" while char != self.suffix: try: char = self.c.recv(1).decode(encoding="unicode_escape") except Exception as err: result = [f"ec: 获取请求指令 {directive} 的返回数据超时,需确认指令发送格式以及内容正确!具体报错信息如下 {err}", "error"] return result result = "".join([result, char]) return result # =================================== ↓↓↓ specific functions ↓↓↓ =================================== def motor_on(self): # OK return self.__exec_cmd("motor_on", "电机上电", self.exec_desc) def motor_off(self): # OK return self.__exec_cmd("motor_off", "电机下电", self.exec_desc) def pp_to_main(self): # OK return self.__exec_cmd("pp_to_main", "程序指针到", self.exec_desc) def program_start(self): # OK return self.__exec_cmd("start", "程序启动(可能需先清告警)", self.exec_desc) def program_stop(self): # OK return self.__exec_cmd("stop", "程序停止", self.exec_desc) def clear_alarm(self): # OK return self.__exec_cmd("clear_alarm", "清除伺服报警", self.exec_desc) def switch_operation_auto(self): # OK return self.__exec_cmd("switch_mode:auto", "切换到自动模式", self.exec_desc) def switch_operation_manual(self): # OK return self.__exec_cmd("switch_mode:manual", "切换到手动模式", self.exec_desc) def open_drag_mode(self): # OK return self.__exec_cmd("open_drag", "打开拖动", self.exec_desc) def close_drag_mode(self): # OK return self.__exec_cmd("close_drag", "关闭拖动", self.exec_desc) def get_program_list(self): # OK return self.__exec_cmd("list_prog", "获取工程列表") def get_current_program(self): # OK return self.__exec_cmd("current_prog", "当前工程") def load_program(self, program_name): # OK return self.__exec_cmd(f"load_prog:{program_name}", "加载工程", self.exec_desc) def estop_reset(self): # OK | 复原外部 IO 急停和安全门急停,前提是硬件已复原 return self.__exec_cmd("estop_reset", "急停复位", self.exec_desc) def estopreset_and_clearalarm(self): # OK | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原 return self.__exec_cmd("estopreset_and_clearalarm", "急停复位并清除报警", self.exec_desc) def motoron_pp2main_start(self): # OK return self.__exec_cmd("motoron_pptomain_start", "依次执行上电,程序指针到main,启动程序(可以是手动模式,必要时需清告警)", self.exec_desc) def motoron_start(self): # OK return self.__exec_cmd("motoron_start", "依次执行上电,启动程序(可以是手动模式,必要时需清告警)", self.exec_desc) def pause_motoroff(self): # OK return self.__exec_cmd("pause_motoroff", "暂停程序并下电", self.exec_desc) def set_program_speed(self, speed: int): # OK | 1-100 return self.__exec_cmd(f"set_program_speed:{speed}", "设置程序运行速率(滑块)", self.exec_desc) def set_soft_estop(self, enable: str): # OK return self.__exec_cmd(f"set_soft_estop:{enable}", "触发(true)/解除(false)机器人软急停", self.exec_desc) def switch_auto_motoron(self): # OK return self.__exec_cmd("switch_auto_motoron", "切换自动模式并上电", self.exec_desc) def open_safe_region(self, number: int): # OK | 1-10 return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域(1-10,信号控制开关需打开,不限操作模式)", self.exec_desc) def close_safe_region(self, number: int): # OK | 1-10 return self.__exec_cmd(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域(1-10,信号控制开关需打开,不限操作模式)", self.exec_desc) def open_reduced_mode(self): # OK return self.__exec_cmd("open_reduced_mode", "开启缩减模式(不限操作模式)", self.exec_desc) def close_reduced_mode(self): # OK return self.__exec_cmd("close_reduced_mode", "关闭缩减模式(不限操作模式)", self.exec_desc) def setdo_value(self, do_name: str, do_value: str): # OK | do_value 为 true/false return self.__exec_cmd(f"setdo:{do_name},{do_value}", f"设置 {do_name} 的值为 {do_value} ", self.exec_desc) def modify_system_time(self, robot_time): # OK return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time} 的", self.exec_desc) # -------------------------------------------------------------------------------------------------- @property def motor_on_state(self): # OK return self.__exec_cmd("motor_on_state", "获取上电状态", " :--: 返回 true 表示已上电,false 已下电") @property def robot_running_state(self): # OK return self.__exec_cmd("robot_running_state", "获取程序运行状态", " :--: 返回 true 表示正在运行,false 未运行") @property def estop_state(self): # OK | 只表示外部急停,安全门触发不会返回 true,只要有急停标识 E 字母,就会返回 true return self.__exec_cmd("estop_state", "急停状态", " :--: 返回 true 表示处于急停状态,false 非急停") @property def operation_mode(self): # OK return self.__exec_cmd("operating_mode", "获取工作模式", " :--: 返回 true 表示自动模式,false 手动模式") @property def home_state(self): # OK | 需要设置一下 "HMI 设置->快速调整" return self.__exec_cmd("home_state", "获取 HOME 输出状态", " :--: 返回 true 表示法兰中心处于 HOME 点,false 未处于 HOME 点") @property def fault_state(self): # OK return self.__exec_cmd("fault_state", "获取 故障状态", " :--: 返回 true 表示处于故障状态,false 非故障状态") @property def collision_state(self): # OK | 但是触发后,无法清除? return self.__exec_cmd("collision_state", "获取碰撞触发状态", " :--: 返回 true 表示碰撞已触发,false 未触发") @property def task_state(self): # OK return self.__exec_cmd("task_state", "获取机器人运行任务状态", " :--: 返回 program 表示任务正在运行,ready 未运行") @property def get_cart_pos(self): # OK | cart_pos/cart_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("cart_pos", "获取笛卡尔位置") @property def get_joint_pos(self): # OK | jnt_pos/jnt_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_pos", "获取轴位置") @property def get_axis_vel(self): # OK | jnt_vel/jnt_vel_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_vel", "获取轴速度") @property def get_axis_trq(self): # OK | jnt_trq/jnt_trq_name 都可以正常返回,区别在返回的前缀,可测试辨别 return self.__exec_cmd("jnt_trq", "获取轴力矩") @property def reduced_mode_state(self): # OK return self.__exec_cmd("reduced_mode_state", "获取缩减模式状态", " :--: 返回 true 表示缩减模式,false 非缩减模式") def get_io_state(self, io_list: str): # OK | DO0_0,DI1_3,DO2_5,不能有空格 return self.__exec_cmd(f"io_state:{io_list}", "获取 IO 状态值") @property def alarm_state(self): # OK return self.__exec_cmd("alarm_state", "获取报警状态", " :--: 返回 true 表示当前有告警,false 没有告警") @property def collision_alarm_state(self): # OK return self.__exec_cmd("collision_alarm_state", "获取碰撞报警状态", " :--: 返回 true 表示有碰撞告警,false 没有碰撞告警") @property def collision_open_state(self): # OK return self.__exec_cmd("collision_open_state", "获取碰撞检测开启状态", " :--: 返回 true 表示已开启碰撞检测,false 未开启") @property def controller_is_running(self): # OK return self.__exec_cmd("controller_is_running", "判断控制器是否开机", " :--: 返回 true 表示控制器正在运行,false 未运行") @property def encoder_low_battery_state(self): # OK return self.__exec_cmd("encoder_low_battery_state", "编码器低电压报警状态", " :--: 返回 true 表示编码器处于低电压状态,false 电压正常") @property def robot_error_code(self): # OK return self.__exec_cmd("robot_error_code", "获取机器人错误码") @property def rl_pause_state(self): # OK # 0 -- 初始化状态,刚开机上电时 # 1 -- RL 运行中 # 2 -- HMI 暂停 # 3 -- 系统 IO 暂停 # 4 -- 寄存器功能码暂停 # 5 -- 外部通讯暂停 # 6 -- # 7 -- Pause 指令暂停 # 8 -- # 9 -- # 10 -- 外部 IO 急停 # 11 -- 安全门急停 # 12 -- 其他因素停止,比如碰撞检测 return self.__exec_cmd("program_full", "获取 RL 的暂停状态", " :--: 返回值含义详见功能定义") @property def program_reset_state(self): # OK return self.__exec_cmd("program_reset_state", "获取程序复位状态", " :--: 返回 true 表示指针指向 main,false 未指向 main") @property def program_speed_value(self): # OK | 速度滑块 return self.__exec_cmd("program_speed", "获取程序运行速度") @property def robot_is_busy(self): # OK | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发 return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态", " :--: 返回 1 表示控制器忙碌,0 非忙碌状态") @property def robot_is_moving(self): # OK return self.__exec_cmd("robot_is_moving", "获取程序运行状态", " :--: 返回 true 表示机器人正在运动,false 未运动") @property def safe_door_state(self): # OK return self.__exec_cmd("safe_door_state", "获取安全门状态", " :--: 返回 true 表示安全门已触发,false 未触发") @property def soft_estop_state(self): # OK return self.__exec_cmd("soft_estop_state", "获取软急停状态", " :--: 返回 true 表示软急停已触发,false 未触发") @property def get_cart_vel(self): # OK return self.__exec_cmd("cart_vel", "获取笛卡尔速度") @property def get_tcp_pos(self): # OK return self.__exec_cmd("tcp_pose", "获取 TCP 位姿") @property def get_tcp_vel(self): # OK return self.__exec_cmd("tcp_vel", "获取 TCP 速度") @property def get_tcp_vel_mag(self): # OK return self.__exec_cmd("tcp_vel_mag", "获取 TCP 合成线速度") @property def ext_estop_state(self): # OK return self.__exec_cmd("ext_estop_state", "获取外部轴急停状态", " :--: 返回 true 表示外部轴急停已触发,false 未触发") @property def hand_estop_state(self): # OK return self.__exec_cmd("hand_estop_state", "获取手持急停状态", " :--: 返回 true 表示手持急停已触发,false 未触发") @property def collaboration_state(self): # OK return self.__exec_cmd("collaboration_state", "获取协作模式状态(其实就是缩减模式)", " :--: 返回 true 表示协作模式,false 非协作模式") def __exec_cmd(self, directive, description, more_desc=""): self.s_string(directive) time.sleep(clibs.INTERVAL/2) result = self.r_string(directive).strip() clibs.logger("DEBUG", "openapi", f"ec: 执行{description}指令是 {directive},返回值为 {result}{more_desc}") return result class PreDos(object): def __init__(self, ip, ssh_port, username, password): self.__ssh = None self.__sftp = None self.ip = ip self.ssh_port = ssh_port self.username = username self.password = password def __ssh2server(self): try: self.__ssh = SSHClient() self.__ssh.set_missing_host_key_policy(AutoAddPolicy()) self.__ssh.connect(hostname=self.ip, port=self.ssh_port, username=self.username, password=self.password) self.__sftp = self.__ssh.open_sftp() except Exception as err: clibs.logger("ERROR", "openapi", f"predos: SSH 无法连接到 {self.ip}:{self.ssh_port},需检查网络连通性或者登录信息是否正确 {err}", "red") def push_prj_to_server(self, prj_file): # prj_file:本地工程完整路径 self.__ssh2server() prj_name = prj_file.split("/")[-1].split(".")[0] self.__sftp.put(prj_file, f"/tmp/{prj_name}.zip") cmd = f"cd /tmp; mkdir {prj_name}; unzip -d {prj_name} -q {prj_name}.zip; rm -rf /tmp/{prj_name}.zip; " cmd += f"sudo rm -rf /home/luoshi/bin/controller/projects/{prj_name}; " cmd += f"sudo mv /tmp/{prj_name}/ /home/luoshi/bin/controller/projects/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close() def pull_prj_from_server(self, prj_name, local_prj_path): # NG | 可以拉取文件,但是导入之后,有问题 # prj_name:要拉取的服务端工程名 # local_prj_path:本地工程文件的完整路径 self.__ssh2server() cmd = f"cd /tmp/; sudo rm -rf {prj_name}*; sudo cp -rf /home/luoshi/bin/controller/projects/{prj_name} .; " cmd += f"sudo zip -q -r {prj_name}.zip {prj_name}/; sudo rm -rf {prj_name}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") print(stdout.read().decode()) # 需要read一下才能正常执行 print(stderr.read().decode()) self.__sftp.get(f"/tmp/{prj_name}.zip", local_prj_path) cmd = f"sudo rm -rf /tmp/{prj_name}.zip" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") print(stdout.read().decode()) # 需要read一下才能正常执行 print(stderr.read().decode()) self.__ssh.close() def push_file_to_server(self, local_file, server_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() filename = local_file.split("\\")[-1].split("/")[-1] self.__sftp.put(local_file, f"/tmp/{filename}") cmd = f"sudo mv /tmp/{filename} {server_file}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close() def pull_file_from_server(self, server_file, local_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() cmd = f"sudo cp {server_file} /tmp/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() filename = server_file.split("/")[-1] self.__sftp.get(f"/tmp/{filename}", f"{local_file}") cmd = f"sudo rm -rf /tmp/{filename}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(self.password + "\n") stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() self.__ssh.close() class RobotInit(object): @staticmethod def modbus_init(): clibs.c_pd = PreDos(clibs.ip_addr, clibs.ssh_port, clibs.username, clibs.password) # 推送配置文件 msg_id = clibs.c_hr.execution("controller.get_params") records = clibs.c_hr.get_from_id(msg_id) for record in records: if "请求发送成功" not in record[0]: robot_params = eval(record[0]) robot_type = robot_params["data"]["robot_type"] security_type = robot_params["data"]["security_type"] controller_type = robot_params["data"]["controller_type"] io_device_file = "_".join(["io_device", controller_type, security_type, "1"]) user_settings = "/home/luoshi/bin/controller/user_settings" interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}" config_files = [ f"{clibs.PREFIX}/files/projects/fieldbus_device.json", f"{clibs.PREFIX}/files/projects/registers.json", f"{clibs.PREFIX}/files/projects/registers.xml" ] for config_file in config_files: filename = config_file.split("/")[-1] clibs.c_pd.push_file_to_server(config_file, f"{user_settings}/{filename}") clibs.c_pd.push_file_to_server(config_file, f"{interactive_data}/{filename}") io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6} io_device_file_local = f"{io_device_file}" io_device_file_local_tmp = f"{io_device_file}_tmp" io_device_file_remote = f"{user_settings}/{io_device_file}" clibs.c_pd.pull_file_from_server(io_device_file_remote, io_device_file_local) with open(io_device_file_local, mode="r", encoding="utf-8") as f: data = json.load(f) for _ in data["device_list"]: if _["extend_attr"].get("name", None) == "autotest": break else: data["device_list"].append(io_device_autotest) with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp: json.dump(data, f_tmp, indent=4) clibs.c_pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}") clibs.c_pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}") try: os.remove(io_device_file_local) os.remove(io_device_file_local_tmp) except: ... clibs.c_hr.execution("fieldbus_device.load_cfg") clibs.c_hr.execution("fieldbus_device.set_params", device_name="autotest", enable=True) clibs.c_hr.execution("io_device.load_cfg") clibs.c_hr.execution("modbus.load_cfg") def robot_init(self): pd = PreDos(clibs.ip_addr, clibs.ssh_port, clibs.username, clibs.password) # 推送配置文件 print("init: 推送配置文件 fieldbus_device.json/registers.json/registers.xml 到控制器,并配置 IO 设备,设备号为 7...") robot_params = clibs.c_hr.get_robot_params robot_type = robot_params["robot_type"] security_type = robot_params["security_type"] controller_type = robot_params["controller_type"] io_device_file = "_".join(["io_device", controller_type, security_type, "1"]) user_settings = "/home/luoshi/bin/controller/user_settings" interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}" config_files = [ "..\\assets\\configs\\fieldbus_device.json", "..\\assets\\configs\\registers.json", "..\\assets\\configs\\registers.xml" ] for config_file in config_files: filename = config_file.split("\\")[-1] pd.push_file_to_server(config_file, f"{user_settings}/{filename}") pd.push_file_to_server(config_file, f"{interactive_data}/{filename}") io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6} io_device_file_local = f"..\\assets\\configs\\{io_device_file}" io_device_file_local_tmp = f"..\\assets\\configs\\{io_device_file}_tmp" io_device_file_remote = f"{user_settings}/{io_device_file}" pd.pull_file_from_server(io_device_file_remote, io_device_file_local) with open(io_device_file_local, mode="r", encoding="utf-8") as f: data = json.load(f) for _ in data["device_list"]: if _["extend_attr"].get("name", None) == "autotest": break else: data["device_list"].append(io_device_autotest) with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp: json.dump(data, f_tmp, indent=4) pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}") pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}") clibs.c_hr.reload_io() clibs.c_hr.reload_registers() clibs.c_hr.reload_fieldbus() clibs.c_hr.set_fieldbus_device_params("autotest", True) # 触发急停并恢复 clibs.c_md.r_soft_estop(0) clibs.c_md.r_soft_estop(1) # 断开示教器连接 print("init: 断开示教器连接...") clibs.c_hr.switch_tp_mode("without") # 清空 system IO 配置 print("init: 清空所有的 System IO 功能配置...") clibs.c_hr.update_system_io_configuration([], [], [], [], []) # 关闭缩减模式 clibs.c_md.r_reduced_mode(0) # 打开软限位 print("init: 打开软限位开关...") clibs.c_hr.set_soft_limit_params(enable=True) # 关闭安全区域 print("init: 正在关闭所有的安全区,并关闭总使能开关...") clibs.c_hr.set_safety_area_overall(False) clibs.c_hr.set_safety_area_signal(False) for i in range(10): clibs.c_hr.set_safety_area_enable(i, False) # 打开外部通信,并设置控制器时间同步 print("init: 配置并打开外部通信,默认服务器,8080端口,后缀为 \"\\r\"...") clibs.c_hr.set_socket_params(True, "8080", "\r", 1) clibs.c_ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))) # 关闭拖动 if robot_type.upper()[:2] not in ["XB", "NB"]: print("init: 关闭拖动模式...") clibs.c_hr.set_drag_params(False, 1, 2) # 关闭碰撞检测 print("init: 关闭碰撞检测...") clibs.c_hr.set_collision_params(False, 0, 1, 100) # 清除所有过滤错误码 print("init: 清除所有过滤错误码设定...") clibs.c_hr.set_filtered_error_code("clear", []) # 回拖动位姿 print("init: 正在回拖动位姿...") clibs.c_hr.switch_operation_mode("manual") clibs.c_hr.switch_motor_state("on") clibs.c_hr.set_quickturn_pos(enable_drag=True) clibs.c_hr.move2quickturn("drag") while True: if clibs.c_md.w_robot_is_moving: time.sleep(1) else: break clibs.c_hr.stop_move(1) clibs.c_hr.switch_motor_state("off") clibs.c_hr.close() # 清除所有告警 clibs.c_md.r_clear_alarm() def fw_updater(self, local_file_path): fw_size = os.path.getsize(local_file_path) if fw_size > 10000000: # get previous version of xCore version_previous = clibs.hr.get_robot_params["version"] # var def remote_file_path = "./upgrade/lircos.zip" fw_content = bytearray() package_data = bytearray() package_data_with_head = bytearray() # socket connect print(f"update firmware: 正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...") try: tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_socket.connect((clibs.ip_addr, clibs.upgrade_port)) tcp_socket.setblocking(True) except Exception as Err: print(f"update firmware: {Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...") raise Exception("UpgradeFailed") # get firmware content of binary format print(f"update firmware: 正在读取 {local_file_path} 文件内容...") with open(local_file_path, "rb") as f_fw: fw_content += f_fw.read() # construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148 print("update firmware: 正在构造数据包,以及包头...") # 1 protocol id protocol_id = c_ushort(socket.htons(0)) # 2 bytes package_data += protocol_id # 2 write type write_type = c_ubyte(0) package_data += bytes(write_type) # 3 md5 md5_hash = hashlib.md5() md5_hash.update(fw_content) fw_md5 = md5_hash.hexdigest() i = 0 while i < len(fw_md5): _ = (fw_md5[i:i + 2]) package_data += bytes.fromhex(_) i += 2 # 4 remote path len remote_file_path_len = c_ushort(socket.htons(len(remote_file_path))) package_data += remote_file_path_len # 5 remote path package_data += remote_file_path.encode("ascii") # 6 file package_data += fw_content # construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045 # 1 get package data with header package_size = c_uint(socket.htonl(len(package_data))) package_type = c_ubyte(1) # 0-无协议 1-文件 2-json package_reserve = c_ubyte(0) # 预留位 package_data_with_head += package_size package_data_with_head += package_type package_data_with_head += package_reserve package_data_with_head += package_data # 2 send data to server print("update firmware: 正在发送数据到 xCore,升级控制器无需重启,升级配置文件会自动软重启...") start = 0 len_of_package = len(package_data_with_head) while start < len_of_package: end = 10240 + start if end > len_of_package: end = len_of_package sent = tcp_socket.send((package_data_with_head[start:end])) time.sleep(0.01) if sent == 0: raise RuntimeError("socket connection broken") else: start += sent waited = 5 if fw_size > 10000000 else 25 time.sleep(waited) if fw_size > 10000000: # get current version of xCore version_current = clibs.c_hr.get_robot_params["version"] print(f"update firmware: 控制器升级成功:from {version_previous} to {version_current} :)") else: print(f"update firmware: 配置文件升级成功 :)") tcp_socket.close() class UpgradeJsonCmd(object): def __init__(self): self.c = None self.__sock_conn() def __sock_conn(self): # socket connect print(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...") try: self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.connect((clibs.ip_addr, clibs.upgrade_port)) self.c.setblocking(True) self.c.settimeout(3) except Exception as Err: print(f"upgrade: {Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...") raise Exception("UpgradePortConnFailed") @staticmethod def __do_package(cmd): package_size = struct.pack("!I", len(cmd)) package_type = struct.pack("B", 2) reserved_byte = struct.pack("B", 0) return package_size + package_type + reserved_byte + cmd def __recv_result(self, cmd): time.sleep(clibs.INTERVAL) try: res = self.c.recv(10240).decode() except Exception as err: print(f"upgrade: 请求命令 {cmd.decode()} 的返回错误 {err}") raise Exception("ResponseError") time.sleep(clibs.INTERVAL) return res def __exec(self, command: dict): try: self.c.recv(10240) except Exception: pass cmd = json.dumps(command, separators=(",", ":")).encode("utf-8") self.c.sendall(self.__do_package(cmd)) res = self.__recv_result(cmd) print(f"upgrade: 请求命令 {cmd.decode()} 的返回信息:{res}") def erase_cfg(self): # 一键抹除机器人配置(.rc_cfg)、交互数据配置(interactive_data),但保留用户日志 # 场景:如果xCore版本升级跨度过大,配置文件可能不兼容导致无法启动,可以使用该功能抹除配置,重新生成配置。 # 机器人参数、零点等会丢失! self.__exec({"cmd": "erase"}) def clear_rubbish(self): self.__exec({"cmd": "clearRubbish"}) def soft_reboot(self): self.__exec({"cmd": "soft_reboot"}) def version_query(self): self.__exec({"query": "version"}) def robot_reboot(self): self.__exec({"cmd": "reboot"}) def reset_passwd(self): # 不生效,有问题 self.__exec({"cmd": "passwd"}) def backup_origin(self): # xCore # .rc_cfg / # interactive_data / # module / # demo_project / # robot_cfg / # dev_eni.xml # ecat_license # libemllI8254x.so & libemllI8254x_v3.so # set_network_parameters self.__exec({"cmd": "backup_origin"}) def origin_recovery(self): self.__exec({"cmd": "recover"})