This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea ff4b0cc04c 20240621
2. [openapi.py] 修改了封包的规则,使之更加明晰,封包操作没有实现分包功能,目前看实际场景用不到
3. [openapi.py] 定义 MAX_FRAME_SIZE 常量(1024),替换socket接收以及响应数据处理相关部分
4. [openapi.py] ʹÓ int.to_bytes ºÍint.from_bytes Ì»» binascii ģ¿é¹¦Ä
2024-06-21 08:13:51 +08:00

247 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import socket
import threading
import selectors
import time
import sys
MAX_FRAME_SIZE = 1024
class HmiRequest(object):
def __init__(self):
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.connect(('192.168.0.160', 5050))
# self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_msg = []
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.connect(('192.168.0.160', 6666))
# self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.c_msg_xs = []
self.t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
# self.t = threading.Thread(target=self.__heartbeat)
# self.t.daemon = True
# self.t.start()
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
def __header_check(self, index, data):
try:
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index+6:index+7], byteorder='big')
_reserved = int.from_bytes(data[index+7:index+8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
exit(9)
except Exception as Err:
print(f"Err = {Err}")
print("无法读取数据,需要确认")
exit(10)
def __msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 1000:
messages.insert(0, response)
else:
messages.insert(0, response)
while len(messages) > 1000:
messages.pop()
def __get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.__header_check(_index, data)
if _pkg_size <= len(data) - _index:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_pkg_size].decode()
self.__msg_storage(flag=0, response=self.response)
_index += _pkg_size
self.flag = 0
self.response = ''
self.leftover = 0
elif _pkg_size > len(data) - _index:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2其中 6 就是六个字节的包头
break
elif self.flag == 1:
# 处理完之后将flag重置为0
_index = self.leftover
self.response += data[:_index].decode()
_index += 2
_frame_size = int.from_bytes(data[_index - 2:_index], byteorder='big')
if _frame_size == 0:
self.__msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
if _frame_size == MAX_FRAME_SIZE:
self.leftover = MAX_FRAME_SIZE - (len(data) - _index)
self.response += data[_index:].decode()
break
else:
if _index+_frame_size <= MAX_FRAME_SIZE:
self.response += data[_index:_index+_frame_size].decode()
self.__msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
else:
self.response += data[_index:].decode()
self.leftover = _index + _frame_size - MAX_FRAME_SIZE
break
def __get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.__msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.__msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.__msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.__msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id in msg:
return msg
time.sleep(1)
else:
print(f'无法查询到{msg_id}对应的响应')
def __package(self, cmd):
_frame_head = (len(cmd)+6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def __package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def __unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.__get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.__get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, flag=0, **kwargs):
if flag == 0: # for old protocols
req = None
try:
with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
exit(1)
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 1:
pass
req['id'] = self.__gen_id(command)
print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.__package(cmd))
time.sleep(2)
return req['id']
else: # for xService
pass
hr = HmiRequest()