完善reboot指令,hr的实例化需要手动关闭,并且会自动重连

This commit is contained in:
gitea 2024-09-21 18:17:24 +08:00
parent 57d9da7bc7
commit 50f150d685
2 changed files with 116 additions and 65 deletions

View File

@ -25,19 +25,21 @@ setdefaulttimeout(TIMEOUT)
PREFIX = "../assets"
log_path = f"{PREFIX}/logs/"
log_data_debug = f"{PREFIX}/logs/debug.log"
log_data_reqs = f"{PREFIX}/logs/reqs.log"
if not exists(log_path):
mkdir(log_path)
else:
for _ in listdir(log_path):
remove("".join([log_path, _]))
logger.remove()
logger.add(stdout, level="INFO")
logger.add(
sink=log_data_debug,
level="DEBUG",
format="{time: YYYY-MM-DD HH:mm:ss} | {level} | {message}",
rotation="10 KB",
rotation="50 MB",
encoding="utf-8",
enqueue=True,
diagnose=True,

View File

@ -4,11 +4,12 @@ from inspect import currentframe
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
import selectors
from time import time, sleep
from time import time, sleep, strftime, localtime
from pymodbus.client.tcp import ModbusTcpClient
# from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder
# from pymodbus.constants import Endian
from paramiko import SSHClient, AutoAddPolicy
from datetime import datetime
import clibs
@ -249,15 +250,31 @@ class HmiRequest(object):
self.__leftover = 0
self.__flag_xs = 0
self.__response_xs = ""
self.__t_bool = True
self.__is_connected = False
self.__silence = False
self.__pkg_size = 0
self.__broke = 0
self.__half = 0
self.__half_length = 0
self.__index = 0
self.__reset_index = 0
self.__close_hmi = False
# self.__sock_conn()
self.__t_is_alive = Thread(target=self.__is_alive)
self.__t_is_alive.daemon = False
self.__t_is_alive.start()
def __is_alive(self):
first_time = True
while not self.__close_hmi:
if not self.__is_connected:
if not first_time:
clibs.logger.info("重新连接中...")
first_time = False
self.__silence = True
self.__sock_conn()
if self.__is_connected:
self.__t_heartbeat = Thread(target=self.__heartbeat)
self.__t_heartbeat.daemon = True
self.__t_heartbeat.start()
@ -267,6 +284,20 @@ class HmiRequest(object):
self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,))
self.__t_unpackage_xs.daemon = True
self.__t_unpackage_xs.start()
else:
self.__silence = False
first_time = True
sleep(clibs.interval*10)
@property
def connection_state(self):
return self.__is_connected
def close(self):
self.__close_hmi = True
self.__is_connected = False
self.__silence = True
def __sock_conn(self):
try:
@ -281,18 +312,19 @@ class HmiRequest(object):
for _ in range(3):
self.execution("controller.heart")
sleep(clibs.interval)
else:
self.__is_connected = True
clibs.logger.success("HMI connection success...")
except Exception as Err:
clibs.logger.error(f"HMI connection failed...{Err}")
self.__sth_wrong(9)
self.__sth_wrong(9, f"HMI connection failed...{Err}")
def __sth_wrong(self, ex_code):
self.__t_bool = False
clibs.logger.error(f"[{ex_code}] Something wrong happened!!! "
f"可能是HMI无法连接到 {clibs.ip_addr}:{clibs.socket_port},也可能是其他问题... "
f"确认无问题后,可尝试重新运行!")
exit(ex_code)
def __sth_wrong(self, ex_code, err_desc=""):
self.__is_connected = False
if not self.__silence:
clibs.logger.error(f"[{ex_code}] 可能是 HMI 无法连接到 {clibs.ip_addr}:{clibs.socket_port}...")
if err_desc != "":
clibs.logger.error(f"[{ex_code}] {err_desc}")
# exit(ex_code)
def __header_check(self, index, data):
if index + 8 < len(data):
@ -305,8 +337,7 @@ class HmiRequest(object):
return index + 8, _frame_size, _pkg_size
else:
print(data)
clibs.logger.critical("Header Check: 解包数据有误,需要确认!")
self.__sth_wrong(6)
self.__sth_wrong(6, "Header Check: 解包数据有误,需要确认!")
else:
self.__half_length = len(data) - index
self.__half = data[index:]
@ -315,7 +346,7 @@ class HmiRequest(object):
return index, 0, 0
def __heartbeat(self):
while self.__t_bool:
while self.__is_connected:
self.execution("controller.heart")
sleep(clibs.heartbeat_interval)
@ -336,37 +367,37 @@ class HmiRequest(object):
if not error_code:
return eval(line.split("|")[-1].strip())
else:
clibs.logger.error(f"请求 {msg_id} 的返回错误码为 {error_code}")
self.__sth_wrong(7)
self.__sth_wrong(7, f"请求 {msg_id} 的返回错误码为 {error_code}")
def find_response_xs(log_data):
with open(log_data, mode="r", encoding="utf-8") as f_log:
for line in reversed(f_log.readlines()):
if line.split("|")[1].strip() != "DEBUG":
continue
if msg_id in line.strip():
return eval(line.split("|")[-1].strip())
if flag == 0:
for _ in range(3):
res = find_response(clibs.log_data_debug)
if res is not None:
return res["data"]
sleep(clibs.interval)
sleep(clibs.interval*2)
else: # 尝试在上一次分割的日志中查找,只做一次
res = find_response("".join([clibs.log_path, listdir(clibs.log_path)[0]]))
if res is not None:
return res["data"]
elif flag == 1:
sleep(1)
with open(clibs.log_data_debug, mode="r", encoding="utf-8") as f_log:
for line in reversed(f_log.readlines()):
if line.split("|")[1].strip() != "DEBUG":
continue
if msg_id in line.strip():
return eval(line.split("|")[-1].strip())
for _ in range(3):
res = find_response_xs(clibs.log_data_debug)
if res is not None:
return res
sleep(clibs.interval*2)
else:
with open("".join([clibs.log_path, listdir(clibs.log_path)[0]]), mode="r", encoding="utf-8") as f_log:
for line in reversed(f_log.readlines()):
if line.split("|")[1].strip() != "DEBUG":
continue
if msg_id in line.strip():
return eval(line.split("|")[-1].strip())
else:
clibs.logger.error(f"无法找到 xService 请求 {msg_id} 的返回结果")
self.__sth_wrong(11)
res = find_response_xs("".join([clibs.log_path, listdir(clibs.log_path)[0]]))
if res is not None:
return res
self.__sth_wrong(11, f"无法找到请求 {msg_id} 的返回结果")
def __msg_storage(self, response, flag=0):
# response是解码后的字符串
@ -395,8 +426,7 @@ class HmiRequest(object):
_reserved = int.from_bytes(_full[7:8], byteorder="big")
if _reserved != 0 or _protocol != 2:
print(data)
clibs.logger.critical("in get_response: 解包数据有误,需要确认!")
self.__sth_wrong(10)
self.__sth_wrong(10, "in get_response: 解包数据有误,需要确认!")
self.__pkg_size = _pkg_size
self.__index = 8 - self.__half_length
@ -552,14 +582,13 @@ class HmiRequest(object):
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.__t_bool:
while self.__is_connected:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as Err:
clibs.logger.error(Err)
self.__sth_wrong(3)
self.__sth_wrong(3, f"错误信息:{Err}")
def __unpackage_xs(self, sock):
def to_read(conn, mask):
@ -576,24 +605,27 @@ class HmiRequest(object):
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.__t_bool:
while self.__is_connected:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as Err:
clibs.logger.error(Err)
self.__sth_wrong(8)
self.__sth_wrong(8, f"错误信息:{Err}")
def execution(self, command, protocol_flag=0, **kwargs):
def log_reqs(request):
with open(clibs.log_data_reqs, mode="a", encoding="utf-8") as f_log:
log_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
f_log.write(" ".join([log_time, dumps(req), "\n"]))
if protocol_flag == 0: # for old protocols
req = None
try:
with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json:
req = load(f_json)
except Exception as Err:
clibs.logger.info(f"暂不支持 {command} 功能,或确认该功能存在...\n{Err}")
self.__sth_wrong(5)
self.__sth_wrong(5, f"暂不支持 {command} 功能,或确认该功能存在... {Err}")
match command:
case "state.set_tp_mode":
@ -716,11 +748,11 @@ class HmiRequest(object):
req["id"] = self.__gen_id(command)
cmd = dumps(req, separators=(",", ":"))
try:
log_reqs(req)
self.__c.send(self.__package(cmd))
sleep(clibs.interval*2)
sleep(clibs.interval)
except Exception as Err:
clibs.logger.error(f"{Err} 请求发送失败:{cmd}")
self.__sth_wrong(4)
self.__sth_wrong(4, f"{Err} 请求发送失败:{cmd}")
return req["id"]
elif protocol_flag == 1: # for xService
req = None
@ -728,8 +760,7 @@ class HmiRequest(object):
with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json:
req = load(f_json)
except Exception as Err:
clibs.logger.info(f"暂不支持 {command} 功能,或确认该功能存在...\n{Err}")
self.__sth_wrong(1)
self.__sth_wrong(1, f"暂不支持 {command} 功能,或确认该功能存在... {Err}")
match command:
case "safety.safety_area.signal_enable":
@ -743,12 +774,13 @@ class HmiRequest(object):
req["c"]["safety.safety_area.safety_area_enable"]["enable"] = kwargs["enable"]
case _:
pass
try:
log_reqs(req)
self.__c_xs.send(self.__package_xs(req))
sleep(clibs.interval*2)
sleep(clibs.interval)
except Exception as Err:
clibs.logger.error(f"{Err} 请求发送失败:{req}")
self.__sth_wrong(2)
self.__sth_wrong(2, f"{Err} 请求发送失败:{req}")
return command
# =================================== ↓↓↓ specific functions ↓↓↓ ===================================
@ -845,6 +877,23 @@ class HmiRequest(object):
:return: None
"""
self.execution("controller.reboot")
clibs.logger.info(f"控制器重启中,重连预计需要等待 100s 左右...")
ts = time()
sleep(30)
while True:
sleep(5)
te = time()
if te - ts > 180:
self.__silence = False
self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确")
break
for _ in range(3):
if not self.connection_state:
break
sleep(2)
else:
clibs.logger.info("HMI 重新连接成功...")
break
def reload_io(self):
"""
@ -1930,7 +1979,7 @@ class ExternalCommunication(object):
def __exec_cmd(self, directive, description):
self.s_string(directive)
result = self.r_string()
result = self.r_string().strip()
clibs.logger.info(f"外部通信:执行{description} {directive} 指令,返回值为 {result}")
return result