This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.

255 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import socket
import threading
import selectors
import time
import binascii
import sys
import inspect
class HmiRequest(object):
def __init__(self):
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.c.connect(('192.168.0.160', 5050))
self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_msg = []
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.c_xs.connect(('192.168.0.160', 6666))
self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.c_msg_xs = []
self.t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.__unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
# self.t = threading.Thread(target=self.__heartbeat)
# self.t.daemon = True
# self.t.start()
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
self.leftover_xs = 0
def __header_check(self, index, data):
try:
_pkg_size = int(binascii.b2a_hex(data[index:index+2]), 16)
_reserved = int(binascii.b2a_hex(data[index+2:index+4]), 16)
_frame_size = int(binascii.b2a_hex(data[index+4:index+6]), 16)
_protocol = int(binascii.b2a_hex(data[index+6:index+8]), 16)
if _reserved == 0 and _protocol == 512 and _pkg_size - _frame_size >= 6:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
exit(9)
except:
print("无法读取数据,需要确认")
exit(10)
def __get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.__header_check(_index, data)
if len(data) - _index >= _frame_size:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_frame_size].decode()
if len(self.c_msg) < 1000:
self.c_msg.insert(0, self.response)
else:
self.c_msg.insert(0, self.response)
while len(self.c_msg) > 1000:
self.c_msg.pop()
_index += _frame_size
self.flag = 0
self.response = ''
self.leftover = 0
elif len(data) - _index < _frame_size:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - (len(data) - _index)
_index += _frame_size
elif self.flag == 1:
# 处理完之后将flag重置为0
if self.leftover <= len(data):
self.response += data[:self.leftover].decode()
if len(self.c_msg) < 1000:
self.c_msg.insert(0, self.response)
else:
self.c_msg.insert(0, self.response)
while len(self.c_msg) > 1000:
self.c_msg.pop()
_index += self.leftover
self.flag = 0
self.response = ''
self.leftover = 0
else:
_index += self.leftover
self.flag = 1
self.response += data.decode()
self.leftover -= len(data)
def __get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
if len(self.c_msg_xs) < 1000:
self.c_msg_xs.insert(0, _response)
else:
self.c_msg_xs.insert(0, _response)
while len(self.c_msg_xs) > 1000:
self.c_msg_xs.pop()
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
if len(self.c_msg_xs) < 1000:
self.c_msg_xs.insert(0, _response)
else:
self.c_msg_xs.insert(0, _response)
while len(self.c_msg_xs) > 1000:
self.c_msg_xs.pop()
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
if len(self.c_msg_xs) < 1000:
self.c_msg_xs.insert(0, _response)
else:
self.c_msg_xs.insert(0, _response)
while len(self.c_msg_xs) > 1000:
self.c_msg_xs.pop()
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
if len(self.c_msg_xs) < 1000:
self.c_msg_xs.insert(0, _response)
else:
self.c_msg_xs.insert(0, _response)
while len(self.c_msg_xs) > 1000:
self.c_msg_xs.pop()
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id in msg:
return msg
time.sleep(1)
else:
print(f'无法查询到{msg_id}对应的响应')
def __package(self, cmd):
pkg_size = str(hex(len(cmd)+6))[2:].rjust(4, '0')
package = binascii.a2b_hex(pkg_size) # 包的长度
reserved = chr(0) + chr(0) # 保留字段
frame_size = str(hex(len(cmd)))[2:].rjust(4, '0')
frame = binascii.a2b_hex(frame_size) # 帧的长度
protocol = chr(2) + chr(0) # 协议类型
return package + reserved.encode() + frame + protocol.encode() + cmd.encode()
def __package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def __unpackage(self, sock):
def to_read(conn):
data = conn.recv(512) # Should be ready
if data:
print(data)
self.__get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
print(data)
self.__get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def __gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, **kwargs):
req = None
try:
with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
exit(1)
match command:
case 0:
pass
case 1:
pass
req['id'] = self.__gen_id(command)
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.__package(cmd))
time.sleep(2)
return req['id']
hr = HmiRequest()
id_test = hr.excution('device.get_params')
time.sleep(2)
print(hr.c_msg)
print(hr.get_from_id(id_test))
# hr.excution('state.switch_manual')
# time.sleep(2)
# hr.excution('state.switch_motor_on')
# time.sleep(2)
# hr.excution('state.switch_motor_off')