diff --git a/code/clibs.py b/code/clibs.py index 8b2d188..c059e2e 100644 --- a/code/clibs.py +++ b/code/clibs.py @@ -25,19 +25,21 @@ setdefaulttimeout(TIMEOUT) PREFIX = "../assets" log_path = f"{PREFIX}/logs/" log_data_debug = f"{PREFIX}/logs/debug.log" +log_data_reqs = f"{PREFIX}/logs/reqs.log" if not exists(log_path): mkdir(log_path) else: for _ in listdir(log_path): remove("".join([log_path, _])) + logger.remove() logger.add(stdout, level="INFO") logger.add( sink=log_data_debug, level="DEBUG", format="{time: YYYY-MM-DD HH:mm:ss} | {level} | {message}", - rotation="10 KB", + rotation="50 MB", encoding="utf-8", enqueue=True, diagnose=True, diff --git a/code/openapi.py b/code/openapi.py index 76cc699..6d0343f 100644 --- a/code/openapi.py +++ b/code/openapi.py @@ -4,11 +4,12 @@ from inspect import currentframe from socket import socket, AF_INET, SOCK_STREAM from threading import Thread import selectors -from time import time, sleep +from time import time, sleep, strftime, localtime from pymodbus.client.tcp import ModbusTcpClient # from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder # from pymodbus.constants import Endian from paramiko import SSHClient, AutoAddPolicy +from datetime import datetime import clibs @@ -249,24 +250,54 @@ class HmiRequest(object): self.__leftover = 0 self.__flag_xs = 0 self.__response_xs = "" - self.__t_bool = True + self.__is_connected = False + self.__silence = False self.__pkg_size = 0 self.__broke = 0 self.__half = 0 self.__half_length = 0 self.__index = 0 self.__reset_index = 0 + self.__close_hmi = False - self.__sock_conn() - self.__t_heartbeat = Thread(target=self.__heartbeat) - self.__t_heartbeat.daemon = True - self.__t_heartbeat.start() - self.__t_unpackage = Thread(target=self.__unpackage, args=(self.__c,)) - self.__t_unpackage.daemon = True - self.__t_unpackage.start() - self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,)) - self.__t_unpackage_xs.daemon = True - self.__t_unpackage_xs.start() + # self.__sock_conn() + self.__t_is_alive = Thread(target=self.__is_alive) + self.__t_is_alive.daemon = False + self.__t_is_alive.start() + + def __is_alive(self): + first_time = True + while not self.__close_hmi: + if not self.__is_connected: + if not first_time: + clibs.logger.info("重新连接中...") + first_time = False + self.__silence = True + self.__sock_conn() + if self.__is_connected: + self.__t_heartbeat = Thread(target=self.__heartbeat) + self.__t_heartbeat.daemon = True + self.__t_heartbeat.start() + self.__t_unpackage = Thread(target=self.__unpackage, args=(self.__c,)) + self.__t_unpackage.daemon = True + self.__t_unpackage.start() + self.__t_unpackage_xs = Thread(target=self.__unpackage_xs, args=(self.__c_xs,)) + self.__t_unpackage_xs.daemon = True + self.__t_unpackage_xs.start() + else: + self.__silence = False + first_time = True + + sleep(clibs.interval*10) + + @property + def connection_state(self): + return self.__is_connected + + def close(self): + self.__close_hmi = True + self.__is_connected = False + self.__silence = True def __sock_conn(self): try: @@ -281,18 +312,19 @@ class HmiRequest(object): for _ in range(3): self.execution("controller.heart") sleep(clibs.interval) - - clibs.logger.success("HMI connection success...") + else: + self.__is_connected = True + clibs.logger.success("HMI connection success...") except Exception as Err: - clibs.logger.error(f"HMI connection failed...{Err}") - self.__sth_wrong(9) + self.__sth_wrong(9, f"HMI connection failed...{Err}") - def __sth_wrong(self, ex_code): - self.__t_bool = False - clibs.logger.error(f"[{ex_code}] Something wrong happened!!! " - f"可能是HMI无法连接到 {clibs.ip_addr}:{clibs.socket_port},也可能是其他问题... " - f"确认无问题后,可尝试重新运行!") - exit(ex_code) + def __sth_wrong(self, ex_code, err_desc=""): + self.__is_connected = False + if not self.__silence: + clibs.logger.error(f"[{ex_code}] 可能是 HMI 无法连接到 {clibs.ip_addr}:{clibs.socket_port}...") + if err_desc != "": + clibs.logger.error(f"[{ex_code}] {err_desc}") + # exit(ex_code) def __header_check(self, index, data): if index + 8 < len(data): @@ -305,8 +337,7 @@ class HmiRequest(object): return index + 8, _frame_size, _pkg_size else: print(data) - clibs.logger.critical("Header Check: 解包数据有误,需要确认!") - self.__sth_wrong(6) + self.__sth_wrong(6, "Header Check: 解包数据有误,需要确认!") else: self.__half_length = len(data) - index self.__half = data[index:] @@ -315,7 +346,7 @@ class HmiRequest(object): return index, 0, 0 def __heartbeat(self): - while self.__t_bool: + while self.__is_connected: self.execution("controller.heart") sleep(clibs.heartbeat_interval) @@ -336,37 +367,37 @@ class HmiRequest(object): if not error_code: return eval(line.split("|")[-1].strip()) else: - clibs.logger.error(f"请求 {msg_id} 的返回错误码为 {error_code}") - self.__sth_wrong(7) + self.__sth_wrong(7, f"请求 {msg_id} 的返回错误码为 {error_code}") + + def find_response_xs(log_data): + with open(log_data, mode="r", encoding="utf-8") as f_log: + for line in reversed(f_log.readlines()): + if line.split("|")[1].strip() != "DEBUG": + continue + if msg_id in line.strip(): + return eval(line.split("|")[-1].strip()) if flag == 0: for _ in range(3): res = find_response(clibs.log_data_debug) if res is not None: return res["data"] - sleep(clibs.interval) + sleep(clibs.interval*2) else: # 尝试在上一次分割的日志中查找,只做一次 res = find_response("".join([clibs.log_path, listdir(clibs.log_path)[0]])) if res is not None: return res["data"] elif flag == 1: - sleep(1) - with open(clibs.log_data_debug, mode="r", encoding="utf-8") as f_log: - for line in reversed(f_log.readlines()): - if line.split("|")[1].strip() != "DEBUG": - continue - if msg_id in line.strip(): - return eval(line.split("|")[-1].strip()) - else: - with open("".join([clibs.log_path, listdir(clibs.log_path)[0]]), mode="r", encoding="utf-8") as f_log: - for line in reversed(f_log.readlines()): - if line.split("|")[1].strip() != "DEBUG": - continue - if msg_id in line.strip(): - return eval(line.split("|")[-1].strip()) - else: - clibs.logger.error(f"无法找到 xService 请求 {msg_id} 的返回结果") - self.__sth_wrong(11) + for _ in range(3): + res = find_response_xs(clibs.log_data_debug) + if res is not None: + return res + sleep(clibs.interval*2) + else: + res = find_response_xs("".join([clibs.log_path, listdir(clibs.log_path)[0]])) + if res is not None: + return res + self.__sth_wrong(11, f"无法找到请求 {msg_id} 的返回结果") def __msg_storage(self, response, flag=0): # response是解码后的字符串 @@ -395,8 +426,7 @@ class HmiRequest(object): _reserved = int.from_bytes(_full[7:8], byteorder="big") if _reserved != 0 or _protocol != 2: print(data) - clibs.logger.critical("in get_response: 解包数据有误,需要确认!") - self.__sth_wrong(10) + self.__sth_wrong(10, "in get_response: 解包数据有误,需要确认!") self.__pkg_size = _pkg_size self.__index = 8 - self.__half_length @@ -552,14 +582,13 @@ class HmiRequest(object): sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) - while self.__t_bool: + while self.__is_connected: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: - clibs.logger.error(Err) - self.__sth_wrong(3) + self.__sth_wrong(3, f"错误信息:{Err}") def __unpackage_xs(self, sock): def to_read(conn, mask): @@ -576,24 +605,27 @@ class HmiRequest(object): sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) - while self.__t_bool: + while self.__is_connected: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: - clibs.logger.error(Err) - self.__sth_wrong(8) + self.__sth_wrong(8, f"错误信息:{Err}") def execution(self, command, protocol_flag=0, **kwargs): + def log_reqs(request): + with open(clibs.log_data_reqs, mode="a", encoding="utf-8") as f_log: + log_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + f_log.write(" ".join([log_time, dumps(req), "\n"])) + if protocol_flag == 0: # for old protocols req = None try: with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json: req = load(f_json) except Exception as Err: - clibs.logger.info(f"暂不支持 {command} 功能,或确认该功能存在...\n{Err}") - self.__sth_wrong(5) + self.__sth_wrong(5, f"暂不支持 {command} 功能,或确认该功能存在... {Err}") match command: case "state.set_tp_mode": @@ -716,11 +748,11 @@ class HmiRequest(object): req["id"] = self.__gen_id(command) cmd = dumps(req, separators=(",", ":")) try: + log_reqs(req) self.__c.send(self.__package(cmd)) - sleep(clibs.interval*2) + sleep(clibs.interval) except Exception as Err: - clibs.logger.error(f"{Err} 请求发送失败:{cmd}") - self.__sth_wrong(4) + self.__sth_wrong(4, f"{Err} 请求发送失败:{cmd}") return req["id"] elif protocol_flag == 1: # for xService req = None @@ -728,8 +760,7 @@ class HmiRequest(object): with open(f"{clibs.PREFIX}/json/{command}.json", encoding="utf-8", mode="r") as f_json: req = load(f_json) except Exception as Err: - clibs.logger.info(f"暂不支持 {command} 功能,或确认该功能存在...\n{Err}") - self.__sth_wrong(1) + self.__sth_wrong(1, f"暂不支持 {command} 功能,或确认该功能存在... {Err}") match command: case "safety.safety_area.signal_enable": @@ -743,12 +774,13 @@ class HmiRequest(object): req["c"]["safety.safety_area.safety_area_enable"]["enable"] = kwargs["enable"] case _: pass + try: + log_reqs(req) self.__c_xs.send(self.__package_xs(req)) - sleep(clibs.interval*2) + sleep(clibs.interval) except Exception as Err: - clibs.logger.error(f"{Err} 请求发送失败:{req}") - self.__sth_wrong(2) + self.__sth_wrong(2, f"{Err} 请求发送失败:{req}") return command # =================================== ↓↓↓ specific functions ↓↓↓ =================================== @@ -845,6 +877,23 @@ class HmiRequest(object): :return: None """ self.execution("controller.reboot") + clibs.logger.info(f"控制器重启中,重连预计需要等待 100s 左右...") + ts = time() + sleep(30) + while True: + sleep(5) + te = time() + if te - ts > 180: + self.__silence = False + self.__sth_wrong("3min 内未能完成重新连接,需要查看后台控制器是否正常启动,或者 ip/port 是否正确") + break + for _ in range(3): + if not self.connection_state: + break + sleep(2) + else: + clibs.logger.info("HMI 重新连接成功...") + break def reload_io(self): """ @@ -1930,7 +1979,7 @@ class ExternalCommunication(object): def __exec_cmd(self, directive, description): self.s_string(directive) - result = self.r_string() + result = self.r_string().strip() clibs.logger.info(f"外部通信:执行{description} {directive} 指令,返回值为 {result}") return result