1. 获取类功能都添加 @property 装饰器

2. 修复 Modbus 安全区相关的功能
3. 重新调整了建联的逻辑
This commit is contained in:
gitea 2024-09-24 21:30:35 +08:00
parent b381ee92f5
commit 5a52f6262d
3 changed files with 187 additions and 59 deletions

View File

@ -0,0 +1,93 @@
{
"s": {
"box": {
"Lx": 0.0,
"Ly": 0.0,
"Lz": 0.0,
"direction": true,
"ori": {
"euler": {
"a": 0.0,
"b": 0.0,
"c": 0.0
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
},
"enable": false,
"id": 0,
"name": "region1",
"plane": {
"direction": true,
"point": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"vector": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
},
"shape": 0,
"shared_bind_di": "",
"shared_bind_do": "",
"sphere": {
"ori": {
"euler": {
"a": 0.0,
"b": 0.0,
"c": 0.0
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"radius": 0.0
},
"state": true,
"trigger": 0,
"type": 0,
"vertebral": {
"high": 0.0,
"ori": {
"euler": {
"a": 0.0,
"b": 0.0,
"c": 0.0
},
"quaternion": {
"q1": 0.0,
"q2": 0.0,
"q3": 0.0,
"q4": 0.0
}
},
"pos": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"radius": 0.0
}
}
}

View File

@ -1,13 +1,14 @@
import time
import openapi
import json
import clibs
def initialization():
hr = openapi.HmiRequest()
pd = openapi.PreDos()
# 推送配置文件
robot_params = hr.get_robot_params()
robot_params = hr.get_robot_params
robot_type = robot_params["robot_type"]
security_type = robot_params["security_type"]
controller_type = robot_params["controller_type"]
@ -86,12 +87,13 @@ def initialization():
hr.set_quickturn_pos(enable_drag=True)
hr.move2quickturn("drag")
while True:
if md.w_robot_moving():
if md.w_robot_is_moving:
time.sleep(1)
else:
break
hr.stop_move(1)
hr.switch_motor_state("off")
hr.close()
# 清除所有告警
md.r_clear_alarm()

View File

@ -1,4 +1,4 @@
from json import load, dumps, loads
from json import load, dumps, loads, dump
from os import listdir
from inspect import currentframe
from socket import socket, AF_INET, SOCK_STREAM
@ -99,142 +99,157 @@ class ModbusRequest(object):
self.__reg_high_pulse(40015)
clibs.logger.info(f"40015-010 执行切换为手动模式")
def r_safe_region01(self, action): # NG
clibs.logger.critical("[NG]-40016-01 该指令暂时有问题,无法实现指定功能,待修复...")
# self.__c.write_register(40016, action)
# actions = "打开" if action == 1 else "关闭"
# clibs.logger.info(f"[NG]-40016-{action} 执行{actions}安全区 safe region 01")
# sleep(clibs.interval)
def r_switch_safe_region01(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.__c.write_register(40016, False)
sleep(clibs.interval)
self.__c.write_register(40016, True)
else:
self.__c.write_register(40016, True)
sleep(clibs.interval)
self.__c.write_register(40016, False)
actions = "打开" if action else "关闭"
clibs.logger.info(f"40016-{action} 执行{actions}安全区 safe region 01")
sleep(clibs.interval)
def r_safe_region02(self, action):
clibs.logger.critical("[NG]-40017-01 该指令暂时有问题,无法实现指定功能,待修复...")
# self.__c.write_register(40017, action)
# actions = "打开" if action == 1 else "关闭"
# clibs.logger.info(f"[NG]-40017-{action} 执行{actions}安全区 safe region 02")
# sleep(clibs.interval)
def r_switch_safe_region02(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.__c.write_register(40017, False)
sleep(clibs.interval)
self.__c.write_register(40017, True)
else:
self.__c.write_register(40017, True)
sleep(clibs.interval)
self.__c.write_register(40017, False)
actions = "打开" if action else "关闭"
clibs.logger.info(f"40017-{action} 执行{actions}安全区 safe region 02")
sleep(clibs.interval)
def r_safe_region03(self, action):
clibs.logger.critical("[NG]-40018-01 该指令暂时有问题,无法实现指定功能,待修复...")
# self.__c.write_register(40018, action)
# actions = "打开" if action == 1 else "关闭"
# clibs.logger.info(f"[NG]-40018-{action} 执行{actions}安全区 safe region 03")
# sleep(clibs.interval)
def r_switch_safe_region03(self, action: bool): # OK | 上升沿打开,下降沿关闭
if action:
self.__c.write_register(40018, False)
sleep(clibs.interval)
self.__c.write_register(40018, True)
else:
self.__c.write_register(40018, True)
sleep(clibs.interval)
self.__c.write_register(40018, False)
actions = "打开" if action else "关闭"
clibs.logger.info(f"40018-{action} 执行{actions}安全区 safe region 03")
sleep(clibs.interval)
@property
def w_alarm_state(self): # OK
res = self.__c.read_holding_registers(40500, 1).registers[0]
clibs.logger.info(f"40500 获取告警状态,结果为 {res} :--: 0 表示无告警, 1 表示有告警")
clibs.logger.info(f"40500 获取告警状态,结果为 {res} :--: 0 表示无告警,1 表示有告警")
return res
@property
def w_collision_alarm_state(self): # OK
res = self.__c.read_holding_registers(40501, 1).registers[0]
clibs.logger.info(f"40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发 1 表示已触发")
clibs.logger.info(f"40501 获取碰撞告警状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_collision_open_state(self): # OK
res = self.__c.read_holding_registers(40502, 1).registers[0]
clibs.logger.info(f"40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭 1 表示开启")
clibs.logger.info(f"40502 获取碰撞检测开启状态,结果为 {res} :--: 0 表示关闭1 表示开启")
return res
@property
def w_controller_is_running(self): # OK
res = self.__c.read_holding_registers(40503, 1).registers[0]
clibs.logger.info(f"40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常 1 表示运行正常")
clibs.logger.info(f"40503 获取控制器运行状态,结果为 {res} :--: 0 表示运行异常1 表示运行正常")
return res
@property
def w_encoder_low_battery(self): # OK
res = self.__c.read_holding_registers(40504, 1).registers[0]
clibs.logger.info(f"40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压 1 表示低电压 需关注")
clibs.logger.info(f"40504 获取编码器低电压状态,结果为 {res} :--: 0 表示非低电压1 表示低电压 需关注")
return res
@property
def w_estop_state(self): # OK
res = self.__c.read_holding_registers(40505, 1).registers[0]
clibs.logger.info(f"40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发 1 表示已触发")
clibs.logger.info(f"40505 获取机器人急停状态(非软急停),结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_motor_state(self): # OK
res = self.__c.read_holding_registers(40506, 1).registers[0]
clibs.logger.info(f"40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电 1 表示已上电")
clibs.logger.info(f"40506 获取机器人上电状态,结果为 {res} :--: 0 表示未上电1 表示已上电")
return res
@property
def w_operation_mode(self): # OK
res = self.__c.read_holding_registers(40507, 1).registers[0]
clibs.logger.info(f"40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式 1 表示自动模式")
clibs.logger.info(f"40507 获取机器人操作模式,结果为 {res} :--: 0 表示手动模式1 表示自动模式")
return res
@property
def w_program_state(self): # OK
res = self.__c.read_holding_registers(40508, 1).registers[0]
clibs.logger.info(f"40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行 1 表示正在运行")
clibs.logger.info(f"40508 获取程序的运行状态,结果为 {res} :--: 0 表示未运行1 表示正在运行")
return res
@property
def w_program_not_run(self): # OK
res = self.__c.read_holding_registers(40509, 1).registers[0]
clibs.logger.info(f"40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行 1 表示未运行")
clibs.logger.info(f"40509 判定程序为未运行状态,结果为 {res} :--: 0 表示正在运行1 表示未运行")
return res
@property
def w_program_reset(self): # OK
res = self.__c.read_holding_registers(40510, 1).registers[0]
clibs.logger.info(f"40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数 1 表示指针在 main 函数")
clibs.logger.info(f"40510 判定程序指针为 pp2main 状态,结果为 {res} :--: 0 表示指针不在 main 函数1 表示指针在 main 函数")
return res
@property
def w_reduce_mode_state(self): # OK
res = self.__c.read_holding_registers(40511, 1).registers[0]
clibs.logger.info(f"40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式 1 表示缩减模式")
clibs.logger.info(f"40511 获取机器人缩减模式状态,结果为 {res} :--: 0 表示非缩减模式1 表示缩减模式")
return res
@property
def w_robot_is_busy(self): # OK
res = self.__c.read_holding_registers(40512, 1).registers[0]
clibs.logger.info(f"40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态 1 表示处于 busy 状态")
clibs.logger.info(f"40512 获取机器人是否处于 busy 状态,结果为 {res} :--: 0 表示未处于 busy 状态1 表示处于 busy 状态")
return res
@property
def w_robot_is_moving(self): # OK
res = self.__c.read_holding_registers(40513, 1).registers[0]
clibs.logger.info(f"40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示为运动 1 表示正在运动")
clibs.logger.info(f"40513 获取机器人是否处于运动状态,结果为 {res} :--: 0 表示为运动1 表示正在运动")
return res
@property
def w_safe_door_state(self): # OK
res = self.__c.read_holding_registers(40514, 1).registers[0]
clibs.logger.info(f"40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门 1 表示已触发安全门")
clibs.logger.info(f"40514 获取机器人是否处于安全门打开状态,需自动模式下执行,结果为 {res} :--: 0 表示未触发安全门1 表示已触发安全门")
return res
@property
def w_safe_region01(self): # NG
clibs.logger.critical(f"40515 获取安全区域 safe region01 功能咱不可用,待修复...")
# res = self.__c.read_holding_registers(40515, 1).registers[0]
# clibs.logger.info(f"40515 获取安全区域 safe region01 是否处于打开状态,结果为 {res}")
# return res
def w_safe_region01_trig_state(self): # OK
res = self.__c.read_holding_registers(40515, 1).registers[0]
clibs.logger.info(f"40515 获取安全区域 safe region01 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_safe_region02(self): # NG
clibs.logger.critical(f"40516 获取安全区域 safe region02 功能咱不可用,待修复...")
# res = self.__c.read_holding_registers(40516, 1).registers[0]
# clibs.logger.info(f"40516 获取安全区域 safe region02 是否处于打开状态,结果为 {res}")
# return res
def w_safe_region02_trig_state(self): # OK
res = self.__c.read_holding_registers(40516, 1).registers[0]
clibs.logger.info(f"40516 获取安全区域 safe region02 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_safe_region03(self): # NG
clibs.logger.critical(f"40517 获取安全区域 safe region03 功能咱不可用,待修复...")
# res = self.__c.read_holding_registers(40517, 1).registers[0]
# clibs.logger.info(f"40517 获取安全区域 safe region03 是否处于打开状态,结果为 {res}")
# return res
def w_safe_region03_trig_state(self): # OK
res = self.__c.read_holding_registers(40517, 1).registers[0]
clibs.logger.info(f"40517 获取安全区域 safe region03 的触发状态,结果为 {res} :--: 0 表示未触发1 表示已触发")
return res
@property
def w_soft_estop_state(self): # OK
res = self.__c.read_holding_registers(40518, 1).registers[0]
clibs.logger.info(f"40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停 1 表示已触发软急停")
clibs.logger.info(f"40518 获取机器人软急停状态,结果为 {res} :--: 0 表示未触发软急停1 表示已触发软急停")
return res
def io_write_coils(self, addr, action): # OK | 名字叫写线圈,其实是写 modbus 的 discrete inputs(DI)
@ -281,6 +296,7 @@ class HmiRequest(object):
self.__t_is_alive = Thread(target=self.__is_alive)
self.__t_is_alive.daemon = False
self.__t_is_alive.start()
sleep(1) # 很重要,必须有,因为涉及到建联成功与否
def __is_alive(self):
first_time = True
@ -301,7 +317,6 @@ class HmiRequest(object):
self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,))
self.__t_unpackage_xs.daemon = True
self.__t_unpackage_xs.start()
else:
self.__silence = False
first_time = True
@ -1727,14 +1742,32 @@ class HmiRequest(object):
"""
return self.__get_data(currentframe().f_code.co_name, "safety_area_data", flag=1)
def set_safety_area_enable(self, id: int, enable: bool, **kwargs):
def set_safety_area_enable(self, id: int, enable: bool):
"""
设置每个安全区域的开关
:param id: 安全区域开关0-9
:param enable: True 打开False 关闭
:return: None
"""
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable, **kwargs)
self.execution("safety.safety_area.safety_area_enable", protocol_flag=1, id=id, enable=enable)
def set_safety_area_param(self, id: int, enable: bool, **kwargs): # 不生效,待确认
"""
设定单独安全区的参数
:param id: 安全区 id
:param enable: 是否开启
:param kwargs: 其他参数参考 get_safety_area_params 的返回值形式
:return: None
"""
res = self.get_safety_area_params["g"]["safety_area_data"]["safety_area_setting"][id]
keys = res.keys()
kwargs.update({"id": id, "enable": enable})
for _ in keys:
if _ in kwargs.keys():
res[_] = kwargs[_]
with open(f"{clibs.PREFIX}/json/safety_area.set_param.json", mode="w", encoding="utf-8") as f:
dump({"s": res}, f, indent=4)
self.execution("safety_area.set_param", protocol_flag=1)
@property
def get_filtered_error_code(self):
@ -1766,7 +1799,7 @@ class HmiRequest(object):
return self.__get_data(currentframe().f_code.co_name, "log_code.data", flag=1)
def set_filtered_error_code(self, action: str, code_list: list):
origin_code_list = self.get_filtered_error_code()["g"]["log_code.data"]["code_list"]
origin_code_list = self.get_filtered_error_code["g"]["log_code.data"]["code_list"]
# [{'id': 10000, 'title': 'HMI请求包解析错误'}, {'id': 10002, 'title': '快速调整启动失败'}]
if action == "clear":
code_list = []
@ -1899,7 +1932,7 @@ class ExternalCommunication(object):
return self.__exec_cmd(f"open_safe_region:{number}", f"打开第 {number} 个安全区域")
def close_safe_region(self, number: int):
return self.__exec_cmd(f"close_safe_region:{number}", f"打开{number} 个安全区域")
return self.__exec_cmd(f"close_safe_region:{number}", f"关闭{number} 个安全区域")
def open_reduced_mode(self):
return self.__exec_cmd("open_reduced_mode", "开启缩减模式")