v0.1.7.0(2024/06/26)-初步可用
1. [aio.py] 在detect_network函数中需改查询时间间隔是1s,在tabview_click中增加textbox配置normal的语句 2. [do_brake.py -> btn_functions.py] 新增执行相应函数,并在get_state函数中设置无示教器模式 3. [openapi.py] 新增sock_conn函数,并做连接时的异常处理,新增类参数w2t 4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件,参考https://github.com/TomSchimansky/CustomTkinter/issues/2296,实现修改tabview组件的字体大小,使用原生字体,同时将segmented button字体修改为原生,为了解决segmented button在禁用和启用时,屏幕抖动的问题,并将大小修改为16 5. [aio.py] 修改了segmented_button_callback的实现逻辑,使代码更简洁 6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作,使每次切换标签都会重新实例化,也就是每次都会重新连接,修复显示不正确的问题 7. [openapi.py] 新增了socket关闭的函数,并增加msg_id为None的处理逻辑 8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑
This commit is contained in:
		| @@ -1 +1 @@ | ||||
| __all__ = ['openapi', 'do_brake'] | ||||
| __all__ = ['openapi', 'btn_functions'] | ||||
| @@ -1,4 +1,5 @@ | ||||
| import json | ||||
| import socket | ||||
| from os.path import dirname | ||||
| from sys import argv | ||||
| 
 | ||||
| @@ -17,40 +18,55 @@ def validate_resp(_id, response, w2t): | ||||
|         w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red') | ||||
| 
 | ||||
| 
 | ||||
| def execution(cmd, hr, w2t, **kwargs): | ||||
|     _id = hr.excution(cmd, **kwargs) | ||||
|     _msg = hr.get_from_id(_id) | ||||
|     if not _msg: | ||||
|         w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red') | ||||
|     else: | ||||
|         _response = json.loads(_msg) | ||||
|         validate_resp(_id, _response, w2t) | ||||
|         return _response | ||||
| 
 | ||||
| 
 | ||||
| def get_state(hr, w2t): | ||||
|     # 获取机器状态 | ||||
|     _id = hr.excution('state.get_state') | ||||
|     _msg = hr.get_from_id(_id) | ||||
|     if not _msg: | ||||
|         w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red') | ||||
|     else: | ||||
|         _response = json.loads(_msg)['data'] | ||||
|         validate_resp(_id, _response, w2t) | ||||
| 
 | ||||
|     _response = execution('state.get_state', hr, w2t) | ||||
|     stat_desc = {'engine': '上电状态', 'operate': '操作模式', 'rc_state': '控制器状态', 'robot_action': '机器人动作', 'safety_mode': '安全模式', 'servo_mode': '伺服工作模式', 'task_space': '工作任务空间'} | ||||
|     for component, state in _response.items(): | ||||
|     for component, state in _response['data'].items(): | ||||
|         w2t(f"{stat_desc[component]}: {state}") | ||||
| 
 | ||||
|     _id = hr.excution('device.get_params') | ||||
|     _msg = hr.get_from_id(_id) | ||||
|     if not _msg: | ||||
|         w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red') | ||||
|     else: | ||||
|         _response = json.loads(_msg)['data']['devices'] | ||||
|         validate_resp(_id, _response, w2t) | ||||
|     # 获取设备伺服信息 | ||||
|     _response = execution('device.get_params', hr, w2t) | ||||
|     dev_desc = {0: '伺服版本', 1: '伺服参数', 2: '安全板固件', 3: '控制器', 4: '通讯总线', 5: '解释器', 6: '运动控制', 8: '力控版本', 9: '末端固件', 10: '机型文件', 11: '环境包'} | ||||
|     dev_vers = {} | ||||
|     for device in _response: | ||||
|     for device in _response['data']['devices']: | ||||
|         dev_vers[device['type']] = device['version'] | ||||
|     for i in sorted(dev_desc.keys()): | ||||
|         w2t(f"{dev_desc[i]}: {dev_vers[i]}") | ||||
| 
 | ||||
|     # 设置示教器模式 | ||||
|     _response = execution('state.set_tp_mode', hr, w2t, tp_mode='without') | ||||
| 
 | ||||
| 
 | ||||
| def warning_info(hr, w2t): | ||||
|     for msg in hr.c_msg: | ||||
|         if 'alarm' in msg.lower(): | ||||
|             w2t(msg) | ||||
|     for msg in hr.c_msg_xs: | ||||
|         if 'alarm' in msg.lower(): | ||||
|             w2t(msg) | ||||
| 
 | ||||
| 
 | ||||
| def main(hr, func, w2t): | ||||
|     if hr is None: | ||||
|         w2t("无法连接机器人,检查是否已经使用Robot Assist软件连接机器,重试中...", 0, 49, 'red') | ||||
|     # func: get_state/ | ||||
|     match func: | ||||
|         case 'get_state': | ||||
|             get_state(hr, w2t) | ||||
|         case 'warning_info': | ||||
|             warning_info(hr, w2t) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
| @@ -3,31 +3,49 @@ import socket | ||||
| import threading | ||||
| import selectors | ||||
| import time | ||||
| from os.path import dirname | ||||
| import os | ||||
|  | ||||
| MAX_FRAME_SIZE = 1024 | ||||
| socket.setdefaulttimeout(3) | ||||
| current_path = dirname(__file__) | ||||
| socket.setdefaulttimeout(2) | ||||
| current_path = os.path.dirname(__file__) | ||||
|  | ||||
|  | ||||
| class HmiRequest(object): | ||||
|     def __init__(self): | ||||
|     def __init__(self, w2t): | ||||
|         super().__init__() | ||||
|         try: | ||||
|             self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
|             self.c.connect(('192.168.0.160', 5050)) | ||||
|             # self.c.connect(('192.168.84.129', 5050)) | ||||
|             self.c.setblocking(False) | ||||
|             self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
|             self.c_xs.connect(('192.168.0.160', 6666)) | ||||
|             # self.c_xs.connect(('192.168.84.129', 6666)) | ||||
|             self.c_xs.setblocking(False) | ||||
|             with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: | ||||
|                 f_h.write('1') | ||||
|         except Exception as Err: | ||||
|             with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: | ||||
|                 f_h.write('0') | ||||
|             print("Connection failed, please try to re-connect via switching tabs...") | ||||
|         self.w2t = w2t | ||||
|         self.c = None | ||||
|         self.c_xs = None | ||||
|  | ||||
|         def sock_conn(): | ||||
|             # while True: | ||||
|             with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_h: | ||||
|                 connection_state = f_h.read().strip() | ||||
|             if connection_state == '0': | ||||
|                 try: | ||||
|                     c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
|                     c.connect(('192.168.0.160', 5050)) | ||||
|                     # c.connect(('192.168.84.129', 5050)) | ||||
|                     c.setblocking(False) | ||||
|                     c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
|                     c_xs.connect(('192.168.0.160', 6666)) | ||||
|                     # c_xs.connect(('192.168.84.129', 6666)) | ||||
|                     c_xs.setblocking(False) | ||||
|  | ||||
|                     self.w2t("Connection success", 0, 0, 'green') | ||||
|                     with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: | ||||
|                         f_h.write('1') | ||||
|                     return c, c_xs | ||||
|  | ||||
|                 except Exception as Err: | ||||
|                     with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: | ||||
|                         f_h.write('0') | ||||
|                     self.w2t("Connection failed...", 0, 0, 'red') | ||||
|                     return None, None | ||||
|  | ||||
|         self.c, self.c_xs = sock_conn() | ||||
|         if self.c is None or self.c_xs is None: | ||||
|             self.w2t("Aborting! Try to switch tabs to re-connect!", 0, 1, 'red') | ||||
|  | ||||
|         self.c_msg = [] | ||||
|         self.c_msg_xs = [] | ||||
| @@ -47,6 +65,10 @@ class HmiRequest(object): | ||||
|         self.t_unpackage_xs.daemon = True | ||||
|         self.t_unpackage_xs.start() | ||||
|  | ||||
|     def close_sock(self): | ||||
|         self.c.close() | ||||
|         self.c_xs.close() | ||||
|  | ||||
|     def header_check(self, index, data): | ||||
|         try: | ||||
|             _frame_size = int.from_bytes(data[index:index+2], byteorder='big') | ||||
| @@ -67,7 +89,7 @@ class HmiRequest(object): | ||||
|     def heartbeat(self): | ||||
|         while True: | ||||
|             _id = self.excution('controller.heart') | ||||
|             _flag = 1 if self.get_from_id(_id) else 0 | ||||
|             _flag = 0 if self.get_from_id(_id) is None else 1 | ||||
|             with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: | ||||
|                 f_h.write(str(_flag)) | ||||
|             time.sleep(3) | ||||
| @@ -170,6 +192,8 @@ class HmiRequest(object): | ||||
|         messages = self.c_msg if flag == 0 else self.c_msg_xs | ||||
|         for i in range(3): | ||||
|             for msg in messages: | ||||
|                 if msg_id is None: | ||||
|                     self.w2t("未能成功获取到 message id...", 0, 10, 'red') | ||||
|                 if msg_id in msg: | ||||
|                     return msg | ||||
|             time.sleep(1) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user