This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea ad4b6ae8d6 v0.1.7.3(2024/07/01)
1. [APIs: openapi.py] 继续完善封包解包操作,并优化了所有调试信息,默认打开状态,直到bug数量明显减少
2. [APIs: do_current.py] 使用原工程的工程名进行move操作,语义更加明确

> 目前看openapi.py封包解包没有任何问题了,但是所有的调试信息都默认打开,以便可以第一时间保留现场
2024-07-01 14:09:47 +08:00

463 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from json import load, dumps
from socket import socket, setdefaulttimeout, AF_INET, SOCK_STREAM
from threading import Thread
import selectors
from time import time, sleep
from os.path import dirname
from binascii import b2a_hex, a2b_hex
MAX_FRAME_SIZE = 1024
setdefaulttimeout(2)
current_path = dirname(__file__)
class HmiRequest(object):
def __init__(self, w2t):
super().__init__()
self.w2t = w2t
self.c = None
self.c_xs = None
self.c_msg = []
self.c_msg_xs = []
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
self.t_bool = True
self.tab_name = 'Automatic Test'
self.pkg_size = 0
self.broke = 0
self.half = 0
self.half_length = 0
self.index = 0
self.reset_index = 0
self.sock_conn()
self.t_heartbeat = Thread(target=self.heartbeat)
self.t_heartbeat.daemon = True
self.t_heartbeat.start()
self.t_unpackage = Thread(target=self.unpackage, args=(self.c,))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = Thread(target=self.unpackage_xs, args=(self.c_xs,))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
def sock_conn(self):
# while True:
with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
if c_state == '0':
try:
self.c = socket(AF_INET, SOCK_STREAM)
self.c.connect(('192.168.0.160', 5050))
# self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_xs = socket(AF_INET, SOCK_STREAM)
self.c_xs.connect(('192.168.0.160', 6666))
# self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write('1')
except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write('0')
def header_check(self, index, data):
if index + 8 < len(data):
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index + 6:index + 7], byteorder='big')
_reserved = int.from_bytes(data[index + 7:index + 8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index + 8, _frame_size, _pkg_size
else:
print(data)
print(f"index = {index}")
print(f"reserve = {_reserved}")
print(f"protocol = {_protocol}")
print("head check 数据有误,需要确认")
self.w2t("Header Check: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name)
else:
self.half_length = len(data) - index
self.half = data[index:]
print(f"in head check half: {self.half}")
print(f"in head check length: {self.half_length}")
print(f"in head check data: {data}")
self.broke = 100
index += MAX_FRAME_SIZE
return index, 0, 0
def heartbeat(self):
while self.t_bool:
_id = self.execution('controller.heart')
_flag = '0' if self.get_from_id(_id) is None else '1'
print(f"hb = {_flag}", end=' ')
print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write(_flag)
if _flag == '0':
self.w2t(f"心跳丢失,连接失败,重新连接中...", 0, 0, 'red', tab_name=self.tab_name)
sleep(2)
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg:
# f.write(msg + '\n')
def msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 20000:
messages.insert(0, response)
else:
messages.insert(0, response)
while len(messages) > 20000:
messages.pop()
def get_response(self, data):
# 流式获取单次请求的响应
if self.broke == 100:
print("*****************************************")
print(f"in get_response if broke == 100 half = {self.half}")
_half_1 = self.half
_half_2 = data[:8-self.half_length]
_full = _half_1 + _half_2
print(f"in get_response if broke == 100 _full = {_full}")
_frame_size = int.from_bytes(_full[:2], byteorder='big')
_pkg_size = int.from_bytes(_full[2:6], byteorder='big')
_protocol = int.from_bytes(_full[6:7], byteorder='big')
_reserved = int.from_bytes(_full[7:8], byteorder='big')
if _reserved != 0 or _protocol != 2:
print(data)
self.w2t("in get_response: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name)
self.pkg_size = _pkg_size
self.index = 8 - self.half_length
print(f"broke == 100 index = {self.index}")
print(f"broke == 100 INIT pkg size = {self.pkg_size}")
print(f"broke == 100 data = {data}")
else:
if self.reset_index == 1:
self.index = 0
while self.index < len(data):
# flag 为 0则说明是一次新的请求对应的一次新的相应也就是需要首次解包
if self.flag == 0:
if self.broke == 100:
self.broke = 0
else:
self.index, _frame_size, self.pkg_size = self.header_check(self.index, data)
print(f"broke == 0 index = {self.index-8}")
print(f"broke == 0 INIT pkg size = {self.pkg_size}")
print(f"broke == 0 data = {data}")
if self.index > MAX_FRAME_SIZE:
break
# 详见解包原理数据.txtself.pkg_size 永远是除了当前data之外剩余未处理的数据大小
if self.pkg_size <= len(data) - self.index:
# 说明剩余部分的数据就在当前data内没有被分割
self.response = data[self.index:self.index + self.pkg_size].decode()
self.msg_storage(flag=0, response=self.response)
self.index += self.pkg_size
print(f"in flag=0 if data = {data}")
print(f"in flag=0 if index = {self.index}")
print(f"in flag=0 if pkg size = {self.pkg_size}")
print(f"in flag=0 if leftover = {self.leftover}")
self.flag = 0
self.response = ''
self.leftover = 0
self.pkg_size = 0
self.reset_index = 0 if self.index < len(data) else 1
elif self.pkg_size > len(data) - self.index:
# 执行到这里说明该data是首包且有有分包的情况发生了也就是该响应数据量稍微比较大
# 分散在了相邻的两个及以上的data中需要flag=1的处理
self.flag = 1
if self.index+_frame_size-6 < len(data):
self.response = data[self.index:self.index+_frame_size-6].decode()
self.index += (_frame_size-6)
self.pkg_size -= (_frame_size-6) # 详见解包原理数据.txtself.pkg_size
self.reset_index = 0
if self.index + 2 < len(data):
self.leftover = int.from_bytes(data[self.index:self.index + 2], byteorder='big')
self.index += 2
self.reset_index = 0
else:
if self.index + 2 == len(data):
self.broke = 1
self.half = data[-2:]
print(f"flag = 0 encounter broke == 1 - half = {self.half}")
elif self.index + 1 == len(data):
self.broke = 2
self.half = data[-1:]
print(f"flag = 0 encounter broke == 2 - half = {self.half}")
elif self.index == len(data):
print('flag = 0 encounter broke == 3')
self.broke = 3
self.index += MAX_FRAME_SIZE
self.reset_index = 1
break # 因为 index + 2 的大小超过 MAX_FRAME_SIZE
elif self.index+_frame_size-6 > len(data):
self.response = data[self.index:].decode()
self.pkg_size -= (len(data) - self.index) # 详见解包原理数据.txtself.pkg_size
self.leftover = (_frame_size-6-(len(data)-self.index))
self.index += MAX_FRAME_SIZE
self.reset_index = 1
print(f"in flag=0 else data = {data}")
print(f"in flag=0 else index = {self.index}")
print(f"in flag=0 else pkg size = {self.pkg_size}")
print(f"in flag=0 else leftover = {self.leftover}")
break
elif self.flag == 1:
# 继续处理之前为接收完的数据处理完之后将flag重置为0
# !!!需要注意的是,包头/帧头也是有可能被分割开的!!!但是目前该程序未实现此种情况!!!
if self.broke == 1:
self.index = 0
self.leftover = int.from_bytes(self.half, byteorder='big')
print(f"broke 1 leftover: {self.leftover}")
elif self.broke == 2:
self.leftover = int.from_bytes(self.half+data[:1], byteorder='big')
self.index = 1
self.broke = 0
print(f"broke 2 leftover: {self.leftover}")
if self.broke == 3:
self.leftover = int.from_bytes(data[:2], byteorder='big')
print(f"broke 3 leftover: {self.leftover}")
self.index = 2
self.broke = 0
while self.pkg_size > 0:
if self.index + self.leftover <= len(data):
print(f"in pkg size > 0 loop before if data = {data}")
print(f"in pkg size > 0 loop before if index = {self.index}")
print(f"in pkg size > 0 loop before if pkg size = {self.pkg_size}")
print(f"in pkg size > 0 loop before if leftover = {self.leftover}")
if self.leftover < 0 or self.leftover > 1024:
self.w2t("", 0, 111, 'red')
self.response += data[self.index:self.index + self.leftover].decode()
self.pkg_size -= self.leftover
if self.pkg_size == 0:
self.msg_storage(flag=0, response=self.response)
self.index += self.leftover
self.flag = 0
self.response = ''
self.leftover = 0
self.pkg_size = 0
self.reset_index = 0 if self.index < len(data) else 1
print(f"in pkg size > 0 loop break if data = {data}")
print(f"in pkg size > 0 loop break if index = {self.index}")
print(f"in pkg size > 0 loop break if pkg size = {self.pkg_size}")
print(f"in pkg size > 0 loop break if leftover = {self.leftover}")
break
self.index += self.leftover
if self.index + 2 < len(data):
self.leftover = int.from_bytes(data[self.index:self.index + 2], byteorder='big')
self.index += 2
self.reset_index = 0
else:
# self.leftover = 4096
if self.index + 2 == len(data):
self.broke = 1
self.half = data[-2:]
print(f"flag = 1 encounter broke == 1 - half = {self.half}")
elif self.index + 1 == len(data):
self.broke = 2
self.half = data[-1:]
print(f"flag = 1 encounter broke == 2 - half = {self.half}")
elif self.index == len(data):
print('flag = 1 encounter broke == 3')
self.broke = 3
self.index += MAX_FRAME_SIZE
self.reset_index = 1
break # 因为 index + 2 的大小超过 MAX_FRAME_SIZE
print(f"in pkg size > 0 loop after if data = {data}")
print(f"in pkg size > 0 loop after if index = {self.index}")
print(f"in pkg size > 0 loop after if pkg size = {self.pkg_size}")
print(f"in pkg size > 0 loop after if leftover = {self.leftover}")
if self.leftover < 0 or self.leftover > 1024:
self.w2t("", 0, 111, 'red')
else:
print(f"in pkg size > 0 loop before else data = {data}")
print(f"in pkg size > 0 loop before else index = {self.index}")
print(f"in pkg size > 0 loop before else pkg size = {self.pkg_size}")
print(f"in pkg size > 0 loop before else leftover = {self.leftover}")
if self.leftover < 0 or self.leftover > 1024:
self.w2t("", 0, 111, 'red')
self.response += data[self.index:].decode()
self.leftover -= (len(data) - self.index)
self.pkg_size -= (len(data) - self.index)
self.index += MAX_FRAME_SIZE
self.reset_index = 1
print(f"in pkg size > 0 loop after else data = {data}")
print(f"in pkg size > 0 loop after else index = {self.index}")
print(f"in pkg size > 0 loop after else pkg size = {self.pkg_size}")
print(f"in pkg size > 0 loop after else leftover = {self.leftover}")
if self.leftover < 0 or self.leftover > 1024:
self.w2t("", 0, 111, 'red')
break # 该data内数据已经处理完毕需要跳出大循环通过break和index
else:
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
self.pkg_size = 0
self.index -= 2
# self.reset_index = 1
self.reset_index = 0 if (len(data) > self.index > 0) else 1
def get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id is None:
self.w2t("未能成功获取到 message id...", 0, 10, 'red', tab_name=self.tab_name)
if msg_id in msg:
return msg
sleep(1)
else:
return None
def package(self, cmd):
_frame_head = (len(cmd) + 6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def package_xs(self, cmd):
return f"{dumps(cmd, separators=(',', ':'))}\r".encode()
def unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.t_bool:
try:
events = sel.select()
except:
sleep(1)
continue
for key, mask in events:
callback = key.data
callback(key.fileobj)
def unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while self.t_bool:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def gen_id(self, command):
_now = time()
_id = f"{command}-{_now}"
return _id
def execution(self, command, flg=0, **kwargs):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8',
mode='r') as f_json:
req = load(f_json)
except:
self.w2t(f"暂不支持 {command} 功能,或确认该功能存在...", 0, 1, 'red', tab_name=self.tab_name)
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 'overview.set_autoload':
req['data']['autoload_prj_path'] = kwargs['autoload_prj_path']
case 'overview.reload':
req['data']['prj_path'] = kwargs['prj_path']
req['data']['tasks'] = kwargs['tasks']
case 'rl_task.pp_to_main' | 'rl_task.run' | 'rl_task.stop':
req['data']['tasks'] = kwargs['tasks']
case 'diagnosis.set_params':
req['data']['display_pdo_params'] = kwargs['display_pdo_params']
case 'diagnosis.open':
req['data']['open'] = kwargs['open']
req['data']['display_open'] = kwargs['display_open']
case _:
pass
req['id'] = self.gen_id(command)
print(f"req = {req}")
cmd = dumps(req, separators=(',', ':'))
try:
self.c.send(self.package(cmd))
sleep(0.5)
except Exception as Err:
self.w2t(f"{cmd}\n请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name)
return req['id']
else: # for xService
pass