11. [openapi.py] 建联部分做容错处理,并将读写文件做自适应处理
12. [aio.py] 将读写文件做自适应处理,引入openapi模块并生成实例,做心跳检测,将socket超时时间修改为3s
This commit is contained in:
gitea 2024-06-24 19:22:56 +08:00
parent a4009eb17c
commit 6604f0ba06
4 changed files with 42 additions and 20 deletions

4
.gitignore vendored
View File

@ -4,4 +4,6 @@ aio/.idea/
aio/code/__pycache__/ aio/code/__pycache__/
aio/package/ aio/package/
aio/venv aio/venv
aio/__pycache__/ aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/

View File

@ -262,6 +262,8 @@ v0.1.7.0(2024/06/29)
8. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例 8. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例
9. [aio.py] 完成detect_network并在main函数开启线程 9. [aio.py] 完成detect_network并在main函数开启线程
10. 将templates文件夹移动到assets内 10. 将templates文件夹移动到assets内
11. [openapi.py] 建联部分做容错逻辑,并将读写文件做自适应处理
12. [aio.py] 将读写文件做自适应处理引入openapi模块并生成实例做心跳检测将socket超时时间修改为3s
> **关于HMI接口** > **关于HMI接口**
> - 封包解包顺序:帧长度二字节/包长度四字节/协议二字节/预留二字节,\x04\x00:\x00\x00\tR:\x02:\x00 > - 封包解包顺序:帧长度二字节/包长度四字节/协议二字节/预留二字节,\x04\x00:\x00\x00\tR:\x02:\x00

View File

@ -1,5 +1,5 @@
import tkinter import tkinter
from os.path import exists from os.path import exists, dirname
from os import getcwd from os import getcwd
from threading import Thread from threading import Thread
import tkinter.messagebox import tkinter.messagebox
@ -11,12 +11,13 @@ import data_process.brake as brake
import data_process.current as current import data_process.current as current
import data_process.iso as iso import data_process.iso as iso
import data_process.wavelogger as wavelogger import data_process.wavelogger as wavelogger
import automatic_test.openapi as openapi
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light" customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue" customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(10) setdefaulttimeout(3)
# global vars # global vars
btns_func = { btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'}, 'start': {'btn': '', 'row': 1, 'text': '开始运行'},
@ -146,7 +147,7 @@ class App(customtkinter.CTk):
self.seg_button.configure(values=["无效功能", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"]) self.seg_button.configure(values=["无效功能", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"])
self.seg_button.set("无效功能") self.seg_button.set("无效功能")
# self.seg_button.configure(state="disabled") # self.seg_button.configure(state="disabled")
# create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test')) self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew") self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.progressbar.configure(mode="determinnate") self.progressbar.configure(mode="determinnate")
@ -206,8 +207,9 @@ class App(customtkinter.CTk):
pass pass
def detect_network(self): def detect_network(self):
current_path = dirname(__file__)
while True: while True:
with open('./automatic_test/templates/heartbeat', 'r', encoding='utf-8') as f_h: with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
pb_color = 'green' if f_h.read().strip() == '1' else 'red' pb_color = 'green' if f_h.read().strip() == '1' else 'red'
self.progressbar.configure(progress_color=pb_color) self.progressbar.configure(progress_color=pb_color)
sleep(3) sleep(3)
@ -490,8 +492,9 @@ class App(customtkinter.CTk):
if __name__ == "__main__": if __name__ == "__main__":
with open("./automatic_test/templates/heartbeat", "w", encoding='utf-8') as f_h: t_hr = Thread(target=openapi.HmiRequest)
f_h.write('0') t_hr.daemon = True
t_hr.start()
aio = App() aio = App()
aio.net_detect = Thread(target=aio.detect_network) aio.net_detect = Thread(target=aio.detect_network)

View File

@ -3,24 +3,36 @@ import socket
import threading import threading
import selectors import selectors
import time import time
import os
MAX_FRAME_SIZE = 1024 MAX_FRAME_SIZE = 1024
socket.setdefaulttimeout(3)
class HmiRequest(object): class HmiRequest(object):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while True:
# self.c.connect(('192.168.0.160', 5050)) try:
self.c.connect(('192.168.84.129', 5050)) self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.setblocking(False) # self.c.connect(('192.168.0.160', 5050))
self.c_msg = [] self.c.connect(('192.168.84.129', 5050))
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.c.setblocking(False)
# self.c_xs.connect(('192.168.0.160', 6666)) self.c_msg = []
self.c_xs.connect(('192.168.84.129', 6666)) self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.setblocking(False) # self.c_xs.connect(('192.168.0.160', 6666))
self.c_msg_xs = [] self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.c_msg_xs = []
break
except Exception as Err:
current_path = os.path.dirname(__file__)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
print("Connection failed, will try again after 2 seconds...")
time.sleep(2)
self.t_heartbeat = threading.Thread(target=self.__heartbeat) self.t_heartbeat = threading.Thread(target=self.__heartbeat)
self.t_heartbeat.daemon = True self.t_heartbeat.daemon = True
self.t_heartbeat.start() self.t_heartbeat.start()
@ -54,10 +66,11 @@ class HmiRequest(object):
exit(10) exit(10)
def __heartbeat(self): def __heartbeat(self):
current_path = os.path.dirname(__file__)
while True: while True:
_id = self.excution('controller.heart') _id = self.excution('controller.heart')
_flag = 1 if self.get_from_id(_id) else 0 _flag = 1 if self.get_from_id(_id) else 0
with open("./templates/heartbeat", "w", encoding='utf-8') as f_h: with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write(str(_flag)) f_h.write(str(_flag))
time.sleep(10) time.sleep(10)
@ -222,10 +235,12 @@ class HmiRequest(object):
return _id return _id
def excution(self, command, flag=0, **kwargs): def excution(self, command, flag=0, **kwargs):
current_path = os.path.dirname(__file__)
if flag == 0: # for old protocols if flag == 0: # for old protocols
req = None req = None
try: try:
with open(f'./templates/{command}.json', encoding='utf-8', mode='r') as f_json: with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json) req = json.load(f_json)
except: except:
print(f"暂不支持 {command} 功能,或确认该功能存在...") print(f"暂不支持 {command} 功能,或确认该功能存在...")