998 lines
56 KiB
Python
998 lines
56 KiB
Python
# from tkinter import colorchooser
|
||
# from tkinter import font
|
||
|
||
import time
|
||
import customtkinter as ctk
|
||
from tkinter import messagebox
|
||
from tkinter import filedialog
|
||
from tkinter import ttk
|
||
import tkinter as tk
|
||
from urllib import request
|
||
from PIL import Image
|
||
import webbrowser
|
||
import sqlite3
|
||
from datetime import datetime
|
||
import os
|
||
from common import clibs, openapi
|
||
from data_process import current, brake, iso, wavelogger
|
||
import threading
|
||
import re
|
||
|
||
|
||
class App:
|
||
def __init__(self):
|
||
# ========================================================================
|
||
self.root = ctk.CTk()
|
||
self.__set_root()
|
||
# ========================================================================
|
||
self.style = ttk.Style()
|
||
self.style.configure("tv.Treeview", font=self.f_treeview, rowheight=30)
|
||
self.style.configure("Treeview.Heading", font=self.f_normal, rowheight=30)
|
||
self.entry_path_dpv = ctk.StringVar()
|
||
self.entry_path_dpv.set("数据文件夹路径")
|
||
self.entry_path_atv = ctk.StringVar()
|
||
self.entry_path_atv.set("数据文件夹路径")
|
||
self.entry_ip_atv = ctk.StringVar()
|
||
self.entry_ip_atv.set("192.168.0.160")
|
||
self.label_pages_logs = ctk.StringVar()
|
||
self.label_pages_logs.set("-.-.-.-.-.-")
|
||
self.entry_tips_v = None
|
||
self.server_vers = None
|
||
self.tv_cols = {1: ["ID", 1], 2: ["time", 160], 3: ["level", 25], 4: ["module", 30], 5: ["content", 700]}
|
||
|
||
# ========================================================================
|
||
self.frame_left = ctk.CTkFrame(self.root, width=80, corner_radius=0, fg_color="#E9E9E9")
|
||
# -------
|
||
self.tabview_top = ctk.CTkTabview(self.root, width=900, height=180, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_top)
|
||
self.tabview_top.add("数据处理")
|
||
self.tabview_top.add("自动测试")
|
||
# -------
|
||
self.tabview_bottom = ctk.CTkTabview(self.root, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_bottom)
|
||
self.tabview_bottom.add("输出")
|
||
self.tabview_bottom.add("日志")
|
||
# -------
|
||
self.frame_status = ctk.CTkFrame(self.root, height=30)
|
||
# ========================================================================
|
||
self.label_logo = ctk.CTkLabel(self.frame_left, text="Rokae AIO", font=self.f_logo, text_color="#4F4F4F")
|
||
self.btn_start = ctk.CTkButton(self.frame_left, text="开始运行", font=self.f_normal, fg_color="#4F4F4F", command=lambda: self.__thread_it(self.__program_start))
|
||
self.btn_reset_state = ctk.CTkButton(self.frame_left, text="重置状态", font=self.f_normal, fg_color="#4F4F4F", command=self.__reset_state)
|
||
# ========================================================================
|
||
self.om_main_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["brake", "current", "iso", "wavelogger"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", command=self.__main_switch_dp)
|
||
self.om_sub_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["cycle", "max", "avg"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
|
||
self.label_path_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Path", font=self.f_common)
|
||
self.entry_path_dp = ctk.CTkEntry(self.tabview_top.tab("数据处理"), width=80, state="disabled", textvariable=self.entry_path_dpv, font=self.f_entry, text_color="#818181")
|
||
self.label_vel_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Vel", font=self.f_common)
|
||
self.om_vel_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
|
||
self.label_trq_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Trq", font=self.f_common)
|
||
self.om_trq_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
|
||
self.label_trqh_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="TrqH", font=self.f_common)
|
||
self.om_trqh_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
|
||
self.label_sensor_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Sensor", font=self.f_common)
|
||
self.om_sensor_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
|
||
self.label_estop_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Estop", font=self.f_common)
|
||
self.om_estop_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
|
||
# ========================================================================
|
||
self.om_main_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["brake", "current"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
|
||
self.om_sub_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["tool33", "tool66", "tool100", "inertia"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
|
||
self.label_ip_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), anchor="e", text="IP", font=self.f_common)
|
||
self.entry_ip_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=160, textvariable=self.entry_ip_atv, font=self.f_entry, text_color="#818181")
|
||
self.btn_conn = ctk.CTkButton(self.tabview_top.tab("自动测试"), text="连接", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0)
|
||
# self.progressbar_at = ctk.CTkProgressBar(self.tabview_top.tab("自动测试"), width=160, mode="indeterminate")
|
||
self.label_path_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), width=50, anchor="e", text="Path", font=self.f_common)
|
||
self.entry_path_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=80, state="disabled", textvariable=self.entry_path_atv, font=self.f_entry, text_color="#818181")
|
||
self.frame_top = ctk.CTkFrame(self.tabview_top.tab("自动测试"), width=120, height=10, fg_color="#E9E9E9")
|
||
self.btn_robot_state = ctk.CTkButton(self.frame_top, width=100, text="机器信息", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__robot_info)
|
||
self.btn_robot_init = ctk.CTkButton(self.frame_top, width=100, text="初始操作", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__robot_init)
|
||
self.btn_trig_estop = ctk.CTkButton(self.frame_top, width=100, text="触发急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__trig_estop)
|
||
self.btn_reset_estop = ctk.CTkButton(self.frame_top, width=100, text="恢复急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__reset_estop)
|
||
self.popupmenu_ip = tk.Menu(self.entry_ip_at, tearoff=False)
|
||
self.popupmenu_ip.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_ip_at))
|
||
self.popupmenu_ip.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_ip_at))
|
||
self.popupmenu_ip.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_ip_at))
|
||
self.popupmenu_ip.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_ip_at))
|
||
# ========================================================================
|
||
self.text_output = ctk.CTkTextbox(self.tabview_bottom.tab("输出"), height=10, corner_radius=0, wrap="none", font=self.f_text)
|
||
# ========================================================================
|
||
self.label_logs = ctk.CTkLabel(self.tabview_bottom.tab("日志"), width=80, font=self.f_pager, textvariable=self.label_pages_logs, text_color="blue", bg_color="#DCDCDC")
|
||
self.btn_previous = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="上一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_previous_log)
|
||
self.btn_realtime = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="实时", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_realtime_log)
|
||
self.btn_next = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="下一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_next_log)
|
||
self.btn_load = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="加载", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__load_log_db)
|
||
self.btn_find = ctk.CTkButton(self.tabview_bottom.tab("日志"), text="查找", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0)
|
||
self.entry_keyword = ctk.CTkEntry(self.tabview_bottom.tab("日志"), placeholder_text="[id/level/module] 查找内容", width=60, font=self.f_entry)
|
||
self.treeview_logs = ttk.Treeview(self.tabview_bottom.tab("日志"), height=1, columns=("id", "time", "level", "module", "content"), style="tv.Treeview", show="headings")
|
||
self.y_scrollbar_logs = tk.Scrollbar(self.tabview_bottom.tab("日志"), width=30, command=self.treeview_logs.yview)
|
||
self.popupmenu_kw = tk.Menu(self.entry_keyword, tearoff=False)
|
||
self.popupmenu_kw.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_keyword))
|
||
self.popupmenu_kw.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_keyword))
|
||
self.popupmenu_kw.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_keyword))
|
||
self.popupmenu_kw.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_keyword))
|
||
# ========================================================================
|
||
with open(f"{clibs.PREFIX}/files/version/vers", mode="r", encoding="utf-8") as f:
|
||
local_vers = f.read().strip()
|
||
_version, _update = local_vers.split("@")
|
||
_text = f" v{_version} Update@{_update}"
|
||
self.label_vers = ctk.CTkLabel(self.frame_status, text=_text, bg_color="#C9C9C9", font=self.f_status, anchor="w", text_color="#4F4F4F")
|
||
|
||
var_tips = ctk.StringVar()
|
||
self.label_tips = ctk.CTkLabel(self.frame_status, textvariable=var_tips, compound="left", bg_color="#C9C9C9", font=self.f_status, anchor="e")
|
||
self.__auth_and_vercheck()
|
||
if local_vers == self.server_vers:
|
||
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/updated.png"), size=(16, 16))
|
||
var_tips.set(" 当前是最新版本,继续保持! ")
|
||
self.label_tips.configure(text_color="#0D8A3D", image=image)
|
||
else:
|
||
if self.server_vers is None:
|
||
return
|
||
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/upgrade.png"), size=(16, 16))
|
||
var_tips.set(f" {self.server_vers.split("@")[0]}已经发布,尽快更新至最新版本! ")
|
||
self.label_tips.configure(text_color="#D81E06", image=image, cursor="hand2")
|
||
self.label_tips.bind("<Button-1>", self.__goto_update)
|
||
|
||
# ========================================================================
|
||
self.__draw()
|
||
|
||
def __detect_network(self):
|
||
while True:
|
||
try:
|
||
if clibs.c_hr.status:
|
||
self.btn_conn.configure(fg_color="#2E8B57")
|
||
else:
|
||
self.btn_conn.configure(fg_color="#979DA2")
|
||
except Exception as Err:
|
||
self.btn_conn.configure(fg_color="#979DA2")
|
||
time.sleep(3)
|
||
|
||
def __robot_info(self):
|
||
...
|
||
|
||
def __robot_init(self):
|
||
...
|
||
|
||
def __trig_estop(self):
|
||
...
|
||
|
||
def __reset_estop(self):
|
||
...
|
||
|
||
def __thread_it(self, func, *args):
|
||
""" 将函数打包进线程,必须使用 lambda """
|
||
self.myThread = threading.Thread(target=func, args=args)
|
||
self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
|
||
self.myThread.start()
|
||
|
||
def __set_geometry(self, widget, width, height):
|
||
x = self.root.winfo_x()
|
||
y = self.root.winfo_y()
|
||
w_width = self.root.winfo_width()
|
||
w_height = self.root.winfo_height()
|
||
x_offset = x+(w_width-width)//2
|
||
y_offset = y+(w_height-height)//2
|
||
widget.geometry(f"{width}x{height}+{x_offset}+{y_offset}")
|
||
|
||
def __toplevel_progress(self, title):
|
||
toplevel = tk.Toplevel(self.root)
|
||
toplevel.title(title)
|
||
self.__set_geometry(toplevel, 400, 50)
|
||
self.root.attributes("-disabled", 1)
|
||
toplevel.protocol("WM_DELETE_WINDOW", print)
|
||
# toplevel.resizable(False, False)
|
||
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
|
||
toplevel.transient(self.root)
|
||
|
||
toplevel.grid_rowconfigure(0, weight=1)
|
||
toplevel.grid_columnconfigure(0, weight=1)
|
||
pb = ttk.Progressbar(toplevel, length=10, mode="indeterminate", orient="horizontal")
|
||
pb.grid(row=0, column=0, sticky="we")
|
||
pb.start(10)
|
||
while clibs.stop:
|
||
time.sleep(0.1)
|
||
self.text_output.update()
|
||
pb.update()
|
||
self.root.update()
|
||
else:
|
||
self.__w2t("\n")
|
||
self.root.attributes("-disabled", 0)
|
||
toplevel.destroy()
|
||
|
||
def __w2t(self, text, color="black", desc=None):
|
||
self.text_output.tag_config(tagName=color, foreground=color)
|
||
self.text_output.insert(ctk.END, text, tags=color)
|
||
self.text_output.see(ctk.END)
|
||
if desc:
|
||
clibs.running = False
|
||
raise Exception(desc)
|
||
|
||
def __program_start(self):
|
||
def get_data_dp():
|
||
data = {
|
||
"_main": self.om_main_dp.get(),
|
||
"_sub": self.om_sub_dp.get(),
|
||
"_path": self.entry_path_dp.get(),
|
||
"_vel": self.om_vel_dp.get(),
|
||
"_trq": self.om_trq_dp.get(),
|
||
"_trqh": self.om_trqh_dp.get(),
|
||
"_estop": self.om_estop_dp.get(),
|
||
"_sensor": self.om_sensor_dp.get(),
|
||
}
|
||
return data
|
||
|
||
def get_data_at():
|
||
data = {
|
||
"_main": self.om_main_at.get(),
|
||
"_sub": self.om_sub_at.get(),
|
||
"_path": self.entry_path_at.get()
|
||
}
|
||
return data
|
||
|
||
def init_op():
|
||
self.text_output.delete("1.0", ctk.END)
|
||
self.tabview_bottom.set("输出")
|
||
if clibs.db_state != "readwrite":
|
||
self.text_output.delete("1.0", ctk.END)
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
clibs.conn = self.conn
|
||
clibs.cursor = self.cursor
|
||
clibs.db_state = "readwrite"
|
||
self.btn_load.configure(fg_color="#979DA2")
|
||
self.__get_realtime_log()
|
||
clibs.tl_prg = self.__toplevel_progress
|
||
# TBD: something signifies status of program and terminate some thread ect.
|
||
|
||
def exec_function():
|
||
init_op()
|
||
if self.tabview_top.get() == "数据处理":
|
||
if not clibs.running:
|
||
clibs.running = True
|
||
clibs.data_dp = get_data_dp()
|
||
eval(clibs.data_dp["_main"] + ".main()")
|
||
clibs.running = False
|
||
else:
|
||
messagebox.showinfo(title="进行中...", message="当前有程序正在运行!")
|
||
elif self.tabview_top.get() == "自动测试":
|
||
clibs.running = True
|
||
clibs.data_at = get_data_at()
|
||
eval(clibs.data_at["_main"] + ".main()")
|
||
clibs.running = False
|
||
|
||
exec_function()
|
||
|
||
def __reset_state(self):
|
||
def reset_methods():
|
||
self.btn_load.configure(fg_color="#979DA2")
|
||
self.__reset_entry_keyword()
|
||
self.entry_ip_atv.set("192.168.0.160")
|
||
self.entry_path_atv.set("数据文件夹路径")
|
||
self.entry_path_atv.set("数据文件夹路径")
|
||
|
||
if clibs.db_state == "readwrite":
|
||
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及日志记录,且不可恢复,请确认!", default=messagebox.NO, icon=messagebox.WARNING)
|
||
if res:
|
||
self.text_output.delete("1.0", ctk.END)
|
||
try:
|
||
clibs.lock.acquire(True)
|
||
clibs.cursor.execute("delete from logs")
|
||
clibs.cursor.execute("delete from sqlite_sequence") # 主键归零
|
||
finally:
|
||
clibs.lock.release()
|
||
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
self.label_pages_logs.set("-.-.-.-.-.-")
|
||
reset_methods()
|
||
# TBD: something signifies status of program and terminate some thread ect.
|
||
elif clibs.db_state == "readonly":
|
||
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及恢复本次的日志记录,请确认!", default=messagebox.NO, icon=messagebox.INFO)
|
||
if res:
|
||
self.text_output.delete("1.0", ctk.END)
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
clibs.conn = self.conn
|
||
clibs.cursor = self.cursor
|
||
clibs.db_state = "readwrite"
|
||
self.__get_realtime_log()
|
||
reset_methods()
|
||
# TBD: something signifies status of program and terminate some thread ect.
|
||
|
||
@clibs.db_lock
|
||
def __get_realtime_log(self):
|
||
clibs.cursor.execute("select id from logs")
|
||
len_records = len(clibs.cursor.fetchall())
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
|
||
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
remainder = len_records % 100 if len_records % 100 != 0 else 100
|
||
end = len_records
|
||
start = len_records - remainder + 1 if len_records - remainder > 0 else 0
|
||
|
||
# clibs.cursor.execute("select * from logs order by id desc limit 100")
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
for record in records:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
self.treeview_logs.yview_moveto(1)
|
||
|
||
self.__reset_entry_keyword()
|
||
|
||
@clibs.db_lock
|
||
def __get_previous_log(self):
|
||
if self.btn_find.cget("fg_color") == "#979DA2":
|
||
try:
|
||
row = self.treeview_logs.get_children()
|
||
first_id = self.treeview_logs.item(row[0], "values")[0]
|
||
end = int(first_id)-1 if int(first_id)-1 > 0 else None
|
||
start = int(first_id)-100 if int(first_id)-100 > 0 else 0
|
||
if end is not None:
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
for record in records:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
# self.treeview_logs.yview_moveto(1)
|
||
|
||
clibs.cursor.execute("select * from logs order by id desc")
|
||
records = clibs.cursor.fetchall()
|
||
pages_all = len(records) // 100 if len(records) % 100 == 0 else len(records) // 100 + 1
|
||
current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1
|
||
self.label_pages_logs.set(f"{current_page} / {pages_all}")
|
||
except Exception as ERR:
|
||
...
|
||
else:
|
||
pages_all = self.label_pages_logs.get().split("/")[1].strip()
|
||
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
|
||
if current_page-1 > 0:
|
||
row = self.treeview_logs.get_children()
|
||
first_one = list(self.treeview_logs.item(row[0], "values"))
|
||
first_one[0] = int(first_one[0])
|
||
index_end = clibs.f_records.index(tuple(first_one))
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
if index_end <= 100:
|
||
for record in clibs.f_records[:index_end]:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
else:
|
||
for record in clibs.f_records[index_end-100:index_end]:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
|
||
previous_page = current_page - 1
|
||
self.label_pages_logs.set(f"{previous_page} / {pages_all}")
|
||
|
||
def __get_next_log(self):
|
||
@clibs.db_lock
|
||
def get_next_page():
|
||
if self.btn_find.cget("fg_color") == "#979DA2":
|
||
# noinspection PyBroadException
|
||
try:
|
||
clibs.cursor.execute("select id from logs")
|
||
len_records = len(clibs.cursor.fetchall())
|
||
|
||
row = self.treeview_logs.get_children()
|
||
last_id = self.treeview_logs.item(row[-1], "values")[0]
|
||
start = int(last_id) + 1
|
||
end = int(last_id) + 100
|
||
if int(start) <= len_records:
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
for record in records:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
# self.treeview_logs.yview_moveto(1)
|
||
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
current_page = int(start) // 100 if int(start) % 100 == 0 else int(start) // 100 + 1
|
||
self.label_pages_logs.set(f"{current_page} / {pages_all}")
|
||
except Exception as Err:
|
||
print(f"get_next_page-if: {Err}")
|
||
else:
|
||
len_records = len(clibs.f_records)
|
||
pages_all = int(self.label_pages_logs.get().split("/")[1].strip())
|
||
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
|
||
if current_page < pages_all:
|
||
row = self.treeview_logs.get_children()
|
||
last_one = list(self.treeview_logs.item(row[-1], "values"))
|
||
last_one[0] = int(last_one[0])
|
||
index_start = clibs.f_records.index(tuple(last_one))+1
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
if index_start+100 <= len_records:
|
||
for record in clibs.f_records[index_start:index_start+100]:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
else:
|
||
for record in clibs.f_records[index_start:]:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
|
||
next_page = current_page + 1
|
||
self.label_pages_logs.set(f"{next_page} / {pages_all}")
|
||
|
||
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
|
||
pages_all = int(self.label_pages_logs.get().split("/")[1].strip())
|
||
if current_page == pages_all:
|
||
if self.btn_find.cget("fg_color") == "#979DA2":
|
||
self.__get_realtime_log()
|
||
return
|
||
|
||
get_next_page()
|
||
|
||
def __load_log_db(self):
|
||
db_file = filedialog.askopenfilename(title="加载数据库文件", defaultextension=".db", initialdir=f"{clibs.PREFIX}/logs")
|
||
if not db_file:
|
||
return
|
||
self.conn = clibs.conn
|
||
self.cursor = clibs.cursor
|
||
try:
|
||
clibs.conn = sqlite3.connect(f"file:{db_file}?mode=ro", uri=True, isolation_level=None, check_same_thread=False, cached_statements=256)
|
||
clibs.cursor = clibs.conn.cursor()
|
||
clibs.db_state = "readonly"
|
||
self.__get_realtime_log()
|
||
self.btn_load.configure(fg_color="#000080")
|
||
except Exception as Err:
|
||
clibs.insert_logdb("ERROR", "aio", f"加载数据库失败,需确认 {db_file} 是否是有效数据库文件!")
|
||
messagebox.showerror(title="加载数据库失败", message=f"需确认 {db_file} 是否是有效数据库文件!")
|
||
|
||
def __main_switch_dp(self, event):
|
||
widgets = [self.om_sub_dp, self.om_vel_dp, self.om_trq_dp, self.om_trqh_dp, self.om_estop_dp, self.om_sensor_dp]
|
||
for widgets in widgets:
|
||
widgets.configure(state="disabled")
|
||
|
||
_func = self.om_main_dp.get()
|
||
if _func == "brake":
|
||
self.om_vel_dp.configure(state="normal")
|
||
self.om_trq_dp.configure(state="normal")
|
||
self.om_estop_dp.configure(state="normal")
|
||
elif _func == "current":
|
||
self.om_sub_dp.configure(state="normal")
|
||
self.om_vel_dp.configure(state="normal")
|
||
self.om_trq_dp.configure(state="normal")
|
||
self.om_trqh_dp.configure(state="normal")
|
||
self.om_sensor_dp.configure(state="normal")
|
||
elif _func == "iso" or _func == "wavelogger":
|
||
...
|
||
|
||
@staticmethod
|
||
def __goto_update():
|
||
webbrowser.open("www.baidu.com")
|
||
|
||
def __auth_and_vercheck(self):
|
||
url_vers = "http://10.2.23.150:10008/version"
|
||
try:
|
||
self.server_vers = request.urlopen(url_vers, timeout=2).read().decode("utf-8").strip()
|
||
except Exception as Err:
|
||
messagebox.showwarning(title="退出", message="无法连接至服务器.....")
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
self.root.destroy()
|
||
|
||
def __set_root(self):
|
||
self.root.title("AIO - All in one automatic toolbox")
|
||
self.root.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
|
||
self.root.configure(fg_color="#E9E9E9")
|
||
self.root.geometry(f"1120x550+10+10")
|
||
self.root.minsize(1120, 550)
|
||
# self.root.resizable(False, False)
|
||
self.root.protocol("WM_DELETE_WINDOW", self.__close_root)
|
||
self.f_logo = ("Segoe Script Bold", 28, "bold")
|
||
self.f_normal = ("Consolas", 20, "bold")
|
||
self.f_segbtn = ("楷体", 20, "bold")
|
||
self.f_common = ("Consolas", 18, "bold")
|
||
self.f_entry = ("Consolas", 16, "bold")
|
||
self.f_text = ("仿宋", 16, "normal")
|
||
self.f_treeview = ("Consolas", 16, "normal")
|
||
self.f_pager = ("Times", 16, "normal")
|
||
self.f_toplevel = ("Consolas", 14, "normal")
|
||
self.f_status = ("Consolas", 12, "bold")
|
||
|
||
def __quit_preparation(self):
|
||
os.chdir(clibs.log_path)
|
||
t = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
disk_conn = sqlite3.connect(f"log_{t}.db", isolation_level=None, check_same_thread=False, cached_statements=256)
|
||
disk_cursor = disk_conn.cursor()
|
||
|
||
if clibs.db_state == "readonly":
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
clibs.conn = self.conn
|
||
clibs.cursor = self.cursor
|
||
clibs.db_state = "readwrite"
|
||
|
||
self.__reset_entry_keyword()
|
||
self.btn_load.configure(fg_color="#979DA2")
|
||
clibs.conn.backup(target=disk_conn, pages=1, progress=None)
|
||
|
||
_, logs = clibs.traversal_files(".", self.__w2t)
|
||
logs.sort()
|
||
while len(logs) > 10:
|
||
_ = logs.pop(0)
|
||
os.remove(_)
|
||
|
||
disk_cursor.close()
|
||
disk_conn.close()
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
self.root.destroy()
|
||
|
||
def __reset_entry_keyword(self):
|
||
self.btn_find.configure(fg_color="#979DA2")
|
||
self.entry_keyword.delete(0, "end")
|
||
self.entry_keyword.configure(placeholder_text="[id/level/module] 查找内容")
|
||
self.root.focus_set()
|
||
|
||
def __close_root(self):
|
||
msg = "相关数据可能未保存,正在运行程序时有概率会损坏数据文件,确定要终止程序运行吗?"
|
||
res = messagebox.askyesno(title="退出程序", message=msg, default=messagebox.NO, icon=messagebox.WARNING)
|
||
if res:
|
||
self.__quit_preparation()
|
||
|
||
def __switch_tab_top(self):
|
||
...
|
||
|
||
def __switch_tab_bottom(self):
|
||
if self.tabview_bottom.get() == "日志":
|
||
self.__get_realtime_log()
|
||
|
||
@staticmethod
|
||
def __cut(widget):
|
||
try:
|
||
selected = widget.selection_get()
|
||
all_text = widget.get()
|
||
widget.clipboard_clear()
|
||
widget.clipboard_append(selected)
|
||
index_start = all_text.find(selected)
|
||
index_end = index_start + len(selected)
|
||
widget.delete(index_start, index_end)
|
||
except Exception as Err:
|
||
...
|
||
|
||
@staticmethod
|
||
def __paste(widget):
|
||
try:
|
||
copy_text = widget.clipboard_get()
|
||
widget.insert("insert", copy_text)
|
||
selected = widget.selection_get()
|
||
all_text = widget.get()
|
||
index_start = all_text.find(selected)
|
||
index_end = index_start + len(selected)
|
||
widget.delete(index_start, index_end)
|
||
except Exception as Err:
|
||
...
|
||
|
||
@staticmethod
|
||
def __copy(widget):
|
||
try:
|
||
widget.clipboard_clear()
|
||
copy_text = widget.selection_get()
|
||
widget.clipboard_append(copy_text)
|
||
except Exception as Err:
|
||
...
|
||
|
||
@staticmethod
|
||
def __select(widget):
|
||
widget.select_range(0, "end")
|
||
# self.entry_keyword.select_range(0, "end")
|
||
|
||
def __draw(self):
|
||
def select_path(event):
|
||
t_name = self.tabview_top.get()
|
||
dir_path = filedialog.askdirectory()
|
||
if t_name == "数据处理":
|
||
c_origin = self.entry_path_dpv.get()
|
||
if dir_path:
|
||
self.entry_path_dpv.set(dir_path)
|
||
else:
|
||
self.entry_path_dpv.set(c_origin)
|
||
elif t_name == "自动测试":
|
||
c_origin = self.entry_path_atv.get()
|
||
if dir_path:
|
||
self.entry_path_atv.set(dir_path)
|
||
else:
|
||
self.entry_path_atv.set(c_origin)
|
||
|
||
def esc_quit_log_tl(event):
|
||
widget_toplevel = event.widget
|
||
self.root.attributes("-disabled", 0)
|
||
widget_toplevel.destroy()
|
||
|
||
def esc_quit_log_text(event):
|
||
widget_toplevel = event.widget.master.master
|
||
self.root.attributes("-disabled", 0)
|
||
widget_toplevel.destroy()
|
||
|
||
def double_click(event):
|
||
# noinspection PyBroadException
|
||
try:
|
||
e = event.widget
|
||
iid = e.identify("item", event.x, event.y)
|
||
_id, _ts, _level, _module, _content = e.item(iid, "values")
|
||
toplevel = tk.Toplevel(self.root)
|
||
toplevel.title(f"LogID: {_id}")
|
||
self.__set_geometry(toplevel, 1000, 400)
|
||
toplevel.resizable(False, False)
|
||
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
|
||
# self.root.after(200, lambda: toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico"))
|
||
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
|
||
toplevel.bind("<Escape>", esc_quit_log_tl)
|
||
toplevel.transient(self.root)
|
||
toplevel.focus()
|
||
_text = f"TS: {_ts} | Log level: {_level} | Module: {_module}"
|
||
label_info = ctk.CTkLabel(toplevel, text=_text, width=10, fg_color="#5F9EA0", corner_radius=5, font=self.f_treeview)
|
||
text_content = ctk.CTkTextbox(toplevel, width=10, font=self.f_toplevel)
|
||
text_content.insert(ctk.END, repr(_content))
|
||
text_content.configure(state="disabled")
|
||
text_content.bind("<Escape>", esc_quit_log_text)
|
||
|
||
toplevel.grid_rowconfigure(1, weight=1)
|
||
toplevel.columnconfigure(0, weight=1)
|
||
label_info.grid(row=0, column=0, padx=10, pady=10, sticky="we")
|
||
text_content.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="news")
|
||
self.root.attributes("-disabled", 1)
|
||
except Exception:
|
||
return
|
||
|
||
@clibs.db_lock
|
||
def jump2page():
|
||
try:
|
||
number = int(self.entry_tips_v)
|
||
if number > 0:
|
||
start = number * 100 - 99
|
||
end = number * 100
|
||
clibs.cursor.execute("select id from logs")
|
||
len_records = len(clibs.cursor.fetchall())
|
||
if start <= len_records:
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
for record in records:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
self.treeview_logs.yview_moveto(1)
|
||
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
self.label_pages_logs.set(f"{number} / {pages_all}")
|
||
except Exception as Err:
|
||
...
|
||
|
||
def close_toplevel(tl):
|
||
self.entry_tips_v = None
|
||
self.root.attributes("-disabled", 0)
|
||
tl.destroy()
|
||
|
||
def enter_quit(event):
|
||
widget_entry = event.widget
|
||
widget_toplevel = event.widget.master.master
|
||
self.entry_tips_v = widget_entry.get()
|
||
self.root.attributes("-disabled", 0)
|
||
widget_toplevel.destroy()
|
||
jump2page()
|
||
|
||
def esc_quit_page(event):
|
||
widget_toplevel = event.widget
|
||
self.entry_tips_v = None
|
||
self.root.attributes("-disabled", 0)
|
||
widget_toplevel.destroy()
|
||
|
||
def select_page(event):
|
||
self.root.attributes("-disabled", 1)
|
||
toplevel = tk.Toplevel(self.root)
|
||
toplevel.title("跳转")
|
||
self.__set_geometry(toplevel, 500, 150)
|
||
toplevel.resizable(False, False)
|
||
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
|
||
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
|
||
toplevel.bind("<Escape>", esc_quit_page)
|
||
toplevel.focus()
|
||
|
||
label_tips = ctk.CTkLabel(toplevel, width=100, text="页面号码", font=self.f_segbtn)
|
||
entry_tips = ctk.CTkEntry(toplevel, width=200, placeholder_text="Fill and press enter", font=self.f_toplevel)
|
||
entry_tips.bind("<Return>", enter_quit, add="+")
|
||
|
||
toplevel.rowconfigure(0, weight=1)
|
||
toplevel.columnconfigure([0, 1], weight=1)
|
||
label_tips.grid(row=0, column=0, padx=10, pady=10, sticky="w")
|
||
entry_tips.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="w")
|
||
|
||
@clibs.db_lock
|
||
def find_log(event):
|
||
def find_error(cdt):
|
||
clibs.insert_logdb("WARNING", "aio", f"查询条件 [{cdt}] 书写规则错误!")
|
||
msg = "可单独查询 ID/Level/Module,也可以指定 ID 范围内查询指定的 Levels(多个条件使用冒号分割,仅限debug, info, warning, error的其中之一或组合]) 和(或) Modules(多个条件使用冒号分割),且当同时查询 Levels 和 Modules 时,Levels 位置在前!\n"
|
||
messagebox.showwarning(title="查询条件错误", message=msg)
|
||
return
|
||
|
||
kw = self.entry_keyword.get().strip()
|
||
if not kw:
|
||
return
|
||
match = re.search("^\\[.*]", kw)
|
||
if match:
|
||
condition = match.group().removeprefix("[").removesuffix("]").strip()
|
||
f_text_isnull = kw.removeprefix(match.group()).strip()
|
||
f_text = f"%{f_text_isnull}%"
|
||
if "-" in condition:
|
||
# PreDOs
|
||
conditions = condition.strip().split("-")
|
||
start = conditions[0].strip()
|
||
end = conditions[1].strip()
|
||
if not (start.isdigit() and end.isdigit()):
|
||
clibs.insert_logdb("WARNING", "aio", f"查询条件 {condition} 书写规则错误:指定 ID 范围参数非数字,需核对!")
|
||
messagebox.showwarning(title="查询条件错误", message="指定 ID 范围参数非数字,需核对!")
|
||
return
|
||
|
||
if len(conditions) == 2: # id range only
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)}")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and content like '{f_text}'")
|
||
|
||
elif len(conditions) == 3: # with id range, and level or module part
|
||
c_unknown = conditions[2].strip()
|
||
if ":" in c_unknown: # multi conditions
|
||
is_level = True
|
||
c_unknowns = c_unknown.split(":")
|
||
for c_unknown in c_unknowns:
|
||
if c_unknown.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
|
||
if is_level: # levels
|
||
levels = tuple(level.strip().upper() for level in c_unknowns)
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels}")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and content like '{f_text}'")
|
||
else: # modules
|
||
modules = tuple(module.strip() for module in c_unknowns)
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules}")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules} and content like '{f_text}'")
|
||
|
||
else: # single condition
|
||
if c_unknown.upper() in clibs.levels: # the third one is level
|
||
level = c_unknown.upper()
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}'")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and content like '{f_text}'")
|
||
else: # the third one is module
|
||
module = c_unknown
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}'")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}' and content like '{f_text}'")
|
||
|
||
elif len(conditions) == 4: # with id range, level and module
|
||
_levels = conditions[2].strip()
|
||
_modules = conditions[3].strip()
|
||
if ":" in _levels and ":" in _modules:
|
||
levels = tuple(level.strip().upper() for level in _levels.split(":"))
|
||
modules = tuple(module.strip() for module in _modules.split(":"))
|
||
for level in levels:
|
||
if level not in clibs.levels:
|
||
find_error(condition)
|
||
break
|
||
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules}")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules} and content like '{f_text}'")
|
||
|
||
elif ":" in _levels and ":" not in _modules:
|
||
levels = tuple(level.strip().upper() for level in _levels.split(":"))
|
||
module = _modules
|
||
for level in levels:
|
||
if level not in clibs.levels:
|
||
find_error(condition)
|
||
break
|
||
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}'")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}' and content like '{f_text}'")
|
||
|
||
elif ":" not in _levels and ":" in _modules:
|
||
level = _levels.upper()
|
||
modules = tuple(module.strip() for module in _modules.split(":"))
|
||
if level not in clibs.levels:
|
||
find_error(condition)
|
||
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules}")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules} and content like '{f_text}'")
|
||
|
||
elif ":" not in _levels and ":" not in _modules:
|
||
level = _levels.upper()
|
||
module = _modules
|
||
if level not in clibs.levels:
|
||
find_error(condition)
|
||
|
||
if not f_text_isnull: # without content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}'")
|
||
else: # with content
|
||
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}' and content like '{f_text}'")
|
||
|
||
else:
|
||
find_error(condition)
|
||
|
||
elif condition.isdigit():
|
||
if not f_text_isnull:
|
||
clibs.cursor.execute(f"select * from logs where id = {int(condition)}")
|
||
else:
|
||
clibs.cursor.execute(f"select * from logs where id = {int(condition)} and content like '{f_text}'")
|
||
|
||
elif condition.upper() in clibs.levels:
|
||
if not f_text_isnull:
|
||
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}'")
|
||
else:
|
||
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}' and content like '{f_text}'")
|
||
|
||
else: # 模块名
|
||
if not f_text_isnull:
|
||
clibs.cursor.execute(f"select * from logs where module = '{condition}'")
|
||
else:
|
||
clibs.cursor.execute(f"select * from logs where module = '{condition}' and content like '{f_text}'")
|
||
|
||
else:
|
||
f_text = f"%{kw}%"
|
||
clibs.cursor.execute(f"select * from logs where content like '{f_text}'")
|
||
|
||
clibs.f_records = clibs.cursor.fetchall()
|
||
len_records = len(clibs.f_records)
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
|
||
|
||
self.treeview_logs.delete(*self.treeview_logs.get_children())
|
||
remainder = len_records % 100
|
||
records = clibs.f_records[-1*remainder:]
|
||
for record in records:
|
||
self.treeview_logs.insert("", "end", values=record, tags=record[2])
|
||
self.treeview_logs.yview_moveto(1)
|
||
self.btn_find.configure(fg_color="#000080")
|
||
|
||
def show_popupmenu_kw(event):
|
||
self.popupmenu_kw.post(event.x_root, event.y_root)
|
||
|
||
def show_popupmenu_ip(event):
|
||
self.popupmenu_ip.post(event.x_root, event.y_root)
|
||
|
||
def conn_change(event):
|
||
def conn_or_disconn():
|
||
if self.btn_conn.cget("fg_color") == "#979DA2":
|
||
self.btn_conn.configure(state="disabled")
|
||
clibs.ip_addr = self.entry_ip_atv.get().strip()
|
||
ip_pattern = re.compile(r'(([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])')
|
||
if not ip_pattern.fullmatch(clibs.ip_addr):
|
||
messagebox.showerror(title="非法地址", message=f"{clibs.ip_addr} 不是一个有效的 IP 地址")
|
||
clibs.insert_logdb("ERROR", "aio", f"connection: {clibs.ip_addr} 不是一个有效的 IP 地址")
|
||
return
|
||
try:
|
||
# clibs.c_md = openapi.ModbusRequest(clibs.ip_addr, clibs.modbus_port)
|
||
clibs.c_hr = openapi.HmiRequest(clibs.ip_addr, clibs.socket_port, clibs.xService_port)
|
||
self.btn_conn.configure(state="normal", fg_color="#2E8B57")
|
||
except Exception as Err:
|
||
self.btn_conn.configure(state="normal", fg_color="#979DA2")
|
||
elif self.btn_conn.cget("fg_color") == "#2E8B57":
|
||
try:
|
||
# clibs.c_md.close()
|
||
clibs.c_hr.close()
|
||
finally:
|
||
self.btn_conn.configure(fg_color="#979DA2")
|
||
t = threading.Thread(target=conn_or_disconn)
|
||
t.daemon = True
|
||
t.start()
|
||
# ========================================================================
|
||
self.root.rowconfigure(0, weight=1)
|
||
self.root.rowconfigure(1, weight=99)
|
||
self.root.columnconfigure(0, weight=1)
|
||
self.root.columnconfigure(1, weight=99)
|
||
self.frame_left.grid(row=0, column=0, rowspan=2, sticky="news")
|
||
self.tabview_top.grid(row=0, column=1, padx=10, pady=5, sticky="news")
|
||
self.tabview_bottom.grid(row=1, column=1, padx=10, pady=(0, 10), sticky="news")
|
||
self.frame_status.grid(row=2, column=0, columnspan=2, sticky="news")
|
||
# ========================================================================
|
||
self.frame_left.rowconfigure(list(range(30)), weight=1)
|
||
self.frame_left.columnconfigure(0, weight=1)
|
||
self.label_logo.grid(row=0, column=0, sticky="new", padx=20, pady=(20, 40))
|
||
self.btn_start.grid(row=1, column=0, sticky="new", padx=15, pady=1, ipady=4)
|
||
self.btn_reset_state.grid(row=2, column=0, sticky="new", padx=15, pady=(0, 1), ipady=4)
|
||
# ========================================================================
|
||
# self.tabview_top.tab("数据处理").grid_rowconfigure([0, 1], weight=1)
|
||
self.tabview_top.tab("数据处理").grid_columnconfigure(11, weight=1)
|
||
self.om_main_dp.grid(row=0, column=0, padx=10)
|
||
self.om_sub_dp.grid(row=1, column=0, padx=10, pady=(0, 10))
|
||
self.label_path_dp.grid(row=0, column=1, padx=(0, 5), pady=10)
|
||
self.entry_path_dp.grid(row=0, column=2, columnspan=10, padx=(0, 10), pady=10, sticky="we")
|
||
self.label_vel_dp.grid(row=1, column=1, padx=(0, 5), pady=(0, 10))
|
||
self.om_vel_dp.grid(row=1, column=2, padx=(0, 5), pady=(0, 10))
|
||
self.label_trq_dp.grid(row=1, column=3, padx=(0, 5), pady=(0, 10))
|
||
self.om_trq_dp.grid(row=1, column=4, padx=(0, 5), pady=(0, 10))
|
||
self.label_trqh_dp.grid(row=1, column=5, padx=(0, 5), pady=(0, 10))
|
||
self.om_trqh_dp.grid(row=1, column=6, padx=(0, 5), pady=(0, 10))
|
||
self.label_estop_dp.grid(row=1, column=7, padx=(0, 5), pady=(0, 10))
|
||
self.om_estop_dp.grid(row=1, column=8, padx=(0, 5), pady=(0, 10))
|
||
self.label_sensor_dp.grid(row=1, column=9, padx=(0, 5), pady=(0, 10))
|
||
self.om_sensor_dp.grid(row=1, column=10, padx=(0, 10), pady=(0, 10))
|
||
|
||
self.entry_path_dp.bind("<Button-1>", select_path)
|
||
self.om_vel_dp.set("1")
|
||
self.om_trq_dp.set("2")
|
||
self.om_trqh_dp.set("2")
|
||
self.om_estop_dp.set("3")
|
||
self.om_sensor_dp.set("3")
|
||
# ========================================================================
|
||
# self.tabview_top.tab("自动测试").grid_rowconfigure([0, 1, 2], weight=1)
|
||
self.tabview_top.tab("自动测试").grid_columnconfigure(4, weight=1)
|
||
self.om_main_at.grid(row=0, column=0, padx=10, pady=10, sticky="w")
|
||
self.om_sub_at.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="w")
|
||
self.label_ip_at.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="e")
|
||
self.entry_ip_at.grid(row=0, column=2, padx=(0, 10), pady=10)
|
||
self.btn_conn.grid(row=0, column=3, padx=(0, 10), pady=10)
|
||
# self.progressbar_at.grid(row=0, column=4, padx=(0, 10), pady=10, sticky="e")
|
||
self.label_path_at.grid(row=1, column=1, padx=(0, 10), pady=(0, 10), sticky="e")
|
||
self.entry_path_at.grid(row=1, column=2, columnspan=3, padx=(0, 10), pady=(0, 10), sticky="we")
|
||
self.frame_top.grid(row=2, column=0, columnspan=5, padx=0, pady=0, sticky="we")
|
||
self.btn_robot_state.grid(row=0, column=0, padx=10, pady=0)
|
||
self.btn_robot_init.grid(row=0, column=1, padx=(0, 10), pady=0)
|
||
self.btn_trig_estop.grid(row=0, column=2, padx=(0, 10), pady=0)
|
||
self.btn_reset_estop.grid(row=0, column=3, padx=(0, 10), pady=0)
|
||
|
||
# self.progressbar_at.start()
|
||
# self.progressbar_at.configure(progress_color="red", fg_color="gray")
|
||
self.entry_path_at.bind("<Button-1>", select_path)
|
||
self.entry_ip_at.bind("<Button-3>", show_popupmenu_ip, add="+")
|
||
self.btn_conn.bind("<Button-1>", conn_change)
|
||
# ========================================================================
|
||
self.tabview_bottom.tab("输出").grid_rowconfigure(0, weight=1)
|
||
self.tabview_bottom.tab("输出").grid_columnconfigure(0, weight=1)
|
||
self.text_output.grid(row=0, column=0, sticky="news")
|
||
# ========================================================================
|
||
self.tabview_bottom.tab("日志").grid_rowconfigure(0, weight=1)
|
||
self.tabview_bottom.tab("日志").grid_columnconfigure(6, weight=1)
|
||
self.treeview_logs.grid(row=0, column=0, columnspan=7, padx=(10, 0), pady=10, sticky="news")
|
||
self.y_scrollbar_logs.grid(row=0, column=7, pady=10, sticky="ns")
|
||
self.label_logs.grid(row=1, column=0, padx=(0, 10), pady=5, sticky="we")
|
||
self.btn_previous.grid(row=1, column=1, padx=(0, 10), pady=5, sticky="we")
|
||
self.btn_realtime.grid(row=1, column=2, padx=(0, 10), pady=5, sticky="we")
|
||
self.btn_next.grid(row=1, column=3, padx=(0, 10), pady=5, sticky="we")
|
||
self.btn_load.grid(row=1, column=4, padx=(0, 10), pady=5, sticky="we")
|
||
self.btn_find.grid(row=1, column=5, padx=(0, 10), pady=5, sticky="we")
|
||
self.entry_keyword.grid(row=1, column=6, columnspan=2, padx=0, pady=5, sticky="we")
|
||
|
||
self.treeview_logs.configure(yscrollcommand=self.y_scrollbar_logs.set)
|
||
self.treeview_logs.tag_configure("DEBUG", background="#708090")
|
||
self.treeview_logs.tag_configure("INFO", background="#F5F5F5") # #43CD80
|
||
self.treeview_logs.tag_configure("WARNING", background="#EEE8AA")
|
||
self.treeview_logs.tag_configure("ERROR", background="#CD5C5C")
|
||
for k, v in self.tv_cols.items():
|
||
self.treeview_logs.heading(f"#{k}", text=v[0].title(), anchor="w")
|
||
self.treeview_logs.column(f"#{k}", anchor="w", width=v[1])
|
||
self.treeview_logs.bind("<Double-1>", double_click)
|
||
self.label_logs.bind("<Button-1>", select_page)
|
||
self.btn_find.bind("<Button-1>", find_log)
|
||
self.entry_keyword.bind("<Return>", find_log)
|
||
self.entry_keyword.bind("<Button-3>", show_popupmenu_kw, add="+")
|
||
|
||
# ========================================================================
|
||
self.frame_status.rowconfigure(0, weight=1)
|
||
self.frame_status.columnconfigure([0, 1], weight=1)
|
||
self.label_vers.grid(row=0, column=0, sticky="news")
|
||
self.label_tips.grid(row=0, column=1, sticky="news")
|
||
# ========================================================================
|
||
# widgets = [self.om_sub_dp, self.om_vel_dp, self.om_trq_dp, self.om_trqh_dp, self.om_estop_dp, self.om_sensor_dp]
|
||
# for widgets in widgets:
|
||
# widgets.configure(state="disabled")
|
||
self.om_sub_dp.configure(state="disabled")
|
||
self.om_trqh_dp.configure(state="disabled")
|
||
self.om_sensor_dp.configure(state="disabled")
|
||
# ========================================================================
|
||
clibs.w2t = self.__w2t
|
||
# clibs.insert_logdb("INFO", "aio", "AIO starts running......")
|
||
|
||
def show(self):
|
||
if self.server_vers:
|
||
t = threading.Thread(target=self.__detect_network)
|
||
t.daemon = True
|
||
t.start()
|
||
self.root.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
aio = App()
|
||
aio.show()
|