中间版本,对于解封超过1024的消息有问题,暂存历史

This commit is contained in:
2024-06-20 17:15:54 +08:00
parent 2917f4ae97
commit 14f269b570
5 changed files with 95 additions and 101 deletions

View File

@ -1,67 +1,28 @@
import json
from socket import *
import threading
import time
import binascii
import openapi
hr = openapi.hr
# 一、设置/检测机器人状态:
# 1. 上电
# 2. 软限位打开
# 3. 示教器断开
# 4. 操作模式
# 5. 控制器状态/工作任务控件/机器人动态
# 二、加载RL程序开始运行
class HmiRequest(object):
# 三、运行过程中,收集数据,并处理出结果
def __init__(self):
self.c = socket(AF_INET, SOCK_STREAM)
self.c.connect(('192.168.84.129', 5050))
self.c_xs = socket(AF_INET, SOCK_STREAM)
self.c_xs.connect(('192.168.84.129', 6666))
self.c.setblocking(False)
self.c_xs.setblocking(False)
self.t = threading.Thread(target=self.__heartbeat_detection)
self.t.daemon = True
self.t.start()
def __handle_command(self, cmd):
len_frame, len_pkg = len(cmd), len(cmd) + 6
pkg_head = str(hex(len_pkg))[2:].rjust(4, '0')
frame_head = str(hex(len_frame))[2:].rjust(4, '0')
str0 = binascii.unhexlify(pkg_head) # 报文
str1 = chr(0) + chr(0) # 保留字段
str2 = binascii.unhexlify(frame_head) # 帧
str3 = chr(2) + chr(0) # 协议类型
return str0 + str1.encode() + str2 + str3.encode() + cmd.encode()
def __heartbeat_detection(self):
data = {
"id": "system.controller.heart_0",
"module": "system",
"command": "controller.heart",
}
_id = 1
while True:
data["id"] = f"#system.controller.heart_{_id}"
cmd = json.dumps(data, separators=(',', ':'))
self.c.send(self.__handle_command(cmd))
time.sleep(10)
_id += 1
def motor_on(self):
"""HMI上电"""
data = {
"command": "state.switch_motor_on",
"id": str(1234),
"module": "system"
}
cmd = json.dumps(data, separators=(',', ':'))
self.c.send(self.__handle_command(cmd))
time.sleep(2)
response = self.c.recv(102400)
print(response)
print(type(response))
print(response.decode())
# response = json.loads(self.c.recv(102400).decode('utf-16-be'))
# print(response)
# 四
hr = HmiRequest()
hr.motor_on()
# _id = hr.excution("state.get_state")
# print(hr.get_from_id(_id))
# _id = hr.excution('state.set_tp_mode', tp_mode='without')
# print(hr.get_from_id(_id))
_id = hr.excution('device.get_params')
print(hr.get_from_id(_id))
# _id = hr.excution('state.switch_motor_on')
# print(hr.get_from_id(_id))