311 lines
10 KiB
Python
311 lines
10 KiB
Python
import os.path
|
||
import socket
|
||
import time
|
||
import openapi
|
||
import json
|
||
import clibs
|
||
from socket import *
|
||
from ctypes import *
|
||
import hashlib
|
||
import struct
|
||
|
||
|
||
def initialization():
|
||
hr = openapi.HmiRequest()
|
||
pd = openapi.PreDos()
|
||
# 推送配置文件
|
||
clibs.logger.info("推送配置文件 fieldbus_device.json/registers.json/registers.xml 到控制器,并配置 IO 设备,设备号为 7...")
|
||
robot_params = hr.get_robot_params
|
||
robot_type = robot_params["robot_type"]
|
||
security_type = robot_params["security_type"]
|
||
controller_type = robot_params["controller_type"]
|
||
io_device_file = "_".join(["io_device", controller_type, security_type, "1"])
|
||
|
||
user_settings = "/home/luoshi/bin/controller/user_settings"
|
||
interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}"
|
||
|
||
config_files = [
|
||
"..\\assets\\configs\\fieldbus_device.json",
|
||
"..\\assets\\configs\\registers.json",
|
||
"..\\assets\\configs\\registers.xml"
|
||
]
|
||
for config_file in config_files:
|
||
filename = config_file.split("\\")[-1]
|
||
pd.push_file_to_server(config_file, f"{user_settings}/{filename}")
|
||
pd.push_file_to_server(config_file, f"{interactive_data}/{filename}")
|
||
|
||
io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6}
|
||
io_device_file_local = f"..\\assets\\configs\\{io_device_file}"
|
||
io_device_file_local_tmp = f"..\\assets\\configs\\{io_device_file}_tmp"
|
||
io_device_file_remote = f"{user_settings}/{io_device_file}"
|
||
pd.pull_file_from_server(io_device_file_remote, io_device_file_local)
|
||
with open(io_device_file_local, mode="r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
for _ in data["device_list"]:
|
||
if _["extend_attr"].get("name", None) == "autotest":
|
||
break
|
||
else:
|
||
data["device_list"].append(io_device_autotest)
|
||
with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp:
|
||
json.dump(data, f_tmp, indent=4)
|
||
pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}")
|
||
pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}")
|
||
|
||
hr.reload_io()
|
||
hr.reload_registers()
|
||
hr.reload_fieldbus()
|
||
hr.set_fieldbus_device_params("autotest", True)
|
||
|
||
md = openapi.ModbusRequest()
|
||
# 触发急停并恢复
|
||
md.r_soft_estop(0)
|
||
md.r_soft_estop(1)
|
||
|
||
# 断开示教器连接
|
||
clibs.logger.info("断开示教器连接...")
|
||
hr.switch_tp_mode("without")
|
||
|
||
# 清空 system IO 配置
|
||
clibs.logger.info("清空所有的 System IO 功能配置...")
|
||
hr.update_system_io_configuration([], [], [], [], [])
|
||
|
||
# 关闭缩减模式
|
||
md.r_reduced_mode(0)
|
||
|
||
# 打开软限位
|
||
clibs.logger.info("打开软限位开关...")
|
||
hr.set_soft_limit_params(enable=True)
|
||
|
||
# 关闭安全区域
|
||
clibs.logger.info("正在关闭所有的安全区,并关闭总使能开关...")
|
||
hr.set_safety_area_overall(False)
|
||
hr.set_safety_area_signal(False)
|
||
for i in range(10):
|
||
hr.set_safety_area_enable(i, False)
|
||
|
||
# 打开外部通信,并设置控制器时间同步
|
||
clibs.logger.info("配置并打开外部通信,默认服务器,8080端口,后缀为 \"\\r\"...")
|
||
hr.set_socket_params(True, "8080", "\r", 1)
|
||
ec = openapi.ExternalCommunication()
|
||
ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
|
||
|
||
# 关闭拖动
|
||
if robot_type.upper()[:2] not in ["XB", "NB"]:
|
||
clibs.logger.info("关闭拖动模式...")
|
||
hr.set_drag_params(False, 1, 2)
|
||
|
||
# 关闭碰撞检测
|
||
clibs.logger.info("关闭碰撞检测...")
|
||
hr.set_collision_params(False, 0, 1, 100)
|
||
|
||
# 清除所有过滤错误码
|
||
clibs.logger.info("清除所有过滤错误码设定...")
|
||
hr.set_filtered_error_code("clear", [])
|
||
|
||
# 回拖动位姿
|
||
clibs.logger.info("正在回拖动位姿...")
|
||
hr.switch_operation_mode("manual")
|
||
hr.switch_motor_state("on")
|
||
hr.set_quickturn_pos(enable_drag=True)
|
||
hr.move2quickturn("drag")
|
||
while True:
|
||
if md.w_robot_is_moving:
|
||
time.sleep(1)
|
||
else:
|
||
break
|
||
hr.stop_move(1)
|
||
hr.switch_motor_state("off")
|
||
hr.close()
|
||
|
||
# 清除所有告警
|
||
md.r_clear_alarm()
|
||
|
||
|
||
def fw_updater(local_file_path):
|
||
fw_size = os.path.getsize(local_file_path)
|
||
if fw_size > 10000000:
|
||
# get previous version of xCore
|
||
hr = openapi.HmiRequest()
|
||
version_previous = hr.get_robot_params["version"]
|
||
hr.close()
|
||
|
||
# var def
|
||
remote_file_path = './upgrade/lircos.zip'
|
||
fw_content = bytearray()
|
||
package_data = bytearray()
|
||
package_data_with_head = bytearray()
|
||
|
||
# socket connect
|
||
clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
|
||
try:
|
||
tcp_socket = socket(AF_INET, SOCK_STREAM)
|
||
tcp_socket.connect((clibs.ip_addr, clibs.upgrade_port))
|
||
tcp_socket.setblocking(True)
|
||
except Exception as Err:
|
||
clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
|
||
exit(1)
|
||
|
||
# get firmware content of binary format
|
||
clibs.logger.info(f"正在读取 {local_file_path} 文件内容...")
|
||
with open(local_file_path, 'rb') as f_fw:
|
||
fw_content += f_fw.read()
|
||
|
||
# construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148
|
||
clibs.logger.info("正在构造数据包,以及包头...")
|
||
# 1 protocol id
|
||
protocol_id = c_ushort(htons(0)) # 2 bytes
|
||
package_data += protocol_id
|
||
|
||
# 2 write type
|
||
write_type = c_ubyte(0)
|
||
package_data += bytes(write_type)
|
||
|
||
# 3 md5
|
||
md5_hash = hashlib.md5()
|
||
md5_hash.update(fw_content)
|
||
fw_md5 = md5_hash.hexdigest()
|
||
i = 0
|
||
while i < len(fw_md5):
|
||
_ = (fw_md5[i:i + 2])
|
||
package_data += bytes.fromhex(_)
|
||
i += 2
|
||
|
||
# 4 remote path len
|
||
remote_file_path_len = c_ushort(htons(len(remote_file_path)))
|
||
package_data += remote_file_path_len
|
||
|
||
# 5 remote path
|
||
package_data += remote_file_path.encode("ascii")
|
||
|
||
# 6 file
|
||
package_data += fw_content
|
||
|
||
# construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045
|
||
# 1 get package data with header
|
||
package_size = c_uint(htonl(len(package_data)))
|
||
package_type = c_ubyte(1) # 0-无协议 1-文件 2-json
|
||
package_reserve = c_ubyte(0) # 预留位
|
||
|
||
package_data_with_head += package_size
|
||
package_data_with_head += package_type
|
||
package_data_with_head += package_reserve
|
||
package_data_with_head += package_data
|
||
|
||
# 2 send data to server
|
||
clibs.logger.info("正在发送数据到 xCore,升级控制器无需重启,升级配置文件会自动软重启...")
|
||
start = 0
|
||
len_of_package = len(package_data_with_head)
|
||
while start < len_of_package:
|
||
end = 10240 + start
|
||
if end > len_of_package:
|
||
end = len_of_package
|
||
sent = tcp_socket.send((package_data_with_head[start:end]))
|
||
time.sleep(0.01)
|
||
if sent == 0:
|
||
raise RuntimeError("socket connection broken")
|
||
else:
|
||
start += sent
|
||
|
||
waited = 5 if fw_size > 10000000 else 25
|
||
time.sleep(waited)
|
||
|
||
if fw_size > 10000000:
|
||
# get current version of xCore
|
||
hr = openapi.HmiRequest()
|
||
version_current = hr.get_robot_params["version"]
|
||
hr.close()
|
||
|
||
clibs.logger.info(f"控制器升级成功:from {version_previous} to {version_current} :)")
|
||
else:
|
||
clibs.logger.info(f"配置文件升级成功 :)")
|
||
|
||
tcp_socket.close()
|
||
|
||
|
||
class UpgradeJsonCmd(object):
|
||
def __init__(self):
|
||
self.__c = None
|
||
self.__sock_conn()
|
||
|
||
def __sock_conn(self):
|
||
# socket connect
|
||
clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
|
||
try:
|
||
self.__c = socket(AF_INET, SOCK_STREAM)
|
||
self.__c.connect((clibs.ip_addr, clibs.upgrade_port))
|
||
self.__c.setblocking(True)
|
||
self.__c.settimeout(3)
|
||
except Exception as Err:
|
||
clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
|
||
exit(1)
|
||
|
||
@staticmethod
|
||
def __do_package(cmd):
|
||
package_size = struct.pack('!I', len(cmd))
|
||
package_type = struct.pack('B', 2)
|
||
reserved_byte = struct.pack('B', 0)
|
||
return package_size + package_type + reserved_byte + cmd
|
||
|
||
def __recv_result(self, cmd):
|
||
time.sleep(2)
|
||
try:
|
||
res = self.__c.recv(10240).decode()
|
||
except timeout:
|
||
res = "ResponseNone"
|
||
clibs.logger.info(f"请求命令 {cmd.decode()} 的返回信息:\n{res[8:]}")
|
||
self.__c.close()
|
||
time.sleep(2)
|
||
|
||
def __exec(self, command: dict):
|
||
try:
|
||
self.__c.recv(10240)
|
||
except timeout:
|
||
pass
|
||
cmd = json.dumps(command, separators=(",", ":")).encode("utf-8")
|
||
self.__c.sendall(self.__do_package(cmd))
|
||
self.__recv_result(cmd)
|
||
|
||
def erase_cfg(self):
|
||
# 一键抹除机器人配置(.rc_cfg)、交互数据配置(interactive_data),但保留用户日志
|
||
# 场景:如果xCore版本升级跨度过大,配置文件可能不兼容导致无法启动,可以使用该功能抹除配置,重新生成配置。
|
||
# 机器人参数、零点等会丢失!
|
||
self.__exec({"cmd": "erase"})
|
||
|
||
def clear_rubbish(self):
|
||
self.__exec({"cmd": "clearRubbish"})
|
||
|
||
def soft_reboot(self):
|
||
self.__exec({"cmd": "soft_reboot"})
|
||
|
||
def version_query(self):
|
||
self.__exec({"query": "version"})
|
||
|
||
def robot_reboot(self):
|
||
self.__exec({"cmd": "reboot"})
|
||
|
||
def reset_passwd(self):
|
||
# 不生效,有问题
|
||
self.__exec({"cmd": "passwd"})
|
||
|
||
def backup_origin(self):
|
||
# xCore
|
||
# .rc_cfg /
|
||
# interactive_data /
|
||
# module /
|
||
# demo_project /
|
||
# robot_cfg /
|
||
# dev_eni.xml
|
||
# ecat_license
|
||
# libemllI8254x.so & libemllI8254x_v3.so
|
||
# set_network_parameters
|
||
self.__exec({"cmd": "backup_origin"})
|
||
|
||
def origin_recovery(self):
|
||
self.__exec({"cmd": "recover"})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# initialization()
|
||
# fw_updater()
|
||
pass
|