From 6604f0ba067959b699229ed2b97b5e4ba5b4ad36 Mon Sep 17 00:00:00 2001 From: gitea Date: Mon, 24 Jun 2024 19:22:56 +0800 Subject: [PATCH] =?UTF-8?q?20240624=2011.=20[openapi.py]=20=E5=BB=BA?= =?UTF-8?q?=E8=81=94=E9=83=A8=E5=88=86=E5=81=9A=E5=AE=B9=E9=94=99=E5=A4=84?= =?UTF-8?q?=E7=90=86=EF=BC=8C=E5=B9=B6=E5=B0=86=E8=AF=BB=E5=86=99=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=81=9A=E8=87=AA=E9=80=82=E5=BA=94=E5=A4=84=E7=90=86?= =?UTF-8?q?=2012.=20[aio.py]=20=E5=B0=86=E8=AF=BB=E5=86=99=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=81=9A=E8=87=AA=E9=80=82=E5=BA=94=E5=A4=84=E7=90=86?= =?UTF-8?q?=EF=BC=8C=E5=BC=95=E5=85=A5openapi=E6=A8=A1=E5=9D=97=E5=B9=B6?= =?UTF-8?q?=E7=94=9F=E6=88=90=E5=AE=9E=E4=BE=8B=EF=BC=8C=E5=81=9A=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=A3=80=E6=B5=8B=EF=BC=8C=E5=B0=86socket=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E6=97=B6=E9=97=B4=E4=BF=AE=E6=94=B9=E4=B8=BA3s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 ++- aio/README.md | 2 ++ aio/code/aio.py | 15 ++++++----- aio/code/automatic_test/openapi.py | 41 ++++++++++++++++++++---------- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 8c99859..c3bd9aa 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ aio/.idea/ aio/code/__pycache__/ aio/package/ aio/venv -aio/__pycache__/ \ No newline at end of file +aio/__pycache__/ +aio/code/automatic_test/__pycache__/ +aio/code/data_process/__pycache__/ \ No newline at end of file diff --git a/aio/README.md b/aio/README.md index 6e4110b..7954e1a 100644 --- a/aio/README.md +++ b/aio/README.md @@ -262,6 +262,8 @@ v0.1.7.0(2024/06/29) 8. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例 9. [aio.py] 完成detect_network,并在main函数开启线程 10. 将templates文件夹移动到assets内 +11. [openapi.py] 建联部分做容错逻辑,并将读写文件做自适应处理 +12. [aio.py] 将读写文件做自适应处理,引入openapi模块并生成实例,做心跳检测,将socket超时时间修改为3s > **关于HMI接口** > - 封包解包顺序:帧长度二字节/包长度四字节/协议二字节/预留二字节,\x04\x00:\x00\x00\tR:\x02:\x00 diff --git a/aio/code/aio.py b/aio/code/aio.py index cab3ac4..0951ac3 100644 --- a/aio/code/aio.py +++ b/aio/code/aio.py @@ -1,5 +1,5 @@ import tkinter -from os.path import exists +from os.path import exists, dirname from os import getcwd from threading import Thread import tkinter.messagebox @@ -11,12 +11,13 @@ import data_process.brake as brake import data_process.current as current import data_process.iso as iso import data_process.wavelogger as wavelogger +import automatic_test.openapi as openapi customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light" customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue" customtkinter.set_widget_scaling(1.1) # widget dimensions and text size customtkinter.set_window_scaling(1.1) # window geometry dimensions -setdefaulttimeout(10) +setdefaulttimeout(3) # global vars btns_func = { 'start': {'btn': '', 'row': 1, 'text': '开始运行'}, @@ -146,7 +147,7 @@ class App(customtkinter.CTk): self.seg_button.configure(values=["无效功能", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"]) self.seg_button.set("无效功能") # self.seg_button.configure(state="disabled") - + # create progress bar self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test')) self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew") self.progressbar.configure(mode="determinnate") @@ -206,8 +207,9 @@ class App(customtkinter.CTk): pass def detect_network(self): + current_path = dirname(__file__) while True: - with open('./automatic_test/templates/heartbeat', 'r', encoding='utf-8') as f_h: + with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h: pb_color = 'green' if f_h.read().strip() == '1' else 'red' self.progressbar.configure(progress_color=pb_color) sleep(3) @@ -490,8 +492,9 @@ class App(customtkinter.CTk): if __name__ == "__main__": - with open("./automatic_test/templates/heartbeat", "w", encoding='utf-8') as f_h: - f_h.write('0') + t_hr = Thread(target=openapi.HmiRequest) + t_hr.daemon = True + t_hr.start() aio = App() aio.net_detect = Thread(target=aio.detect_network) diff --git a/aio/code/automatic_test/openapi.py b/aio/code/automatic_test/openapi.py index b65d548..7ca1b25 100644 --- a/aio/code/automatic_test/openapi.py +++ b/aio/code/automatic_test/openapi.py @@ -3,24 +3,36 @@ import socket import threading import selectors import time +import os MAX_FRAME_SIZE = 1024 +socket.setdefaulttimeout(3) class HmiRequest(object): - def __init__(self): super().__init__() - self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.c.connect(('192.168.0.160', 5050)) - self.c.connect(('192.168.84.129', 5050)) - self.c.setblocking(False) - self.c_msg = [] - self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.c_xs.connect(('192.168.0.160', 6666)) - self.c_xs.connect(('192.168.84.129', 6666)) - self.c_xs.setblocking(False) - self.c_msg_xs = [] + while True: + try: + self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # self.c.connect(('192.168.0.160', 5050)) + self.c.connect(('192.168.84.129', 5050)) + self.c.setblocking(False) + self.c_msg = [] + self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # self.c_xs.connect(('192.168.0.160', 6666)) + self.c_xs.connect(('192.168.84.129', 6666)) + self.c_xs.setblocking(False) + self.c_msg_xs = [] + break + except Exception as Err: + current_path = os.path.dirname(__file__) + with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: + f_h.write('0') + + print("Connection failed, will try again after 2 seconds...") + time.sleep(2) + self.t_heartbeat = threading.Thread(target=self.__heartbeat) self.t_heartbeat.daemon = True self.t_heartbeat.start() @@ -54,10 +66,11 @@ class HmiRequest(object): exit(10) def __heartbeat(self): + current_path = os.path.dirname(__file__) while True: _id = self.excution('controller.heart') _flag = 1 if self.get_from_id(_id) else 0 - with open("./templates/heartbeat", "w", encoding='utf-8') as f_h: + with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: f_h.write(str(_flag)) time.sleep(10) @@ -222,10 +235,12 @@ class HmiRequest(object): return _id def excution(self, command, flag=0, **kwargs): + current_path = os.path.dirname(__file__) + if flag == 0: # for old protocols req = None try: - with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json: + with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json: req = json.load(f_json) except: print(f"暂不支持 {command} 功能,或确认该功能存在...")