v0.1.9.3(2024/07/15)
1. [APIs: openapi.py] - иÄodbusl½Ó§°ܱ¨´í³ö½£¬ʹֻ֮Ôautomatic testҳÃÏʾ - 将该文件移动至toplevel,为后面扩展做准备 - 修改heartbeat文件路径,使后续打包的时候更方便 2. [APIs: aio.py]: - 修改heartbeat文件路径,使后续打包的时候更方便 - 修改write2textbox函数的打印逻辑,先判断网络相关
This commit is contained in:
parent
5ed38b4b2a
commit
cdbe1c40c6
@ -480,3 +480,13 @@ v0.1.9.2(2024/07/13)
|
||||
- 整体梳理了trq/trqh的传递路径,现已修正完毕
|
||||
- 减速比rr数据源修改为configs.xlsx
|
||||
4. 在current工程main函数增加 VelSet 100语句
|
||||
|
||||
v0.1.9.3(2024/07/15)
|
||||
1. [APIs: openapi.py]
|
||||
- 修改modbus连接失败报错输出形式,使之只在automatic test页面显示
|
||||
- 将该文件移动至toplevel,为后面扩展做准备
|
||||
- 修改heartbeat文件路径,使后续打包的时候更方便
|
||||
2. [APIs: aio.py]:
|
||||
- 修改heartbeat文件路径,使后续打包的时候更方便
|
||||
- 修改write2textbox函数的打印逻辑,先判断网络相关
|
||||
|
||||
|
@ -9,8 +9,9 @@ from urllib.request import urlopen
|
||||
from socket import setdefaulttimeout
|
||||
from data_process import *
|
||||
from automatic_test import *
|
||||
import openapi
|
||||
|
||||
current_path = dirname(__file__)
|
||||
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
|
||||
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
|
||||
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
|
||||
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
|
||||
@ -166,7 +167,7 @@ class App(customtkinter.CTk):
|
||||
self.seg_button.configure(state='disabled')
|
||||
# self.tabview.configure(state='disabled')
|
||||
self.textbox.delete(index1='1.0', index2='end')
|
||||
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
|
||||
with open(heartbeat, 'r', encoding='utf-8') as f_h:
|
||||
c_state = f_h.read().strip()
|
||||
|
||||
if c_state == '0' and value != '功能切换':
|
||||
@ -181,13 +182,13 @@ class App(customtkinter.CTk):
|
||||
# self.tabview.configure(state='normal')
|
||||
|
||||
def detect_network(self):
|
||||
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write('0')
|
||||
self.hr = openapi.HmiRequest(self.write2textbox)
|
||||
self.md = openapi.ModbusRequest(self.write2textbox)
|
||||
|
||||
while True:
|
||||
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, 'r', encoding='utf-8') as f_hb:
|
||||
c_state = f_hb.read().strip()
|
||||
pb_color = 'green' if c_state == '1' else 'red'
|
||||
self.progressbar.configure(progress_color=pb_color)
|
||||
@ -333,7 +334,7 @@ class App(customtkinter.CTk):
|
||||
self.textbox.tag_config(tagName=color, foreground=color)
|
||||
tab_name_cur = self.tabview.get()
|
||||
|
||||
if tab_name == tab_name_cur:
|
||||
if tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
|
||||
if wait != 0:
|
||||
self.textbox.insert(index='end', text=text, tags=color)
|
||||
self.textbox.update()
|
||||
@ -347,7 +348,7 @@ class App(customtkinter.CTk):
|
||||
self.textbox.insert(index='end', text=text + '\n', tags=color)
|
||||
self.textbox.update()
|
||||
self.textbox.see('end')
|
||||
elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
|
||||
elif tab_name == tab_name_cur:
|
||||
if wait != 0:
|
||||
self.textbox.insert(index='end', text=text, tags=color)
|
||||
self.textbox.update()
|
||||
|
@ -1 +1 @@
|
||||
__all__ = ['openapi', 'btn_functions', 'do_brake', 'do_current']
|
||||
__all__ = ['btn_functions', 'do_brake', 'do_current']
|
@ -11,6 +11,7 @@ from pymodbus.constants import Endian
|
||||
MAX_FRAME_SIZE = 1024
|
||||
setdefaulttimeout(2)
|
||||
current_path = dirname(__file__)
|
||||
heartbeat = f'{current_path}/../assets/templates/heartbeat'
|
||||
|
||||
|
||||
class ModbusRequest(object):
|
||||
@ -27,22 +28,19 @@ class ModbusRequest(object):
|
||||
try:
|
||||
self.c.write_register(40002, 1)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def motor_on(self):
|
||||
try:
|
||||
self.c.write_register(40003, 1)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def trigger_estop(self):
|
||||
try:
|
||||
self.c.write_register(40012, 0)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def reset_estop(self):
|
||||
try:
|
||||
@ -54,66 +52,50 @@ class ModbusRequest(object):
|
||||
sleep(0.2)
|
||||
self.c.write_register(40001, 0)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def clear_alarm(self):
|
||||
try:
|
||||
self.c.write_register(40000, 1)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def get_cart_vel(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(40537, 7)
|
||||
print(f"cart vel: {results.registers}")
|
||||
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def get_jnt_vel(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(40579, 7)
|
||||
print(f"joint vel: {results.registers}")
|
||||
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def get_tcp_vel(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(40607, 7)
|
||||
print(f"tcp vel: {results.registers}")
|
||||
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def get_tcp_mag_vel(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(40621, 1)
|
||||
print(f"tcp mag: {results.registers}")
|
||||
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_act(self, number):
|
||||
try:
|
||||
self.c.write_register(41000, number)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def read_ready_to_go(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(41001, 1)
|
||||
return results.registers[0]
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def read_scenario_time(self):
|
||||
try:
|
||||
@ -122,15 +104,13 @@ class ModbusRequest(object):
|
||||
result = f"{result.decode_32bit_float():.3f}"
|
||||
return result
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_stop0(self, number):
|
||||
try:
|
||||
self.c.write_register(41004, number)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_speed_max(self, speed):
|
||||
try:
|
||||
@ -139,16 +119,14 @@ class ModbusRequest(object):
|
||||
payload = builder.build()
|
||||
self.c.write_registers(41005, payload, skip_encode=True)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def read_brake_done(self):
|
||||
try:
|
||||
results = self.c.read_holding_registers(41007, 1)
|
||||
return results.registers[0]
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_axis(self, axis):
|
||||
try:
|
||||
@ -157,22 +135,19 @@ class ModbusRequest(object):
|
||||
payload = builder.to_registers()
|
||||
self.c.write_registers(41008, payload)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_probe(self, probe):
|
||||
try:
|
||||
self.c.write_register(41010, probe)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_pon(self, pon): # positive or negative
|
||||
try:
|
||||
self.c.write_register(41011, pon)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
self.w2t(f"{Err}\n无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
|
||||
class HmiRequest(object):
|
||||
@ -210,7 +185,7 @@ class HmiRequest(object):
|
||||
|
||||
def sock_conn(self):
|
||||
# while True:
|
||||
with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, "r", encoding='utf-8') as f_hb:
|
||||
c_state = f_hb.read().strip()
|
||||
if c_state == '0':
|
||||
try:
|
||||
@ -224,7 +199,7 @@ class HmiRequest(object):
|
||||
self.c_xs.setblocking(False)
|
||||
|
||||
self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
|
||||
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write('1')
|
||||
md = ModbusRequest(self.w2t)
|
||||
md.reset_estop()
|
||||
@ -234,7 +209,7 @@ class HmiRequest(object):
|
||||
md.write_axis(1)
|
||||
except Exception as Err:
|
||||
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
|
||||
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write('0')
|
||||
|
||||
def header_check(self, index, data):
|
||||
@ -272,12 +247,12 @@ class HmiRequest(object):
|
||||
print(f"hb = {_flag}", end=' ')
|
||||
print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
|
||||
print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n')
|
||||
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
with open(heartbeat, "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write(_flag)
|
||||
if _flag == '0':
|
||||
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
|
||||
sleep(1.5)
|
||||
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
|
||||
# with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
|
||||
# for msg in self.c_msg:
|
||||
# f.write(str(loads(msg)) + '\n')
|
||||
|
||||
@ -590,7 +565,7 @@ class HmiRequest(object):
|
||||
if flg == 0: # for old protocols
|
||||
req = None
|
||||
try:
|
||||
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8',
|
||||
with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8',
|
||||
mode='r') as f_json:
|
||||
req = load(f_json)
|
||||
except:
|
Reference in New Issue
Block a user