import os.path import socket import time import openapi import json import clibs from socket import * from ctypes import * import hashlib import struct def initialization(): hr = openapi.HmiRequest() pd = openapi.PreDos() # 推送配置文件 clibs.logger.info("推送配置文件 fieldbus_device.json/registers.json/registers.xml 到控制器,并配置 IO 设备,设备号为 7...") robot_params = hr.get_robot_params robot_type = robot_params["robot_type"] security_type = robot_params["security_type"] controller_type = robot_params["controller_type"] io_device_file = "_".join(["io_device", controller_type, security_type, "1"]) user_settings = "/home/luoshi/bin/controller/user_settings" interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}" config_files = [ "..\\assets\\configs\\fieldbus_device.json", "..\\assets\\configs\\registers.json", "..\\assets\\configs\\registers.xml" ] for config_file in config_files: filename = config_file.split("\\")[-1] pd.push_file_to_server(config_file, f"{user_settings}/{filename}") pd.push_file_to_server(config_file, f"{interactive_data}/{filename}") io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6} io_device_file_local = f"..\\assets\\configs\\{io_device_file}" io_device_file_local_tmp = f"..\\assets\\configs\\{io_device_file}_tmp" io_device_file_remote = f"{user_settings}/{io_device_file}" pd.pull_file_from_server(io_device_file_remote, io_device_file_local) with open(io_device_file_local, mode="r", encoding="utf-8") as f: data = json.load(f) for _ in data["device_list"]: if _["extend_attr"].get("name", None) == "autotest": break else: data["device_list"].append(io_device_autotest) with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp: json.dump(data, f_tmp, indent=4) pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}") pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}") hr.reload_io() hr.reload_registers() hr.reload_fieldbus() hr.set_fieldbus_device_params("autotest", True) md = openapi.ModbusRequest() # 触发急停并恢复 md.r_soft_estop(0) md.r_soft_estop(1) # 断开示教器连接 clibs.logger.info("断开示教器连接...") hr.switch_tp_mode("without") # 清空 system IO 配置 clibs.logger.info("清空所有的 System IO 功能配置...") hr.update_system_io_configuration([], [], [], [], []) # 关闭缩减模式 md.r_reduced_mode(0) # 打开软限位 clibs.logger.info("打开软限位开关...") hr.set_soft_limit_params(enable=True) # 关闭安全区域 clibs.logger.info("正在关闭所有的安全区,并关闭总使能开关...") hr.set_safety_area_overall(False) hr.set_safety_area_signal(False) for i in range(10): hr.set_safety_area_enable(i, False) # 打开外部通信,并设置控制器时间同步 clibs.logger.info("配置并打开外部通信,默认服务器,8080端口,后缀为 \"\\r\"...") hr.set_socket_params(True, "8080", "\r", 1) ec = openapi.ExternalCommunication() ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))) # 关闭拖动 if robot_type.upper()[:2] not in ["XB", "NB"]: clibs.logger.info("关闭拖动模式...") hr.set_drag_params(False, 1, 2) # 关闭碰撞检测 clibs.logger.info("关闭碰撞检测...") hr.set_collision_params(False, 0, 1, 100) # 清除所有过滤错误码 clibs.logger.info("清除所有过滤错误码设定...") hr.set_filtered_error_code("clear", []) # 回拖动位姿 clibs.logger.info("正在回拖动位姿...") hr.switch_operation_mode("manual") hr.switch_motor_state("on") hr.set_quickturn_pos(enable_drag=True) hr.move2quickturn("drag") while True: if md.w_robot_is_moving: time.sleep(1) else: break hr.stop_move(1) hr.switch_motor_state("off") hr.close() # 清除所有告警 md.r_clear_alarm() def fw_updater(local_file_path): fw_size = os.path.getsize(local_file_path) if fw_size > 10000000: # get previous version of xCore hr = openapi.HmiRequest() version_previous = hr.get_robot_params["version"] hr.close() # var def remote_file_path = './upgrade/lircos.zip' fw_content = bytearray() package_data = bytearray() package_data_with_head = bytearray() # socket connect clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...") try: tcp_socket = socket(AF_INET, SOCK_STREAM) tcp_socket.connect((clibs.ip_addr, clibs.upgrade_port)) tcp_socket.setblocking(True) except Exception as Err: clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...") exit(1) # get firmware content of binary format clibs.logger.info(f"正在读取 {local_file_path} 文件内容...") with open(local_file_path, 'rb') as f_fw: fw_content += f_fw.read() # construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148 clibs.logger.info("正在构造数据包,以及包头...") # 1 protocol id protocol_id = c_ushort(htons(0)) # 2 bytes package_data += protocol_id # 2 write type write_type = c_ubyte(0) package_data += bytes(write_type) # 3 md5 md5_hash = hashlib.md5() md5_hash.update(fw_content) fw_md5 = md5_hash.hexdigest() i = 0 while i < len(fw_md5): _ = (fw_md5[i:i + 2]) package_data += bytes.fromhex(_) i += 2 # 4 remote path len remote_file_path_len = c_ushort(htons(len(remote_file_path))) package_data += remote_file_path_len # 5 remote path package_data += remote_file_path.encode("ascii") # 6 file package_data += fw_content # construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045 # 1 get package data with header package_size = c_uint(htonl(len(package_data))) package_type = c_ubyte(1) # 0-无协议 1-文件 2-json package_reserve = c_ubyte(0) # 预留位 package_data_with_head += package_size package_data_with_head += package_type package_data_with_head += package_reserve package_data_with_head += package_data # 2 send data to server clibs.logger.info("正在发送数据到 xCore,升级控制器无需重启,升级配置文件会自动软重启...") start = 0 len_of_package = len(package_data_with_head) while start < len_of_package: end = 10240 + start if end > len_of_package: end = len_of_package sent = tcp_socket.send((package_data_with_head[start:end])) time.sleep(0.01) if sent == 0: raise RuntimeError("socket connection broken") else: start += sent waited = 5 if fw_size > 10000000 else 25 time.sleep(waited) if fw_size > 10000000: # get current version of xCore hr = openapi.HmiRequest() version_current = hr.get_robot_params["version"] hr.close() clibs.logger.info(f"控制器升级成功:from {version_previous} to {version_current} :)") else: clibs.logger.info(f"配置文件升级成功 :)") tcp_socket.close() class UpgradeJsonCmd(object): def __init__(self): self.__c = None self.__sock_conn() def __sock_conn(self): # socket connect clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...") try: self.__c = socket(AF_INET, SOCK_STREAM) self.__c.connect((clibs.ip_addr, clibs.upgrade_port)) self.__c.setblocking(True) self.__c.settimeout(3) except Exception as Err: clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...") exit(1) @staticmethod def __do_package(cmd): package_size = struct.pack('!I', len(cmd)) package_type = struct.pack('B', 2) reserved_byte = struct.pack('B', 0) return package_size + package_type + reserved_byte + cmd def __recv_result(self, cmd): time.sleep(2) try: res = self.__c.recv(10240).decode() except timeout: res = "ResponseNone" clibs.logger.info(f"请求命令 {cmd.decode()} 的返回信息:\n{res[8:]}") self.__c.close() time.sleep(2) def __exec(self, command: dict): try: self.__c.recv(10240) except timeout: pass cmd = json.dumps(command, separators=(",", ":")).encode("utf-8") self.__c.sendall(self.__do_package(cmd)) self.__recv_result(cmd) def erase_cfg(self): # 一键抹除机器人配置(.rc_cfg)、交互数据配置(interactive_data),但保留用户日志 # 场景:如果xCore版本升级跨度过大,配置文件可能不兼容导致无法启动,可以使用该功能抹除配置,重新生成配置。 # 机器人参数、零点等会丢失! self.__exec({"cmd": "erase"}) def clear_rubbish(self): self.__exec({"cmd": "clearRubbish"}) def soft_reboot(self): self.__exec({"cmd": "soft_reboot"}) def version_query(self): self.__exec({"query": "version"}) def robot_reboot(self): self.__exec({"cmd": "reboot"}) def reset_passwd(self): # 不生效,有问题 self.__exec({"cmd": "passwd"}) def backup_origin(self): # xCore # .rc_cfg / # interactive_data / # module / # demo_project / # robot_cfg / # dev_eni.xml # ecat_license # libemllI8254x.so & libemllI8254x_v3.so # set_network_parameters self.__exec({"cmd": "backup_origin"}) def origin_recovery(self): self.__exec({"cmd": "recover"}) if __name__ == "__main__": # initialization() # fw_updater() pass