From 14f269b570c2aa5038e7ad6e877a61f57f9d21d3 Mon Sep 17 00:00:00 2001 From: gitea Date: Thu, 20 Jun 2024 17:15:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=AD=E9=97=B4=E7=89=88=E6=9C=AC=EF=BC=8C?= =?UTF-8?q?=E5=AF=B9=E4=BA=8E=E8=A7=A3=E5=B0=81=E8=B6=85=E8=BF=871024?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E6=9C=89=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E6=9A=82=E5=AD=98=E5=8E=86=E5=8F=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio/code/automatic_test/do_brake.py | 83 +++++----------- aio/code/automatic_test/openapi.py | 95 +++++++++++-------- .../templates/state.get_state.json | 5 + .../templates/state.get_tp_mode.json | 5 + .../templates/state.set_tp_mode.json | 8 ++ 5 files changed, 95 insertions(+), 101 deletions(-) create mode 100644 aio/code/automatic_test/templates/state.get_state.json create mode 100644 aio/code/automatic_test/templates/state.get_tp_mode.json create mode 100644 aio/code/automatic_test/templates/state.set_tp_mode.json diff --git a/aio/code/automatic_test/do_brake.py b/aio/code/automatic_test/do_brake.py index 052262d..f1aef39 100644 --- a/aio/code/automatic_test/do_brake.py +++ b/aio/code/automatic_test/do_brake.py @@ -1,67 +1,28 @@ -import json -from socket import * -import threading -import time -import binascii +import openapi + +hr = openapi.hr + +# 一、设置/检测机器人状态: +# 1. 上电 +# 2. 软限位打开 +# 3. 示教器断开 +# 4. 操作模式 +# 5. 控制器状态/工作任务控件/机器人动态 + +# 二、加载RL程序开始运行 -class HmiRequest(object): +# 三、运行过程中,收集数据,并处理出结果 - def __init__(self): - self.c = socket(AF_INET, SOCK_STREAM) - self.c.connect(('192.168.84.129', 5050)) - self.c_xs = socket(AF_INET, SOCK_STREAM) - self.c_xs.connect(('192.168.84.129', 6666)) - self.c.setblocking(False) - self.c_xs.setblocking(False) - self.t = threading.Thread(target=self.__heartbeat_detection) - self.t.daemon = True - self.t.start() - - def __handle_command(self, cmd): - len_frame, len_pkg = len(cmd), len(cmd) + 6 - pkg_head = str(hex(len_pkg))[2:].rjust(4, '0') - frame_head = str(hex(len_frame))[2:].rjust(4, '0') - str0 = binascii.unhexlify(pkg_head) # 报文 - str1 = chr(0) + chr(0) # 保留字段 - str2 = binascii.unhexlify(frame_head) # 帧 - str3 = chr(2) + chr(0) # 协议类型 - return str0 + str1.encode() + str2 + str3.encode() + cmd.encode() - - def __heartbeat_detection(self): - data = { - "id": "system.controller.heart_0", - "module": "system", - "command": "controller.heart", - } - _id = 1 - while True: - data["id"] = f"#system.controller.heart_{_id}" - cmd = json.dumps(data, separators=(',', ':')) - self.c.send(self.__handle_command(cmd)) - time.sleep(10) - _id += 1 - - def motor_on(self): - """HMI上电""" - data = { - "command": "state.switch_motor_on", - "id": str(1234), - "module": "system" - } - cmd = json.dumps(data, separators=(',', ':')) - self.c.send(self.__handle_command(cmd)) - time.sleep(2) - response = self.c.recv(102400) - print(response) - print(type(response)) - print(response.decode()) - - # response = json.loads(self.c.recv(102400).decode('utf-16-be')) - # print(response) +# 四 -hr = HmiRequest() -hr.motor_on() - +# _id = hr.excution("state.get_state") +# print(hr.get_from_id(_id)) +# _id = hr.excution('state.set_tp_mode', tp_mode='without') +# print(hr.get_from_id(_id)) +_id = hr.excution('device.get_params') +print(hr.get_from_id(_id)) +# _id = hr.excution('state.switch_motor_on') +# print(hr.get_from_id(_id)) diff --git a/aio/code/automatic_test/openapi.py b/aio/code/automatic_test/openapi.py index b197a2c..9b7f196 100644 --- a/aio/code/automatic_test/openapi.py +++ b/aio/code/automatic_test/openapi.py @@ -12,13 +12,13 @@ class HmiRequest(object): def __init__(self): self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.c.connect(('192.168.0.160', 5050)) - self.c.connect(('192.168.84.129', 5050)) + self.c.connect(('192.168.0.160', 5050)) + # self.c.connect(('192.168.84.129', 5050)) self.c.setblocking(False) self.c_msg = [] self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.c_xs.connect(('192.168.0.160', 6666)) - self.c_xs.connect(('192.168.84.129', 6666)) + self.c_xs.connect(('192.168.0.160', 6666)) + # self.c_xs.connect(('192.168.84.129', 6666)) self.c_xs.setblocking(False) self.c_msg_xs = [] self.t_unpackage = threading.Thread(target=self.__unpackage, args=(self.c, )) @@ -39,16 +39,27 @@ class HmiRequest(object): def __header_check(self, index, data): try: - _pkg_size = int(binascii.b2a_hex(data[index:index+2]), 16) - _reserved = int(binascii.b2a_hex(data[index+2:index+4]), 16) - _frame_size = int(binascii.b2a_hex(data[index+4:index+6]), 16) - _protocol = int(binascii.b2a_hex(data[index+6:index+8]), 16) - if _reserved == 0 and _protocol == 512 and _pkg_size - _frame_size >= 6: + print(binascii.b2a_hex(data[index:index+2]).decode()) + print(binascii.b2a_hex(data[index+2:index+6]).decode()) + print(binascii.b2a_hex(data[index+6:index+7]).decode()) + print(binascii.b2a_hex(data[index+7:index+8]).decode()) + _frame_size = int(binascii.b2a_hex(data[index:index+2]).decode(), 16) + _pkg_size = int(binascii.b2a_hex(data[index+2:index+6]).decode(), 16) + print('--------') + _protocol = 2 # int(binascii.b2a_hex(data[index+6:index+7]).decode(), 16) + print('--------') + _reserved = int(binascii.b2a_hex(data[index+7:index+8]).decode(), 16) + print(f"frame size = {_frame_size}") + print(f"pkg size = {_pkg_size}") + print(f"protocol = {_protocol}") + print(f"reserved = {_reserved}") + if _reserved == 0 and _protocol == 2 and True: return index+8, _frame_size, _pkg_size else: print("数据有误,需要确认") exit(9) - except: + except Exception as Err: + print(f"Err = {Err}") print("无法读取数据,需要确认") exit(10) @@ -56,26 +67,27 @@ class HmiRequest(object): _index = 0 while _index < len(data): if self.flag == 0: + print(f"===========index = {_index}") _index, _frame_size, _pkg_size = self.__header_check(_index, data) - if len(data) - _index >= _frame_size: + if len(data) - _index >= _pkg_size: # 说明剩余部分的数据正好就是完整的包数据 - self.response = data[_index:_index+_frame_size].decode() + self.response = data[_index:_index+_pkg_size].decode() if len(self.c_msg) < 1000: self.c_msg.insert(0, self.response) else: self.c_msg.insert(0, self.response) while len(self.c_msg) > 1000: self.c_msg.pop() - _index += _frame_size + _index += _pkg_size self.flag = 0 self.response = '' self.leftover = 0 - elif len(data) - _index < _frame_size: + elif len(data) - _index < _pkg_size: # 说有有分包的情况发生了,需要flag=1的处理 self.flag = 1 self.response = data[_index:].decode() - self.leftover = _frame_size - (len(data) - _index) - _index += _frame_size + self.leftover = _pkg_size - (len(data) - _index) + _index += _pkg_size elif self.flag == 1: # 处理完之后,将flag重置为0 @@ -173,7 +185,7 @@ class HmiRequest(object): def __unpackage(self, sock): def to_read(conn): - data = conn.recv(512) # Should be ready + data = conn.recv(1024) # Should be ready if data: print(data) self.__get_response(data) @@ -216,33 +228,36 @@ class HmiRequest(object): _id = f"{command}-{_now}" return _id - def excution(self, command, **kwargs): - req = None - try: - with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json: - req = json.load(f_json) - except: - print(f"暂不支持 {command} 功能,或确认该功能存在...") - exit(1) + def excution(self, command, flag=0, **kwargs): + if flag == 0: # for old protocols + req = None + try: + with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json: + req = json.load(f_json) + except: + print(f"暂不支持 {command} 功能,或确认该功能存在...") + exit(1) - match command: - case 0: - pass - case 1: - pass - - req['id'] = self.__gen_id(command) - cmd = json.dumps(req, separators=(',', ':')) - self.c.send(self.__package(cmd)) - time.sleep(2) - return req['id'] + match command: + case 'state.set_tp_mode': + req['data']['tp_mode'] = kwargs['tp_mode'] + case 1: + pass + req['id'] = self.__gen_id(command) + print(f"req = {req}") + cmd = json.dumps(req, separators=(',', ':')) + self.c.send(self.__package(cmd)) + time.sleep(2) + return req['id'] + else: # for xService + pass hr = HmiRequest() -id_test = hr.excution('device.get_params') -time.sleep(2) -print(hr.c_msg) -print(hr.get_from_id(id_test)) +# id_test = hr.excution('device.get_params') +# time.sleep(2) +# print(hr.c_msg) +# print(hr.get_from_id(id_test)) # hr.excution('state.switch_manual') # time.sleep(2) # hr.excution('state.switch_motor_on') diff --git a/aio/code/automatic_test/templates/state.get_state.json b/aio/code/automatic_test/templates/state.get_state.json new file mode 100644 index 0000000..1a2b981 --- /dev/null +++ b/aio/code/automatic_test/templates/state.get_state.json @@ -0,0 +1,5 @@ +{ + "id": "xxxxxxxxxxx", + "module": "system", + "command": "state.get_state" +} \ No newline at end of file diff --git a/aio/code/automatic_test/templates/state.get_tp_mode.json b/aio/code/automatic_test/templates/state.get_tp_mode.json new file mode 100644 index 0000000..5b59ed0 --- /dev/null +++ b/aio/code/automatic_test/templates/state.get_tp_mode.json @@ -0,0 +1,5 @@ +{ + "id": "xxxxxxxxxxx", + "module": "system", + "command": "state.get_tp_mode" +} \ No newline at end of file diff --git a/aio/code/automatic_test/templates/state.set_tp_mode.json b/aio/code/automatic_test/templates/state.set_tp_mode.json new file mode 100644 index 0000000..608dd02 --- /dev/null +++ b/aio/code/automatic_test/templates/state.set_tp_mode.json @@ -0,0 +1,8 @@ +{ + "id": "xxxxxxxxxxx", + "module": "system", + "command": "state.set_tp_mode", + "data": { + "tp_mode": "with" + } +} \ No newline at end of file