AIO/codes/common/openapi.py
2025-03-28 13:43:25 +08:00

2492 lines
107 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import socket
from inspect import currentframe
import threading
from paramiko import SSHClient, AutoAddPolicy
from pymodbus.client.tcp import ModbusTcpClient
import selectors
import time
import os.path
from ctypes import *
import hashlib
import struct
from datetime import datetime
from PySide6.QtCore import Signal, QThread
from codes.common import clibs
class ModbusRequest(QThread):
output = Signal(str, str)
def __init__(self, ip, port, /):
super().__init__()
self.ip = ip
self.port = port
def net_conn(self):
self.logger("INFO", "openapi", f"Modbus 正在连接中,需要配置设备,这可能需要一点时间......", "blue")
RobotInit.modbus_init()
self.c = ModbusTcpClient(host=self.ip, port=self.port)
if self.c.connect():
self.logger("INFO", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) success!", "green")
else:
self.logger("ERROR", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) failed!", "red", "MdConnFailed")
def logger(self, level, module, content, color="black", error="", flag="both"):
flag = "cursor" if level.upper() == "DEBUG" else "both"
clibs.logger(level, module, content, color, flag, signal=self.output)
if level.upper() == "ERROR":
raise Exception(f"{error} | {content}")
def close(self):
if self.c.connect():
try:
self.c.close()
self.logger("INFO", "openapi", f"modbus: 关闭 Modbus 连接成功", "green")
except Exception as err:
self.logger("ERROR", "openapi", f"modbus: 关闭 Modbus 连接失败:{err}", "red", "MdCloseFailed")
def __reg_high_pulse(self, addr: int) -> None:
self.c.write_register(addr, 0)
time.sleep(clibs.INTERVAL)
self.c.write_register(addr, 1)
time.sleep(clibs.INTERVAL+1)
self.c.write_register(addr, 0)
def r_clear_alarm(self): # OK
self.__reg_high_pulse(40000)
self.logger("DEBUG", "openapi", "modbus: 40000-010 执行清除告警信息")
def r_reset_estop(self): # OK
self.__reg_high_pulse(40001)
self.logger("DEBUG", "openapi", "modbus: 40001-010 执行复位急停状态(非软急停)")
def r_reset_estop_clear_alarm(self): # OK
self.__reg_high_pulse(40002)
self.logger("DEBUG", "openapi", "modbus: 40002-010 执行复位急停状态(非软急停),并清除告警信息")
def r_motor_off(self): # OK
self.__reg_high_pulse(40003)
self.logger("DEBUG", "openapi", "modbus: 40003-010 执行机器人下电")
def r_motor_on(self): # OK
self.__reg_high_pulse(40004)
self.logger("DEBUG", "openapi", "modbus: 40004-010 执行机器人上电")
def r_motoron_pp2main_start(self): # OK
self.__reg_high_pulse(40005)
self.logger("DEBUG", "openapi", "modbus: 40005-010 执行机器人上电/pp2main/开始运行程序,需自动模式执行,若运行失败,可清除告警后再次尝试")
def r_motoron_start(self): # OK
self.__reg_high_pulse(40006)
self.logger("DEBUG", "openapi", "modbus: 40006-010 执行机器人上电/开始运行程序需自动模式执行若运行失败可清除告警、执行pp2main后再次尝试")
def r_pulse_motoroff(self): # OK
self.__reg_high_pulse(40007)
self.logger("DEBUG", "openapi", "modbus: 40007-010 执行机器人停止,并下电,手动模式下可停止程序运行,但不能下电,若运行失败,可清除告警后再次尝试")
def r_pp2main(self): # OK
self.__reg_high_pulse(40008)
self.logger("DEBUG", "openapi", "modbus: 40008-010 执行机器人 pp2main需自动模式执行若运行失败可清除告警后再次尝试")
def r_program_start(self): # OK
self.__reg_high_pulse(40009)
self.logger("DEBUG", "openapi", "modbus: 40009-010 执行机器人默认程序运行,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试")
def r_program_stop(self): # OK
self.__reg_high_pulse(40010)
self.logger("DEBUG", "openapi", "modbus: 40010-010 执行机器人默认程序停止,需有 pp2main 前置操作,若运行失败,可清除告警后再次尝试")
def r_reduced_mode(self, action: int): # OK
self.c.write_register(40011, action)
actions = "进入" if action == 1 else "退出"
self.logger("DEBUG", "openapi", f"modbus: 40011-{action} 执行机器人{actions}缩减模式")
time.sleep(clibs.INTERVAL)
def r_soft_estop(self, action: int): # OK
self.c.write_register(40012, action)
actions = "解除" if action == 1 else "触发"
self.logger("DEBUG", "openapi", f"modbus: 40012-{action} 执行{actions}急停动作")
time.sleep(clibs.INTERVAL)
def r_switch_auto_motoron(self): # OK
self.__reg_high_pulse(40013)
self.logger("DEBUG", "openapi", "modbus: 40013-010 执行切换为自动模式,并上电,初始状态 !!不能是!! 手动上电模式")
def r_switch_auto(self): # OK
self.__reg_high_pulse(40014)
self.logger("DEBUG", "openapi", "modbus: 40014-010 执行切换为自动模式")
def r_switch_manual(self): # OK
self.__reg_high_pulse(40015)
self.logger("DEBUG", "openapi", "modbus: 40015-010 执行切换为手动模式")
def r_switch_safe_region01(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.c.write_register(40016, False)
time.sleep(clibs.INTERVAL)
self.c.write_register(40016, True)
else:
self.c.write_register(40016, True)
time.sleep(clibs.INTERVAL)
self.c.write_register(40016, False)
actions = "打开" if action else "关闭"
self.logger("DEBUG", "openapi", f"modbus: 40016-{action} 执行{actions}安全区 safe region 01")
time.sleep(clibs.INTERVAL)
def r_switch_safe_region02(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.c.write_register(40017, False)
time.sleep(clibs.INTERVAL)
self.c.write_register(40017, True)
else:
self.c.write_register(40017, True)
time.sleep(clibs.INTERVAL)
self.c.write_register(40017, False)
actions = "打开" if action else "关闭"
self.logger("DEBUG", "openapi", f"modbus: 40017-{action} 执行{actions}安全区 safe region 02")
time.sleep(clibs.INTERVAL)
def r_switch_safe_region03(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.c.write_register(40018, False)
time.sleep(clibs.INTERVAL)
self.c.write_register(40018, True)
else:
self.c.write_register(40018, True)
time.sleep(clibs.INTERVAL)
self.c.write_register(40018, False)
actions = "打开" if action else "关闭"
self.logger("DEBUG", "openapi", f"modbus: 40018-{action} 执行{actions}安全区 safe region 03")
time.sleep(clibs.INTERVAL)
def write_act(self, number):
self.c.write_register(40100, number)
self.logger("DEBUG", "openapi", f"modbus: 40100 将 {number} 写入")
def write_probe(self, probe):
self.c.write_register(40101, probe)
self.logger("DEBUG", "openapi", f"modbus: 40101 将 {probe} 写入")
def write_pon(self, pon):
self.c.write_register(40102, pon)
self.logger("DEBUG", "openapi", f"modbus: 40102 将 {pon} 写入")
def write_axis(self, axis):
result = self.c.convert_to_registers(int(axis), self.c.DATATYPE.INT32, "little")
self.c.write_registers(40103, result)
self.logger("DEBUG", "openapi", f"modbus: 40103 将 {axis} 写入")
def write_speed_max(self, speed):
result = self.c.convert_to_registers(float(speed), self.c.DATATYPE.FLOAT32, "little")
self.c.write_registers(40105, result)
self.logger("DEBUG", "openapi", f"modbus: 40105 将 {speed} 写入")
def r_write_signals(self, addr: int, value): # OK | 40100 - 40109: signal_0 ~ signal_9
if -1 < addr < 10 and addr.is_integer():
self.c.write_register(40100+addr, value)
self.logger("DEBUG", "openapi", f"modbus: {40100+addr}-{value} 将寄存器 signal_{addr} 赋值为 {value}")
else:
self.logger("INFO", "openapi", f"modbus: {40100+addr}-{value} 地址错误,无法赋值!", "red")
@property
def w_alarm_state(self): # OK
res = self.c.read_holding_registers(40500, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40500 获取告警状态,结果为 {res} :--: 0 表示无告警,1 表示有告警")
return res
@property
def w_collision_alarm_state(self): # OK
res = self.c.read_holding_registers(40501, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_collision_open_state(self): # OK
res = self.c.read_holding_registers(40502, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭1 表示开启")
return res
@property
def w_controller_is_running(self): # OK
res = self.c.read_holding_registers(40503, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常1 表示运行正常")
return res
@property
def w_encoder_low_battery(self): # OK
res = self.c.read_holding_registers(40504, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压1 表示低电压 需关注")
return res
@property
def w_estop_state(self): # OK
res = self.c.read_holding_registers(40505, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_motor_state(self): # OK
res = self.c.read_holding_registers(40506, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电1 表示已上电")
return res
@property
def w_operation_mode(self): # OK
res = self.c.read_holding_registers(40507, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式1 表示自动模式")
return res
@property
def w_program_state(self): # OK
res = self.c.read_holding_registers(40508, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行1 表示正在运行")
return res
@property
def w_program_not_run(self): # OK
res = self.c.read_holding_registers(40509, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行1 表示未运行")
return res
@property
def w_program_reset(self): # OK
res = self.c.read_holding_registers(40510, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数1 表示指针在 main 函数")
return res
@property
def w_reduce_mode_state(self): # OK
res = self.c.read_holding_registers(40511, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式1 表示缩减模式")
return res
@property
def w_robot_is_busy(self): # OK
res = self.c.read_holding_registers(40512, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态1 表示处于 busy 状态")
return res
@property
def w_robot_is_moving(self): # OK
res = self.c.read_holding_registers(40513, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示未运动1 表示正在运动")
return res
@property
def w_safe_door_state(self): # OK
res = self.c.read_holding_registers(40514, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门1 表示已触发安全门")
return res
@property
def w_safe_region01_trig_state(self): # OK
res = self.c.read_holding_registers(40515, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40515 获取安全区域 safe region01 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_safe_region02_trig_state(self): # OK
res = self.c.read_holding_registers(40516, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40516 获取安全区域 safe region02 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_safe_region03_trig_state(self): # OK
res = self.c.read_holding_registers(40517, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40517 获取安全区域 safe region03 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_soft_estop_state(self): # OK
res = self.c.read_holding_registers(40518, count=1).registers[0]
self.logger("DEBUG", "openapi", f"modbus: 40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停1 表示已触发软急停")
return res
def io_write_coils(self, addr, action): # OK | 名字叫写线圈,其实是写 modbus 的 discrete inputs(DI)
# e.g. io_write_coils(0, 1)
# e.g. io_write_coils(1, 1)
# e.g. io_write_coils(0, [1, 1, 1])
self.c.write_coils(addr, action)
self.logger("DEBUG", "openapi", f"modbus: 执行给 DI 地址 {addr} 赋值为 {action},可根据情况传递列表,实现一次性赋值多个")
time.sleep(clibs.INTERVAL)
def io_read_coils(self): # OK | 读 modbus 的 16 个 discrete inputs(DI)
res = self.c.read_coils(0, count=16).bits
self.logger("DEBUG", "openapi", f"modbus: 执行读取所有 DI 的结果为 {res}")
return res
def io_read_discretes(self): # OK | 读 modbus 的 coil outputs(DO)
res = self.c.read_discrete_inputs(0, count=16).bits
self.logger("DEBUG", "openapi", f"modbus: 执行读取所有 DO 的结果为 {res}")
return res
def read_ready_to_go(self):
result = self.c.read_holding_registers(40600, count=1)
return result.registers[0]
def read_scenario_time(self):
results = self.c.read_holding_registers(40601, count=2)
result = self.c.convert_from_registers(registers=results.registers, data_type=self.c.DATATYPE.FLOAT32, word_order="little")
return result
def read_capture_start(self):
result = self.c.read_holding_registers(40603, count=1)
return result.registers[0]
class HmiRequest(QThread):
output = Signal(str, str)
def __init__(self, ip, port, port_xs, /):
super().__init__()
self.__ip = ip
self.__port = port
self.__port_xs = port_xs
self.__close_hmi = False
self.__is_connected = False
self.__index = 0
self.__previous_data = b""
self.__valid_data_length = 0
self.__leftovers = 0
self.__response = b""
self.__response_xs = ""
self.__half_frm = b""
self.__half_frm_flag = -1
self.__half_pkg = b""
self.__half_pkg_flag = False
self.__is_first_frame = True
self.__is_debug = True
def net_conn(self):
self.__socket_conn()
self.__t_heartbeat = threading.Thread(target=self.__heartbeat)
self.__t_heartbeat.daemon = True
self.__t_heartbeat.start()
self.__t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c,))
self.__t_unpackage.daemon = True
self.__t_unpackage.start()
self.__t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs,))
self.__t_unpackage_xs.daemon = True
self.__t_unpackage_xs.start()
def logger(self, level, module, content, color="black", error="", flag="both"):
flag = "cursor" if level.upper() == "DEBUG" else "both"
clibs.logger(level, module, content, color, flag, signal=self.output)
if level.upper() == "ERROR":
raise Exception(f"{error} | {content}")
@property
def status(self):
return self.__is_connected
def close(self):
if self.__is_connected:
try:
self.__is_connected = False
time.sleep(clibs.INTERVAL/2)
self.c.close()
self.c_xs.close()
clibs.status["hmi"] = 0
self.logger("INFO", "openapi", f"hmi: 关闭 Socket 连接成功", "green")
except Exception as err:
self.logger("ERROR", "openapi", f"hmi: 关闭 Socket 连接失败 {err}", "red", "HmiCloseFailed")
def __socket_conn(self):
self.close()
try:
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.settimeout(clibs.INTERVAL*5)
self.c.connect((self.__ip, self.__port))
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.settimeout(clibs.INTERVAL*5)
self.c_xs.connect((self.__ip, self.__port_xs))
# 尝试连续三次发送心跳包,确认建联成功
state = None
for i in range(3):
_ = self.execution("controller.heart")
time.sleep(clibs.INTERVAL/2)
self.__is_connected = True
self.logger("INFO", "openapi", "hmi: HMI connection success...", "green")
except Exception as err:
self.logger("ERROR", "openapi", f"hmi: HMI connection timeout...", "red", "HmiConnTimeout")
def __heartbeat(self):
while self.__is_connected:
self.execution("controller.heart")
time.sleep(clibs.INTERVAL*2)
@staticmethod
def package(cmd):
frm_value = (len(cmd) + 6).to_bytes(length=2, byteorder="big")
pkg_value = len(cmd).to_bytes(length=4, byteorder="big")
protocol = int(2).to_bytes(length=1, byteorder="big")
reserved = int(0).to_bytes(length=1, byteorder="big")
return frm_value + pkg_value + protocol + reserved + cmd.encode()
def __unpackage(self, sock):
def to_read(conn, mask):
data = conn.recv(clibs.MAX_FRAME_SIZE)
if data:
# print(f"data = {data}")
# with open(f"{clibs.log_path}/logs.txt", mode="a", encoding="utf-8") as f_logs:
# f_logs.write(str(data) + "\n")
self.__get_response(data)
else:
sel.unregister(conn)
conn.close()
try:
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.__is_connected:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as Err:
self.logger("DEBUG", "openapi", f"hmi: 老协议解包报错 {Err}", "red", "UnpackageFailed")
def __get_headers(self, index, data):
if index + 8 < len(data):
frm_value = int.from_bytes(data[index:index + 2], byteorder="big")
pkg_value = int.from_bytes(data[index + 2:index + 6], byteorder="big")
protocol = int.from_bytes(data[index + 6:index + 7], byteorder="big")
reserved = int.from_bytes(data[index + 7:index + 8], byteorder="big")
if reserved == 0 and protocol == 2:
return index + 8, frm_value, pkg_value
else:
# print("hr-get_headers: 解包数据有误,需要确认!")
# print(data)
self.logger("ERROR", "openapi", f"hmi: 解包数据有误,需要确认,最后一个数据包如下 {data}")
else:
self.__half_pkg = data[index:]
self.__half_pkg_flag = True
return -1, 0, 0
def __get_response(self, data):
frm_value, pkg_value, self.__index = 0, 0, 0
while self.__index < len(data):
if self.__is_first_frame:
if self.__half_pkg_flag:
len_one = len(self.__half_pkg)
len_another = 8 - len_one
headers = self.__half_pkg + data[:len_another]
self.__index = len_another
frm_value = int.from_bytes(headers[0:2], byteorder="big")
pkg_value = int.from_bytes(headers[2:6], byteorder="big")
self.__half_pkg_flag = False
else:
self.__index, frm_value, pkg_value = self.__get_headers(self.__index, data)
if self.__half_pkg_flag:
break
if frm_value - pkg_value == 6:
if len(data[self.__index:]) >= pkg_value:
self.__response = data[self.__index:self.__index + pkg_value]
self.__index += pkg_value
self.logger("DEBUG", "openapi", str(json.loads(self.__response.decode())))
self.__response = b""
self.__leftovers = 0
self.__is_first_frame = True
elif len(data[self.__index:]) < pkg_value:
self.__response = data[self.__index:]
self.__leftovers = pkg_value - len(data[self.__index:])
self.__index += clibs.MAX_FRAME_SIZE # whatever
self.__is_first_frame = False
elif frm_value < pkg_value:
self.__valid_data_length = pkg_value
if len(data[self.__index:]) >= frm_value - 6:
self.__is_first_frame = False
self.__leftovers = 0
self.__response = data[self.__index:self.__index + frm_value - 6]
self.__index += (frm_value - 6)
self.__valid_data_length -= (frm_value - 6)
if len(data[self.__index:]) == 2:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 2
elif len(data[self.__index:]) == 1:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 1
elif len(data[self.__index:]) == 0:
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 0
else:
self.__half_frm_flag = -1
elif len(data[self.__index:]) < frm_value - 6:
self.__response = data[self.__index:]
self.__leftovers = frm_value - 6 - len(data[self.__index:])
self.__valid_data_length -= len(data[self.__index:])
self.__index += clibs.MAX_FRAME_SIZE
self.__is_first_frame = False
elif not self.__is_first_frame: # 不是首帧
if self.__leftovers > 0 and self.__valid_data_length > 0:
if len(data) >= self.__leftovers:
self.__is_first_frame = False
self.__response += data[:self.__leftovers]
self.__index = self.__leftovers
self.__valid_data_length -= self.__leftovers
self.__leftovers = 0
if self.__valid_data_length == 0:
# with open(f"{clibs.log_path}/response.txt", mode="a", encoding="utf-8") as f_res:
# f_res.write(f"{json.loads(self.__response.decode())}" + "\n")
self.logger("DEBUG", "openapi", str(json.loads(self.__response.decode())))
self.__response = b""
self.__is_first_frame = True
continue # 此时应该重新 get_headers
if len(data[self.__index:]) == 2:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 2
elif len(data[self.__index:]) == 1:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 1
elif len(data[self.__index:]) == 0:
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 0
else:
self.__half_frm_flag = -1
elif len(data) < self.__leftovers:
self.__response += data
self.__leftovers -= len(data)
self.__valid_data_length -= len(data)
self.__index += clibs.MAX_FRAME_SIZE
elif self.__leftovers > 0 and self.__valid_data_length == 0:
if len(data) >= self.__leftovers:
self.__response += data[:self.__leftovers]
self.__index = self.__leftovers
self.__leftovers = 0
self.logger("DEBUG", "openapi", str(json.loads(self.__response.decode())))
self.__response = b""
self.__is_first_frame = True
elif len(data) < self.__leftovers:
self.__response += data
self.__leftovers -= len(data)
self.__index += clibs.MAX_FRAME_SIZE
elif self.__leftovers == 0 and self.__valid_data_length > 0:
# 1. len(data[self.__index:]) > 2
# 2. len(data[self.__index:]) = 2
# 3. len(data[self.__index:]) = 1
# 4. len(data[self.__index:]) = 0
if self.__half_frm_flag != -1:
if self.__half_frm_flag == 2:
frm_value = int.from_bytes(self.__half_frm)
if len(data) >= frm_value:
self.__response += data[:frm_value]
self.__leftovers = 0
self.__valid_data_length -= len(data[:frm_value])
self.__index = frm_value
elif len(data) < frm_value:
self.__response += data
self.__leftovers = frm_value - len(data)
self.__valid_data_length -= len(data)
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = -1
elif self.__half_frm_flag == 1:
frm_value = int.from_bytes(self.__half_frm + data[0:1])
if len(data[1:]) >= frm_value:
self.__response += data[1:frm_value+1]
self.__leftovers = 0
self.__valid_data_length -= len(data[1:frm_value+1])
self.__index = frm_value + 1
elif len(data[1:]) < frm_value:
self.__response += data[1:]
self.__leftovers = frm_value - len(data[1:])
self.__valid_data_length -= len(data[1:])
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = -1
elif self.__half_frm_flag == 0:
frm_value = int.from_bytes(data[0:2])
if len(data[2:]) >= frm_value:
self.__response += data[2:frm_value+2]
self.__leftovers = 0
self.__valid_data_length -= len(data[2:frm_value+2])
self.__index = frm_value + 2
elif len(data[2:]) < frm_value:
self.__response += data[2:]
self.__leftovers = frm_value - len(data[2:])
self.__valid_data_length -= len(data[2:])
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = -1
if self.__valid_data_length == 0:
self.logger("DEBUG", "openapi", str(json.loads(self.__response.decode())))
self.__response = b""
self.__is_first_frame = True
continue
if len(data[self.__index:]) == 2:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 2
elif len(data[self.__index:]) == 1:
self.__half_frm = data[self.__index:]
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 1
elif len(data[self.__index:]) == 0:
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm_flag = 0
else:
self.__half_frm_flag = -1
else:
frm_value = int.from_bytes(data[self.__index:self.__index + 2])
self.__index += 2
if len(data[self.__index:]) >= frm_value:
self.__leftovers = 0
self.__response += data[self.__index:self.__index + frm_value]
self.__index += frm_value
self.__valid_data_length -= frm_value
if self.__valid_data_length == 0:
self.logger("DEBUG", "openapi", str(json.loads(self.__response.decode())))
self.__response = b""
self.__is_first_frame = True
continue
if len(data[self.__index:]) == 2:
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm = data[self.__index:]
self.__half_frm_flag = 2
elif len(data[self.__index:]) == 1:
self.__index += clibs.MAX_FRAME_SIZE
self.__half_frm = data[self.__index:]
self.__half_frm_flag = 1
elif len(data[self.__index:]) == 0:
self.__half_frm_flag = 0
else:
self.__half_frm_flag = -1
elif len(data[self.__index:]) < frm_value:
self.__response += data[self.__index:]
self.__leftovers = frm_value - len(data[self.__index:])
self.__valid_data_length -= len(data[self.__index:])
self.__index += clibs.MAX_FRAME_SIZE
else:
# DEBUG INFO
# if self.__is_debug:
# print(f"12 index = {self.__index}")
# print(f"12 frm_value = {frm_value}")
# print(f"12 leftovers = {self.__leftovers}")
# print(f"12 valid_data_length = {self.__valid_data_length}")
# print(f"12 is_first_frame = {self.__is_first_frame}")
# if self.__valid_data_length < 0 or self.__leftovers > 1024:
# print(f"data = {data}")
# raise Exception("DataError")
self.logger("ERROR", "openapi", "hmi: Will never be here", "red", "WillNeverBeHere")
@staticmethod
def package_xs(cmd):
return "".join([json.dumps(cmd, separators=(",", ":")), "\r"]).encode()
def __unpackage_xs(self, sock):
def to_read(conn, mask):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.get_response_xs(data)
else:
sel.unregister(conn)
conn.close()
try:
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.__is_connected:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as err:
self.logger("DEBUG", "openapi", f"hmi: xService解包报错 {err}", "red", "XsUnpackageFailed")
def get_response_xs(self, data):
char, response = "", self.__response_xs
for char in data.decode():
if char != "\r":
response = "".join([response, char])
else:
self.logger("DEBUG", "openapi", response)
self.response_xs = response
response = ""
else:
self.__response_xs = response
@clibs.handle_exception
def get_from_id(self, msg_id):
f_text, flag, records, time_delay, ts = f"%{msg_id}%", False, "Null", clibs.INTERVAL*10, msg_id.split("@")[-1]
t = ts.replace("T", " ")
t_ts = datetime.timestamp(datetime.strptime(t, "%Y-%m-%d %H:%M:%S.%f"))
t_send = datetime.strftime(datetime.fromtimestamp(t_ts-time_delay), '%Y-%m-%d %H:%M:%S.%f')
t_receive = datetime.strftime(datetime.fromtimestamp(t_ts+time_delay), '%Y-%m-%d %H:%M:%S.%f')
for idx in range(time_delay):
time.sleep(clibs.INTERVAL)
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{t_send}' AND '{t_receive}' AND content LIKE '{f_text}'")
records = clibs.cursor.fetchall()
flag = True if len(records) == 2 else False
finally:
clibs.lock.release()
if flag is True:
return records
else:
self.logger("ERROR", "openapi", f"hmi: {time_delay}s内无法找到请求 {msg_id} 的响应!", "red", "ResponseNotFound")
def execution(self, command, **kwargs):
req = None
try:
with open(f"{clibs.PREFIX}/files/protocols/hmi/{command}.json", encoding="utf-8", mode="r") as f_json:
req = json.load(f_json)
t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
req["id"] = f"{command}@{t}"
flag = req["p_type"]
del req["p_type"]
except Exception as err:
self.logger("ERROR", "openapi", f"hmi: 暂不支持 {command} 功能,或确认该功能存在... {err}", "red", "CommandNotSupport")
match command:
case "state.set_tp_mode" | "overview.set_autoload" | "overview.reload" | "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop" | "rl_task.set_run_params" | "diagnosis.set_params" | "diagnosis.open" | "drag.set_params" | "controller.set_params" | "collision.set_state" | "collision.set_params" | "move.set_quickstop_distance" | "move.set_params" | "move.set_monitor_cfg" | "modbus.get_values" | "modbus.set_params" | "system_io.update_configuration" | "diagnosis.get_params" | "jog.set_params" | "jog.start" | "move.stop" | "move.quick_turn" | "move.set_quickturn_pos" | "soft_limit.set_params" | "fieldbus_device.set_params" | "socket.set_params" | "diagnosis.save" | "register.set_value":
req["data"].update(kwargs)
case "safety.safety_area.signal_enable" | "safety.safety_area.overall_enable" | "safety.safety_area.safety_area_enable" | "safety.safety_area.set_param":
req["c"].update(kwargs)
case "log_code.data.code_list":
req["s"]["log_code.data"]["code_list"] = kwargs["code_list"]
case _:
pass
if flag == 0:
cmd = json.dumps(req, separators=(",", ":"))
try:
self.c.send(self.package(cmd))
time.sleep(clibs.INTERVAL/4)
self.logger("DEBUG", "openapi", f"hmi: 老协议请求发送成功 {cmd}")
except Exception as err:
self.logger("ERROR", "openapi", f"hmi: 老协议请求发送失败 {cmd},报错信息 {err}", "red", "CommandSendFailed")
elif flag == 1:
try:
self.c_xs.send(self.package_xs(req))
time.sleep(clibs.INTERVAL/4)
self.logger("DEBUG", "openapi", f"hmi: xService请求发送成功 {req}")
except Exception as Err:
self.logger("ERROR", "openapi", f"hr: xService请求发送失败 {req} 报错信息 {Err}", "red", "CommandSendFailed")
return req["id"]
# =================================== ↓↓↓ specific functions ↓↓↓ ===================================
def switch_motor_state(self, state: str): # OK
"""
切换上电/下电的状态
:param state: on/off
:return: None
"""
match state:
case "on":
self.execution("state.switch_motor_on")
case "off":
self.execution("state.switch_motor_off")
case _:
self.logger("ERROR", "openapi", f"hmi: switch_motor_state 参数错误 {state} 非法参数,只接受 on/off", "red", "ArgumentError")
def switch_operation_mode(self, mode: str): # OK
"""
切换自动/手动操作模式
:param mode: auto/manual
:return: None
"""
match mode:
case "auto":
self.execution("state.switch_auto")
case "manual":
self.execution("state.switch_manual")
case _:
self.logger("ERROR", "openapi", f"hmi: switch_operation_mode 参数错误 {mode},非法参数,只接受 auto/manual", "red", "ArgumentError")
def reload_project(self, prj_name: str, tasks: list): # OK
"""
重新加载指定工程
:param prj_name: 工程名,也即 zip 文件的名字
:param tasks: 要加载的任务列表
:return: None
"""
prj_path = f"{prj_name}/_build/{prj_name}.prj"
self.execution("overview.reload", prj_path=prj_path, tasks=tasks)
def set_project_auto_reload(self, prj_name: str): # OK
"""
将指定工程设置为开机自动加载,也即默认工程
:param prj_name: 工程名,也即 zip 文件的名字
:return: None
"""
autoload_prj_path = f"{prj_name}/_build/{prj_name}.prj"
self.execution("overview.set_autoload", autoload_prj_path=autoload_prj_path)
def pp_to_main(self, tasks: list): # OK
"""
将指定的任务列表的指针,指向 main 函数
:param tasks: 任务列表
:return: None
"""
self.execution("rl_task.pp_to_main", tasks=tasks)
def program_start(self, tasks: list): # OK
"""
开始执行程序任务,必须是自动模式下执行
:param tasks: 任务列表
:return: None
"""
self.execution("rl_task.run", tasks=tasks)
def program_stop(self, tasks: list): # OK
"""
停止执行程序任务
:param tasks: 人物列表
:return: None
"""
self.execution("rl_task.stop", tasks=tasks)
def set_program_loop_speed(self, loop_mode: bool = True, override: float = 0.5): # OK
"""
:param loop_mode: True为循环模式False为单次模式
:param override: HMI 左下方的速度滑块,取值范围 [0, 1]
:return: None
"""
self.execution("rl_task.set_run_params", loop_mode=loop_mode, override=override)
def clear_alarm(self): # OK
"""
清除伺服告警
:return: None
"""
self.execution("servo.clear_alarm")
def reboot_robot(self): # OK
"""
重启控制器
:return: None
"""
self.execution("controller.reboot")
self.logger("INFO", "openapi", f"hmi: 控制器重启中,重连预计需要等待 100s 左右...")
ts = time.time()
time.sleep(30)
while True:
time.sleep(5)
te = time.time()
if te - ts > 180:
self.__silence = False
self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确")
break
for _ in range(3):
if not self.__is_connected:
break
time.sleep(2)
else:
self.logger("INFO", "openapi", "hr: HMI 重新连接成功...", "green")
break
def reload_io(self):
"""
触发控制器重新加载 IO 设备列表
:return: None
"""
self.execution("io_device.load_cfg")
@property
def get_quickturn_pos(self): # OK
"""
获取机器人的home位姿、拖动位姿和发货位姿轴关节角度end_posture 取值如下:
0 法兰平面与地面平行
1 工具坐标系X轴与地面垂直正向朝下
2 工具坐标系X轴与地面垂直正向朝上
3 工具坐标系Y轴与地面垂直正向朝下
4 工具坐标系Y轴与地面垂直正向朝上
5 工具坐标系Z轴与地面垂直正向朝下
6 工具坐标系Z轴与地面垂直正向朝上
:return: as below
{
"enable_home": false, // 是否开启 home 点快速调整
"enable_drag": false, // 是否开启拖动位姿点快速调整
"enable_transport": false, // 是否开启发货位姿点快速调整
"joint_home": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // home 位姿的关节角度
"joint_drag": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 拖动位姿的关节角度
"joint_transport": [0.0,0.0,0.0,0.0,0.0,0.0,0.0], // 发货位姿的关节角度
"end_posture":0, // 末端姿态的调整方式,取值 0-6
"home_error_range":[0.0,0.0,0.0,0.0,0.0,0.0,0.0] // home点误差范围
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_quickturn_pos")
def set_quickturn_pos(self, enable_home: bool = False, enable_drag: bool = False, enable_transport: bool = False, end_posture: int = 0): # OK
"""
设置机器人的home位姿、拖动位姿、发货位姿轴关节角度Home点误差范围详见上一个 get_quickturn_pos 功能实现
:param enable_home: 是否开启 home 点快速调整
:param enable_drag: 是否开启拖动位姿点快速调整
:param enable_transport:是否开启发货位姿点快速调整
:param end_posture: 末端姿态的调整方式,取值 0-6详见 get_quickturn_pos 注释
:return: None
"""
self.execution("move.set_quickturn_pos", enable_home=enable_home, enable_drag=enable_drag, enable_transport=enable_transport, end_posture=end_posture)
def move2quickturn(self, name: str): # OK
"""
运动到指定的快速调整位姿
:param name: 指定快速调整的名称home/drag/transport
:return: None
"""
self.execution("move.quick_turn", name=name)
def stop_move(self, stoptype=0): # OK
"""
停止运动
TS_READY | TS_JOG | TS_LOADIDENTIFY | TS_DYNAMICIDENTIFY | TS_DRAG | TS_PROGRAM | TS_DEMO | TS_RCI | TS_DEBUG | TS_FRICTIONIDENTIFY
:param stoptype: 对应控制器的任务空间类型TaskSpace的枚举值0-7
:return: None
"""
self.execution("move.stop", stoptype=stoptype)
@property
def get_jog_params(self): # OK
"""
获取JOG的参数
世界坐标系 WORLD_COORDINATE 0
法兰盘坐标系 FLANGE_COORDINATE 1
基坐标系 BASE_COORDINATE 2
工具坐标系 TOOL_COORDINATE 3
工件坐标系 FRAME_COORDINATE 4
关节空间 JOINT_SPACE 5
:return:
{
"step": 1000 [1000-连续] [10/1/0.1/0.001-点动]
"override": 0.2 速度比率
"space": 5 JOG的空间
}
"""
return self.__get_data(currentframe().f_code.co_name, "jog.get_params")
def set_jog_params(self, step, override, space): # OK
"""
设置JOG的参数包含步长空间速度倍率
:param step: [1000-连续] [10/1/0.1/0.001-点动]
:param override: 速度比率
:param space: JOG的空间
:return: None
"""
self.execution("jog.set_params", step=step, override=override, space=space)
def start_jog(self, index: int, direction: bool = False, is_ext: bool = False): # OK
"""
开始 JOG 运动
:param index: 0-6若选轴空间则 0-6 对应 1-7 轴,若选笛卡尔空间,则 0-6 对应 xyzabc elb
:param direction: True 正方向False 反方向
:param is_ext: 是否是外部轴 jog
:return: None
"""
self.execution("jog.start", index=index, direction=direction, is_ext=is_ext)
@property
def get_socket_params(self): # OK
"""
获取socket参数
:return:
{
"auto_connect": true, // True 开机启动False 不开机启动
"disconnection_detection_time": 10, // 链接断开检测周期(s)
"disconnection_triggering_behavior": 0, // 断开连接触发行为 0无动作 1暂停程序 2暂停并下电
"enable": true, // True 开启或者 False 关闭
"ip": "", // 仅限于客户端,用于指定服务端 IP当作为服务端时该参数设置为空字符串否则会报错
"name": "name", // 连接名称
"port": "8080", // 连接端口
"reconnect_flag": true, // True 自动重连False 不自动重连
"suffix": "\r", // 指定发送分隔符
"type": 1 // 连接类型 0 client | 1 server
}
"""
return self.__get_data(currentframe().f_code.co_name, "socket.get_params")
def set_socket_params(self, enable: bool, ip: str, port: str, suffix: str, type: int = 1, **kwargs): # OK
"""
设置 socket 参数,一般作为服务器使用
:param enable: True 开启或者 False 关闭
:param ip: 连接ip
:param port: 连接端口
:param suffix: 指定发送分隔符
:param type: 0 client | 1 server
:return: None
"""
data = self.get_socket_params
keys = data.keys()
kwargs.update({"enable": enable, "ip": ip, "port": port, "suffix": suffix, "type": type})
for _ in keys:
if _ in kwargs.keys():
data[_] = kwargs[_]
self.execution("socket.set_params", data=data)
@property
def get_diagnosis_params(self, version="1.4.1"): # OK
"""
获取诊断功能开启状态,以及相应其他信息
:param version: 指定诊断工具版本
:return:
{
"delay_motion": false, // -
"display_open": false, // 诊断显示功能开启状态
"ecat_diagnosis_state": false, // -
"overrun": false, // 是否开启实时线程超时监控上报
"pdo_params": [...], // 指定版本支持的所有曲线信息
"state": true, // 诊断功能的开启状态
"turn_area": false // 转弯区是否上报
}
"""
return self.__get_data(currentframe().f_code.co_name, "diagnosis.get_params", version=version)
def set_diagnosis_params(self, display_pdo_params: list, frequency: int = 50, version: str = "1.4.1"): # OK
"""
设置诊断功能显示参数 [{"name": "hw_joint_vel_feedback", "channel": 0}, ]
:param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线
:param frequency: 采样频率,默认 50ms
:param version: xDiagnose的版本号
:return: None
"""
self.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=frequency, version=version)
def open_diagnosis(self, open: bool, display_open: bool, overrun: bool = False, turn_area: bool = False, delay_motion: bool = False): # OK
"""
打开或者关闭诊断曲线,并定义其他功能的开关(调试相关功能,比如是否开启线程超时监控和上报,转弯区以及运动延迟等)
:param open: 诊断功能控制HMI->日志->诊断设置->私服诊断开关,一般设置成 True
:param display_open: 诊断显示功能,指的是在线诊断插件中的打开 switch 的状态,需要诊断数据的情况,设置成 True
:param overrun: 实时线程超时监控上报
:param turn_area: 转弯区上报
:param delay_motion: 延迟运动
:return: None
"""
self.execution("diagnosis.open", open=open, display_open=display_open, overrun=overrun, turn_area=turn_area, delay_motion=delay_motion)
def save_diagnosis(self, save: bool = True): # OK
"""
保存诊断数据也就是主动写诊断动作HMI日志->诊断设置->保存诊断数据
:param save: 保存数据开关
:return: None
"""
self.execution("diagnosis.save", save=save)
@property
def qurry_system_io_configuration(self): # OK
"""
系统IO配置的查询协议trigger 参数取值参照如下:
FLANKS 0, //边缘触发
POS_FLANK 1, //上升沿
NEG_FLANK 2, //下降沿
HIGH_LEVEL 3, //高电平
LOW_LEVEL 4 //低电平
:return:
{
"input_system_io": {
"motor_on": {
"signal":"DI0_0",
"trigger":1
},
"motor_off": {
"signal":"DI0_0",
"trigger":2
}
},
"output_system_io": {
"sta_motor_on": {
"signal":"DO0_0"
},
"sta_robot_running": {
"signal":"DO0_1"
}
}
}
"""
return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration")
@property
def qurry_system_io_event_configuration(self): # OK
"""
查询当前系统支持的系统IO事件列表包括事件key、名称、支持的触发方式等配置
:return:
{
"input_system_event": [
{
"key": "ctrl_motor_on",
"name": "上电",
"trigger_types": [
1,
2
]
},
{
"key": "ctrl_motor_off",
"name": "下电",
"trigger_types": [
1,
2
]
}
],
"output_system_event": [
{
"key": "sta_motor_on",
"name": "上下电状态"
},
{
"key": "sta_program",
"name": "运行状态"
}
],
"input_mutex_event": [
{
"key0": "ctrl_motor_on",
"key1": "ctrl_motor_off"
},
{
"key0": "ctrl_switch_auto",
"key1": "ctrl_switch_manu"
}
]
}
"""
return self.__get_data(currentframe().f_code.co_name, "system_io.query_configuration")
def update_system_io_configuration(self, i_funcs: list, o_funcs: list, i_signals: list, o_signals: list, trig_types: list): # OK
"""
配置系统 IO
:param i_funcs: 输入,只写功能码列表
:param o_funcs: 输出,只读功能码列表
:param i_signals: DI 信号列表
:param o_signals: DO 信号列表
:param trig_types: 触发条件列表,可参考 qurry_system_io_configuration 中的触发条件
:return: None
"""
input_system_io, output_system_io = {}, {}
for i_func in i_funcs:
_index = i_funcs.index(i_func)
input_system_io[i_func] = {"signal": i_signals[_index], "trigger": trig_types[_index]}
for o_func in o_funcs:
_index = o_funcs.index(o_func)
output_system_io[o_func] = {"signal": o_signals[_index]}
self.execution("system_io.update_configuration", input_system_io=input_system_io, output_system_io=output_system_io)
@property
def get_fieldbus_device_params(self): # OK
"""
获取的是 HMI通讯->总线设备列表的信息,以及开关状态
:return:
{
"device_list": [
{
"device_name": "modbus_1",
"enable": true
}, {
"device_name": "modbus_2",
"enable": false
}
]
}
"""
return self.__get_data(currentframe().f_code.co_name, "fieldbus_device.get_params")
def set_fieldbus_device_params(self, device_name: str, enable: bool): # OK
"""
定义开关设备的协议,一次只能打开一个设备
:param device_name: 设备列表中的名称
:param enable: 是否开启,这里操作的是 HMI通信->IO设备里面的开关状态
:return: None
"""
self.execution("fieldbus_device.set_params", device_name=device_name, enable=enable)
def reload_fieldbus(self): # OK
"""
触发控制器重新加载总线设备
:return: None
"""
self.execution("fieldbus_device.load_cfg")
@property
def get_modbus_params(self): # OK
"""
获取modbus参数
:return:
{
"connect_state": true,
"enable_master": false,
"enable_slave": false,
"ip": "192.168.0.160",
"is_convert": true,
"port": 502,
"slave_id": 1
}
"""
return self.__get_data(currentframe().f_code.co_name, "modbus.get_params")
def set_modbus_params(self, enable_slave: bool, ip: str, port: int, slave_id: int, enable_master: bool): # OK
"""
设置modbus参数相当于新建
:param enable_slave: Modbus从站是否自动开启
:param ip: ip 地址
:param port: 端口
:param slave_id: 从站 ID
:param enable_master: Modbus主站是否自动开启
:return:
"""
self.execution("modbus.set_params", enable_slave=enable_slave, ip=ip, port=port, slave_id=slave_id, enable_master=enable_master)
def reload_registers(self): # OK
"""
触发控制器重新加载寄存器列表
:return: None
"""
self.execution("modbus.load_cfg")
def get_modbus_values(self, mode: str = "all"): # OK
"""
用于获取 modbus 寄存器变量值的更新信息,展示在状态监控界面
:param mode: all/change
:return:
"""
return self.__get_data(currentframe().f_code.co_name, "modbus.get_values", mode=mode)
@property
def get_soft_limit_params(self): # OK
"""
获取软限位参数
:return:
{
"enable":true
"upper":[0,0,0,0,0,0,0],
"lower":[0,0,0,0,0,0,0],
"reduced_upper":[0,0,0,0,0,0,0],
"reduced_lower":[0,0,0,0,0,0,0]
}
"""
return self.__get_data(currentframe().f_code.co_name, "soft_limit.get_params")
def set_soft_limit_params(self, **kwargs): # OK
"""
设定软限位参数 enable: bool, upper: list, lower: list, reduced_upper: list, reduced_lower: list
:enable: 是否启用软限位
:upper: 软限位上限
:lower: 软限位下限
:reduced_upper: 缩减模式软限位上限
:reduced_lower: 缩减模式软限位下限
:return: None
"""
data = self.get_soft_limit_params
keys = data.keys()
for _ in keys:
if _ in kwargs.keys():
data[_] = kwargs[_]
self.execution("soft_limit.set_params", data=data)
@property
def get_device_params(self): # OK
"""
获取设备信息
:return:
"""
return self.__get_data(currentframe().f_code.co_name, "device.get_params")
@property
def get_cart_pos(self): # OK
"""
获取机器人的当前位姿:包括轴关节角度,笛卡尔关节角度,四元数,欧拉角(臂角)
:return:
{
"joint":[0.0,0.0,0.0,0.0,0.0,0.0],
"position":[0.0,0.0,0.0,0.0,0.0,0.0],
"euler":[0.0,0.0,0.0],
"quaternion"[0.0,0.0,0.0,0.0],
"elb":0.0, // 可缺省
"ext_joint":[0.0,0.0,0.0,0.0,0.0]
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_pos")
@property
def get_joint_pos(self): # OK
"""
获取机器人的当前关节角度:包括内部轴和外部轴
:return:
{
"inner_pos": [0,0,0,0,0,0,0],
"extern_pos": [0,0,0,0,0,0,0]
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_joint_pos")
@property
def get_monitor_cfg(self): # OK
"""
获取机器人的监控配置参数RefCoordinateType 类型数据,表示当前控制器位置监测的相对坐标系
基坐标系 REF_COORDINATE_BASE 0
世界坐标系 REF_COORDINATE_WORLD 1
工件坐标系 REF_COORDINATE_WOBJ 2
:return:
{
"ref_coordinate": 0
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_monitor_cfg")
def set_monitor_cfg(self, ref_coordinate): # OK
"""
设置机器人的监控配置参数
:ref_coordinate: RefCoordinateType类型数据用来设置当前控制器位置监测的相对坐标系
:return: None
"""
self.execution("move.set_monitor_cfg", ref_coordinate=ref_coordinate)
@property
def get_move_params(self): # OK
"""
获取机器人的运动参数包括减速比、耦合比、最大速度、加速度、加加速度、acc ramp time、规划步长等信息
:return:
{
"MOTION": {
"ACC_RAMPTIME_JOG": 0.5,
"ACC_RAMPTIME_STOP": 0.5,
"DEFAULT_ACC_PARAMS": [1.0, 0.5],
"JERK_LIMIT_CART": 0,
"JERK_LIMIT_JOINT": 0,
"JERK_LIMIT_ROT": 0,
"JOINT_MAX_ACC": [500, 500, 1000, 1000, 1000],
"JOINT_MAX_JERK": [2000, 2000, 2000, 4000, 4000, 4000],
"JOINT_MAX_SPEED": [120.0, 120.0, 180.0, 180.0, 180.0, 180.0],
"MAX_ACC_PARAMS": [1.0, 1],
"MIN_ACC_PARAMS": [0.3, 0.05],
"TCP_MAX_ACC": 5000,
"TCP_MAX_JERK": 10000,
"TCP_MAX_SPEED": 1000,
"TCP_ROTATE_MAX_ACC": 1800,
"TCP_ROTATE_MAX_JERK": 3600,
"TCP_ROTATE_MAX_SPEED": 180,
"VEL_SMOOTH_FACTOR": 1.0,
"VEL_SMOOTH_FACTOR_RANGE": [1.0, 10.0]
}
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_params")
def set_move_params(self, **kwargs): # OK
"""
设置机器人的运动参数轴最大速度、轴最大加加速度、速度、加加速度、加速度、加加速度、acc ramp time、规划步长等信息
可选参数:参考 get_move_params 函数返回值 MOTION 里面的选项
:return: None
"""
data = self.get_move_params["MOTION"]
print(f"res = {data}")
keys = data.keys()
for _ in keys:
if _ in kwargs.keys():
data[_] = kwargs[_]
self.execution("move.set_params", data=data)
@property
def get_quick_stop_distance(self): # OK
"""
获取机器人 search 指令最大停止距离
:return:
{
"distance":2.0
}
"""
return self.__get_data(currentframe().f_code.co_name, "move.get_quickstop_distance")
def set_quick_stop_distance(self, distance: float): # OK
"""
设置机器人 search 指令最大停止距离
:param distance: 停止距离,单位 mm
:return: None
"""
self.execution("move.set_quickstop_distance", distance=distance)
@property
def get_collision_params(self): # OK
"""
获取碰撞检测相关参数
:return:
{
"action": 1, // 触发行为1-安全停止2-触发暂停3-柔顺停止
"coeff": [100, 100, 100, 100, 100, 100], // 0-整机灵敏度百分比1-单轴灵敏度百分比2-单轴和整机灵敏度百分比
"coeff_level": 0, // 灵敏度等级0-低1-中2-高
"compliance": 0, // 柔顺功能比例系数,[0-1]
"enable": true, // 功能使能开关
"fallback_distance": 3, // 回退距离
"mode": 0, // 力传感器系数0 整机 1 单轴
"percent": 100, // 0-200整机灵敏度百分比
"percent_axis": [100, 100, 100, 100, 100, 100], // 0-200单轴灵敏度百分比
"reduced_percent": 100, // 0-200整机缩减模式灵敏度百分比
"reduced_percent_axis": [100, 100, 100, 100, 100, 100] // 0-200单轴缩减模式灵敏度百分比
}
"""
return self.__get_data(currentframe().f_code.co_name, "collision.get_params")
def set_collision_params(self, enable, mode, action, percent, **kwargs): # OK
"""
设置碰撞检测相关参数
:param enable: 功能使能开关
:param mode: 力传感器系数0 整机 1 单轴
:param action: 触发行为1-安全停止2-触发暂停3-柔顺停止
:param percent: 0-200整机灵敏度百分比
:return:
"""
data = self.get_collision_params
keys = data.keys()
kwargs.update({"enable": enable, "mode": mode, "action": action, "percent": percent})
for _ in keys:
if _ in kwargs.keys():
data[_] = kwargs[_]
self.execution("collision.set_params", data=data)
def set_collision_state(self, collision_state: bool): # NG
"""
开启或者关闭虚拟墙碰撞检测,测试该函数功能无效!!!
:param collision_state: 碰撞检测的开关状态
:return: None
"""
self.execution("collision.set_state", collision_state=collision_state)
@property
def get_robot_state(self): # OK
"""
{
"rc_state":"normal", # "fatal""error""block""normal"
"engine":"on", # "fatal""error""GStop""EStop""on""off"
"servo_mode":"position", # "torque""position"
"operate": "auto", # "auto""manual"
"task_space": "program", # "jog""drag""ready""load_identify""demo""rci""dynamic_identify""program""debug"
"robot_action": "idle", # "idle""busy"
"safety_mode": "collision" # "normal""collision""collaboration"
}
"""
return self.__get_data(currentframe().f_code.co_name, "state.get_state")
def set_controller_params(self, robot_time: str): # OK
"""
设置控制器系统时间
:param robot_time: 系统时间,"2020-02-28 15:28:30"
:return: None
"""
self.execution("controller.set_params", time=robot_time)
@property
def get_robot_params(self): # OK
"""
"alias": "",
"auth_state":
"controller_type": "XBC_XMATE",
"controller_types": ["XBC_3", "XBC_5", "XBC_XMATE"],
"disk_serial_number": "2338020401535",
"mac_addr": "34:df:20:03:1b:45",
"model":,
"nic_list": ["enp1s0", "enp2s0"],
"occupied_addr": "192.168.2.123:49269",
"period": 0.002,
"period_types": [0.001, 0.002, 0.003, 0.004],
"robot_template": 10,
"robot_type": "XMC12-R1300-W7G3B1C",
"robot_types": ["XMC12-R1300-B7S3B0C", "XMC12-R1300-W7G3B1C", "XMC17_5-R1900-W7G3B1C", "XMC20-R1650-B7G3Z0C"],
"security_type": "ROKAE_RSC",
"security_types": ["ROKAE_MINI", "ROKAE_RSC"],
"time": "2024-09-13 12:36:38",
"version": "2.3.0.4"
"""
return self.__get_data(currentframe().f_code.co_name, "controller.get_params")
def switch_tp_mode(self, mode: str): # OK
"""
切换示教器模式
:param mode: with/without
:return: None
"""
match mode:
case "with":
self.execution("state.set_tp_mode", tp_mode="with")
case "without":
self.execution("state.set_tp_mode", tp_mode="without")
case _:
self.logger("ERROR", "openapi", f"hmi: switch_tp_mode 参数错误{mode} 非法参数,只接受 with/without", "red", "ArgumentError")
@property
def get_tp_mode(self): # OK
"""
获取示教器连接状态
:return:
{
"tp_mode":"with"
}
"""
return self.__get_data(currentframe().f_code.co_name, "state.get_tp_mode")
@property
def get_drag_params(self): # OK
"""
获取拖动模式参数
:return:
{
"enable": true,
"space": 0,
"type": 0
}
"""
return self.__get_data(currentframe().f_code.co_name, "drag.get_params")
def set_drag_params(self, enable: bool, space: int = 1, type: int = 2): # OK
"""
设置拖动模式开关以及参数
:param enable: 是否启用拖动
:param space: 拖动空间 - 0 代表关节 1 代表笛卡尔
:param type: 拖动类型 - 0 只平移 1 只旋转 2 自由拖动
:return: None
"""
self.execution("drag.set_params", enable=enable, space=space, type=type)
def set_safety_area_signal(self, signal: bool): # OK
"""
设置安全区域信号控制使能开关
:param signal: True 打开 False 关闭
:return: None
"""
self.execution("safety.safety_area.signal_enable", protocol_flag=1, signal=signal)
def set_safety_area_overall(self, enable: bool): # OK
"""
设置安全区域整体控制使能开关
:param enable: True 打开 False 关闭
:return: None
"""
self.execution("safety.safety_area.overall_enable", protocol_flag=1, enable=enable)
@property
def get_safety_area_params(self): # OK
"""
获取安全区所有的配置信息
:return:
"g": {
"safety_area_data": {
"overall_enable": true,
"safety_area_setting": [
{
"box": {
"Lx": 100.0,
"Ly": 100.0,
"Lz": 100.0,
"direction": false,
"ori": {
"euler": {
"a": 179.9963851353547,
"b": -0.006653792532429416,
"c": 179.9934560302729
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
},
"enable": false,
"id": 0,
"name": "region1",
"plane": {
"direction": true,
"point": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"vector": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
},
"shape": 0,
"shared_bind_di": "",
"shared_bind_do": "",
"sphere": {
"ori": {
"euler": {
"a": 0.0,
"b": 0.0,
"c": 0.0
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"radius": 0.0
},
"state": true,
"trigger": 0,
"type": 0,
"vertebral": {
"high": 0.0,
"ori": {
"euler": {
"a": 0.0,
"b": 0.0,
"c": 0.0
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"radius": 0.0
}
},
... // 剩余 9 个安全区域的配置信息
],
"signal_enable": true
}
}
}
"""
return self.__get_data(currentframe().f_code.co_name, "safety_area_data", flag=1)
def set_safety_area_enable(self, id: int, enable: bool): # OK
"""
设置每个安全区域的开关
:param id: 安全区域开关0-9
:param enable: True 打开False 关闭
:return: None
"""
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable)
def set_safety_area_param(self, id: int, enable: bool, **kwargs): # OK
"""
设定单独安全区的参数
:param id: 安全区 id
:param enable: 是否开启
:param kwargs: 其他参数,参考 get_safety_area_params 的返回值形式
:return: None
"""
data = self.get_safety_area_params["g"]["safety_area_data"]["safety_area_setting"][id]
keys = data.keys()
kwargs.update({"id": id, "enable": enable})
for _ in keys:
if _ in kwargs.keys():
data[_] = kwargs[_]
self.execution("safety.safety_area.set_param", protocol_flag=1, data=data)
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable)
@property
def get_filtered_error_code(self): # OK
"""
获取已设定的错误码过滤列表
:return:
{
"g": {
"log_code.data": {
"code_list": [
{
"id": 100,
"title": "DEMO\\u610f\\u5916\\u505c\\u6b62"
},
{
"id": 10000,
"title": "HMI\\u8bf7\\u6c42\\u5305\\u89e3\\u6790\\u9519\\u8bef"
},
{
"id": 10002,
"title": "\\u5feb\\u901f\\u8c03\\u6574\\u542f\\u52a8\\u5931\\u8d25"
}
],
"version": 1
}
}
}
"""
return self.__get_data(currentframe().f_code.co_name, "log_code.data", flag=1)
def set_filtered_error_code(self, action: str, code_list: list): # OK
"""
清空,增加,删除错误过滤码
:param action: 支持 add/remove/clear当为 clear 时code_list 可为任意列表
:param code_list: 需要添加/删除的过滤码列表
:return: None
"""
origin_code_list = self.get_filtered_error_code["g"]["log_code.data"]["code_list"]
# [{"id": 10000, "title": "HMI请求包解析错误"}, {"id": 10002, "title": "快速调整启动失败"}]
if action == "clear":
code_list = []
elif action == "add":
for error_code in code_list:
for item in origin_code_list:
if error_code == item["id"]:
break
else:
origin_code_list.append({"id": error_code, "title": ""})
code_list = origin_code_list
elif action == "remove":
for error_code in code_list:
for item in origin_code_list:
if error_code == item["id"]:
origin_code_list.remove(item)
code_list = origin_code_list
self.execution("log_code.data.code_list", protocol_flag=1, code_list=code_list)
def __get_data(self, upper_func, command, flag=0, **kwargs):
msg_id, state = self.execution(command, protocol_flag=flag, **kwargs)
records = clibs.c_hr.get_from_id(msg_id)
for record in records:
if "请求发送成功" not in record[0]:
data = eval(record[0])["data"]
return data
# =================================== ↑↑↑ specific functions ↑↑↑ ===================================
class ExternalCommunication(QThread):
output = Signal(str, str)
def __init__(self, ip, port, /):
super().__init__()
self.c = None
self.ip = ip
self.port = int(port)
self.suffix = "\r"
self.exec_desc = " :--: 返回 true 表示执行成功false 失败"
def logger(self, level, module, content, color="black", error="", flag="both"):
flag = "cursor" if level.upper() == "DEBUG" else "both"
clibs.logger(level, module, content, color, flag, signal=self.output)
if level.upper() == "ERROR":
raise Exception(f"{error} | {content}")
def net_conn(self):
# clibs.c_hr.execution("socket.set_params", enable=False, ip="0.0.0.0", port=str(self.port), suffix="\r", type=1)
# time.sleep(clibs.INTERVAL)
clibs.c_hr.execution("socket.set_params", enable=True, ip="0.0.0.0", port=str(self.port), suffix="\r", type=1)
time.sleep(clibs.INTERVAL*2)
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.settimeout(clibs.INTERVAL*5)
try:
self.c.connect((self.ip, self.port))
self.logger("INFO", "openapi", f"ec: 外部通信连接成功...", "green")
return self.c
except Exception as err:
self.logger("ERROR", "openapi", f"ec: 外部通信连接失败... {err}", "red", "EcConnFailed")
def close(self):
if clibs.status["ec"]:
try:
self.c.close()
self.logger("INFO", "openapi", f"ec: 关闭外部通信连接成功", "green")
except Exception as err:
self.logger("ERROR", "openapi", f"ec: 关闭外部通信连接失败:{err}", "red", "EcCloseFailed")
def s_string(self, directive):
order = "".join([directive, self.suffix])
self.c.send(order.encode())
def r_string(self, directive):
result, char = "", ""
while char != self.suffix:
try:
char = self.c.recv(1).decode(encoding="unicode_escape")
except Exception as err:
self.logger("ERROR", "openapi", f"ec: 获取请求指令 {directive} 的返回数据超时,需确认指令发送格式以及内容正确!具体报错信息如下 {err}", "red", "RecvMsgFailed")
result = "".join([result, char])
return result
# =================================== ↓↓↓ specific functions ↓↓↓ ===================================
def motor_on(self): # OK
return self.__exec_cmd("motor_on", "电机上电", self.exec_desc)
def motor_off(self): # OK
return self.__exec_cmd("motor_off", "电机下电", self.exec_desc)
def pp_to_main(self): # OK
return self.__exec_cmd("pp_to_main", "程序指针到", self.exec_desc)
def program_start(self): # OK
return self.__exec_cmd("start", "程序启动(可能需先清告警)", self.exec_desc)
def program_stop(self): # OK
return self.__exec_cmd("stop", "程序停止", self.exec_desc)
def clear_alarm(self): # OK
return self.__exec_cmd("clear_alarm", "清除伺服报警", self.exec_desc)
def switch_operation_auto(self): # OK
return self.__exec_cmd("switch_mode:auto", "切换到自动模式", self.exec_desc)
def switch_operation_manual(self): # OK
return self.__exec_cmd("switch_mode:manual", "切换到手动模式", self.exec_desc)
def open_drag_mode(self): # OK
return self.__exec_cmd("open_drag", "打开拖动", self.exec_desc)
def close_drag_mode(self): # OK
return self.__exec_cmd("close_drag", "关闭拖动", self.exec_desc)
def get_program_list(self): # OK
return self.__exec_cmd("list_prog", "获取工程列表")
def get_current_program(self): # OK
return self.__exec_cmd("current_prog", "当前工程")
def load_program(self, program_name): # OK
return self.__exec_cmd(f"load_prog:{program_name}", "加载工程", self.exec_desc)
def estop_reset(self): # OK | 复原外部 IO 急停和安全门急停,前提是硬件已复原
return self.__exec_cmd("estop_reset", "急停复位", self.exec_desc)
def estopreset_and_clearalarm(self): # OK | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原
return self.__exec_cmd("estopreset_and_clearalarm", "急停复位并清除报警", self.exec_desc)
def motoron_pp2main_start(self): # OK
return self.__exec_cmd("motoron_pptomain_start", "依次执行上电程序指针到main启动程序(可以是手动模式,必要时需清告警)", self.exec_desc)
def motoron_start(self): # OK
return self.__exec_cmd("motoron_start", "依次执行上电,启动程序(可以是手动模式,必要时需清告警)", self.exec_desc)
def pause_motoroff(self): # OK
return self.__exec_cmd("pause_motoroff", "暂停程序并下电", self.exec_desc)
def set_program_speed(self, speed: int): # OK | 1-100
return self.__exec_cmd(f"set_program_speed:{speed}", "设置程序运行速率(滑块)", self.exec_desc)
def set_soft_estop(self, enable: str): # OK
return self.__exec_cmd(f"set_soft_estop:{enable}", "触发(true)/解除(false)机器人软急停", self.exec_desc)
def switch_auto_motoron(self): # OK
return self.__exec_cmd("switch_auto_motoron", "切换自动模式并上电", self.exec_desc)
def open_safe_region(self, number: int): # OK | 1-10
return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域(1-10信号控制开关需打开不限操作模式)", self.exec_desc)
def close_safe_region(self, number: int): # OK | 1-10
return self.__exec_cmd(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域(1-10信号控制开关需打开不限操作模式)", self.exec_desc)
def open_reduced_mode(self): # OK
return self.__exec_cmd("open_reduced_mode", "开启缩减模式(不限操作模式)", self.exec_desc)
def close_reduced_mode(self): # OK
return self.__exec_cmd("close_reduced_mode", "关闭缩减模式(不限操作模式)", self.exec_desc)
def setdo_value(self, do_name: str, do_value: str): # OK | do_value 为 true/false
return self.__exec_cmd(f"setdo:{do_name},{do_value}", f"设置 {do_name} 的值为 {do_value} ", self.exec_desc)
def modify_system_time(self, robot_time): # OK
return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time}", self.exec_desc)
# --------------------------------------------------------------------------------------------------
@property
def motor_on_state(self): # OK
return self.__exec_cmd("motor_on_state", "获取上电状态", " :--: 返回 true 表示已上电false 已下电")
@property
def robot_running_state(self): # OK
return self.__exec_cmd("robot_running_state", "获取程序运行状态", " :--: 返回 true 表示正在运行false 未运行")
@property
def estop_state(self): # OK | 只表示外部急停,安全门触发不会返回 true只要有急停标识 E 字母,就会返回 true
return self.__exec_cmd("estop_state", "急停状态", " :--: 返回 true 表示处于急停状态false 非急停")
@property
def operation_mode(self): # OK
return self.__exec_cmd("operating_mode", "获取工作模式", " :--: 返回 true 表示自动模式false 手动模式")
@property
def home_state(self): # OK | 需要设置一下 "HMI 设置->快速调整"
return self.__exec_cmd("home_state", "获取 HOME 输出状态", " :--: 返回 true 表示法兰中心处于 HOME 点false 未处于 HOME 点")
@property
def fault_state(self): # OK
return self.__exec_cmd("fault_state", "获取 故障状态", " :--: 返回 true 表示处于故障状态false 非故障状态")
@property
def collision_state(self): # OK | 但是触发后,无法清除?
return self.__exec_cmd("collision_state", "获取碰撞触发状态", " :--: 返回 true 表示碰撞已触发false 未触发")
@property
def task_state(self): # OK
return self.__exec_cmd("task_state", "获取机器人运行任务状态", " :--: 返回 program 表示任务正在运行ready 未运行")
@property
def get_cart_pos(self): # OK | cart_pos/cart_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别
return self.__exec_cmd("cart_pos", "获取笛卡尔位置")
@property
def get_joint_pos(self): # OK | jnt_pos/jnt_pos_name 都可以正常返回,区别在返回的前缀,可测试辨别
return self.__exec_cmd("jnt_pos", "获取轴位置")
@property
def get_axis_vel(self): # OK | jnt_vel/jnt_vel_name 都可以正常返回,区别在返回的前缀,可测试辨别
return self.__exec_cmd("jnt_vel", "获取轴速度")
@property
def get_axis_trq(self): # OK | jnt_trq/jnt_trq_name 都可以正常返回,区别在返回的前缀,可测试辨别
return self.__exec_cmd("jnt_trq", "获取轴力矩")
@property
def reduced_mode_state(self): # OK
return self.__exec_cmd("reduced_mode_state", "获取缩减模式状态", " :--: 返回 true 表示缩减模式false 非缩减模式")
def get_io_state(self, io_list: str): # OK | DO0_0,DI1_3,DO2_5不能有空格
return self.__exec_cmd(f"io_state:{io_list}", "获取 IO 状态值")
@property
def alarm_state(self): # OK
return self.__exec_cmd("alarm_state", "获取报警状态", " :--: 返回 true 表示当前有告警false 没有告警")
@property
def collision_alarm_state(self): # OK
return self.__exec_cmd("collision_alarm_state", "获取碰撞报警状态", " :--: 返回 true 表示有碰撞告警false 没有碰撞告警")
@property
def collision_open_state(self): # OK
return self.__exec_cmd("collision_open_state", "获取碰撞检测开启状态", " :--: 返回 true 表示已开启碰撞检测false 未开启")
@property
def controller_is_running(self): # OK
return self.__exec_cmd("controller_is_running", "判断控制器是否开机", " :--: 返回 true 表示控制器正在运行false 未运行")
@property
def encoder_low_battery_state(self): # OK
return self.__exec_cmd("encoder_low_battery_state", "编码器低电压报警状态", " :--: 返回 true 表示编码器处于低电压状态false 电压正常")
@property
def robot_error_code(self): # OK
return self.__exec_cmd("robot_error_code", "获取机器人错误码")
@property
def rl_pause_state(self): # OK
# 0 -- 初始化状态,刚开机上电时
# 1 -- RL 运行中
# 2 -- HMI 暂停
# 3 -- 系统 IO 暂停
# 4 -- 寄存器功能码暂停
# 5 -- 外部通讯暂停
# 6 --
# 7 -- Pause 指令暂停
# 8 --
# 9 --
# 10 -- 外部 IO 急停
# 11 -- 安全门急停
# 12 -- 其他因素停止,比如碰撞检测
return self.__exec_cmd("program_full", "获取 RL 的暂停状态", " :--: 返回值含义详见功能定义")
@property
def program_reset_state(self): # OK
return self.__exec_cmd("program_reset_state", "获取程序复位状态", " :--: 返回 true 表示指针指向 mainfalse 未指向 main")
@property
def program_speed_value(self): # OK | 速度滑块
return self.__exec_cmd("program_speed", "获取程序运行速度")
@property
def robot_is_busy(self): # OK | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发
return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态", " :--: 返回 1 表示控制器忙碌0 非忙碌状态")
@property
def robot_is_moving(self): # OK
return self.__exec_cmd("robot_is_moving", "获取程序运行状态", " :--: 返回 true 表示机器人正在运动false 未运动")
@property
def safe_door_state(self): # OK
return self.__exec_cmd("safe_door_state", "获取安全门状态", " :--: 返回 true 表示安全门已触发false 未触发")
@property
def soft_estop_state(self): # OK
return self.__exec_cmd("soft_estop_state", "获取软急停状态", " :--: 返回 true 表示软急停已触发false 未触发")
@property
def get_cart_vel(self): # OK
return self.__exec_cmd("cart_vel", "获取笛卡尔速度")
@property
def get_tcp_pos(self): # OK
return self.__exec_cmd("tcp_pose", "获取 TCP 位姿")
@property
def get_tcp_vel(self): # OK
return self.__exec_cmd("tcp_vel", "获取 TCP 速度")
@property
def get_tcp_vel_mag(self): # OK
return self.__exec_cmd("tcp_vel_mag", "获取 TCP 合成线速度")
@property
def ext_estop_state(self): # OK
return self.__exec_cmd("ext_estop_state", "获取外部轴急停状态", " :--: 返回 true 表示外部轴急停已触发false 未触发")
@property
def hand_estop_state(self): # OK
return self.__exec_cmd("hand_estop_state", "获取手持急停状态", " :--: 返回 true 表示手持急停已触发false 未触发")
@property
def collaboration_state(self): # OK
return self.__exec_cmd("collaboration_state", "获取协作模式状态(其实就是缩减模式)", " :--: 返回 true 表示协作模式false 非协作模式")
def __exec_cmd(self, directive, description, more_desc=""):
self.s_string(directive)
time.sleep(clibs.INTERVAL)
result = self.r_string(directive).strip()
self.logger("DEBUG", "openapi", f"ec: 执行{description}指令是 {directive},返回值为 {result}{more_desc}")
return result
class PreDos(object):
def __init__(self, ip, ssh_port, username, password):
self.__ssh = None
self.__sftp = None
self.ip = ip
self.ssh_port = ssh_port
self.username = username
self.password = password
def __ssh2server(self):
try:
self.__ssh = SSHClient()
self.__ssh.set_missing_host_key_policy(AutoAddPolicy())
self.__ssh.connect(hostname=self.ip, port=self.ssh_port, username=self.username, password=self.password)
self.__sftp = self.__ssh.open_sftp()
except Exception as err:
print(f"predos: SSH 无法连接到 {self.ip}:{self.ssh_port},需检查网络连通性或者登录信息是否正确 {err}")
raise Exception("SshConnFailed")
@clibs.handle_exception
def push_prj_to_server(self, prj_file):
# prj_file本地工程完整路径
self.__ssh2server()
prj_name = prj_file.split("/")[-1].split(".")[0]
self.__sftp.put(prj_file, f"/tmp/{prj_name}.zip")
cmd = f"cd /tmp; mkdir {prj_name}; unzip -d {prj_name} -q {prj_name}.zip; rm -rf /tmp/{prj_name}.zip; "
cmd += f"sudo rm -rf /home/luoshi/bin/controller/projects/{prj_name}; "
cmd += f"sudo mv /tmp/{prj_name}/ /home/luoshi/bin/controller/projects/"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
self.__ssh.close()
def pull_prj_from_server(self, prj_name, local_prj_path): # NG | 可以拉取文件,但是导入之后,有问题
# prj_name要拉取的服务端工程名
# local_prj_path本地工程文件的完整路径
self.__ssh2server()
cmd = f"cd /tmp/; sudo rm -rf {prj_name}*; sudo cp -rf /home/luoshi/bin/controller/projects/{prj_name} .; "
cmd += f"sudo zip -q -r {prj_name}.zip {prj_name}/; sudo rm -rf {prj_name}"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
print(stdout.read().decode()) # 需要read一下才能正常执行
print(stderr.read().decode())
self.__sftp.get(f"/tmp/{prj_name}.zip", local_prj_path)
cmd = f"sudo rm -rf /tmp/{prj_name}.zip"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
print(stdout.read().decode()) # 需要read一下才能正常执行
print(stderr.read().decode())
self.__ssh.close()
def push_file_to_server(self, local_file, server_file):
# local_file本地文件完整路径
# server_file服务端文件完整路径
self.__ssh2server()
filename = local_file.split("\\")[-1].split("/")[-1]
self.__sftp.put(local_file, f"/tmp/{filename}")
cmd = f"sudo mv /tmp/{filename} {server_file}"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
self.__ssh.close()
def pull_file_from_server(self, server_file, local_file):
# local_file本地文件完整路径
# server_file服务端文件完整路径
self.__ssh2server()
cmd = f"sudo cp {server_file} /tmp/"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
filename = server_file.split("/")[-1]
self.__sftp.get(f"/tmp/{filename}", f"{local_file}")
cmd = f"sudo rm -rf /tmp/{filename}"
stdin, stdout, stderr = self.__ssh.exec_command(cmd, get_pty=True)
stdin.write(self.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
self.__ssh.close()
class RobotInit(object):
@staticmethod
@clibs.handle_exception
def modbus_init():
clibs.c_pd = PreDos(clibs.ip_addr, clibs.ssh_port, clibs.username, clibs.password)
# 推送配置文件
msg_id = clibs.c_hr.execution("controller.get_params")
records = clibs.c_hr.get_from_id(msg_id)
for record in records:
if "请求发送成功" not in record[0]:
robot_params = eval(record[0])
robot_type = robot_params["data"]["robot_type"]
security_type = robot_params["data"]["security_type"]
controller_type = robot_params["data"]["controller_type"]
io_device_file = "_".join(["io_device", controller_type, security_type, "1"])
user_settings = "/home/luoshi/bin/controller/user_settings"
interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}"
config_files = [
f"{clibs.PREFIX}/files/projects/fieldbus_device.json",
f"{clibs.PREFIX}/files/projects/registers.json",
f"{clibs.PREFIX}/files/projects/registers.xml"
]
for config_file in config_files:
filename = config_file.split("/")[-1]
clibs.c_pd.push_file_to_server(config_file, f"{user_settings}/{filename}")
clibs.c_pd.push_file_to_server(config_file, f"{interactive_data}/{filename}")
io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6}
io_device_file_local = f"{io_device_file}"
io_device_file_local_tmp = f"{io_device_file}_tmp"
io_device_file_remote = f"{user_settings}/{io_device_file}"
clibs.c_pd.pull_file_from_server(io_device_file_remote, io_device_file_local)
with open(io_device_file_local, mode="r", encoding="utf-8") as f:
data = json.load(f)
for _ in data["device_list"]:
if _["extend_attr"].get("name", None) == "autotest":
break
else:
data["device_list"].append(io_device_autotest)
with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp:
json.dump(data, f_tmp, indent=4)
clibs.c_pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}")
clibs.c_pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}")
try:
os.remove(io_device_file_local)
os.remove(io_device_file_local_tmp)
except:
...
clibs.c_hr.execution("fieldbus_device.load_cfg")
clibs.c_hr.execution("fieldbus_device.set_params", device_name="autotest", enable=True)
clibs.c_hr.execution("io_device.load_cfg")
clibs.c_hr.execution("modbus.load_cfg")
def robot_init(self):
pd = PreDos(clibs.ip_addr, clibs.ssh_port, clibs.username, clibs.password)
# 推送配置文件
print("init: 推送配置文件 fieldbus_device.json/registers.json/registers.xml 到控制器,并配置 IO 设备,设备号为 7...")
robot_params = clibs.c_hr.get_robot_params
robot_type = robot_params["robot_type"]
security_type = robot_params["security_type"]
controller_type = robot_params["controller_type"]
io_device_file = "_".join(["io_device", controller_type, security_type, "1"])
user_settings = "/home/luoshi/bin/controller/user_settings"
interactive_data = f"/home/luoshi/bin/controller/interactive_data/{robot_type}"
config_files = [
"..\\assets\\configs\\fieldbus_device.json",
"..\\assets\\configs\\registers.json",
"..\\assets\\configs\\registers.xml"
]
for config_file in config_files:
filename = config_file.split("\\")[-1]
pd.push_file_to_server(config_file, f"{user_settings}/{filename}")
pd.push_file_to_server(config_file, f"{interactive_data}/{filename}")
io_device_autotest = {"ai_num": 0, "ao_num": 0, "di_num": 16, "do_num": 16, "extend_attr": {"mode": "slaver", "name": "autotest", "type": "MODBUS"}, "id": 7, "name": "autotest", "type": 6}
io_device_file_local = f"..\\assets\\configs\\{io_device_file}"
io_device_file_local_tmp = f"..\\assets\\configs\\{io_device_file}_tmp"
io_device_file_remote = f"{user_settings}/{io_device_file}"
pd.pull_file_from_server(io_device_file_remote, io_device_file_local)
with open(io_device_file_local, mode="r", encoding="utf-8") as f:
data = json.load(f)
for _ in data["device_list"]:
if _["extend_attr"].get("name", None) == "autotest":
break
else:
data["device_list"].append(io_device_autotest)
with open(io_device_file_local_tmp, mode="w", encoding="utf-8") as f_tmp:
json.dump(data, f_tmp, indent=4)
pd.push_file_to_server(io_device_file_local_tmp, f"{user_settings}/{io_device_file}")
pd.push_file_to_server(io_device_file_local_tmp, f"{interactive_data}/{io_device_file}")
clibs.c_hr.reload_io()
clibs.c_hr.reload_registers()
clibs.c_hr.reload_fieldbus()
clibs.c_hr.set_fieldbus_device_params("autotest", True)
# 触发急停并恢复
clibs.c_md.r_soft_estop(0)
clibs.c_md.r_soft_estop(1)
# 断开示教器连接
print("init: 断开示教器连接...")
clibs.c_hr.switch_tp_mode("without")
# 清空 system IO 配置
print("init: 清空所有的 System IO 功能配置...")
clibs.c_hr.update_system_io_configuration([], [], [], [], [])
# 关闭缩减模式
clibs.c_md.r_reduced_mode(0)
# 打开软限位
print("init: 打开软限位开关...")
clibs.c_hr.set_soft_limit_params(enable=True)
# 关闭安全区域
print("init: 正在关闭所有的安全区,并关闭总使能开关...")
clibs.c_hr.set_safety_area_overall(False)
clibs.c_hr.set_safety_area_signal(False)
for i in range(10):
clibs.c_hr.set_safety_area_enable(i, False)
# 打开外部通信,并设置控制器时间同步
print("init: 配置并打开外部通信默认服务器8080端口后缀为 \"\\r\"...")
clibs.c_hr.set_socket_params(True, "8080", "\r", 1)
clibs.c_ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
# 关闭拖动
if robot_type.upper()[:2] not in ["XB", "NB"]:
print("init: 关闭拖动模式...")
clibs.c_hr.set_drag_params(False, 1, 2)
# 关闭碰撞检测
print("init: 关闭碰撞检测...")
clibs.c_hr.set_collision_params(False, 0, 1, 100)
# 清除所有过滤错误码
print("init: 清除所有过滤错误码设定...")
clibs.c_hr.set_filtered_error_code("clear", [])
# 回拖动位姿
print("init: 正在回拖动位姿...")
clibs.c_hr.switch_operation_mode("manual")
clibs.c_hr.switch_motor_state("on")
clibs.c_hr.set_quickturn_pos(enable_drag=True)
clibs.c_hr.move2quickturn("drag")
while True:
if clibs.c_md.w_robot_is_moving:
time.sleep(1)
else:
break
clibs.c_hr.stop_move(1)
clibs.c_hr.switch_motor_state("off")
clibs.c_hr.close()
# 清除所有告警
clibs.c_md.r_clear_alarm()
def fw_updater(self, local_file_path):
fw_size = os.path.getsize(local_file_path)
if fw_size > 10000000:
# get previous version of xCore
version_previous = clibs.hr.get_robot_params["version"]
# var def
remote_file_path = "./upgrade/lircos.zip"
fw_content = bytearray()
package_data = bytearray()
package_data_with_head = bytearray()
# socket connect
print(f"update firmware: 正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.connect((clibs.ip_addr, clibs.upgrade_port))
tcp_socket.setblocking(True)
except Exception as Err:
print(f"update firmware: {Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
raise Exception("UpgradeFailed")
# get firmware content of binary format
print(f"update firmware: 正在读取 {local_file_path} 文件内容...")
with open(local_file_path, "rb") as f_fw:
fw_content += f_fw.read()
# construct package data: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634148
print("update firmware: 正在构造数据包,以及包头...")
# 1 protocol id
protocol_id = c_ushort(socket.htons(0)) # 2 bytes
package_data += protocol_id
# 2 write type
write_type = c_ubyte(0)
package_data += bytes(write_type)
# 3 md5
md5_hash = hashlib.md5()
md5_hash.update(fw_content)
fw_md5 = md5_hash.hexdigest()
i = 0
while i < len(fw_md5):
_ = (fw_md5[i:i + 2])
package_data += bytes.fromhex(_)
i += 2
# 4 remote path len
remote_file_path_len = c_ushort(socket.htons(len(remote_file_path)))
package_data += remote_file_path_len
# 5 remote path
package_data += remote_file_path.encode("ascii")
# 6 file
package_data += fw_content
# construct communication protocol: http://confluence.i.rokae.com/pages/viewpage.action?pageId=15634045
# 1 get package data with header
package_size = c_uint(socket.htonl(len(package_data)))
package_type = c_ubyte(1) # 0-无协议 1-文件 2-json
package_reserve = c_ubyte(0) # 预留位
package_data_with_head += package_size
package_data_with_head += package_type
package_data_with_head += package_reserve
package_data_with_head += package_data
# 2 send data to server
print("update firmware: 正在发送数据到 xCore升级控制器无需重启升级配置文件会自动软重启...")
start = 0
len_of_package = len(package_data_with_head)
while start < len_of_package:
end = 10240 + start
if end > len_of_package:
end = len_of_package
sent = tcp_socket.send((package_data_with_head[start:end]))
time.sleep(0.01)
if sent == 0:
raise RuntimeError("socket connection broken")
else:
start += sent
waited = 5 if fw_size > 10000000 else 25
time.sleep(waited)
if fw_size > 10000000:
# get current version of xCore
version_current = clibs.c_hr.get_robot_params["version"]
print(f"update firmware: 控制器升级成功from {version_previous} to {version_current} :)")
else:
print(f"update firmware: 配置文件升级成功 :)")
tcp_socket.close()
class UpgradeJsonCmd(object):
def __init__(self):
self.c = None
self.__sock_conn()
def __sock_conn(self):
# socket connect
print(f"正在连接 {clibs.ip_addr}:{clibs.upgrade_port}...")
try:
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.connect((clibs.ip_addr, clibs.upgrade_port))
self.c.setblocking(True)
self.c.settimeout(3)
except Exception as Err:
print(f"upgrade: {Err} | 连接 {clibs.ip_addr}:{clibs.upgrade_port} 失败...")
raise Exception("UpgradePortConnFailed")
@staticmethod
def __do_package(cmd):
package_size = struct.pack("!I", len(cmd))
package_type = struct.pack("B", 2)
reserved_byte = struct.pack("B", 0)
return package_size + package_type + reserved_byte + cmd
def __recv_result(self, cmd):
time.sleep(clibs.INTERVAL)
try:
res = self.c.recv(10240).decode()
except Exception as err:
print(f"upgrade: 请求命令 {cmd.decode()} 的返回错误 {err}")
raise Exception("ResponseError")
time.sleep(clibs.INTERVAL)
return res
def __exec(self, command: dict):
try:
self.c.recv(10240)
except Exception:
pass
cmd = json.dumps(command, separators=(",", ":")).encode("utf-8")
self.c.sendall(self.__do_package(cmd))
res = self.__recv_result(cmd)
print(f"upgrade: 请求命令 {cmd.decode()} 的返回信息:{res}")
def erase_cfg(self):
# 一键抹除机器人配置(.rc_cfg)、交互数据配置(interactive_data),但保留用户日志
# 场景如果xCore版本升级跨度过大配置文件可能不兼容导致无法启动可以使用该功能抹除配置重新生成配置。
# 机器人参数、零点等会丢失!
self.__exec({"cmd": "erase"})
def clear_rubbish(self):
self.__exec({"cmd": "clearRubbish"})
def soft_reboot(self):
self.__exec({"cmd": "soft_reboot"})
def version_query(self):
self.__exec({"query": "version"})
def robot_reboot(self):
self.__exec({"cmd": "reboot"})
def reset_passwd(self):
# 不生效,有问题
self.__exec({"cmd": "passwd"})
def backup_origin(self):
# xCore
# .rc_cfg /
# interactive_data /
# module /
# demo_project /
# robot_cfg /
# dev_eni.xml
# ecat_license
# libemllI8254x.so & libemllI8254x_v3.so
# set_network_parameters
self.__exec({"cmd": "backup_origin"})
def origin_recovery(self):
self.__exec({"cmd": "recover"})