This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea a75775c869 v0.1.7.0(2024/06/25)-未发布
1. [aio.py] 取消了在本文件中开启openapi线程的做法,并修改如下:
	- 通过包的方式导入其他模块
    - 使用current_path来规避文件路径问题
    - 声名了 self.hr 变量,用来接收openapi的实例化
    - 修改了对于segment button的错误调用
    - 设定progress bar的长度是10
    - 完善了segmented_button_callback函数
    - 在detect_network函数中增加heartbeat初始化
    - tabview_click函数中新增textbox清屏功能,以及实例化openapi,并做检测
2. [openapi.py] 取消了初始化中无限循环检测,因为阻塞了aio主界面进程!!!socket也无法多次连接!!!浪费了好多时间!!!很生气!!!!
	- 通过tabview切换来实现重新连接,并保留了异常处理部分
    - 将所有的 __xxxx 函数都替换成 xxxx 函数,去掉了 __
    - 使用current_path来规避文件路径问题
3. [do_brake.py] 初步完成了机器状态收集的功能,还需要完善
    - 使用current_path来规避文件路径问题
    - 新增validate_resp函数,校验数据
    - 完善了调用接口
2024-06-25 21:40:27 +08:00

257 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import socket
import threading
import selectors
import time
from os.path import dirname
MAX_FRAME_SIZE = 1024
socket.setdefaulttimeout(3)
current_path = dirname(__file__)
class HmiRequest(object):
def __init__(self):
super().__init__()
try:
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.connect(('192.168.0.160', 5050))
# self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.connect(('192.168.0.160', 6666))
# self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('1')
except Exception as Err:
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
print("Connection failed, please try to re-connect via switching tabs...")
self.c_msg = []
self.c_msg_xs = []
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
self.t_heartbeat = threading.Thread(target=self.heartbeat)
self.t_heartbeat.daemon = True
self.t_heartbeat.start()
self.t_unpackage = threading.Thread(target=self.unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
def header_check(self, index, data):
try:
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index+6:index+7], byteorder='big')
_reserved = int.from_bytes(data[index+7:index+8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
return 'DATA ERR'
except Exception as Err:
print(f"Err = {Err}")
print("无法读取数据,需要确认")
return 'DATA READ ERR'
def heartbeat(self):
while True:
_id = self.excution('controller.heart')
_flag = 1 if self.get_from_id(_id) else 0
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write(str(_flag))
time.sleep(3)
def msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 1000:
messages.insert(0, response)
else:
messages.insert(0, response)
while len(messages) > 1000:
messages.pop()
def get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.header_check(_index, data)
if _pkg_size <= len(data) - _index:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_pkg_size].decode()
self.msg_storage(flag=0, response=self.response)
_index += _pkg_size
self.flag = 0
self.response = ''
self.leftover = 0
elif _pkg_size > len(data) - _index:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2其中 6 就是六个字节的包头
break
elif self.flag == 1:
# 处理完之后将flag重置为0
_index = self.leftover
self.response += data[:_index].decode()
_index += 2
_frame_size = int.from_bytes(data[_index - 2:_index], byteorder='big')
if _frame_size == 0:
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
if _frame_size == MAX_FRAME_SIZE:
self.leftover = MAX_FRAME_SIZE - (len(data) - _index)
self.response += data[_index:].decode()
break
else:
if _index+_frame_size <= MAX_FRAME_SIZE:
self.response += data[_index:_index+_frame_size].decode()
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
else:
self.response += data[_index:].decode()
self.leftover = _index + _frame_size - MAX_FRAME_SIZE
break
def get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id in msg:
return msg
time.sleep(1)
else:
return None
def package(self, cmd):
_frame_head = (len(cmd)+6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, flg=0, **kwargs):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
return 'NOT SUPPORT'
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 1:
pass
req['id'] = self.gen_id(command)
print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.package(cmd))
time.sleep(2)
return req['id']
else: # for xService
pass