APIs/code/common.py

311 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
import socket
import time
import openapi
import json
import clibs
from socket import *
from ctypes import *
import hashlib
import struct
def initialization():
hr = openapi.HmiRequest()
pd = openapi.PreDos()
# 推送配置文件
clibs.logger.info("推送配置文件 fieldbus_device.json/registers.json/registers.xml 到控制器,并配置 IO 设备,设备号为 7...")
robot_params = hr.get_robot_params
robot_type = robot_params["robot_type"]
security_type = robot_params["security_type"]
controller_type = robot_params["controller_type"]
io_device_file = "_".join(["io_device", controller_type, security_type, "1"])
user_settings = "/home/luoshi/bin/controller/user_settings"
interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}"
config_files = [
"..\\assets\\configs\\fieldbus_device.json",
"..\\assets\\configs\\registers.json",
"..\\assets\\configs\\registers.xml"
]
for config_file in config_files:
filename = config_file.split("\\")[-1]
pd.push_file_to_server(config_file, f"{user_settings}/{filename}")
pd.push_file_to_server(config_file, f"{interactive_data}/{filename}")
io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6}
io_device_file_local = f"..\\assets\\configs\\{io_device_file}"
io_device_file_local_tmp = f"..\\assets\\configs\\{io_device_file}_tmp"
io_device_file_remote = f"{user_settings}/{io_device_file}"
pd.pull_file_from_server(io_device_file_remote, io_device_file_local)
with open(io_device_file_local, mode="r", encoding="utf-8") as f:
data = json.load(f)
for _ in data["device_list"]:
if _["extend_attr"].get("name", None) == "autotest":
break
else:
data["device_list"].append(io_device_autotest)
with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp:
json.dump(data, f_tmp, indent=4)
pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}")
pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}")
hr.reload_io()
hr.reload_registers()
hr.reload_fieldbus()
hr.set_fieldbus_device_params("autotest", True)
md = openapi.ModbusRequest()
# 触发急停并恢复
md.r_soft_estop(0)
md.r_soft_estop(1)
# 断开示教器连接
clibs.logger.info("断开示教器连接...")
hr.switch_tp_mode("without")
# 清空 system IO 配置
clibs.logger.info("清空所有的 System IO 功能配置...")
hr.update_system_io_configuration([], [], [], [], [])
# 关闭缩减模式
md.r_reduced_mode(0)
# 打开软限位
clibs.logger.info("打开软限位开关...")
hr.set_soft_limit_params(enable=True)
# 关闭安全区域
clibs.logger.info("正在关闭所有的安全区,并关闭总使能开关...")
hr.set_safety_area_overall(False)
hr.set_safety_area_signal(False)
for i in range(10):
hr.set_safety_area_enable(i, False)
# 打开外部通信,并设置控制器时间同步
clibs.logger.info("配置并打开外部通信默认服务器8080端口后缀为 \"\\r\"...")
hr.set_socket_params(True, "8080", "\r", 1)
ec = openapi.ExternalCommunication()
ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
# 关闭拖动
if robot_type.upper()[:2] not in ["XB", "NB"]:
clibs.logger.info("关闭拖动模式...")
hr.set_drag_params(False, 1, 2)
# 关闭碰撞检测
clibs.logger.info("关闭碰撞检测...")
hr.set_collision_params(False, 0, 1, 100)
# 清除所有过滤错误码
clibs.logger.info("清除所有过滤错误码设定...")
hr.set_filtered_error_code("clear", [])
# 回拖动位姿
clibs.logger.info("正在回拖动位姿...")
hr.switch_operation_mode("manual")
hr.switch_motor_state("on")
hr.set_quickturn_pos(enable_drag=True)
hr.move2quickturn("drag")
while True:
if md.w_robot_is_moving:
time.sleep(1)
else:
break
hr.stop_move(1)
hr.switch_motor_state("off")
hr.close()
# 清除所有告警
md.r_clear_alarm()
def fw_updater(local_file_path):
fw_size = os.path.getsize(local_file_path)
if fw_size > 10000000:
# get previous version of xCore
hr = openapi.HmiRequest()
version_previous = hr.get_robot_params["version"]
hr.close()
# var def
remote_file_path = './upgrade/lircos.zip'
fw_content = bytearray()
package_data = bytearray()
package_data_with_head = bytearray()
# socket connect
clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
try:
tcp_socket = socket(AF_INET, SOCK_STREAM)
tcp_socket.connect((clibs.ip_addr, clibs.upgrade_port))
tcp_socket.setblocking(True)
except Exception as Err:
clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
exit(1)
# get firmware content of binary format
clibs.logger.info(f"正在读取 {local_file_path} 文件内容...")
with open(local_file_path, 'rb') as f_fw:
fw_content += f_fw.read()
# construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148
clibs.logger.info("正在构造数据包,以及包头...")
# 1 protocol id
protocol_id = c_ushort(htons(0)) # 2 bytes
package_data += protocol_id
# 2 write type
write_type = c_ubyte(0)
package_data += bytes(write_type)
# 3 md5
md5_hash = hashlib.md5()
md5_hash.update(fw_content)
fw_md5 = md5_hash.hexdigest()
i = 0
while i < len(fw_md5):
_ = (fw_md5[i:i + 2])
package_data += bytes.fromhex(_)
i += 2
# 4 remote path len
remote_file_path_len = c_ushort(htons(len(remote_file_path)))
package_data += remote_file_path_len
# 5 remote path
package_data += remote_file_path.encode("ascii")
# 6 file
package_data += fw_content
# construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045
# 1 get package data with header
package_size = c_uint(htonl(len(package_data)))
package_type = c_ubyte(1) # 0-无协议 1-文件 2-json
package_reserve = c_ubyte(0) # 预留位
package_data_with_head += package_size
package_data_with_head += package_type
package_data_with_head += package_reserve
package_data_with_head += package_data
# 2 send data to server
clibs.logger.info("正在发送数据到 xCore升级控制器无需重启升级配置文件会自动软重启...")
start = 0
len_of_package = len(package_data_with_head)
while start < len_of_package:
end = 10240 + start
if end > len_of_package:
end = len_of_package
sent = tcp_socket.send((package_data_with_head[start:end]))
time.sleep(0.01)
if sent == 0:
raise RuntimeError("socket connection broken")
else:
start += sent
waited = 5 if fw_size > 10000000 else 25
time.sleep(waited)
if fw_size > 10000000:
# get current version of xCore
hr = openapi.HmiRequest()
version_current = hr.get_robot_params["version"]
hr.close()
clibs.logger.info(f"控制器升级成功from {version_previous} to {version_current} :)")
else:
clibs.logger.info(f"配置文件升级成功 :)")
tcp_socket.close()
class UpgradeJsonCmd(object):
def __init__(self):
self.__c = None
self.__sock_conn()
def __sock_conn(self):
# socket connect
clibs.logger.info(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
try:
self.__c = socket(AF_INET, SOCK_STREAM)
self.__c.connect((clibs.ip_addr, clibs.upgrade_port))
self.__c.setblocking(True)
self.__c.settimeout(3)
except Exception as Err:
clibs.logger.error(f"{Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
exit(1)
@staticmethod
def __do_package(cmd):
package_size = struct.pack('!I', len(cmd))
package_type = struct.pack('B', 2)
reserved_byte = struct.pack('B', 0)
return package_size + package_type + reserved_byte + cmd
def __recv_result(self, cmd):
time.sleep(2)
try:
res = self.__c.recv(10240).decode()
except timeout:
res = "ResponseNone"
clibs.logger.info(f"请求命令 {cmd.decode()} 的返回信息:\n{res[8:]}")
self.__c.close()
time.sleep(2)
def __exec(self, command: dict):
try:
self.__c.recv(10240)
except timeout:
pass
cmd = json.dumps(command, separators=(",", ":")).encode("utf-8")
self.__c.sendall(self.__do_package(cmd))
self.__recv_result(cmd)
def erase_cfg(self):
# 一键抹除机器人配置(.rc_cfg)、交互数据配置(interactive_data),但保留用户日志
# 场景如果xCore版本升级跨度过大配置文件可能不兼容导致无法启动可以使用该功能抹除配置重新生成配置。
# 机器人参数、零点等会丢失!
self.__exec({"cmd": "erase"})
def clear_rubbish(self):
self.__exec({"cmd": "clearRubbish"})
def soft_reboot(self):
self.__exec({"cmd": "soft_reboot"})
def version_query(self):
self.__exec({"query": "version"})
def robot_reboot(self):
self.__exec({"cmd": "reboot"})
def reset_passwd(self):
# 不生效,有问题
self.__exec({"cmd": "passwd"})
def backup_origin(self):
# xCore
# .rc_cfg /
# interactive_data /
# module /
# demo_project /
# robot_cfg /
# dev_eni.xml
# ecat_license
# libemllI8254x.so & libemllI8254x_v3.so
# set_network_parameters
self.__exec({"cmd": "backup_origin"})
def origin_recovery(self):
self.__exec({"cmd": "recover"})
if __name__ == "__main__":
# initialization()
# fw_updater()
pass