Compare commits

...

2 Commits

2 changed files with 70 additions and 32 deletions

View File

@ -66,6 +66,10 @@ def initialization():
# 关闭缩减模式
md.r_reduced_mode(0)
# 打开软限位
clibs.logger.info("打开软限位开关...")
hr.set_soft_limit_params(enable=True)
# 关闭安全区域
clibs.logger.info("正在关闭所有的安全区,并关闭总使能开关...")
hr.set_safety_area_overall(False)
@ -73,9 +77,11 @@ def initialization():
for i in range(10):
hr.set_safety_area_enable(i, False)
# 打开外部通信
# 打开外部通信,并设置控制器时间同步
clibs.logger.info("配置并打开外部通信默认服务器8080端口后缀为 \"\\r\"...")
hr.set_socket_params(True, "8080", "\r", 1)
ec = openapi.ExternalCommunication()
ec.modify_system_time(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
# 关闭拖动
clibs.logger.info("关闭拖动模式...")

View File

@ -8,6 +8,8 @@ from time import time, sleep, strftime, localtime
from pymodbus.client.tcp import ModbusTcpClient
# from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
# from pymodbus.constants import Endian
from functools import wraps
from inspect import getcallargs
from paramiko import SSHClient, AutoAddPolicy
import clibs
@ -338,7 +340,8 @@ class HmiRequest(object):
# 尝试连续三次发送心跳包,确认建联成功
self.__error_code = 0
for i in range(3):
self.execution("controller.heart")
self.execution.__wrapped__(self, "controller.heart")
# self.execution("controller.heart")
sleep(clibs.interval/5)
if not self.__error_code:
self.__is_connected = True
@ -412,12 +415,12 @@ class HmiRequest(object):
for _ in range(3):
res = find_response(clibs.log_data_debug)
if res is not None:
return res["data"]
return res.get("data", f"{msg_id} has no data item")
sleep(clibs.interval*2)
else: # 尝试在上一次分割的日志中查找,只做一次
res = find_response("".join([clibs.log_path, listdir(clibs.log_path)[0]]))
res = find_response("".join([clibs.log_path, sorted(listdir(clibs.log_path))[-3]]))
if res is not None:
return res["data"]
return res.get("data", f"{msg_id} has no data item")
elif flag == 1:
for _ in range(3):
res = find_response_xs(clibs.log_data_debug)
@ -425,13 +428,14 @@ class HmiRequest(object):
return res
sleep(clibs.interval*2)
else:
res = find_response_xs("".join([clibs.log_path, listdir(clibs.log_path)[0]]))
res = find_response_xs("".join([clibs.log_path, sorted(listdir(clibs.log_path))[-3]]))
if res is not None:
return res
self.__sth_wrong(11, f"无法找到请求 {msg_id} 的返回结果")
def __msg_storage(self, response, flag=0):
# response是解码后的字符串
if "move.monitor" not in response:
clibs.logger.debug(f"{loads(response)}")
messages = self.c_msg if flag == 0 else self.c_msg_xs
if "move.monitor" in response:
@ -644,6 +648,17 @@ class HmiRequest(object):
except Exception as Err:
self.__sth_wrong(8, f"错误信息:{Err}")
@staticmethod
def validate_req(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
all_args = getcallargs(func, self, *args, **kwargs)
_id = func(self, *args, **kwargs)
self.get_from_id(_id, flag=all_args["protocol_flag"])
return _id
return wrapper
@validate_req
def execution(self, command, protocol_flag=0, **kwargs):
def log_reqs(request):
with open(clibs.log_data_reqs, mode="a", encoding="utf-8") as f_log:
@ -1052,7 +1067,7 @@ class HmiRequest(object):
def set_diagnosis_params(self, display_pdo_params: list, frequency: int = 50, version: str = "1.4.1"): # OK
"""
设置诊断功能显示参数
设置诊断功能显示参数 [{"name": "hw_joint_vel_feedback", "channel": 0}, ]
:param display_pdo_params: 指定要采集的曲线名称,具体可通过 get_diagnosis_params 函数功能获取所有目前已支持的曲线
:param frequency: 采样频率,默认 50ms
:param version: xDiagnose的版本号
@ -1799,10 +1814,14 @@ class ExternalCommunication(object):
self.__c.send(order.encode())
sleep(clibs.interval)
def r_string(self):
def r_string(self, directive):
result, char = "", ""
while char != self.suffix:
try:
char = self.__c.recv(1).decode(encoding="unicode_escape")
except TimeoutError:
clibs.logger.error(f"获取请求指令 {directive} 的返回数据超时,需确认指令发送格式以及内容正确!")
exit(101)
result = "".join([result, char])
sleep(clibs.interval)
return result
@ -1847,10 +1866,10 @@ class ExternalCommunication(object):
def load_program(self, program_name): # OK
return self.__exec_cmd(f"load_prog:{program_name}", "加载工程", self.exec_desc)
def estop_reset(self):
def estop_reset(self): # OK | 复原外部 IO 急停和安全门急停,前提是硬件已复原
return self.__exec_cmd("estop_reset", "急停复位", self.exec_desc)
def estopreset_and_clearalarm(self):
def estopreset_and_clearalarm(self): # OK | 外部 IO/安全门急停/安全碰撞告警都可以清除,前提是硬件已复原
return self.__exec_cmd("estopreset_and_clearalarm", "急停复位并清除报警", self.exec_desc)
def motoron_pp2main_start(self): # OK
@ -1862,7 +1881,7 @@ class ExternalCommunication(object):
def pause_motoroff(self): # OK
return self.__exec_cmd("pause_motoroff", "暂停程序并下电", self.exec_desc)
def set_program_speed(self, speed: int): # OK
def set_program_speed(self, speed: int): # OK | 1-100
return self.__exec_cmd(f"set_program_speed:{speed}", "设置程序运行速率(滑块)", self.exec_desc)
def set_soft_estop(self, enable: str): # OK
@ -1871,10 +1890,10 @@ class ExternalCommunication(object):
def switch_auto_motoron(self): # OK
return self.__exec_cmd("switch_auto_motoron", "切换自动模式并上电", self.exec_desc)
def open_safe_region(self, number: int): # OK
def open_safe_region(self, number: int): # OK | 1-10
return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域(1-10信号控制开关需打开不限操作模式)", self.exec_desc)
def close_safe_region(self, number: int): # OK
def close_safe_region(self, number: int): # OK | 1-10
return self.__exec_cmd(f"close_safe_region:{number}", f"关闭第 {number} 个安全区域(1-10信号控制开关需打开不限操作模式)", self.exec_desc)
def open_reduced_mode(self): # OK
@ -1883,11 +1902,11 @@ class ExternalCommunication(object):
def close_reduced_mode(self): # OK
return self.__exec_cmd("close_reduced_mode", "关闭缩减模式(不限操作模式)", self.exec_desc)
def setdo_value(self, do_name, do_value):
def setdo_value(self, do_name: str, do_value: str): # NG | do_value 为 true/false
return self.__exec_cmd(f"setdo:{do_name},{do_value}", f"设置 {do_name} 的值为 {do_value}", self.exec_desc)
def modify_system_time(self, robot_time):
return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time}", self.exec_desc)
def modify_system_time(self, robot_time): # OK
return self.__exec_cmd(f"set_robot_time:{robot_time}", f"修改控制器和示教器的时间为 {robot_time}", self.exec_desc)
# --------------------------------------------------------------------------------------------------
@property
@ -1899,7 +1918,7 @@ class ExternalCommunication(object):
return self.__exec_cmd("robot_running_state", "获取程序运行状态", " :--: 返回 true 表示正在运行false 未运行")
@property
def estop_state(self):
def estop_state(self): # OK | 只表示外部急停,安全门触发不会返回 true只要有急停标识 E 字母,就会返回 true
return self.__exec_cmd("estop_state", "急停状态", " :--: 返回 true 表示处于急停状态false 非急停")
@property
@ -1907,16 +1926,16 @@ class ExternalCommunication(object):
return self.__exec_cmd("operating_mode", "获取工作模式", " :--: 返回 true 表示自动模式false 手动模式")
@property
def home_state(self): # NG
return self.__exec_cmd("home_state", "获取 HOME 输出状态", " :--: 返回 true 表示处于 HOME 点false 未处于 HOME 点")
def home_state(self): # OK | 需要设置一下 "HMI 设置->快速调整"
return self.__exec_cmd("home_state", "获取 HOME 输出状态", " :--: 返回 true 表示法兰中心处于 HOME 点false 未处于 HOME 点")
@property
def fault_state(self): # OK
return self.__exec_cmd("fault_state", "获取 故障状态", " :--: 返回 true 表示处于故障状态false 非故障状态")
@property
def collision_state(self): # NG
return self.__exec_cmd("collision_state", "获取碰撞检测状态", " :--: 返回 true 表示碰撞检测打开false 未打开")
def collision_state(self): # OK | 但是触发后,无法清除?
return self.__exec_cmd("collision_state", "获取碰撞触发状态", " :--: 返回 true 表示碰撞已触发false 未触发")
@property
def task_state(self): # OK
@ -1950,8 +1969,8 @@ class ExternalCommunication(object):
return self.__exec_cmd("alarm_state", "获取报警状态", " :--: 返回 true 表示当前有告警false 没有告警")
@property
def collision_alarm_state(self):
return self.__exec_cmd("collision_alarm_state", "获取碰撞检测报警状态", " :--: 返回 true 表示碰撞检测已触发false 未触发")
def collision_alarm_state(self): # OK
return self.__exec_cmd("collision_alarm_state", "获取碰撞报警状态", " :--: 返回 true 表示碰撞告警false 没有碰撞告警")
@property
def collision_open_state(self): # OK
@ -1970,8 +1989,21 @@ class ExternalCommunication(object):
return self.__exec_cmd("robot_error_code", "获取机器人错误码")
@property
def rl_pause_state(self):
return self.__exec_cmd("program_full", "获取 RL 的暂停状态")
def rl_pause_state(self): # OK
# 0 -- 初始化状态,刚开机上电时
# 1 -- RL 运行中
# 2 -- HMI 暂停
# 3 -- 系统 IO 暂停
# 4 -- 寄存器功能码暂停
# 5 -- 外部通讯暂停
# 6 --
# 7 -- Pause 指令暂停
# 8 --
# 9 --
# 10 -- 外部 IO 急停
# 11 -- 安全门急停
# 12 -- 其他因素停止,比如碰撞检测
return self.__exec_cmd("program_full", "获取 RL 的暂停状态", " :--: 返回值含义详见功能定义")
@property
def program_reset_state(self): # OK
@ -1982,15 +2014,15 @@ class ExternalCommunication(object):
return self.__exec_cmd("program_speed", "获取程序运行速度")
@property
def robot_is_busy(self):
return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态", " :--: 返回 true 表示控制器忙碌,false 非忙碌状态")
def robot_is_busy(self): # OK | 触发条件为 pp2main/重载工程/推送到控制器,最好测试工程大一些,比较容易触发
return self.__exec_cmd("robot_is_busy", "获取程序忙碌状态", " :--: 返回 1 表示控制器忙碌,0 非忙碌状态")
@property
def robot_is_moving(self): # OK
return self.__exec_cmd("robot_is_moving", "获取程序运行状态", " :--: 返回 true 表示机器人正在运动false 未运动")
@property
def safe_door_state(self):
def safe_door_state(self): # OK
return self.__exec_cmd("safe_door_state", "获取安全门状态", " :--: 返回 true 表示安全门已触发false 未触发")
@property
@ -2027,7 +2059,7 @@ class ExternalCommunication(object):
def __exec_cmd(self, directive, description, more_desc=""):
self.s_string(directive)
result = self.r_string().strip()
result = self.r_string(directive).strip()
clibs.logger.info(f"执行{description}指令 {directive},返回值为 {result}{more_desc}")
return result