import json import socket import time import os import selectors import threading from datetime import datetime import hashlib from typing import Callable, Dict from loguru import logger from pymodbus.client.tcp import ModbusTcpClient from paramiko import SSHClient, AutoAddPolicy from codes import clibs class HmiRequest: def __init__(self): self.c = self.c_xs = self.t_rx_pkg = self.t_rx_pkg_xs = self.t_s_heart = None self.data = self.data_xs = b"" self.is_connected = True self.sock_conn() def sock_conn(self): count = 0 while True: try: self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.settimeout(clibs.interval * 3) self.c.connect((clibs.ip_addr, clibs.socket_port)) self.c.setblocking(False) self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c_xs.settimeout(clibs.interval * 3) self.c_xs.connect((clibs.ip_addr, clibs.xService_port)) self.c_xs.setblocking(False) for _ in range(3): _ = self.exec_cmd("controller.heart") time.sleep(clibs.interval/4) count += 1 if count > 3: logger.success("hr: HMI connection success") break except Exception as e: clibs.to_logdb("error", "HmiRequest", str(e)) logger.warning(f"hr: HMI connection failed, will retry after {clibs.retry_time}s") try: self.c.close() self.c_xs.close() except Exception as e: clibs.to_logdb("error", "HmiRequest", str(e)) count = 0 time.sleep(clibs.retry_time) self.t_rx_pkg = threading.Thread(target=self.rx_pkg, args=(self.c,)) self.t_rx_pkg.daemon = True self.t_rx_pkg.start() self.t_rx_pkg_xs = threading.Thread(target=self.rx_pkg, args=(self.c_xs,)) self.t_rx_pkg_xs.daemon = True self.t_rx_pkg_xs.start() self.t_s_heart = threading.Thread(target=self.s_heart) self.t_s_heart.daemon = True self.t_s_heart.start() def close(self): self.is_connected = False time.sleep(clibs.interval) try: self.c.close() self.c_xs.close() except Exception as e: clibs.to_logdb("error", "HmiRequest", str(e)) finally: logger.info("hr: HMI connection has been disconnected") def s_heart(self): while self.is_connected: self.exec_cmd("controller.heart") time.sleep(clibs.interval) @staticmethod def tx_pkg(cmd): frm = (len(cmd) + 6).to_bytes(length=2) pkg = len(cmd).to_bytes(length=4) protocol = int(2).to_bytes() reserved = int(0).to_bytes() return frm + pkg + protocol + reserved + cmd.encode() @staticmethod def tx_pkg_xs(cmd): return "".join([cmd, "\r"]).encode() def rx_pkg(self, sock): def to_read(conn, mask): data = conn.recv(clibs.MAX_RECEIVED_SIZE) if data: ip, port = sock.getpeername() func_dict[port](data) else: sel.unregister(conn) conn.close() func_dict: Dict[int, Callable[[bytes], None]] = {clibs.socket_port: self.get_response, clibs.xService_port: self.get_response_xs} try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while self.is_connected: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as e: clibs.to_logdb("error", "HmiRequest", str(e)) def get_response(self, data: bytes) -> None: def get_valid_data(): nonlocal record, frm_number, pkg_number valid_data, is_first_frame = b"", True while len(record) != 0: if is_first_frame is True: is_first_frame = False valid_data += record[8:8 + frm_number - 6] record = record[8 + frm_number - 6:] frm_number = int.from_bytes(record[:2]) valid_data += record[2:2 + frm_number] record = record[2 + frm_number:] else: if len(valid_data) != pkg_number: logger.error("hr: 数据校验失败,解包有误,需要检查!") raise Exception() return str(json.loads(valid_data.decode())) self.data += data frm_number = int.from_bytes(self.data[0:2]) pkg_number = int.from_bytes(self.data[2:6]) protocol = int.from_bytes(self.data[6:7]) reserved = int.from_bytes(self.data[7:8]) if frm_number == pkg_number + 6 and protocol == 2 and reserved == 0: record_len = pkg_number+8 record, self.data = self.data[:record_len], self.data[record_len:] clibs.to_logdb("debug", "HmiRequest", str(json.loads(record[8:].decode()))) elif frm_number == clibs.max_frame_size and pkg_number > clibs.max_frame_size - 8 and protocol == 2 and reserved == 0: one_or_zero = 1 if (pkg_number - 1018) % clibs.max_frame_size > 0 else 0 record_len = pkg_number + 8 + ((pkg_number - 1018) // clibs.max_frame_size + one_or_zero) * 2 if len(self.data) < record_len: return record, self.data = self.data[:record_len], self.data[record_len:] clibs.to_logdb("debug", "HmiRequest", get_valid_data()) else: logger.critical(f"hr: 其他未考虑到的情况:data = {self.data}") raise Exception() def get_response_xs(self, data: bytes) -> None: self.data_xs += data res = self.data_xs.split(b"\r", 1) if len(res) != 2: return record, self.data_xs = res clibs.to_logdb("debug", "HmiRequest", str(json.loads(record))) @staticmethod def get_from_id(msg_id: str) -> str: f_text, flag, records, time_delay = f"%{msg_id}%", False, "Null", clibs.interval*10 cmd, ts = msg_id.split("@") t = ts.replace("T", " ") t_ts = datetime.timestamp(datetime.strptime(t, "%Y-%m-%d %H:%M:%S.%f")) t_send = datetime.strftime(datetime.fromtimestamp(t_ts-time_delay), '%Y-%m-%d %H:%M:%S.%f') t_receive = datetime.strftime(datetime.fromtimestamp(t_ts+time_delay), '%Y-%m-%d %H:%M:%S.%f') for idx in range(time_delay): time.sleep(clibs.interval) try: clibs.lock.acquire(True) clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{t_send}' AND '{t_receive}' AND content LIKE '{f_text}'") records = clibs.cursor.fetchall() flag = True if len(records) == 1 else False finally: clibs.lock.release() if flag is True: return records[0][0] else: logger.error(f"hr: {time_delay}s内无法找到请求 {msg_id} 的响应!") # raise Exception() def exec_cmd(self, cmd, **kwargs): try: with open(f"{clibs.base_path}/assets/protocols/{cmd}.json", mode="r", encoding="utf-8") as f_cmd: req = json.load(f_cmd) t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f") req["id"] = f"{cmd}@{t}" p_type = req["p_type"] del req["p_type"] except FileNotFoundError as err: logger.error(f"hr: 暂不支持 {cmd} 功能,或确认该功能存在,err = {err}") raise Exception() match cmd: case "state.set_tp_mode" | "overview.set_autoload" | "overview.reload" | "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop" | "rl_task.set_run_params" | "diagnosis.set_params" | "diagnosis.open" | "drag.set_params" | "controller.set_params" | "collision.set_params" | "move.set_quickstop_distance" | "move.set_params" | "move.set_monitor_cfg" | "modbus.get_values" | "modbus.set_params" | "system_io.update_configuration" | "diagnosis.get_params" | "jog.set_params" | "jog.start" | "move.stop" | "move.quick_turn" | "move.set_quickturn_pos" | "soft_limit.set_params" | "fieldbus_device.set_params" | "socket.set_params" | "diagnosis.save" | "register.set_value": req["data"].update(kwargs) case "safety.safety_area.signal_enable" | "safety.safety_area.overall_enable" | "safety.safety_area.safety_area_enable" | "safety.safety_area.set_param": req["c"].update(kwargs) case "log_code.data.code_list": req["s"]["log_code.data"]["code_list"] = kwargs["code_list"] case _: pass cmd = json.dumps(req, separators=(",", ":")) if p_type == 0: self.c.send(self.tx_pkg(cmd)) elif p_type == 1: self.c_xs.send(self.tx_pkg_xs(cmd)) else: raise Exception("Unexpected CMD Input") return req["id"] class ModbusRequest: def __init__(self): self.c = None self.sock_conn() def sock_conn(self): self.c = ModbusTcpClient(host=clibs.ip_addr, port=clibs.modbus_port) if self.c.connect(): logger.success(f"md: Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) success!") else: logger.error(f"md: Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) failed!") raise Exception() def __reg_high_pulse(self, addr: int, desc: str) -> None: self.c.write_register(addr, 0) time.sleep(clibs.interval/4) self.c.write_register(addr, 1) time.sleep(clibs.interval/4) self.c.write_register(addr, 0) logger.info(f"md: {addr}-010 执行{desc}的操作") def r_clear_alarm(self): self.__reg_high_pulse(40000, "清除告警信息") def r_reset_estop(self): self.__reg_high_pulse(40001, "复位外部急停状态") def r_reset_estop_clear_alarm(self): self.__reg_high_pulse(40002, "复位外部急停状态,并清除告警信息") def r_motor_off(self): self.__reg_high_pulse(40004, "上电") def r_motor_on(self): self.__reg_high_pulse(40005, "下电") def r_motor_on_off(self, action: bool): self.c.write_register(40006, action) desc = {False: "下电", True: "上电"} logger.info(f"md: 40006-{action} 执行{desc[action]}的操作") time.sleep(clibs.interval) def r_motoron_pp2main_start(self): self.__reg_high_pulse(40007, "上电/pp2main/开始运行程序") def r_motoron_start(self): self.__reg_high_pulse(40008, "上电/开始运行程序") def r_pulse_motoroff(self): self.__reg_high_pulse(40009, "停止并下电") def r_pp2main(self): self.__reg_high_pulse(40010, "pp2main") def r_program_start(self): self.__reg_high_pulse(40011, "启动程序运行") def r_program_stop(self): self.__reg_high_pulse(40012, "停止程序运行") def r_program_start_stop(self, action: bool): self.c.write_register(40013, action) desc = {False: "停止程序运行", True: "启动程序运行"} logger.info(f"md: 40013-{action} 执行{desc[action]}的操作") time.sleep(clibs.interval) def r_set_program_speed(self, speed: int): result = self.c.convert_to_registers(int(speed), self.c.DATATYPE.INT32, "little") self.c.write_registers(40014, result) logger.info(f"md: 40014 设置程序运行速度为程序给定速度的{speed}%") def r_soft_estop(self, action: bool): self.c.write_register(40015, action) desc = {False: "触发软急停状态", True: "解除软急停状态"} logger.info(f"md: 40015-{action} 执行{desc[action]}的操作") time.sleep(clibs.interval) def r_switch_auto_motoron(self): self.__reg_high_pulse(40016, "切换为自动模式并上电") def r_switch_auto(self): self.__reg_high_pulse(40017, "切换为自动模式") def r_switch_manual(self): self.__reg_high_pulse(40018, "切换为手动模式") def r_switch_auto_manual(self, action: bool): self.c.write_register(40019, action) desc = {False: "切换为手动模式", True: "切换为自动模式"} logger.info(f"md: 40019-{action} 执行{desc[action]}的操作") time.sleep(clibs.interval) def r_reduced_mode(self, action: bool): self.c.write_register(40020, action) desc = {False: "退出缩减模式", True: "进入缩减模式"} logger.info(f"md: 40020-{action} 执行{desc[action]}的操作") time.sleep(clibs.interval) def r_switch_safe_region(self, idx: int, enable: bool): addr = 40020 + idx if enable: self.c.write_register(addr, False) time.sleep(clibs.interval) self.c.write_register(addr, True) else: self.c.write_register(addr, True) time.sleep(clibs.interval) self.c.write_register(addr, False) desc = {False: "关闭安全区域", True: "打开安全区域"} logger.info(f"md: {addr}-{enable} 执行{desc[enable]}[{idx}]的操作") def __get_state(self, addr: int, count: int = 1, **kwargs): res = self.c.read_holding_registers(addr, count=count).registers[0] logger.info(f"md: {addr}-{kwargs['func']},结果为{res} {kwargs['desc']}") return res @property def w_alarm_state(self): return self.__get_state(41000, func="告警状态", desc=":--: 0表示无告警,1表示有告警") @property def w_sta_collision(self): return self.__get_state(41001, func="碰撞检测状态", desc=":--: 0表示无碰撞,1表示检测到碰撞") @property def w_sta_collision_alarm(self): return self.__get_state(41002, func="碰撞检测报警", desc=":--: 0表示无碰撞,1表示检测到碰撞") @property def w_sta_collision_open(self): return self.__get_state(41003, func="碰撞检测开启状态", desc=":--: 0表示未开启,1表示已开启") @property def w_controller_is_running(self): return self.__get_state(41004, func="控制器运行状态", desc=":--: 0表示未运行,1表示已运行") @property def w_encoder_low_battery(self): return self.__get_state(41005, func="编码器低电压报警状态", desc=":--: 0表示正常,1表示异常") @property def w_sta_error_code(self): result = self.c.read_holding_registers(41006, count=2) error_code = self.c.convert_from_registers(registers=result.registers, data_type=self.c.DATATYPE.INT16, word_order="little")[0] return error_code+30000 @property def w_sta_estop(self): return self.__get_state(41007, func="外部急停状态", desc=":--: 0表示非急停,1表示急停") @property def w_sta_heartbeat(self): return self.__get_state(41008, func="心跳电平状态", desc=":--: 0表示低电平,1表示高电平") @property def w_sta_home(self): return self.__get_state(41009, func="当前点是否是home点", desc=":--: 0表示非home点,1表示home点") @property def w_sta_motor(self): return self.__get_state(41011, func="上电状态", desc=":--: 0表示下电,1表示上电") @property def w_sta_operation_mode(self): return self.__get_state(41012, func="运行模式", desc=":--: 0表示手动,1表示自动") @property def w_sta_program(self): return self.__get_state(41013, func="程序运行状态", desc=":--: 0表示未运行,1表示正在运行") @property def w_sta_program_full(self): result = self.c.read_holding_registers(41014, count=2) return self.c.convert_from_registers(registers=result.registers, data_type=self.c.DATATYPE.INT16, word_order="little")[0] @property def w_sta_program_not_run(self): return self.__get_state(41015, func="程序未运行状态", desc=":--: 0表示正在运行,1表示未运行") @property def w_sta_program_reset(self): return self.__get_state(41016, func="程序指针复位状态", desc=":--: 0表示未复位,1表示已复位") @property def w_sta_program_speed(self): result = self.c.read_holding_registers(41017, count=2) return self.c.convert_from_registers(registers=result.registers, data_type=self.c.DATATYPE.INT16, word_order="little")[0] @property def w_sta_reduce_mode(self): return self.__get_state(41018, func="缩减模式状态", desc=":--: 0表示非缩减模式,1表示缩减模式") @property def w_sta_robot_is_busy(self): return self.__get_state(41019, func="机器人忙碌状态", desc=":--: 0表示空闲,1表示忙碌") @property def w_sta_robot_is_moving(self): return self.__get_state(41020, func="机器人运动状态", desc=":--: 0表示非运动状态,1表示正在运动") @property def w_safe_door_state(self): return self.__get_state(41021, func="安全门状态", desc=":--: 0表示未触发,1表示已触发") @property def w_sta_safe_jnt_pos(self): res = [self.c.read_holding_registers(41022+idx, count=1).registers[0] for idx in range(8)] logger.info(f"md: 41022-41029-安全位置触发状态 结果为{res}") return res @property def w_sta_soft_estop(self): return self.__get_state(41030, func="软急停状态", desc=":--: 0表示未触发,1表示已触发") @property def w_sta_safe_region(self): res = [self.c.read_holding_registers(41031+idx, count=1).registers[0] for idx in range(10)] logger.info(f"md: 41031-41040-安全区域触发状态 结果为{res} :--: 0表示未触发,1表示已触发") return res @property def w_sta_on_path(self): return self.__get_state(41030, func="当前是否在预设轨迹上", desc=":--: 0表示偏离预设轨迹,1表示在规划轨迹上") @property def w_sta_near_path(self): return self.__get_state(41030, func="当前是否在预设轨迹附近", desc=":--: 0表示偏离预设轨迹附近的标准,1表示在规划轨迹附近") @property def w_sta_cart_pose(self) -> list: res = self.c.read_holding_registers(41043, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_cart_vel(self) -> list: res = self.c.read_holding_registers(41057, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_jnt_pose(self) -> list: res = self.c.read_holding_registers(41071, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_jnt_vel(self) -> list: res = self.c.read_holding_registers(41085, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_tcp_pose(self) -> list: res = self.c.read_holding_registers(41113, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_tcp_vel(self) -> list: res = self.c.read_holding_registers(41127, count=14) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") @property def w_sta_tcp_vel_mag(self): res = self.c.read_holding_registers(41141, count=2) return self.c.convert_from_registers(registers=res.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little") def io_write_coils(self, addr, value: bool): # e.g. io_write_coils(0, 1) 给0号DI赋值为1 self.c.write_coils(addr, [value, ]) logger.info(f"md: 执行给DI地址{addr}赋值为{value}") def io_read_coils(self): res = self.c.read_coils(0, count=16).bits logger.info(f"md: 执行读取所有DI的结果为{res}") return res def io_read_discretes(self): res = self.c.read_discrete_inputs(0, count=16).bits logger.info(f"md: 执行读取所有DO的结果为{res}") return res class EcRequest: def __init__(self): self.c = None self.exec_desc = " :--: true表示执行成功,false表示执行失败" self.sock_conn() def sock_conn(self): try: self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.setblocking(False) self.c.settimeout(clibs.interval*3) self.c.connect((clibs.ip_addr, clibs.external_port)) logger.success(f"ec: 外部通信连接成功") except Exception as e: logger.success(f"ec: 外部通信连接失败 -- {e}") raise Exception() def sr_string(self, directive, description, more_desc=""): self.s_string(directive) time.sleep(clibs.interval/2) result = self.r_string() logger.info(f"ec: 执行{description}指令是{directive},返回值为{result}{more_desc}") return result def s_string(self, directive: str): cmd = "".join([directive, clibs.suffix]) self.c.send(cmd.encode()) def r_string(self): result = self.c.recv(10240).decode().strip() return result # =================================== ↓↓↓ specific functions ↓↓↓ =================================== def motor_on(self): # auto | manual return self.sr_string("motor_on", "机器人上电", self.exec_desc) def motor_off(self): # auto | manual return self.sr_string("motor_off", "机器人下电", self.exec_desc) def pp_to_main(self): # auto return self.sr_string("pp_to_main", "程序pptomain", self.exec_desc) def program_start(self): # auto return self.sr_string("start", "程序启动(必要时需清告警)", self.exec_desc) def program_stop(self): # auto return self.sr_string("stop", "程序停止", self.exec_desc) def clear_alarm(self): # auto | manual return self.sr_string("clear_alarm", "清除伺服报警", self.exec_desc) def switch_operation_auto(self): # auto | manual return self.sr_string("switch_mode:auto", "切换到自动模式", self.exec_desc) def switch_operation_manual(self): # auto | manual return self.sr_string("switch_mode:manual", "切换到手动模式", self.exec_desc) def open_drag_mode(self): # manual return self.sr_string("open_drag", "打开拖动", self.exec_desc) def close_drag_mode(self): # manual return self.sr_string("close_drag", "关闭拖动", self.exec_desc) def get_project_list(self): # auto | manual return self.sr_string("list_prog", "获取工程列表") def get_current_project(self): # auto | manual return self.sr_string("current_prog", "当前工程") def load_project(self, project_name): # auto | manual return self.sr_string(f"load_prog:{project_name}", "加载工程", self.exec_desc) def estop_reset(self): # auto | manual | 复原外部 IO 急停和安全门急停,前提是硬件已复原 return self.sr_string("estop_reset", "急停复位", self.exec_desc) def estopreset_and_clearalarm(self): # auto | manual | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原 return self.sr_string("estopreset_and_clearalarm", "急停复位并清除报警", self.exec_desc) def motoron_pp2main_start(self): # auto | manual return self.sr_string("motoron_pptomain_start", "依次执行上电,程序指针到main,启动程序(必要时需清告警)", self.exec_desc) def motoron_start(self): # auto | manual return self.sr_string("motoron_start", "依次执行上电,启动程序(必要时需清告警)", self.exec_desc) def pause_motoroff(self): # auto | manual return self.sr_string("pause_motoroff", "暂停程序并下电", self.exec_desc) def set_program_speed(self, speed: int = 20): # auto | manual | 1-100 return self.sr_string(f"set_program_speed:{speed}", "设置程序运行速率(滑块)", self.exec_desc) def set_soft_estop(self, enable: str): # auto | manual return self.sr_string(f"set_soft_estop:{enable}", "触发(true)/解除(false)机器人软急停", self.exec_desc) def switch_auto_motoron(self): # auto | manual return self.sr_string("switch_auto_motoron", "切换自动模式并上电", self.exec_desc) def open_safe_region(self, number: int): # auto | manual | 1-10 return self.sr_string(f"open_safe_region:{number}", f"打开第 {number} 个安全区域(信号控制开关需打开)", self.exec_desc) def close_safe_region(self, number: int): # auto | manual | 1-10 return self.sr_string(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域(信号控制开关需打开)", self.exec_desc) def open_reduced_mode(self): # auto | manual return self.sr_string("open_reduced_mode", "开启缩减模式", self.exec_desc) def close_reduced_mode(self): # auto | manual return self.sr_string("close_reduced_mode", "关闭缩减模式", self.exec_desc) def setdo_value(self, do_name: str, do_value: str): # - | do_value 为 true/false return self.sr_string(f"setdo:{do_name},{do_value}", f"设置 {do_name} 的值为 {do_value} ", self.exec_desc) def set_robot_time(self, robot_time: str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())): # auto | manual | YY-mm-dd HH:MM:SS return self.sr_string(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time} 的", self.exec_desc) # -------------------------------------------------------------------------------------------------- @property def motor_on_state(self): # auto | manual return self.sr_string("motor_on_state", "获取上电状态", " :--: 返回true表示已上电,false表示已下电") @property def robot_running_state(self): # auto | manual | 只针对运动任务和常规任务 return self.sr_string("robot_running_state", "获取程序运行状态", " :--: 返回true表示正在运行,false表示未运行") @property def estop_state(self): # auto | manual | 只表示外部急停,安全门触发不会返回true,只要有急停标识E字母,且非软急停就会返回true return self.sr_string("estop_state", "急停状态", " :--: 返回true表示处于急停状态,false表示非急停") @property def operation_mode(self): # auto | manual return self.sr_string("operating_mode", "获取工作模式", " :--: 返回true表示自动模式,false手动模式") @property def home_state(self): # # auto | manual | 需要设置一下 "HMI 设置->快速调整" return self.sr_string("home_state", "获取HOME输出状态", " :--: 返回true表示法兰中心处于HOME点,false 未处于HOME点") @property def fault_state(self): # - return self.sr_string("fault_state", "获取 故障状态", " :--: 返回true表示处于故障状态,false表示非故障状态") @property def collision_state(self): # - return self.sr_string("collision_state", "获取碰撞触发状态", " :--: 返回true表示碰撞已触发,false表示未触发") @property def task_state(self): # auto | manual | 只适用于运动任务和常规任务 return self.sr_string("task_state", "获取机器人运行任务状态", " :--: 返回program表示任务正在运行,ready表示未运行") @property def get_cart_pos(self): # auto | manual | cart_pos/cart_pos_name都可以正常返回,区别在于前者直接返回value,后者返回的是"cart_pos" : value的形式 return self.sr_string("cart_pos", "获取笛卡尔位置") @property def get_joint_pos(self): # auto | manual | jnt_pos/jnt_pos_name都可以正常返回,区别在于前者直接返回value,后者返回的是"joint_pos" : value的形式 return self.sr_string("jnt_pos", "获取轴位置") @property def get_joint_vel(self): # auto | manual | jnt_vel/jnt_vel_name都可以正常返回,区别在于前者直接返回value,后者返回的是"jnt_vel" : value的形式 return self.sr_string("jnt_vel", "获取轴速度") @property def get_joint_trq(self): # auto | manual | jnt_trq/jnt_trq_name都可以正常返回,区别在于前者直接返回value,后者返回的是"jnt_trq" : value的形式 return self.sr_string("jnt_trq", "获取轴力矩") @property def reduced_mode_state(self): # auto | manual return self.sr_string("reduced_mode_state", "获取缩减模式状态", " :--: 返回true表示缩减模式,false表示非缩减模式") def get_io_state(self, io_list: str): # - | DO0_0,DI1_3,DO2_5,不能有空格 return self.sr_string(f"io_state:{io_list}", "获取 IO 状态值") @property def alarm_state(self): # auto | manual return self.sr_string("alarm_state", "获取报警状态", " :--: 返回true表示当前有告警,false 没有告警") @property def collision_alarm_state(self): # - return self.sr_string("collision_alarm_state", "获取碰撞报警状态", " :--: 返回true表示有碰撞告警,false表示没有碰撞告警") @property def collision_open_state(self): # auto | manual return self.sr_string("collision_open_state", "获取碰撞检测开启状态", " :--: 返回true表示已开启碰撞检测,false表示未开启") @property def controller_is_running(self): # auto | manual return self.sr_string("controller_is_running", "判断控制器是否开机", " :--: 返回true表示控制器正在运行,false表示未运行") @property def encoder_low_battery_state(self): # auto | manual return self.sr_string("encoder_low_battery_state", "编码器低电压报警状态", " :--: 返回true表示编码器处于低电压状态,false表示电压正常") @property def robot_error_code(self): # auto | manual return self.sr_string("robot_error_code", "获取机器人错误码") @property def rl_pause_state(self): # - # 0 -- 初始化状态,刚开机上电时 # 1 -- RL 运行中 # 2 -- HMI 暂停 # 3 -- 系统 IO 暂停 # 4 -- 寄存器功能码暂停 # 5 -- 外部通讯暂停 # 6 -- # 7 -- Pause 指令暂停 # 8 -- # 9 -- # 10 -- 外部 IO 急停 # 11 -- 安全门急停 # 12 -- 其他因素停止,比如碰撞检测 return self.sr_string("program_full", "获取RL的暂停状态", " :--: 返回值含义详见功能定义") @property def program_reset_state(self): # auto | manual return self.sr_string("program_reset_state", "获取程序复位状态", " :--: 返回true表示指针指向main,false表示未指向main") @property def program_speed_value(self): # auto | manual | 速度滑块 return self.sr_string("program_speed", "获取程序运行速度") @property def robot_is_busy(self): # auto | manual | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发 return self.sr_string("robot_is_busy", "获取程序忙碌状态", " :--: 返回1表示控制器忙碌,0表示非忙碌状态") @property def robot_is_moving(self): # auto | manual return self.sr_string("robot_is_moving", "获取程序运行状态", " :--: 返回true表示机器人正在运动,false未运动") @property def safe_door_state(self): # - return self.sr_string("safe_door_state", "获取安全门状态", " :--: 返回true表示安全门已触发,false表示未触发") @property def soft_estop_state(self): # auto | manual return self.sr_string("soft_estop_state", "获取软急停状态", " :--: 返回true表示软急停已触发,false表示未触发") @property def get_cart_vel(self): # auto | manual return self.sr_string("cart_vel", "获取笛卡尔速度") @property def get_tcp_pos(self): # auto | manual return self.sr_string("tcp_pose", "获取TCP位姿") @property def get_tcp_vel(self): # auto | manual return self.sr_string("tcp_vel", "获取TCP速度") @property def get_tcp_vel_mag(self): # auto | manual return self.sr_string("tcp_vel_mag", "获取TCP合成线速度") @property def ext_estop_state(self): # - return self.sr_string("ext_estop_state", "获取外部轴急停状态", " :--: 返回true表示外部轴急停已触发,false表示未触发") @property def hand_estop_state(self): # - return self.sr_string("hand_estop_state", "获取手持急停状态", " :--: 返回true表示手持急停已触发,false表示未触发") @property def sta_on_path(self): # auto | manual return self.sr_string("sta_on_path", "获取当前坐标是否在路径上", " :--: 返回true表示在规划路径上,false表示不在规划路径上") @property def sta_near_path(self): # - return self.sr_string("sta_near_path", "获取当前坐标是否在路径附近", " :--: 返回true表示在规划路径附近,false表示不在规划路径附近") class PreDos(object): def __init__(self): self.__ssh = None self.__sftp = None def __ssh2server(self): try: self.__ssh = SSHClient() self.__ssh.set_missing_host_key_policy(AutoAddPolicy()) self.__ssh.connect(hostname=clibs.ip_addr, port=clibs.ssh_port, username=clibs.username, password=clibs.password) self.__sftp = self.__ssh.open_sftp() # logger.success("pd: SSH连接成功") except Exception as e: logger.error(f"pd: SSH无法连接,需检查网络连通性或者登录信息是否正确 -- {e}") raise Exception() def push_prj_to_server(self, prj_file): # prj_file:工程的本地完整路径 self.__ssh2server() prj_name, postfix = os.path.basename(prj_file).split(".") self.__sftp.put(prj_file, f"/tmp/{prj_name}.zip") cmd = f"cd /tmp; mkdir {prj_name}; unzip -q {prj_name}.zip; rm -rf /tmp/{prj_name}.zip; " cmd += f"sudo rm -rf /home/luoshi/bin/controller/projects/{prj_name}; " cmd += f"sudo mv /tmp/{prj_name}/ /home/luoshi/bin/controller/projects/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() # 需要read一下才能正常执行 _ = stderr.read().decode() self.__ssh.close() def pull_prj_from_server(self, prj_name, local_prj_path): # prj_name:要拉取的服务端工程名 # local_prj_path:存放工程文件的本地路径 self.__ssh2server() cmd = f"cd /tmp/; sudo rm -rf {prj_name}*; sudo cp -rf /home/luoshi/bin/controller/projects/{prj_name} .; " cmd += f"sudo zip -q -r {prj_name}.zip {prj_name}/; sudo rm -rf {prj_name}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() _ = stderr.read().decode() self.__sftp.get(f"/tmp/{prj_name}.zip", f"{local_prj_path}/{prj_name}.zip") cmd = f"sudo rm -rf /tmp/{prj_name}.zip" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() _ = stderr.read().decode() self.__ssh.close() def push_file_to_server(self, local_file, server_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() filename = os.path.basename(local_file) self.__sftp.put(local_file, f"/tmp/{filename}") cmd = f"sudo mv /tmp/{filename} {server_file}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() # 需要read一下才能正常执行 _ = stderr.read().decode() self.__ssh.close() def pull_file_from_server(self, server_file, local_file): # local_file:本地文件完整路径 # server_file:服务端文件完整路径 self.__ssh2server() cmd = f"sudo cp {server_file} /tmp/" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() # 需要read一下才能正常执行 _ = stderr.read().decode() filename = server_file.split("/")[-1] self.__sftp.get(f"/tmp/{filename}", f"{local_file}") cmd = f"sudo rm -rf /tmp/{filename}" stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True) stdin.write(clibs.password + "\n") _ = stdout.read().decode() # 需要read一下才能正常执行 _ = stderr.read().decode() self.__ssh.close() class RobotInit(object): def __init__(self): self.pd = PreDos() self.hr = HmiRequest() def robot_init(self): # 推送配置文件 msg_id = self.hr.exec_cmd("controller.get_params") record = self.hr.get_from_id(msg_id) robot_params = eval(record) robot_type = robot_params["data"]["robot_type"] security_type = robot_params["data"]["security_type"] controller_type = robot_params["data"]["controller_type"] io_device_file = "_".join(["io_device", controller_type, security_type, "1"]) user_settings = "/home/luoshi/bin/controller/user_settings" interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}" config_files = [ f"{clibs.base_path}/assets/confs/fieldbus_device.json", f"{clibs.base_path}/assets/confs/registers.json", f"{clibs.base_path}/assets/confs/registers.xml" ] for config_file in config_files: filename = os.path.basename(config_file) self.pd.push_file_to_server(config_file, f"{user_settings}/{filename}") self.pd.push_file_to_server(config_file, f"{interactive_data}/{filename}") io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6} io_device_file_local = f"{io_device_file}" io_device_file_local_tmp = f"{io_device_file}_tmp" io_device_file_server = f"{user_settings}/{io_device_file}" self.pd.pull_file_from_server(io_device_file_server, io_device_file_local) with open(io_device_file_local, mode="r", encoding="utf-8") as f: data = json.load(f) for _ in data["device_list"]: if _["extend_attr"].get("name", None) == "autotest": break else: data["device_list"].append(io_device_autotest) with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp: json.dump(data, f_tmp, separators=(",", ":"), indent=4) self.pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}") self.pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}") try: os.remove(io_device_file_local) os.remove(io_device_file_local_tmp) except FileNotFoundError: pass self.hr.exec_cmd("fieldbus_device.load_cfg") self.hr.exec_cmd("fieldbus_device.set_params", device_name="autotest", enable=True) self.hr.exec_cmd("io_device.load_cfg") self.hr.exec_cmd("modbus.load_cfg") def fw_updater(self, fw_file): """ :param fw_file: xCore升级文件/机型文件的绝对路径 :return: None """ def get_xcore_version(): msg_id = self.hr.exec_cmd("controller.get_params") record = self.hr.get_from_id(msg_id) self.hr.close() return eval(record)["data"]["version"] conn = None try: conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) conn.setblocking(False) conn.settimeout(clibs.interval * 3) conn.connect((clibs.ip_addr, clibs.upgrade_port)) logger.success(f"ri: 升级服务连接成功") except Exception as e: logger.error(f"ri: 升级服务连接失败\n{e}") previous_version = get_xcore_version() fw_size = os.path.getsize(fw_file) # Byte remote_file_path = "./upgrade/lircos.zip" fw_content = package_data = package_data_with_head = b"" # construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148 # 1 protocol id protocol_id = int(0).to_bytes(length=2) # 2 write type write_type = int(0).to_bytes(length=1) # 6 file with open(fw_file, "rb") as f_fw: fw_content += f_fw.read() # 3 md5 md5_hash = hashlib.md5() md5_hash.update(fw_content) fw_md5 = bytes.fromhex(md5_hash.hexdigest()) # 4 remote file path len remote_file_path_len = len(remote_file_path).to_bytes(length=2) # 5 remote path remote_path = remote_file_path.encode() # 7 组包,注意顺序 package_data += (protocol_id + write_type + fw_md5 + remote_file_path_len + remote_path + fw_content) # construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045 # 1 get package data with header package_size = len(package_data).to_bytes(length=4) package_type = int(1).to_bytes() # 0-无协议 1-文件 2-json package_reserve = int(0).to_bytes() # 预留位 package_data_with_head += (package_size + package_type + package_reserve + package_data) # 2 send data to server logger.info("ri: 正在发送数据到xCore......") start = 0 len_of_package = len(package_data_with_head) while start < len_of_package: end = 10240 + start if end > len_of_package: end = len_of_package sent = conn.send((package_data_with_head[start:end])) # time.sleep(0.01) if sent == 0: raise RuntimeError("socket connection broken") else: start += sent if fw_size > 1000000: # 1MB self.hr = HmiRequest() current_version = get_xcore_version() logger.success(f"update firmware: 控制器升级成功:from {previous_version} to {current_version} :)") else: logger.success(f"update firmware: 配置文件升级成功 :)") conn.close() class UpgradeRequest: """ http://confluence.i.rokae.com/pages/viewpage.action?pageId=21531754 """ def __init__(self): self.c = None self.sock_conn() def sock_conn(self): try: self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.setblocking(False) self.c.settimeout(clibs.interval*3) self.c.connect((clibs.ip_addr, clibs.upgrade_port)) logger.success(f"ur: 升级协议通信连接成功") except Exception as e: logger.error(f"ur: 升级协议通信连接失败 -- {e}") raise Exception() def s_pkg(self, command: dict): cmd = json.dumps(command, separators=(",", ":")).encode("utf-8") package_size = len(cmd).to_bytes(length=4) package_type = int(2).to_bytes() reserved_byte = int(0).to_bytes() pkg = package_size + package_type + reserved_byte + cmd self.c.send(pkg) def r_pkg(self): try: res = self.c.recv(10240)[8:].decode() except Exception as e: res = e return res def sr_pkg(self, command: dict): self.s_pkg(command) time.sleep(clibs.interval/2) res = self.r_pkg() logger.info(f"upgrade: 请求命令 {str(command)} 的返回信息:\n{res}") def erase_cfg(self): # OK 抹除配置,重启后生效 self.sr_pkg({"cmd": "erase"}) def clear_rubbish(self): # OK 清理垃圾 self.sr_pkg({"cmd": "clearRubbish"}) def soft_reboot(self): # OK 重启xCore self.sr_pkg({"cmd": "soft_reboot"}) def version_query(self): # OK 查询升级程序版本信息 self.sr_pkg({"query": "version"}) def robot_reboot(self): # OK 重启系统 self.sr_pkg({"cmd": "reboot"}) def backup_origin(self): # OK 生成备份 self.sr_pkg({"cmd": "backup_origin"}) def origin_recovery(self): # OK 恢复备份,恢复后手动设置,并重启生效 self.sr_pkg({"cmd": "recover"})