v0.1.9.1(2024/07/12)
1. [APIs: do_brake.py] - 修改正负方向拍急停的逻辑,基本原理为:运行之前发送正负方向信号pon给RL,RL根据信号以及速度正负号运作 - 由于上述修改,正负方向急停准确率可达100% 2. [APIs: aio.py] - 修改write2textbox的输出逻辑,实现更加灵活的自定义输出,同时修改相关部分 3. [APIs: openapi.py] - modbus类新增指示政府方向急停的信号pon,将modbus类入参中的tab_name删除,并修改tab_name的值为'openapi' - socket类种修改tab_name的值为'openapi'
This commit is contained in:
@ -43,8 +43,7 @@ class App(customtkinter.CTk):
|
||||
self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold")
|
||||
self.w_param = 84
|
||||
self.hr = None
|
||||
self.md_at = None
|
||||
self.md_dp = None
|
||||
self.md = None
|
||||
# =====================================================================
|
||||
# configure window
|
||||
self.title("AIO - All in one automatic toolbox")
|
||||
@ -71,7 +70,7 @@ class App(customtkinter.CTk):
|
||||
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
|
||||
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
|
||||
# create version info
|
||||
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.0\nDate: 07/10/2024", font=self.my_font, text_color="#4F4F4F")
|
||||
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.1\nDate: 07/12/2024", font=self.my_font, text_color="#4F4F4F")
|
||||
self.frame_func.rowconfigure(6, weight=1)
|
||||
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
|
||||
# =====================================================================
|
||||
@ -175,7 +174,7 @@ class App(customtkinter.CTk):
|
||||
else:
|
||||
for _func in _btn_funcs:
|
||||
if _btn_funcs[_func] == value:
|
||||
btn_functions.main(self.hr, self.md_at, _func, self.write2textbox)
|
||||
btn_functions.main(self.hr, self.md, _func, self.write2textbox)
|
||||
break
|
||||
|
||||
self.seg_button.configure(state='normal')
|
||||
@ -185,8 +184,7 @@ class App(customtkinter.CTk):
|
||||
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write('0')
|
||||
self.hr = openapi.HmiRequest(self.write2textbox)
|
||||
self.md_at = openapi.ModbusRequest(self, 'Automatic Test')
|
||||
self.md_dp = openapi.ModbusRequest(self, 'Data Process')
|
||||
self.md = openapi.ModbusRequest(self.write2textbox)
|
||||
|
||||
while True:
|
||||
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb:
|
||||
@ -194,7 +192,6 @@ class App(customtkinter.CTk):
|
||||
pb_color = 'green' if c_state == '1' else 'red'
|
||||
self.progressbar.configure(progress_color=pb_color)
|
||||
if c_state == '0':
|
||||
# self.textbox.delete(index1='1.0', index2='end')
|
||||
self.hr.t_bool = False
|
||||
sleep(3)
|
||||
del self.hr
|
||||
@ -203,7 +200,6 @@ class App(customtkinter.CTk):
|
||||
|
||||
def tabview_click(self):
|
||||
self.initialization()
|
||||
# self.textbox.delete(index1='1.0', index2='end')
|
||||
|
||||
tab_name = self.tabview.get()
|
||||
if tab_name == 'Data Process':
|
||||
@ -351,7 +347,21 @@ class App(customtkinter.CTk):
|
||||
self.textbox.insert(index='end', text=text + '\n', tags=color)
|
||||
self.textbox.update()
|
||||
self.textbox.see('end')
|
||||
|
||||
elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
|
||||
if wait != 0:
|
||||
self.textbox.insert(index='end', text=text, tags=color)
|
||||
self.textbox.update()
|
||||
self.textbox.see('end')
|
||||
elif exitcode != 0:
|
||||
self.textbox.insert(index='end', text=text + '\n', tags=color)
|
||||
self.textbox.update()
|
||||
self.textbox.see('end')
|
||||
raise Exception(f"Error code: {exitcode}")
|
||||
else:
|
||||
self.textbox.insert(index='end', text=text + '\n', tags=color)
|
||||
self.textbox.update()
|
||||
self.textbox.see('end')
|
||||
|
||||
def is_float(self, flag, *args):
|
||||
for item in args:
|
||||
try:
|
||||
@ -459,10 +469,10 @@ class App(customtkinter.CTk):
|
||||
func_dict[flag](path=args[0], w2t=self.write2textbox)
|
||||
elif flag == 5:
|
||||
self.pre_warning()
|
||||
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
|
||||
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
|
||||
elif flag == 6:
|
||||
self.pre_warning()
|
||||
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
|
||||
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
|
||||
else:
|
||||
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
|
||||
|
||||
|
@ -168,6 +168,13 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
|
||||
]
|
||||
wb = load_workbook(config_file, read_only=True)
|
||||
ws = wb['Target']
|
||||
if ws.cell(row=1, column=1).value == 'positive':
|
||||
md.write_pon(True)
|
||||
elif ws.cell(row=1, column=1).value == 'negative':
|
||||
md.write_pon(False)
|
||||
else:
|
||||
w2t("configs.xlsx中Target页面A1单元格填写不正确,检查后重新运行...", 0, 111, 'red', 'Automatic Test')
|
||||
|
||||
for condition in result_dirs:
|
||||
_reach = condition.split('_')[0].removeprefix('reach')
|
||||
_load = condition.split('_')[1].removeprefix('load')
|
||||
@ -196,17 +203,17 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
|
||||
ssh.set_missing_host_key_policy(AutoAddPolicy())
|
||||
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
|
||||
if ws.cell(row=1, column=1).value == 'positive':
|
||||
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
|
||||
elif ws.cell(row=1, column=1).value == 'negative':
|
||||
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
|
||||
elif ws.cell(row=1, column=1).value == 'negative':
|
||||
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
|
||||
else:
|
||||
w2t("configs.xlsx中Target页面A1单元格填写不正确,检查后重新运行...", 0, 111, 'red', 'Automatic Test')
|
||||
_rl_speed = f"VelSet {_speed}"
|
||||
cmd = 'cd /home/luoshi/bin/controller/; '
|
||||
cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; '
|
||||
cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; '
|
||||
cmd += f'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
|
||||
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; '
|
||||
cmd += 'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
|
||||
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod'
|
||||
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
|
||||
stdin.write('luoshi2019' + '\n')
|
||||
stdin.flush()
|
||||
@ -233,7 +240,7 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
|
||||
if count == 1:
|
||||
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
|
||||
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
|
||||
sleep(8) # 前八秒获取实际最大速度
|
||||
sleep(10) # 前10秒获取实际最大速度
|
||||
|
||||
md.trigger_estop()
|
||||
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
|
||||
@ -277,7 +284,7 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
|
||||
|
||||
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
|
||||
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
|
||||
# sleep(randint(3, 6))
|
||||
sleep(randint(3, 6))
|
||||
md.write_probe(True)
|
||||
_t_start = time()
|
||||
while True:
|
||||
@ -288,8 +295,12 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
|
||||
sleep(1) # 保证所有数据均已返回
|
||||
break
|
||||
else:
|
||||
if (time() - _t_start) // 60 > 1:
|
||||
w2t(f"规定时间内未找到合适的点触发急停,需要确认RL/Python程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
|
||||
if (time() - _t_start) > 30:
|
||||
w2t(f"30s内未触发急停,该条数据无效,需要确认RL/Python程序编写正确并正常执行,或者判别是否是机器本体问题...", 0, 0, 'red', 'Automatic Test')
|
||||
md.write_probe(False)
|
||||
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
|
||||
sleep(1) # 保证所有数据均已返回
|
||||
break
|
||||
else:
|
||||
sleep(1)
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
from json import load, dumps, loads
|
||||
from json import load, dumps
|
||||
from socket import socket, setdefaulttimeout, AF_INET, SOCK_STREAM
|
||||
from threading import Thread
|
||||
import selectors
|
||||
@ -14,10 +14,10 @@ current_path = dirname(__file__)
|
||||
|
||||
|
||||
class ModbusRequest(object):
|
||||
def __init__(self, w2t, tab_name):
|
||||
def __init__(self, w2t):
|
||||
super().__init__()
|
||||
self.w2t = w2t
|
||||
self.tab_name = tab_name
|
||||
self.tab_name = 'openapi'
|
||||
self.host = '192.168.0.160'
|
||||
self.port = 502
|
||||
self.c = ModbusTcpClient(self.host, self.port)
|
||||
@ -167,6 +167,13 @@ class ModbusRequest(object):
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
def write_pon(self, pon): # positive or negative
|
||||
try:
|
||||
self.c.write_register(41011, pon)
|
||||
except Exception as Err:
|
||||
self.w2t(f"{Err}")
|
||||
self.w2t("无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
|
||||
|
||||
|
||||
class HmiRequest(object):
|
||||
def __init__(self, w2t):
|
||||
@ -182,7 +189,7 @@ class HmiRequest(object):
|
||||
self.flag_xs = 0
|
||||
self.response_xs = ''
|
||||
self.t_bool = True
|
||||
self.tab_name = 'Automatic Test'
|
||||
self.tab_name = 'openapi'
|
||||
self.pkg_size = 0
|
||||
self.broke = 0
|
||||
self.half = 0
|
||||
@ -219,7 +226,7 @@ class HmiRequest(object):
|
||||
self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
|
||||
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
|
||||
f_hb.write('1')
|
||||
md = ModbusRequest(self.w2t, self.tab_name)
|
||||
md = ModbusRequest(self.w2t)
|
||||
md.reset_estop()
|
||||
md.clear_alarm()
|
||||
md.write_act(False)
|
||||
|
Reference in New Issue
Block a user