This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea 295894a843 2024-06-12
4. [openapi.py] 使用 int.to_bytes 和 int.from_bytes 替换 binascii 模块的功能
5. [aio.py] 修改了Data Process中初始化的动作,使得初始化时的状态统一成程序刚启动时的样子
6. [aio.py] 增加了tabview的点击行为函数,每次点击tab都会初始化
7. [aio.py] 增加了Automatic Test界面元素,包括如下,并完成了功能框架的搭建
- 标签:文件/角速度/减速比
- 按钮:急停及恢复
- 输入框:文件路径/角速度/减速比
- OptionMenu:负载
- 进度条
2024-06-21 16:53:13 +08:00

245 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import socket
import threading
import selectors
import time
MAX_FRAME_SIZE = 1024
class HmiRequest(object):
def __init__(self):
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.connect(('192.168.0.160', 5050))
# self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_msg = []
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.connect(('192.168.0.160', 6666))
# self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.c_msg_xs = []
self.t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
# self.t = threading.Thread(target=self.__heartbeat)
# self.t.daemon = True
# self.t.start()
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
def __header_check(self, index, data):
try:
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index+6:index+7], byteorder='big')
_reserved = int.from_bytes(data[index+7:index+8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
exit(9)
except Exception as Err:
print(f"Err = {Err}")
print("无法读取数据,需要确认")
exit(10)
def __msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 1000:
messages.insert(0, response)
else:
messages.insert(0, response)
while len(messages) > 1000:
messages.pop()
def __get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.__header_check(_index, data)
if _pkg_size <= len(data) - _index:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_pkg_size].decode()
self.__msg_storage(flag=0, response=self.response)
_index += _pkg_size
self.flag = 0
self.response = ''
self.leftover = 0
elif _pkg_size > len(data) - _index:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2其中 6 就是六个字节的包头
break
elif self.flag == 1:
# 处理完之后将flag重置为0
_index = self.leftover
self.response += data[:_index].decode()
_index += 2
_frame_size = int.from_bytes(data[_index - 2:_index], byteorder='big')
if _frame_size == 0:
self.__msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
if _frame_size == MAX_FRAME_SIZE:
self.leftover = MAX_FRAME_SIZE - (len(data) - _index)
self.response += data[_index:].decode()
break
else:
if _index+_frame_size <= MAX_FRAME_SIZE:
self.response += data[_index:_index+_frame_size].decode()
self.__msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
else:
self.response += data[_index:].decode()
self.leftover = _index + _frame_size - MAX_FRAME_SIZE
break
def __get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.__msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.__msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.__msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.__msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id in msg:
return msg
time.sleep(1)
else:
print(f'无法查询到{msg_id}对应的响应')
def __package(self, cmd):
_frame_head = (len(cmd)+6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def __package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def __unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.__get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.__get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, flag=0, **kwargs):
if flag == 0: # for old protocols
req = None
try:
with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
exit(1)
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 1:
pass
req['id'] = self.__gen_id(command)
print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.__package(cmd))
time.sleep(2)
return req['id']
else: # for xService
pass
hr = HmiRequest()