This repository has been archived on 2025-03-27. You can view files and clone it, but cannot push or open issues or pull requests.
2025-03-10 16:06:27 +08:00

1234 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import customtkinter as ctk
from tkinter import messagebox
from tkinter import filedialog
from tkinter import ttk
import tkinter as tk
from urllib import request
from PIL import Image
import webbrowser
import sqlite3
from datetime import datetime
import os
from common import clibs, openapi
from data_process import current, brake, iso, wavelogger
from automatic_test import do_current, do_brake
from durable_docs import factory_test, create_plot
import threading
import re
class App:
def __init__(self):
# ========================================================================
self.root = ctk.CTk()
self.__set_root()
self.__t = datetime.now().strftime("%Y%m%d%H%M%S")
# ========================================================================
self.style = ttk.Style()
self.style.configure("tv.Treeview", font=self.f_treeview, rowheight=25)
self.style.configure("Treeview.Heading", font=self.f_normal, rowheight=30)
self.entry_path_dpv = ctk.StringVar()
self.entry_path_dpv.set("数据文件夹路径")
self.entry_path_atv = ctk.StringVar()
self.entry_path_atv.set("数据文件夹路径")
self.entry_ip_atv = ctk.StringVar()
self.entry_ip_atv.set("192.168.0.160")
self.label_pages_logs = ctk.StringVar()
self.label_pages_logs.set("-.-.-.-.-.-")
self.entry_tips_v = None
self.server_vers = None
self.tv_cols = {1: ["ID", 1], 2: ["time", 160], 3: ["level", 25], 4: ["module", 30], 5: ["content", 700]}
self.chk_box_v = ctk.BooleanVar()
self.chk_box_v.set(False)
self.entry_path_ddv = ctk.StringVar()
self.entry_path_ddv.set("数据文件夹路径")
# ========================================================================
self.frame_left = ctk.CTkFrame(self.root, width=80, corner_radius=0, fg_color="#E9E9E9")
# -------
self.tabview_top = ctk.CTkTabview(self.root, width=900, height=180, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_top)
self.tabview_top._segmented_button.configure(font=ctk.CTkFont(family="Consolas", size=18, weight="bold"))
self.tabview_top.add("数据处理")
self.tabview_top.add("自动测试")
self.tabview_top.add("耐久记录")
# -------
self.tabview_bottom = ctk.CTkTabview(self.root, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_bottom)
self.tabview_bottom._segmented_button.configure(font=ctk.CTkFont(family="Consolas", size=18, weight="bold"))
self.tabview_bottom.add("输出")
self.tabview_bottom.add("日志")
# -------
self.frame_status = ctk.CTkFrame(self.root, height=30)
# ========================================================================
self.label_logo = ctk.CTkLabel(self.frame_left, text="Rokae AIO", font=self.f_logo, text_color="#4F4F4F")
self.btn_start = ctk.CTkButton(self.frame_left, text="开始运行", font=self.f_normal, fg_color="#4F4F4F", command=lambda: self.__thread_it(self.__program_start))
self.btn_reset_state = ctk.CTkButton(self.frame_left, text="重置状态", font=self.f_normal, fg_color="#4F4F4F", command=self.__reset_state)
# ========================================================================
self.om_main_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["brake", "current", "iso", "wavelogger"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", command=self.__main_switch_dp)
self.om_sub_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["cycle", "max", "avg"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090")
self.label_path_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Path", font=self.f_common)
self.entry_path_dp = ctk.CTkEntry(self.tabview_top.tab("数据处理"), width=80, state="disabled", textvariable=self.entry_path_dpv, font=self.f_entry, text_color="#818181")
self.label_vel_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Vel", font=self.f_common)
self.om_vel_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", text_color_disabled="#808000")
self.label_trq_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Trq", font=self.f_common)
self.om_trq_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", text_color_disabled="#808000")
self.label_trqh_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="TrqH", font=self.f_common)
self.om_trqh_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", text_color_disabled="#808000")
self.label_sensor_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Sensor", font=self.f_common)
self.om_sensor_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", text_color_disabled="#808000")
self.label_estop_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Estop", font=self.f_common)
self.om_estop_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090", text_color_disabled="#808000")
self.popupmenu_path_dp = tk.Menu(self.entry_path_dp, tearoff=False)
self.popupmenu_path_dp.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy_path(self.entry_path_dp))
# ========================================================================
self.om_main_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["brake", "current"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090")
self.om_sub_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["tool33", "tool66", "tool100", "inertia"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", button_hover_color="#708090")
self.label_ip_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), anchor="e", text="IP", font=self.f_common)
self.entry_ip_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=160, textvariable=self.entry_ip_atv, font=self.f_entry, text_color="#818181")
self.btn_conn = ctk.CTkButton(self.tabview_top.tab("自动测试"), text="连接", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, cursor="hand2")
self.label_path_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), width=50, anchor="e", text="Path", font=self.f_common)
self.entry_path_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=80, state="disabled", textvariable=self.entry_path_atv, font=self.f_entry, text_color="#818181")
self.frame_top = ctk.CTkFrame(self.tabview_top.tab("自动测试"), width=120, height=10, fg_color="#E9E9E9")
self.btn_robot_info = ctk.CTkButton(self.frame_top, width=100, text="机器信息", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__robot_info)
self.btn_trig_estop = ctk.CTkButton(self.frame_top, width=100, text="触发急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__trig_estop)
self.btn_reset_estop = ctk.CTkButton(self.frame_top, width=100, text="恢复急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__reset_estop)
self.popupmenu_ip = tk.Menu(self.entry_ip_at, tearoff=False)
self.popupmenu_ip.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_ip_at))
self.popupmenu_ip.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_ip_at))
self.popupmenu_ip.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_ip_at))
self.popupmenu_ip.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_ip_at))
self.popupmenu_path_at = tk.Menu(self.entry_path_at, tearoff=False)
self.popupmenu_path_at.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy_path(self.entry_path_at))
# ========================================================================
self.scrollable_frame = ctk.CTkScrollableFrame(self.tabview_top.tab("耐久记录"), width=360, height=60, label_text="选择曲线", label_font=self.f_segbtn)
self.switch_1 = ctk.CTkSwitch(self.scrollable_frame, text="hw_joint_vel_feedback", font=self.f_treeview, onvalue=True, offvalue=False)
self.switch_2 = ctk.CTkSwitch(self.scrollable_frame, text="device_servo_trq_feedback", font=self.f_treeview, onvalue=True, offvalue=False)
self.label_path_dd = ctk.CTkLabel(self.tabview_top.tab("耐久记录"), width=80, anchor="e", text="指定路径", font=self.f_segbtn_l)
self.entry_path_dd = ctk.CTkEntry(self.tabview_top.tab("耐久记录"), width=80, state="disabled", textvariable=self.entry_path_ddv, font=self.f_entry, text_color="#818181")
self.label_interval = ctk.CTkLabel(self.tabview_top.tab("耐久记录"), text="间隔时间", width=80, font=self.f_segbtn_l)
self.entry_interval = ctk.CTkEntry(self.tabview_top.tab("耐久记录"), placeholder_text="采样间隔时间,默认(最小)300s", font=self.f_entry)
self.btn_plot = ctk.CTkButton(self.tabview_top.tab("耐久记录"), text="绘图", width=80, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__create_plot)
self.chk_box = ctk.CTkCheckBox(self.tabview_top.tab("耐久记录"), text="全部打开/关闭", checkbox_width=15, checkbox_height=15, corner_radius=0, onvalue=True, offvalue=False, variable=self.chk_box_v, width=20, font=self.f_segbtn_l, command=self.__all_or_none)
self.popupmenu_path_dd = tk.Menu(self.entry_path_dd, tearoff=False)
self.popupmenu_path_dd.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy_path(self.entry_path_dd))
# ========================================================================
self.text_output = ctk.CTkTextbox(self.tabview_bottom.tab("输出"), height=10, corner_radius=0, wrap="none", font=self.f_text)
# ========================================================================
self.label_logs = ctk.CTkLabel(self.tabview_bottom.tab("日志"), width=100, font=self.f_pager, textvariable=self.label_pages_logs, text_color="blue", bg_color="#DCDCDC", cursor="hand2")
self.btn_previous = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="上一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_previous_log)
self.btn_realtime = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="实时", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_realtime_log)
self.btn_next = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="下一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_next_log)
self.btn_load = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="加载", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__load_log_db)
self.btn_find = ctk.CTkButton(self.tabview_bottom.tab("日志"), text="查找", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, cursor="hand2")
self.entry_keyword = ctk.CTkEntry(self.tabview_bottom.tab("日志"), placeholder_text="[id/level/module] 查找内容", width=10, font=self.f_entry)
self.treeview_logs = ttk.Treeview(self.tabview_bottom.tab("日志"), height=1, columns=("id", "time", "level", "module", "content"), style="tv.Treeview", show="headings")
self.y_scrollbar_logs = tk.Scrollbar(self.tabview_bottom.tab("日志"), width=30, command=self.treeview_logs.yview)
self.popupmenu_kw = tk.Menu(self.entry_keyword, tearoff=False)
self.popupmenu_kw.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_keyword))
self.popupmenu_kw.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_keyword))
self.popupmenu_kw.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_keyword))
self.popupmenu_kw.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_keyword))
self.popupmenu_output = tk.Menu(self.text_output, tearoff=False)
self.popupmenu_output.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.text_output))
self.popupmenu_output.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.text_output))
self.popupmenu_output.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.text_output))
# ========================================================================
with open(f"{clibs.PREFIX}/files/version/version", mode="r", encoding="utf-8") as f:
local_vers = f.read().strip()
_version, _update = local_vers.split("@")
_text = f" v{_version} Update@{_update}"
self.label_vers = ctk.CTkLabel(self.frame_status, text=_text, bg_color="#C9C9C9", font=self.f_status, anchor="w", text_color="#4F4F4F")
var_tips = ctk.StringVar()
self.label_tips = ctk.CTkLabel(self.frame_status, textvariable=var_tips, compound="left", bg_color="#C9C9C9", font=self.f_status, anchor="e")
self.__auth_and_vercheck()
if local_vers == self.server_vers:
# if True:
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/updated.png"), size=(16, 16))
var_tips.set(" 当前是最新版本,继续保持! ")
self.label_tips.configure(text_color="#0D8A3D", image=image)
else:
if self.server_vers is None:
return
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/upgrade.png"), size=(16, 16))
var_tips.set(f" {self.server_vers.split('@')[0]}已经发布,尽快更新至最新版本! ")
self.label_tips.configure(text_color="#D81E06", image=image, cursor="hand2")
self.label_tips.bind("<Button-1>", self.__goto_update)
# ========================================================================
self.__draw()
@staticmethod
def __do_nothing(event):
...
@staticmethod
def __create_plot():
create_plot.main()
def __all_or_none(self):
if self.chk_box_v.get():
for widget in self.scrollable_frame.winfo_children():
widget.select()
else:
for widget in self.scrollable_frame.winfo_children():
widget.deselect()
def __get_curve_names(self):
curves = []
for widget in self.scrollable_frame.winfo_children():
if widget.get():
curves.append(widget.cget("text"))
return curves
def __copy_path(self, widget):
self.root.clipboard_clear()
self.root.clipboard_append(widget.get())
def __detect_network(self):
def func_access(state):
self.btn_robot_info.configure(state=state)
self.btn_trig_estop.configure(state=state)
self.btn_reset_estop.configure(state=state)
while True:
try:
if clibs.c_hr.status:
self.btn_conn.configure(fg_color="#2E8B57")
func_access("normal")
else:
self.btn_conn.configure(fg_color="#979DA2")
func_access("disabled")
except Exception:
self.btn_conn.configure(fg_color="#979DA2")
func_access("disabled")
time.sleep(2)
def __robot_info(self):
def get_robot_info():
self.tabview_bottom.set("输出")
# self.text_output.delete("1.0", "end")
self.__w2t("正在获取机器信息,请稍后...\n\n")
infos = {0: {}, 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {}, 14: {}, 15: {}, 16: {}, 17: {}, 18: {}, 19: {}, 20: {}}
msg_id, state = clibs.c_hr.execution("controller.get_params")
records = clibs.c_hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
data, activate = eval(record[0])["data"], []
infos[0]["robot_type"] = data["robot_type"]
infos[1]["controller_type"] = data["controller_type"]
infos[2]["disk_serial_number"] = data["disk_serial_number"]
infos[3]["mac_addr"] = data["mac_addr"]
infos[4]["security_type"] = data["security_type"]
infos[5]["xcore_version"] = data["version"]
auth_states = data["auth_state"]["function"]
for auth_state in auth_states:
if auth_state["auth"] == "activate":
activate.append(auth_state["name"])
infos[6]["activated"] = ", ".join(activate)
msg_id, state = clibs.c_hr.execution("device.get_params")
records = clibs.c_hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
data = eval(record[0])["data"]["devices"]
for item in data:
if item["type"] == 2:
infos[7]["security_version"] = item["version"]
elif item["type"] == 4:
infos[8]["communication_version"] = item["version"]
elif item["type"] == 6:
infos[9]["motion_control_version"] = item["version"]
elif item["type"] == 9:
infos[10]["end_firmware_version"] = item["version"]
elif item["type"] == 10:
infos[11]["robot_cfg_version"] = item["version"]
elif item["type"] == 11:
infos[12]["env_package_version"] = item["version"]
msg_id, state = clibs.c_hr.execution("state.get_state")
records = clibs.c_hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
data = eval(record[0])["data"]
infos[13]["rc_state"] = data["rc_state"]
infos[14]["engine"] = data["engine"]
infos[15]["servo_mode"] = data["servo_mode"]
infos[16]["operate"] = data["operate"]
infos[17]["task_space"] = data["task_space"]
infos[18]["robot_action"] = data["robot_action"]
infos[19]["safety_mode"] = data["safety_mode"]
msg_id, state = clibs.c_hr.execution("state.get_tp_mode")
records = clibs.c_hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
data = eval(record[0])["data"]
infos[20]["tp_mode"] = data["tp_mode"]
for idx in sorted(infos.keys()):
for k, v in infos[idx].items():
self.__w2t(f"{k}: {v}\n")
self.__thread_it(get_robot_info)
def __trig_estop(self):
def trig_estop():
self.tabview_bottom.set("输出")
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
clibs.c_md.r_soft_estop(0)
self.__w2t(f"{t} - 触发软急停信号已发送...\n")
except Exception as Err:
clibs.insert_logdb("WARNING", "aio", f"触发软急停失败 - {Err}")
return
try:
clibs.c_hr.execution("diagnosis.open", open=False, display_open=False, overrun=True, turn_area=True, delay_motion=False)
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=[], frequency=50, version="1.4.1")
except Exception as Err:
clibs.insert_logdb("WARNING", "aio", f"关闭诊断曲线失败 - {Err}")
if clibs.running > 20:
self.__w2t("程序已停止运行,!!执行过程中停止时需要清零后台数据!!:\n- 制动/电机电流测试大约一分钟左右\n- 耐久数据采集需至多等待一个轮次的时间\n\n下次提示出现之后才可重新运行,否则有可能会出现数据错乱的情况!!!\n", "red", "TerminateProgram")
clibs.running = 0
self.__thread_it(trig_estop)
def __reset_estop(self):
def reset_estop():
self.tabview_bottom.set("输出")
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
clibs.c_md.r_soft_estop(1)
clibs.c_md.r_clear_alarm()
self.__w2t(f"{t} - 解除软急停信号已发送...\n")
except Exception as Err:
clibs.insert_logdb("WARNING", "aio", f"触发解除软急停失败 - {Err}")
self.__thread_it(reset_estop)
@staticmethod
def __thread_it(func, *args):
""" 将函数打包进线程,必须使用 lambda """
t = threading.Thread(target=func, args=args)
t.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
t.start()
def __set_geometry(self, widget, width, height):
x = self.root.winfo_x()
y = self.root.winfo_y()
w_width = self.root.winfo_width()
w_height = self.root.winfo_height()
x_offset = x+(w_width-width)//2
y_offset = y+(w_height-height)//2
widget.geometry(f"{width}x{height}+{x_offset}+{y_offset}")
def __toplevel_progress(self, title):
toplevel = tk.Toplevel(self.root)
toplevel.title(title)
self.__set_geometry(toplevel, 400, 50)
self.root.attributes("-disabled", 1)
toplevel.protocol("WM_DELETE_WINDOW", print)
toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
toplevel.transient(self.root)
toplevel.grid_rowconfigure(0, weight=1)
toplevel.grid_columnconfigure(0, weight=1)
pb = ttk.Progressbar(toplevel, length=10, mode="indeterminate", orient="horizontal")
pb.grid(row=0, column=0, sticky="we")
pb.start(10)
while clibs.stop:
time.sleep(0.1)
self.text_output.update()
pb.update()
self.root.update()
else:
self.__w2t("\n")
self.root.attributes("-disabled", 0)
toplevel.destroy()
def __w2t(self, text, color="black", desc=None):
self.text_output.tag_config(tagName=color, foreground=color)
self.text_output.insert(ctk.END, text, tags=color)
self.text_output.see(ctk.END)
if desc:
clibs.running = 0
raise Exception(desc)
@staticmethod
def __is_running(operation):
if clibs.running > 0:
messagebox.showerror(title="处理中", message=f"有程序正在运行,需等待结束后,在执行{operation}操作!")
return "running"
def __program_start(self):
def get_data_dp():
data = {
"_main": self.om_main_dp.get(),
"_sub": self.om_sub_dp.get(),
"_path": self.entry_path_dp.get(),
"_vel": self.om_vel_dp.get(),
"_trq": self.om_trq_dp.get(),
"_trqh": self.om_trqh_dp.get(),
"_estop": self.om_estop_dp.get(),
"_sensor": self.om_sensor_dp.get(),
}
return data
def get_data_at():
data = {
"_main": self.om_main_at.get(),
"_sub": self.om_sub_at.get(),
"_path": self.entry_path_at.get()
}
return data
def get_data_dd():
data = {
"path": self.entry_path_ddv.get(),
"interval": self.entry_interval.get(),
"curves": self.__get_curve_names()
}
return data
def init_op():
if self.__is_running("开始") == "running":
return "running"
self.text_output.delete("1.0", ctk.END)
self.tabview_bottom.set("输出")
clibs.tl_prg = self.__toplevel_progress
if clibs.db_state != "readwrite":
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.btn_load.configure(fg_color="#979DA2")
self.__get_realtime_log()
def exec_function():
if init_op() == "running":
return
try:
if self.tabview_top.get() == "数据处理":
clibs.data_dp = get_data_dp()
eval(clibs.data_dp["_main"] + ".main()")
elif self.tabview_top.get() == "自动测试":
clibs.data_at = get_data_at()
eval("do_" + clibs.data_at["_main"] + ".main()")
elif self.tabview_top.get() == "耐久记录":
clibs.data_dd = get_data_dd()
factory_test.main()
except Exception as Err:
self.__w2t(f"程序执行过程中出现异常,{Err}\n", "red")
finally:
clibs.running = 0
clibs.stop = False
exec_function()
def __reset_state(self):
def reset_methods():
self.btn_load.configure(fg_color="#979DA2")
self.__reset_entry_keyword()
self.entry_ip_atv.set("192.168.0.160")
self.entry_path_atv.set("数据文件夹路径")
self.entry_path_atv.set("数据文件夹路径")
try:
clibs.c_hr.close()
clibs.c_md.close()
except Exception:
...
def do_reset():
if self.__is_running("重置") == "running":
return
if clibs.db_state == "readwrite":
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及日志记录,且不可恢复,请确认!", default=messagebox.NO, icon=messagebox.WARNING)
if res:
self.text_output.delete("1.0", ctk.END)
try:
clibs.lock.acquire(True)
clibs.cursor.execute("delete from logs")
clibs.cursor.execute("delete from sqlite_sequence") # 主键归零
except Exception:
...
finally:
clibs.lock.release()
self.treeview_logs.delete(*self.treeview_logs.get_children())
self.label_pages_logs.set("-.-.-.-.-.-")
reset_methods()
elif clibs.db_state == "readonly":
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及恢复本次的日志记录,请确认!", default=messagebox.NO, icon=messagebox.INFO)
if res:
self.text_output.delete("1.0", ctk.END)
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.__get_realtime_log()
reset_methods()
self.__thread_it(do_reset)
@clibs.db_lock
def __get_realtime_log(self):
clibs.cursor.execute("select count(id) from logs")
len_records = clibs.cursor.fetchone()[0]
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
self.treeview_logs.delete(*self.treeview_logs.get_children())
remainder = len_records % 100 if len_records % 100 != 0 else 100
end = len_records
start = len_records - remainder + 1 if len_records - remainder > 0 else 0
# clibs.cursor.execute("select * from logs order by id desc limit 100")
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(1)
self.__reset_entry_keyword()
@clibs.db_lock
def __get_previous_log(self):
if self.btn_find.cget("fg_color") == "#979DA2":
try:
row = self.treeview_logs.get_children()
first_id = self.treeview_logs.item(row[0], "values")[0]
end = int(first_id)-1 if int(first_id)-1 > 0 else None
start = int(first_id)-100 if int(first_id)-100 > 0 else 0
if end is not None:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
clibs.cursor.execute("select count(id) from logs")
len_records = clibs.cursor.fetchone()[0]
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1
self.label_pages_logs.set(f"{current_page} / {pages_all}")
except Exception:
...
else:
pages_all = self.label_pages_logs.get().split("/")[1].strip()
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
if current_page-1 > 0:
row = self.treeview_logs.get_children()
first_one = list(self.treeview_logs.item(row[0], "values"))
first_one[0] = int(first_one[0])
index_end = clibs.f_records.index(tuple(first_one))
self.treeview_logs.delete(*self.treeview_logs.get_children())
if index_end <= 100:
for record in clibs.f_records[:index_end]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
else:
for record in clibs.f_records[index_end-100:index_end]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
previous_page = current_page - 1
self.label_pages_logs.set(f"{previous_page} / {pages_all}")
def __get_next_log(self):
@clibs.db_lock
def get_next_page():
if self.btn_find.cget("fg_color") == "#979DA2":
try:
clibs.cursor.execute("select count(id) from logs")
len_records = clibs.cursor.fetchone()[0]
row = self.treeview_logs.get_children()
last_id = self.treeview_logs.item(row[-1], "values")[0]
start = int(last_id) + 1
end = int(last_id) + 100
if int(start) <= len_records:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
_pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
_current_page = int(start) // 100 if int(start) % 100 == 0 else int(start) // 100 + 1
self.label_pages_logs.set(f"{_current_page} / {_pages_all}")
except Exception:
...
else:
len_records = len(clibs.f_records)
_pages_all = int(self.label_pages_logs.get().split("/")[1].strip())
_current_page = int(self.label_pages_logs.get().split("/")[0].strip())
if _current_page < _pages_all:
row = self.treeview_logs.get_children()
last_one = list(self.treeview_logs.item(row[-1], "values"))
last_one[0] = int(last_one[0])
index_start = clibs.f_records.index(tuple(last_one))+1
self.treeview_logs.delete(*self.treeview_logs.get_children())
if index_start+100 <= len_records:
for record in clibs.f_records[index_start:index_start+100]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
else:
for record in clibs.f_records[index_start:]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
_next_page = _current_page + 1
self.label_pages_logs.set(f"{_next_page} / {_pages_all}")
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
pages_all = int(self.label_pages_logs.get().split("/")[1].strip())
if current_page == pages_all:
if self.btn_find.cget("fg_color") == "#979DA2":
self.__get_realtime_log()
return
get_next_page()
def __load_log_db(self):
if self.__is_running("加载") == "running":
return
db_file = filedialog.askopenfilename(title="加载数据库文件", defaultextension=".db", initialdir=f"{clibs.PREFIX}/logs")
if not db_file:
return
self.conn = clibs.conn
self.cursor = clibs.cursor
try:
clibs.conn = sqlite3.connect(f"file:{db_file}?mode=ro", uri=True, isolation_level=None, check_same_thread=False, cached_statements=2048)
clibs.cursor = clibs.conn.cursor()
clibs.cursor.execute("PRAGMA synchronous=normal")
clibs.cursor.execute("PRAGMA temp_store=memory")
clibs.cursor.execute("PRAGMA mmap_size=30000000000")
clibs.cursor.execute("PRAGMA cache_size=200000")
clibs.db_state = "readonly"
self.__get_realtime_log()
self.btn_load.configure(fg_color="#000080")
except Exception as Err:
clibs.insert_logdb("ERROR", "aio", f"load_log_db: 加载数据库失败,需确认 {db_file} 是否是有效数据库文件!\n{Err}")
messagebox.showerror(title="加载数据库失败", message=f"需确认 {db_file} 是否是有效数据库文件!")
def __main_switch_dp(self, event):
widgets = [self.om_sub_dp, self.om_vel_dp, self.om_trq_dp, self.om_trqh_dp, self.om_estop_dp, self.om_sensor_dp]
for widgets in widgets:
widgets.configure(state="disabled")
_func = self.om_main_dp.get()
if _func == "brake":
self.om_vel_dp.configure(state="normal")
self.om_trq_dp.configure(state="normal")
self.om_estop_dp.configure(state="normal")
elif _func == "current":
self.om_sub_dp.configure(state="normal")
self.om_vel_dp.configure(state="normal")
self.om_trq_dp.configure(state="normal")
self.om_trqh_dp.configure(state="normal")
self.om_sensor_dp.configure(state="normal")
elif _func == "iso" or _func == "wavelogger":
...
@staticmethod
def __goto_update(event):
webbrowser.open("https://10.2.20.216:10008/aio.zip")
def __auth_and_vercheck(self):
url_vers = "http://10.2.20.216:10008/version"
try:
self.server_vers = request.urlopen(url_vers, timeout=2).read().decode("utf-8").strip()
except Exception:
messagebox.showwarning(title="退出", message="无法连接至服务器.....")
clibs.cursor.close()
clibs.conn.close()
self.root.destroy()
def __set_root(self):
self.root.title("AIO - All in one automatic toolbox")
self.root.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
self.root.configure(fg_color="#E9E9E9")
self.root.geometry(f"1120x550+10+10")
self.root.minsize(1120, 550)
# self.root.resizable(False, False)
self.root.protocol("WM_DELETE_WINDOW", self.__close_root)
self.f_logo = ("Segoe Script Bold", 28, "bold")
self.f_normal = ("Consolas", 20, "bold")
self.f_segbtn = ("楷体", 20, "bold")
self.f_segbtn_l = ("楷体", 18, "bold")
self.f_common = ("Consolas", 18, "bold")
self.f_entry = ("Consolas", 16, "bold")
self.f_text = ("仿宋", 16, "normal")
self.f_treeview = ("Consolas", 16, "normal")
self.f_pager = ("Times", 16, "normal")
self.f_toplevel = ("Consolas", 14, "normal")
self.f_status = ("Consolas", 12, "bold")
@clibs.db_lock
def __quit_preparation(self):
os.chdir(clibs.log_path)
disk_conn = sqlite3.connect(f"log_{self.__t}.db", isolation_level=None, check_same_thread=False, cached_statements=256)
disk_cursor = disk_conn.cursor()
if clibs.db_state == "readonly":
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.__reset_entry_keyword()
self.btn_load.configure(fg_color="#979DA2")
clibs.conn.backup(target=disk_conn, pages=1, progress=None)
_, logs = clibs.traversal_files(".", self.__w2t)
logs.sort()
while len(logs) > 10:
_ = logs.pop(0)
os.remove(_)
disk_cursor.close()
disk_conn.close()
clibs.cursor.close()
clibs.conn.close()
self.root.destroy()
def __reset_entry_keyword(self):
self.btn_find.configure(fg_color="#979DA2")
self.entry_keyword.delete(0, "end")
self.entry_keyword.configure(placeholder_text="[id/level/module] 查找内容")
self.root.focus_set()
def __close_root(self):
msg = "相关数据可能未保存,正在运行程序时有概率会损坏数据文件,确定要终止程序运行吗?"
res = messagebox.askyesno(title="退出程序", message=msg, default=messagebox.NO, icon=messagebox.WARNING)
if res:
self.__quit_preparation()
def __switch_tab_top(self):
tab_name = self.tabview_top.get()
if tab_name == "数据处理" or tab_name == "自动测试":
self.tabview_top.configure(height=180)
elif tab_name == "耐久记录":
self.tabview_top.configure(height=330)
def __switch_tab_bottom(self):
if self.tabview_bottom.get() == "日志":
self.__get_realtime_log()
elif self.tabview_bottom.get() == "输出":
self.text_output.see(ctk.END)
def __cut(self, widget):
try:
selected = widget.selection_get()
all_text = widget.get()
widget.clipboard_clear()
widget.clipboard_append(selected)
index_start = all_text.find(selected)
index_end = index_start + len(selected)
widget.delete(index_start, index_end)
except Exception:
self.__copy(widget)
widget.delete(ctk.SEL_FIRST, ctk.SEL_LAST)
@staticmethod
def __paste(widget):
try:
copy_text = widget.clipboard_get()
widget.insert("insert", copy_text)
selected = widget.selection_get()
all_text = widget.get()
index_start = all_text.find(selected)
index_end = index_start + len(selected)
widget.delete(index_start, index_end)
except Exception:
...
@staticmethod
def __copy(widget):
try:
widget.clipboard_clear()
copy_text = widget.selection_get()
widget.clipboard_append(copy_text)
except Exception:
...
@staticmethod
def __select(widget):
widget.select_range(0, "end")
def __draw(self):
def select_path(event):
t_name = self.tabview_top.get()
dir_path = filedialog.askdirectory()
if t_name == "数据处理":
c_origin = self.entry_path_dpv.get()
if dir_path:
self.entry_path_dpv.set(dir_path)
else:
self.entry_path_dpv.set(c_origin)
elif t_name == "自动测试":
c_origin = self.entry_path_atv.get()
if dir_path:
self.entry_path_atv.set(dir_path)
else:
self.entry_path_atv.set(c_origin)
elif t_name == "耐久记录":
c_origin = self.entry_path_ddv.get()
if dir_path:
self.entry_path_ddv.set(dir_path)
else:
self.entry_path_ddv.set(c_origin)
def esc_quit_log_tl(event):
widget_toplevel = event.widget
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def esc_quit_log_text(event):
widget_toplevel = event.widget.master.master
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def double_click(event):
try:
e = event.widget
iid = e.identify("item", event.x, event.y)
_id, _ts, _level, _module, _content = e.item(iid, "values")
toplevel = tk.Toplevel(self.root)
toplevel.title(f"LogID: {_id}")
self.__set_geometry(toplevel, 1000, 400)
toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
# self.root.after(200, lambda: toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico"))
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
toplevel.bind("<Escape>", esc_quit_log_tl)
toplevel.transient(self.root)
toplevel.focus()
_text = f"{_level} log in {_module} @ {_ts}"
label_info = ctk.CTkLabel(toplevel, text=_text, width=10, fg_color="#5F9EA0", corner_radius=5, font=self.f_treeview)
text_content = ctk.CTkTextbox(toplevel, width=10, font=self.f_toplevel)
text_content.insert(ctk.END, repr(_content))
text_content.configure(state="disabled")
text_content.bind("<Escape>", esc_quit_log_text)
toplevel.grid_rowconfigure(1, weight=1)
toplevel.columnconfigure(0, weight=1)
label_info.grid(row=0, column=0, padx=10, pady=10, sticky="we")
text_content.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="news")
self.root.attributes("-disabled", 1)
except Exception:
return
@clibs.db_lock
def jump2page():
try:
number = int(self.entry_tips_v)
if number > 0:
start = number * 100 - 99
end = number * 100
clibs.cursor.execute("select count(id) from logs")
len_records = clibs.cursor.fetchone()[0]
if start <= len_records:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(0)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{number} / {pages_all}")
except Exception:
...
def close_toplevel(tl):
self.entry_tips_v = None
self.root.attributes("-disabled", 0)
tl.destroy()
def enter_quit(event):
widget_entry = event.widget
widget_toplevel = event.widget.master.master
self.entry_tips_v = widget_entry.get()
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
jump2page()
def esc_quit_page(event):
widget_toplevel = event.widget
self.entry_tips_v = None
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def select_page(event):
self.root.attributes("-disabled", 1)
toplevel = tk.Toplevel(self.root)
toplevel.title("跳转")
self.__set_geometry(toplevel, 500, 150)
toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
toplevel.bind("<Escape>", esc_quit_page)
toplevel.focus()
label_tips = ctk.CTkLabel(toplevel, width=100, text="页面号码", font=self.f_segbtn)
entry_tips = ctk.CTkEntry(toplevel, width=200, placeholder_text="Fill and press enter", font=self.f_toplevel)
entry_tips.bind("<Return>", enter_quit, add="+")
toplevel.rowconfigure(0, weight=1)
toplevel.columnconfigure([0, 1], weight=1)
label_tips.grid(row=0, column=0, padx=10, pady=10, sticky="w")
entry_tips.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="w")
@clibs.db_lock
def find_log(event):
def find_error(cdt):
clibs.insert_logdb("WARNING", "aio", f"查询条件 [{cdt}] 书写规则错误!")
msg = "可单独查询 ID/Level/Module也可以指定 ID 范围内查询指定的 Levels(多个条件使用冒号分割仅限debug, info, warning, error的其中之一或组合]) 和(或) Modules(多个条件使用冒号分割),且当同时查询 Levels 和 Modules 时Levels 位置在前!\n"
messagebox.showwarning(title="查询条件错误", message=msg)
return
kw = self.entry_keyword.get().strip()
if not kw:
return
match = re.search("^\\[.*]", kw)
if match:
condition = match.group().removeprefix("[").removesuffix("]").strip()
f_text_isnull = kw.removeprefix(match.group()).strip()
f_text = f"%{f_text_isnull}%"
if "-" in condition:
# PreDOs
conditions = condition.strip().split("-")
start = conditions[0].strip()
end = conditions[1].strip()
if not (start.isdigit() and end.isdigit()):
clibs.insert_logdb("WARNING", "aio", f"查询条件 {condition} 书写规则错误:指定 ID 范围参数非数字,需核对!")
messagebox.showwarning(title="查询条件错误", message="指定 ID 范围参数非数字,需核对!")
return
if len(conditions) == 2: # id range only
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and content like '{f_text}'")
elif len(conditions) == 3: # with id range, and level or module part
c_unknown = conditions[2].strip()
if ":" in c_unknown: # multi conditions
is_level = True
c_unknowns = c_unknown.split(":")
for c_unknown in c_unknowns:
if c_unknown.strip().upper() not in clibs.levels:
is_level = False
break
if is_level: # levels
levels = tuple(level.strip().upper() for level in c_unknowns)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and content like '{f_text}'")
else: # modules
modules = tuple(module.strip() for module in c_unknowns)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules} and content like '{f_text}'")
else: # single condition
if c_unknown.upper() in clibs.levels: # the third one is level
level = c_unknown.upper()
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and content like '{f_text}'")
else: # the third one is module
module = c_unknown
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}' and content like '{f_text}'")
elif len(conditions) == 4: # with id range, level and module
_levels = conditions[2].strip()
_modules = conditions[3].strip()
if ":" in _levels and ":" in _modules:
levels = tuple(level.strip().upper() for level in _levels.split(":"))
modules = tuple(module.strip() for module in _modules.split(":"))
for level in levels:
if level not in clibs.levels:
find_error(condition)
break
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules} and content like '{f_text}'")
elif ":" in _levels and ":" not in _modules:
levels = tuple(level.strip().upper() for level in _levels.split(":"))
module = _modules
for level in levels:
if level not in clibs.levels:
find_error(condition)
break
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}' and content like '{f_text}'")
elif ":" not in _levels and ":" in _modules:
level = _levels.upper()
modules = tuple(module.strip() for module in _modules.split(":"))
if level not in clibs.levels:
find_error(condition)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules} and content like '{f_text}'")
elif ":" not in _levels and ":" not in _modules:
level = _levels.upper()
module = _modules
if level not in clibs.levels:
find_error(condition)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}' and content like '{f_text}'")
else:
find_error(condition)
elif condition.isdigit():
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where id = {int(condition)}")
else:
clibs.cursor.execute(f"select * from logs where id = {int(condition)} and content like '{f_text}'")
elif condition.upper() in clibs.levels:
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}'")
else:
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}' and content like '{f_text}'")
else: # 模块名
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where module = '{condition}'")
else:
clibs.cursor.execute(f"select * from logs where module = '{condition}' and content like '{f_text}'")
else:
f_text = f"%{kw}%"
clibs.cursor.execute(f"select * from logs where content like '{f_text}'")
clibs.f_records = clibs.cursor.fetchall()
len_records = len(clibs.f_records)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
self.treeview_logs.delete(*self.treeview_logs.get_children())
remainder = len_records % 100
records = clibs.f_records[-1*remainder:]
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(1)
self.btn_find.configure(fg_color="#000080")
def show_popupmenu_kw(event):
self.popupmenu_kw.post(event.x_root, event.y_root)
def show_popupmenu_ip(event):
self.popupmenu_ip.post(event.x_root, event.y_root)
def show_popupmenu_output(event):
self.popupmenu_output.post(event.x_root, event.y_root)
def show_popupmenu_path(event):
if self.tabview_top.get() == "数据处理":
self.popupmenu_path_dp.post(event.x_root, event.y_root)
elif self.tabview_top.get() == "自动测试":
self.popupmenu_path_at.post(event.x_root, event.y_root)
elif self.tabview_top.get() == "耐久记录":
self.popupmenu_path_dd.post(event.x_root, event.y_root)
def conn_change(event):
def conn_or_disconn():
if self.btn_conn.cget("fg_color") == "#979DA2":
self.btn_conn.configure(state="disabled")
clibs.ip_addr = self.entry_ip_atv.get().strip()
ip_pattern = re.compile(r"(([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])")
if not ip_pattern.fullmatch(clibs.ip_addr):
messagebox.showerror(title="非法地址", message=f"{clibs.ip_addr} 不是一个有效的 IP 地址")
clibs.insert_logdb("ERROR", "aio", f"connection: {clibs.ip_addr} 不是一个有效的 IP 地址")
return
try:
clibs.c_md = openapi.ModbusRequest(clibs.ip_addr, clibs.modbus_port)
clibs.c_hr = openapi.HmiRequest(clibs.ip_addr, clibs.socket_port, clibs.xService_port)
clibs.c_pd = openapi.PreDos(clibs.ip_addr, clibs.ssh_port, clibs.username, clibs.password)
self.btn_conn.configure(state="normal", fg_color="#2E8B57")
except Exception:
self.btn_conn.configure(state="normal", fg_color="#979DA2")
elif self.btn_conn.cget("fg_color") == "#2E8B57":
self.btn_conn.configure(state="disabled")
try:
clibs.c_md.close()
clibs.c_hr.close()
except Exception:
...
finally:
self.btn_conn.configure(state="normal", fg_color="#979DA2")
self.__thread_it(conn_or_disconn)
def pending_a_while(event):
def pending():
self.btn_start.configure(state="disabled")
time.sleep(2)
self.btn_start.configure(state="normal")
self.__thread_it(pending)
def change_sub_at(event):
def detect_change():
if self.om_main_at.get() == "current":
self.om_sub_at.set("tool100")
self.root.after(200, detect_change)
# ========================================================================
self.root.rowconfigure(0, weight=1)
self.root.rowconfigure(1, weight=99)
self.root.columnconfigure(0, weight=1)
self.root.columnconfigure(1, weight=99)
self.frame_left.grid(row=0, column=0, rowspan=2, sticky="news")
self.tabview_top.grid(row=0, column=1, padx=10, pady=5, sticky="news")
self.tabview_bottom.grid(row=1, column=1, padx=10, pady=(0, 10), sticky="news")
self.frame_status.grid(row=2, column=0, columnspan=2, sticky="news")
# ========================================================================
self.frame_left.rowconfigure(list(range(30)), weight=1)
self.frame_left.columnconfigure(0, weight=1)
self.label_logo.grid(row=0, column=0, sticky="new", padx=20, pady=(20, 40))
self.btn_start.grid(row=1, column=0, sticky="new", padx=15, pady=1, ipady=4)
self.btn_reset_state.grid(row=2, column=0, sticky="new", padx=15, pady=(0, 1), ipady=4)
self.btn_start.bind("<Button-1>", pending_a_while)
# ========================================================================
# self.tabview_top.tab("数据处理").grid_rowconfigure([0, 1], weight=1)
self.tabview_top.tab("数据处理").grid_columnconfigure(11, weight=1)
self.om_main_dp.grid(row=0, column=0, padx=10)
self.om_sub_dp.grid(row=1, column=0, padx=10, pady=(0, 10))
self.label_path_dp.grid(row=0, column=1, padx=(0, 5), pady=10)
self.entry_path_dp.grid(row=0, column=2, columnspan=10, padx=(0, 10), pady=10, sticky="we")
self.label_vel_dp.grid(row=1, column=1, padx=(0, 5), pady=(0, 10))
self.om_vel_dp.grid(row=1, column=2, padx=(0, 5), pady=(0, 10))
self.label_trq_dp.grid(row=1, column=3, padx=(0, 5), pady=(0, 10))
self.om_trq_dp.grid(row=1, column=4, padx=(0, 5), pady=(0, 10))
self.label_trqh_dp.grid(row=1, column=5, padx=(0, 5), pady=(0, 10))
self.om_trqh_dp.grid(row=1, column=6, padx=(0, 5), pady=(0, 10))
self.label_estop_dp.grid(row=1, column=7, padx=(0, 5), pady=(0, 10))
self.om_estop_dp.grid(row=1, column=8, padx=(0, 5), pady=(0, 10))
self.label_sensor_dp.grid(row=1, column=9, padx=(0, 5), pady=(0, 10))
self.om_sensor_dp.grid(row=1, column=10, padx=(0, 10), pady=(0, 10))
self.entry_path_dp.bind("<Button-1>", select_path)
self.entry_path_dp.bind("<Button-3>", show_popupmenu_path, add="+")
self.om_vel_dp.set("1")
self.om_trq_dp.set("2")
self.om_trqh_dp.set("2")
self.om_estop_dp.set("3")
self.om_sensor_dp.set("3")
# ========================================================================
# self.tabview_top.tab("自动测试").grid_rowconfigure([0, 1, 2], weight=1)
self.tabview_top.tab("自动测试").grid_columnconfigure(4, weight=1)
self.om_main_at.grid(row=0, column=0, padx=10, pady=10, sticky="w")
self.om_sub_at.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="w")
self.label_ip_at.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="e")
self.entry_ip_at.grid(row=0, column=2, padx=(0, 10), pady=10)
self.btn_conn.grid(row=0, column=3, padx=(0, 10), pady=10)
self.label_path_at.grid(row=1, column=1, padx=(0, 10), pady=(0, 10), sticky="e")
self.entry_path_at.grid(row=1, column=2, columnspan=3, padx=(0, 10), pady=(0, 10), sticky="we")
self.frame_top.grid(row=2, column=0, columnspan=5, padx=0, pady=0, sticky="we")
self.btn_robot_info.grid(row=0, column=0, padx=10, pady=0)
self.btn_trig_estop.grid(row=0, column=1, padx=(0, 10), pady=0)
self.btn_reset_estop.grid(row=0, column=2, padx=(0, 10), pady=0)
self.om_main_at.bind("<Button-1>", change_sub_at)
self.entry_path_at.bind("<Button-1>", select_path)
self.entry_path_at.bind("<Button-3>", show_popupmenu_path, add="+")
self.entry_ip_at.bind("<Button-3>", show_popupmenu_ip, add="+")
self.btn_conn.bind("<Button-1>", conn_change)
self.btn_conn.bind("<Double-1>", self.__do_nothing, add="+")
# ========================================================================
self.tabview_top.tab("耐久记录").grid_columnconfigure(2, weight=1)
self.scrollable_frame.grid(row=0, column=0, rowspan=30, padx=10, pady=0)
self.switch_1.grid(row=0, column=0, padx=10, sticky="w")
self.switch_2.grid(row=1, column=0, padx=10, sticky="w")
self.label_path_dd.grid(row=0, column=1, padx=10, pady=0, sticky="w")
self.entry_path_dd.grid(row=0, column=2, padx=(0, 10), pady=0, sticky="we")
self.label_interval.grid(row=1, column=1, padx=10, pady=0, sticky="w")
self.entry_interval.grid(row=1, column=2, padx=(0, 10), pady=0, sticky="we")
self.btn_plot.grid(row=2, column=1, padx=10, pady=(5, 0), sticky="w")
self.chk_box.grid(row=2, column=2, padx=(0, 10), pady=(5, 0), sticky="e")
self.entry_path_dd.bind("<Button-1>", select_path)
self.entry_path_dd.bind("<Button-3>", show_popupmenu_path, add="+")
# ========================================================================
self.tabview_bottom.tab("输出").grid_rowconfigure(0, weight=1)
self.tabview_bottom.tab("输出").grid_columnconfigure(0, weight=1)
self.text_output.grid(row=0, column=0, sticky="news")
self.text_output.bind("<Button-3>", show_popupmenu_output, add="+")
# ========================================================================
self.tabview_bottom.tab("日志").grid_rowconfigure(0, weight=1)
self.tabview_bottom.tab("日志").grid_columnconfigure(6, weight=1)
self.treeview_logs.grid(row=0, column=0, columnspan=7, padx=(10, 0), pady=10, sticky="news")
self.y_scrollbar_logs.grid(row=0, column=7, pady=10, sticky="ns")
self.label_logs.grid(row=1, column=0, padx=(0, 10), pady=5, sticky="we")
self.btn_previous.grid(row=1, column=1, padx=(0, 10), pady=5, sticky="we")
self.btn_realtime.grid(row=1, column=2, padx=(0, 10), pady=5, sticky="we")
self.btn_next.grid(row=1, column=3, padx=(0, 10), pady=5, sticky="we")
self.btn_load.grid(row=1, column=4, padx=(0, 10), pady=5, sticky="we")
self.btn_find.grid(row=1, column=5, padx=(0, 10), pady=5, sticky="we")
self.entry_keyword.grid(row=1, column=6, columnspan=2, padx=0, pady=5, sticky="we")
self.treeview_logs.configure(yscrollcommand=self.y_scrollbar_logs.set)
self.treeview_logs.tag_configure("DEBUG", background="#708090")
self.treeview_logs.tag_configure("INFO", background="#D8F9D2") # #43CD80 F5F5F5
self.treeview_logs.tag_configure("WARNING", background="#EEE8AA")
self.treeview_logs.tag_configure("ERROR", background="#CD5C5C")
for k, v in self.tv_cols.items():
self.treeview_logs.heading(f"#{k}", text=v[0].title(), anchor="w")
self.treeview_logs.column(f"#{k}", anchor="w", width=v[1])
self.treeview_logs.bind("<Double-1>", double_click)
self.label_logs.bind("<Button-1>", select_page)
self.btn_find.bind("<Button-1>", find_log)
self.entry_keyword.bind("<Return>", find_log)
self.entry_keyword.bind("<Button-3>", show_popupmenu_kw, add="+")
# ========================================================================
self.frame_status.rowconfigure(0, weight=1)
self.frame_status.columnconfigure([0, 1], weight=1)
self.label_vers.grid(row=0, column=0, sticky="news")
self.label_tips.grid(row=0, column=1, sticky="news")
self.om_sub_dp.configure(state="disabled")
self.om_trqh_dp.configure(state="disabled")
self.om_sensor_dp.configure(state="disabled")
# ========================================================================
clibs.w2t = self.__w2t
clibs.insert_logdb("INFO", "aio", "AIO starts running......")
def show(self):
if self.server_vers:
# if True:
self.__thread_it(self.__detect_network)
self.root.mainloop()
if __name__ == "__main__":
aio = App()
aio.show()