diff --git a/aio/README.md b/aio/README.md index b5e70d6..1289907 100644 --- a/aio/README.md +++ b/aio/README.md @@ -480,3 +480,13 @@ v0.1.9.2(2024/07/13) - 整体梳理了trq/trqh的传递路径,现已修正完毕 - 减速比rr数据源修改为configs.xlsx 4. 在current工程main函数增加 VelSet 100语句 + +v0.1.9.3(2024/07/15) +1. [APIs: openapi.py] + - 修改modbus连接失败报错输出形式,使之只在automatic test页面显示 + - 将该文件移动至toplevel,为后面扩展做准备 + - 修改heartbeat文件路径,使后续打包的时候更方便 +2. [APIs: aio.py]: + - 修改heartbeat文件路径,使后续打包的时候更方便 + - 修改write2textbox函数的打印逻辑,先判断网络相关 + diff --git a/aio/code/aio.py b/aio/code/aio.py index 094f2b2..fb6c984 100644 --- a/aio/code/aio.py +++ b/aio/code/aio.py @@ -9,8 +9,9 @@ from urllib.request import urlopen from socket import setdefaulttimeout from data_process import * from automatic_test import * +import openapi -current_path = dirname(__file__) +heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat' customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light" customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue" customtkinter.set_widget_scaling(1.1) # widget dimensions and text size @@ -166,7 +167,7 @@ class App(customtkinter.CTk): self.seg_button.configure(state='disabled') # self.tabview.configure(state='disabled') self.textbox.delete(index1='1.0', index2='end') - with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h: + with open(heartbeat, 'r', encoding='utf-8') as f_h: c_state = f_h.read().strip() if c_state == '0' and value != '功能切换': @@ -181,13 +182,13 @@ class App(customtkinter.CTk): # self.tabview.configure(state='normal') def detect_network(self): - with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: + with open(heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write('0') self.hr = openapi.HmiRequest(self.write2textbox) self.md = openapi.ModbusRequest(self.write2textbox) while True: - with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb: + with open(heartbeat, 'r', encoding='utf-8') as f_hb: c_state = f_hb.read().strip() pb_color = 'green' if c_state == '1' else 'red' self.progressbar.configure(progress_color=pb_color) @@ -333,7 +334,7 @@ class App(customtkinter.CTk): self.textbox.tag_config(tagName=color, foreground=color) tab_name_cur = self.tabview.get() - if tab_name == tab_name_cur: + if tab_name == 'openapi' and tab_name_cur == 'Automatic Test': if wait != 0: self.textbox.insert(index='end', text=text, tags=color) self.textbox.update() @@ -347,7 +348,7 @@ class App(customtkinter.CTk): self.textbox.insert(index='end', text=text + '\n', tags=color) self.textbox.update() self.textbox.see('end') - elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test': + elif tab_name == tab_name_cur: if wait != 0: self.textbox.insert(index='end', text=text, tags=color) self.textbox.update() diff --git a/aio/code/automatic_test/__init__.py b/aio/code/automatic_test/__init__.py index f60ee0b..5c1ee4d 100644 --- a/aio/code/automatic_test/__init__.py +++ b/aio/code/automatic_test/__init__.py @@ -1 +1 @@ -__all__ = ['openapi', 'btn_functions', 'do_brake', 'do_current'] \ No newline at end of file +__all__ = ['btn_functions', 'do_brake', 'do_current'] \ No newline at end of file diff --git a/aio/code/automatic_test/openapi.py b/aio/code/openapi.py similarity index 84% rename from aio/code/automatic_test/openapi.py rename to aio/code/openapi.py index f688694..042c4b5 100644 --- a/aio/code/automatic_test/openapi.py +++ b/aio/code/openapi.py @@ -11,6 +11,7 @@ from pymodbus.constants import Endian MAX_FRAME_SIZE = 1024 setdefaulttimeout(2) current_path = dirname(__file__) +heartbeat = f'{current_path}/../assets/templates/heartbeat' class ModbusRequest(object): @@ -27,22 +28,19 @@ class ModbusRequest(object): try: self.c.write_register(40002, 1) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def motor_on(self): try: self.c.write_register(40003, 1) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def trigger_estop(self): try: self.c.write_register(40012, 0) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def reset_estop(self): try: @@ -54,66 +52,50 @@ class ModbusRequest(object): sleep(0.2) self.c.write_register(40001, 0) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def clear_alarm(self): try: self.c.write_register(40000, 1) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_cart_vel(self): try: results = self.c.read_holding_registers(40537, 7) - print(f"cart vel: {results.registers}") - except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_jnt_vel(self): try: results = self.c.read_holding_registers(40579, 7) - print(f"joint vel: {results.registers}") - except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_tcp_vel(self): try: results = self.c.read_holding_registers(40607, 7) - print(f"tcp vel: {results.registers}") - except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_tcp_mag_vel(self): try: results = self.c.read_holding_registers(40621, 1) - print(f"tcp mag: {results.registers}") - except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_act(self, number): try: self.c.write_register(41000, number) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_ready_to_go(self): try: results = self.c.read_holding_registers(41001, 1) return results.registers[0] except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_scenario_time(self): try: @@ -122,15 +104,13 @@ class ModbusRequest(object): result = f"{result.decode_32bit_float():.3f}" return result except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_stop0(self, number): try: self.c.write_register(41004, number) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_speed_max(self, speed): try: @@ -139,16 +119,14 @@ class ModbusRequest(object): payload = builder.build() self.c.write_registers(41005, payload, skip_encode=True) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_brake_done(self): try: results = self.c.read_holding_registers(41007, 1) return results.registers[0] except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_axis(self, axis): try: @@ -157,22 +135,19 @@ class ModbusRequest(object): payload = builder.to_registers() self.c.write_registers(41008, payload) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_probe(self, probe): try: self.c.write_register(41010, probe) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_pon(self, pon): # positive or negative try: self.c.write_register(41011, pon) except Exception as Err: - self.w2t(f"{Err}") - self.w2t("无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) + self.w2t(f"{Err}\n无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) class HmiRequest(object): @@ -210,7 +185,7 @@ class HmiRequest(object): def sock_conn(self): # while True: - with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb: + with open(heartbeat, "r", encoding='utf-8') as f_hb: c_state = f_hb.read().strip() if c_state == '0': try: @@ -224,7 +199,7 @@ class HmiRequest(object): self.c_xs.setblocking(False) self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name) - with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: + with open(heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write('1') md = ModbusRequest(self.w2t) md.reset_estop() @@ -234,7 +209,7 @@ class HmiRequest(object): md.write_axis(1) except Exception as Err: self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name) - with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: + with open(heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write('0') def header_check(self, index, data): @@ -272,12 +247,12 @@ class HmiRequest(object): print(f"hb = {_flag}", end=' ') print(f"len(c_msg) = {len(self.c_msg)}", end=' ') print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n') - with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb: + with open(heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write(_flag) if _flag == '0': self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name) sleep(1.5) - # with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f: + # with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f: # for msg in self.c_msg: # f.write(str(loads(msg)) + '\n') @@ -590,7 +565,7 @@ class HmiRequest(object): if flg == 0: # for old protocols req = None try: - with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', + with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json: req = load(f_json) except: