import json from socket import * import threading import time import binascii class HmiRequest(object): def __init__(self): self.c = socket(AF_INET, SOCK_STREAM) self.c.connect(('192.168.84.129', 5050)) self.c_xs = socket(AF_INET, SOCK_STREAM) self.c_xs.connect(('192.168.84.129', 6666)) self.c.setblocking(False) self.c_xs.setblocking(False) self.t = threading.Thread(target=self.__heartbeat_detection) self.t.daemon = True self.t.start() def __handle_command(self, cmd): len_frame, len_pkg = len(cmd), len(cmd) + 6 pkg_head = str(hex(len_pkg))[2:].rjust(4, '0') frame_head = str(hex(len_frame))[2:].rjust(4, '0') str0 = binascii.unhexlify(pkg_head) # 报文 str1 = chr(0) + chr(0) # 保留字段 str2 = binascii.unhexlify(frame_head) # 帧 str3 = chr(2) + chr(0) # 协议类型 return str0 + str1.encode() + str2 + str3.encode() + cmd.encode() def __heartbeat_detection(self): data = { "id": "system.controller.heart_0", "module": "system", "command": "controller.heart", } _id = 1 while True: data["id"] = f"#system.controller.heart_{_id}" cmd = json.dumps(data, separators=(',', ':')) self.c.send(self.__handle_command(cmd)) time.sleep(10) _id += 1 def motor_on(self): """HMI上电""" data = { "command": "state.switch_motor_on", "id": str(1234), "module": "system" } cmd = json.dumps(data, separators=(',', ':')) self.c.send(self.__handle_command(cmd)) time.sleep(2) response = self.c.recv(102400) print(response) print(type(response)) print(response.decode()) # response = json.loads(self.c.recv(102400).decode('utf-16-be')) # print(response) hr = HmiRequest() hr.motor_on()