import json import socket import threading import selectors import time import binascii import sys class HmiRequest(object): def __init__(self): self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.connect(('192.168.0.160', 5050)) # self.c.connect(('192.168.84.129', 5050)) self.c.setblocking(False) self.c_msg = [] self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c_xs.connect(('192.168.0.160', 6666)) # self.c_xs.connect(('192.168.84.129', 6666)) self.c_xs.setblocking(False) self.c_msg_xs = [] self.t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c, )) self.t_unpackage.daemon = True self.t_unpackage.start() self.t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs, )) self.t_unpackage_xs.daemon = True self.t_unpackage_xs.start() # self.t = threading.Thread(target=self.__heartbeat) # self.t.daemon = True # self.t.start() self.flag = 0 self.response = '' self.leftover = 0 self.flag_xs = 0 self.response_xs = '' self.leftover_xs = 0 def __header_check(self, index, data): try: _frame_size = int(binascii.b2a_hex(data[index:index+2]).decode(), 16) _pkg_size = int(binascii.b2a_hex(data[index+2:index+6]).decode(), 16) _protocol = int(binascii.b2a_hex(data[index+6:index+7]).decode(), 16) _reserved = int(binascii.b2a_hex(data[index+7:index+8]).decode(), 16) if _reserved == 0 and _protocol == 2: return index+8, _frame_size, _pkg_size else: print("数据有误,需要确认") exit(9) except Exception as Err: print(f"Err = {Err}") print("无法读取数据,需要确认") exit(10) def __msg_storage(self, response, flag=0): messages = self.c_msg if flag == 0 else self.c_msg_xs if len(messages) < 1000: messages.insert(0, response) else: messages.insert(0, response) while len(messages) > 1000: messages.pop() def __get_response(self, data): _index = 0 while _index < len(data): if self.flag == 0: _index, _frame_size, _pkg_size = self.__header_check(_index, data) if _pkg_size <= len(data) - _index: # 说明剩余部分的数据正好就是完整的包数据 self.response = data[_index:_index+_pkg_size].decode() self.__msg_storage(flag=0, response=self.response) _index += _pkg_size self.flag = 0 self.response = '' self.leftover = 0 elif _pkg_size > len(data) - _index: # 说有有分包的情况发生了,需要flag=1的处理 self.flag = 1 self.response = data[_index:].decode() self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2 _index += _pkg_size # 也可以换成 break,效果都是退出循环 elif self.flag == 1: # 处理完之后,将flag重置为0 _index = self.leftover self.response += data[:_index].decode() _index += 2 try: _frame_size = int.from_bytes(data[_index-2:_index]) except: self.__msg_storage(flag=0, response=self.response) self.flag = 0 self.response = '' self.leftover = 0 break if _frame_size == 1024: self.leftover = 1024 - (len(data) - _index) self.response += data[_index:].decode() break else: if _index+_frame_size <= 1024: self.response += data[_index:_index+_frame_size].decode() self.__msg_storage(flag=0, response=self.response) self.flag = 0 self.response = '' self.leftover = 0 break else: self.response += data[_index:].decode() self.leftover = _index + _frame_size - 1024 def __get_response_xs(self, data): if self.flag_xs == 0: if data[-1].decode() == '\r': _responses = data.decode().split('\r') for _response in _responses: self.__msg_storage(flag=1, response=_response) else: _responses = data.decode().split('\r') for _response in _responses[:-1]: if not _response: break self.__msg_storage(flag=1, response=_response) self.response_xs = _responses[-1] self.flag_xs = 1 else: if data[-1].decode() == '\r': _responses = (self.response_xs.encode() + data).decode().split('\r') for _response in _responses: self.__msg_storage(flag=1, response=_response) self.response_xs = '' self.flag_xs = 0 else: _responses = (self.response_xs.encode() + data).decode().split('\r') for _response in _responses[:-1]: if not _response: break self.__msg_storage(flag=1, response=_response) self.response_xs = _responses[-1] self.flag_xs = 1 def get_from_id(self, msg_id, flag=0): messages = self.c_msg if flag == 0 else self.c_msg_xs for i in range(3): for msg in messages: if msg_id in msg: return msg time.sleep(1) else: print(f'无法查询到{msg_id}对应的响应') def __package(self, cmd): pkg_size = str(hex(len(cmd)+6))[2:].rjust(4, '0') package = binascii.a2b_hex(pkg_size) # 包的长度 reserved = chr(0) + chr(0) # 保留字段 frame_size = str(hex(len(cmd)))[2:].rjust(4, '0') frame = binascii.a2b_hex(frame_size) # 帧的长度 protocol = chr(2) + chr(0) # 协议类型 return package + reserved.encode() + frame + protocol.encode() + cmd.encode() def __package_xs(self, cmd): return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode() def __unpackage(self, sock): def to_read(conn): data = conn.recv(1024) # Should be ready if data: print(data) self.__get_response(data) else: print('closing', sock) sel.unregister(conn) conn.close() sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj) def __unpackage_xs(self, sock): def to_read(conn): data = conn.recv(1024) # Should be ready if data: # print(data) self.__get_response_xs(data) else: print('closing', sock) sel.unregister(conn) conn.close() sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj) def __gen_id(self, command): _now = time.time() _id = f"{command}-{_now}" return _id def excution(self, command, flag=0, **kwargs): if flag == 0: # for old protocols req = None try: with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json: req = json.load(f_json) except: print(f"暂不支持 {command} 功能,或确认该功能存在...") exit(1) match command: case 'state.set_tp_mode': req['data']['tp_mode'] = kwargs['tp_mode'] case 1: pass req['id'] = self.__gen_id(command) print(f"req = {req}") cmd = json.dumps(req, separators=(',', ':')) self.c.send(self.__package(cmd)) time.sleep(2) return req['id'] else: # for xService pass hr = HmiRequest()