电机电流完善

This commit is contained in:
2025-01-06 12:26:49 +08:00
parent d9f898715e
commit ca4fa4980a
50 changed files with 241671 additions and 222 deletions

5
.gitignore vendored
View File

@ -1,5 +1,8 @@
test.py
.idea/
.venv/
assets/logdata/
assets/logs/
package/
code/common/__pycache__/
code/data_process/__pycache__/
code/automatic_test/__pycache__/

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "controller.heart"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "device.get_params"
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "robot",
"command": "diagnosis.get_params",
"data": {
"version": "1.4.1"
}
}

View File

@ -0,0 +1,12 @@
{
"id": "xxxxxxxxxxx",
"module": "robot",
"command": "diagnosis.open",
"data": {
"open": false,
"display_open": false,
"overrun": false,
"turn_area": false,
"delay_motion": false
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "robot",
"command": "diagnosis.save",
"data": {
"save": true
}
}

View File

@ -0,0 +1,10 @@
{
"id": "xxxxxxxxxxx",
"module": "robot",
"command": "diagnosis.set_params",
"data": {
"display_pdo_params": [],
"frequency": 50,
"version": "1.4.1"
}
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.get_autoload"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.get_cur_prj"
}

View File

@ -0,0 +1,9 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.reload",
"data": {
"prj_path": "",
"tasks": []
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.set_autoload",
"data": {
"autoload_prj_path": ""
}
}

View File

@ -0,0 +1,11 @@
{
"id": "xxxxxxxxxxx",
"module": "fieldbus",
"command": "register.set_value",
"data": {
"name": "",
"type": "bool",
"bias": 0,
"value": 0
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "rl_task.pp_to_main",
"data": {
"tasks": []
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "rl_task.run",
"data": {
"tasks": []
}
}

View File

@ -0,0 +1,9 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "rl_task.set_run_params",
"data": {
"loop_mode": true,
"override": 1.0
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "rl_task.stop",
"data": {
"tasks": []
}
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_state"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_tp_mode"
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.set_tp_mode",
"data": {
"tp_mode": "with"
}
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_auto"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_manual"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_off"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_on"
}

BIN
assets/media/updated.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

BIN
assets/media/upgrade.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB

924
code/aio.py Normal file
View File

@ -0,0 +1,924 @@
# from tkinter import colorchooser
# from tkinter import font
import time
import customtkinter as ctk
from tkinter import messagebox
from tkinter import filedialog
from tkinter import ttk
import tkinter as tk
from urllib import request
from PIL import Image
import webbrowser
import sqlite3
from datetime import datetime
import os
from common import clibs
from data_process import current, brake, iso, wavelogger
import threading
import re
class App:
def __init__(self):
# ========================================================================
self.root = ctk.CTk()
self.__set_root()
# ========================================================================
self.style = ttk.Style()
self.style.configure("tv.Treeview", font=self.f_treeview, rowheight=30)
self.style.configure("Treeview.Heading", font=self.f_normal, rowheight=30)
self.entry_path_dpv = ctk.StringVar()
self.entry_path_dpv.set("数据文件夹路径")
self.entry_path_atv = ctk.StringVar()
self.entry_path_atv.set("数据文件夹路径")
self.entry_ip_atv = ctk.StringVar()
self.entry_ip_atv.set("192.168.0.160")
self.label_pages_logs = ctk.StringVar()
self.label_pages_logs.set("-.-.-.-.-.-")
self.entry_tips_v = None
self.server_vers = None
self.tv_cols = {1: ["ID", 1], 2: ["time", 160], 3: ["level", 25], 4: ["module", 30], 5: ["content", 700]}
# ========================================================================
self.frame_left = ctk.CTkFrame(self.root, width=80, corner_radius=0, fg_color="#E9E9E9")
# -------
self.tabview_top = ctk.CTkTabview(self.root, width=900, height=180, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_top)
self.tabview_top.add("数据处理")
self.tabview_top.add("自动测试")
# -------
self.tabview_bottom = ctk.CTkTabview(self.root, fg_color="#E9E9E9", segmented_button_selected_color="#008B8B", segmented_button_selected_hover_color="#2F4F4F", border_width=2, anchor="w", command=self.__switch_tab_bottom)
self.tabview_bottom.add("输出")
self.tabview_bottom.add("日志")
# -------
self.frame_status = ctk.CTkFrame(self.root, height=30)
# ========================================================================
self.label_logo = ctk.CTkLabel(self.frame_left, text="Rokae AIO", font=self.f_logo, text_color="#4F4F4F")
self.btn_start = ctk.CTkButton(self.frame_left, text="开始运行", font=self.f_normal, fg_color="#4F4F4F", command=lambda: self.__thread_it(self.__program_start))
self.btn_reset_state = ctk.CTkButton(self.frame_left, text="重置状态", font=self.f_normal, fg_color="#4F4F4F", command=self.__reset_state)
# ========================================================================
self.om_main_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["brake", "current", "iso", "wavelogger"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D", command=self.__main_switch_dp)
self.om_sub_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=160, dynamic_resizing=False, values=["cycle", "max", "avg"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
self.label_path_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Path", font=self.f_common)
self.entry_path_dp = ctk.CTkEntry(self.tabview_top.tab("数据处理"), width=80, state="disabled", textvariable=self.entry_path_dpv, font=self.f_entry, text_color="#818181")
self.label_vel_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Vel", font=self.f_common)
self.om_vel_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
self.label_trq_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="Trq", font=self.f_common)
self.om_trq_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
self.label_trqh_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=40, anchor="e", text="TrqH", font=self.f_common)
self.om_trqh_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
self.label_sensor_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Sensor", font=self.f_common)
self.om_sensor_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
self.label_estop_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=60, anchor="e", text="Estop", font=self.f_common)
self.om_estop_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_common, button_color="#7B6B5B", fg_color="#8D8D8D", text_color_disabled="#808000")
# ========================================================================
self.om_main_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["brake", "current"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
self.om_sub_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=160, dynamic_resizing=False, values=["tool33", "tool66", "tool100", "inertia"], font=self.f_common, text_color="#3C3C3C", button_color="#7B6B5B", fg_color="#8D8D8D")
self.label_ip_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), anchor="e", text="IP", font=self.f_common)
self.entry_ip_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=160, textvariable=self.entry_ip_atv, font=self.f_entry, text_color="#818181")
self.btn_conn = ctk.CTkButton(self.tabview_top.tab("自动测试"), text="连接", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0)
self.progressbar_at = ctk.CTkProgressBar(self.tabview_top.tab("自动测试"), width=160, mode="indeterminate")
self.label_path_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), width=50, anchor="e", text="Path", font=self.f_common)
self.entry_path_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=80, state="disabled", textvariable=self.entry_path_atv, font=self.f_entry, text_color="#818181")
self.frame_top = ctk.CTkFrame(self.tabview_top.tab("自动测试"), width=120, height=10, fg_color="#E9E9E9")
self.btn_trig_estop = ctk.CTkButton(self.frame_top, width=100, text="触发急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=print)
self.btn_reset_estop = ctk.CTkButton(self.frame_top, width=100, text="恢复急停", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=print)
self.btn_robot_state = ctk.CTkButton(self.frame_top, width=100, text="机器信息", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__robot_info)
self.popupmenu_ip = tk.Menu(self.entry_ip_at, tearoff=False)
self.popupmenu_ip.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_ip_at))
self.popupmenu_ip.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_ip_at))
self.popupmenu_ip.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_ip_at))
self.popupmenu_ip.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_ip_at))
# ========================================================================
self.text_output = ctk.CTkTextbox(self.tabview_bottom.tab("输出"), height=10, corner_radius=0, wrap="none", font=self.f_text)
# ========================================================================
self.label_logs = ctk.CTkLabel(self.tabview_bottom.tab("日志"), width=80, font=self.f_pager, textvariable=self.label_pages_logs, text_color="blue", bg_color="#DCDCDC")
self.btn_previous = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="上一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_previous_log)
self.btn_realtime = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="实时", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_realtime_log)
self.btn_next = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="下一页", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__get_next_log)
self.btn_load = ctk.CTkButton(self.tabview_bottom.tab("日志"), width=60, text="加载", font=self.f_segbtn, fg_color="#979DA2", corner_radius=0, command=self.__load_log_db)
self.btn_find = ctk.CTkButton(self.tabview_bottom.tab("日志"), text="查找", width=60, font=self.f_segbtn, fg_color="#979DA2", corner_radius=0)
self.entry_keyword = ctk.CTkEntry(self.tabview_bottom.tab("日志"), placeholder_text="[id/level/module] 查找内容", width=60, font=self.f_entry)
self.treeview_logs = ttk.Treeview(self.tabview_bottom.tab("日志"), height=1, columns=("id", "time", "level", "module", "content"), style="tv.Treeview", show="headings")
self.y_scrollbar_logs = tk.Scrollbar(self.tabview_bottom.tab("日志"), width=30, command=self.treeview_logs.yview)
self.popupmenu_kw = tk.Menu(self.entry_keyword, tearoff=False)
self.popupmenu_kw.add_command(label="复制", accelerator="Ctrl+C", font=self.f_treeview, command=lambda: self.__copy(self.entry_keyword))
self.popupmenu_kw.add_command(label="剪切", accelerator="Ctrl+X", font=self.f_treeview, command=lambda: self.__cut(self.entry_keyword))
self.popupmenu_kw.add_command(label="粘贴", accelerator="Ctrl+V", font=self.f_treeview, command=lambda: self.__paste(self.entry_keyword))
self.popupmenu_kw.add_command(label="全选", accelerator="Ctrl+A", font=self.f_treeview, command=lambda: self.__select(self.entry_keyword))
# ========================================================================
with open(f"{clibs.PREFIX}/files/version/vers", mode="r", encoding="utf-8") as f:
local_vers = f.read().strip()
_version, _update = local_vers.split("@")
_text = f" v{_version} Update@{_update}"
self.label_vers = ctk.CTkLabel(self.frame_status, text=_text, bg_color="#C9C9C9", font=self.f_status, anchor="w", text_color="#4F4F4F")
var_tips = ctk.StringVar()
self.label_tips = ctk.CTkLabel(self.frame_status, textvariable=var_tips, compound="left", bg_color="#C9C9C9", font=self.f_status, anchor="e")
self.__auth_and_vercheck()
if local_vers == self.server_vers:
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/updated.png"), size=(16, 16))
var_tips.set(" 当前是最新版本,继续保持! ")
self.label_tips.configure(text_color="#0D8A3D", image=image)
else:
if self.server_vers is None:
return
image = ctk.CTkImage(Image.open(f"{clibs.PREFIX}/media/upgrade.png"), size=(16, 16))
var_tips.set(f" {self.server_vers.split("@")[0]}已经发布,尽快更新至最新版本! ")
self.label_tips.configure(text_color="#D81E06", image=image, cursor="hand2")
self.label_tips.bind("<Button-1>", self.__goto_update)
# ========================================================================
self.__draw()
clibs.insert_logdb("INFO", "aio", "AIO starts running......")
def __robot_info(self):
clibs.cursor.execute("select * from logs")
records = clibs.cursor.fetchall()
self.text_output.insert(ctk.END, records)
def __thread_it(self, func, *args):
""" 将函数打包进线程 """
self.myThread = threading.Thread(target=func, args=args)
self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
self.myThread.start()
def __set_geometry(self, widget, width, height):
x = self.root.winfo_x()
y = self.root.winfo_y()
w_width = self.root.winfo_width()
w_height = self.root.winfo_height()
x_offset = x+(w_width-width)//2
y_offset = y+(w_height-height)//2
widget.geometry(f"{width}x{height}+{x_offset}+{y_offset}")
def __toplevel_progress(self, title):
toplevel = tk.Toplevel(self.root)
toplevel.title(title)
self.__set_geometry(toplevel, 400, 50)
self.root.attributes("-disabled", 1)
toplevel.protocol("WM_DELETE_WINDOW", print)
# toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
toplevel.transient(self.root)
toplevel.grid_rowconfigure(0, weight=1)
toplevel.grid_columnconfigure(0, weight=1)
pb = ttk.Progressbar(toplevel, length=10, mode="indeterminate", orient="horizontal")
pb.grid(row=0, column=0, sticky="we")
pb.start(10)
while clibs.stop:
time.sleep(0.1)
self.text_output.update()
pb.update()
self.root.update()
else:
self.__w2t("\n")
self.root.attributes("-disabled", 0)
toplevel.destroy()
def __w2t(self, text, color="black", desc=None):
self.text_output.tag_config(tagName=color, foreground=color)
self.text_output.insert(ctk.END, text, tags=color)
self.text_output.see(ctk.END)
if desc:
clibs.running = False
raise Exception(desc)
def __program_start(self):
def get_data_dp():
data = {
"_main": self.om_main_dp.get(),
"_sub": self.om_sub_dp.get(),
"_path": self.entry_path_dp.get(),
"_vel": self.om_vel_dp.get(),
"_trq": self.om_trq_dp.get(),
"_trqh": self.om_trqh_dp.get(),
"_estop": self.om_estop_dp.get(),
"_sensor": self.om_sensor_dp.get(),
}
return data
def get_data_at():
data = {
"_main": self.om_main_at.get(),
"_sub": self.om_sub_at.get(),
"_path": self.entry_path_at.get()
}
return data
def init_op():
clibs.w2t = self.__w2t
self.text_output.delete("1.0", ctk.END)
self.tabview_bottom.set("输出")
if clibs.db_state != "readwrite":
self.text_output.delete("1.0", ctk.END)
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.btn_load.configure(fg_color="#979DA2")
self.__get_realtime_log()
clibs.tl_prg = self.__toplevel_progress
# TBD: something signifies status of program and terminate some thread ect.
def exec_function():
init_op()
if self.tabview_top.get() == "数据处理":
if not clibs.running:
clibs.running = True
clibs.data_dp = get_data_dp()
eval(clibs.data_dp["_main"] + ".main()")
clibs.running = False
else:
messagebox.showinfo(title="进行中...", message="当前有程序正在运行!")
elif self.tabview_top.get() == "自动测试":
clibs.running = True
clibs.data_at = get_data_at()
eval(clibs.data_at["_main"] + ".main()")
clibs.running = False
exec_function()
def __reset_state(self):
def reset_methods():
self.btn_load.configure(fg_color="#979DA2")
self.__reset_entry_keyword()
self.entry_ip_atv.set("192.168.0.160")
self.entry_path_atv.set("数据文件夹路径")
self.entry_path_atv.set("数据文件夹路径")
if clibs.db_state == "readwrite":
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及日志记录,且不可恢复,请确认!", default=messagebox.NO, icon=messagebox.WARNING)
if res:
self.text_output.delete("1.0", ctk.END)
clibs.cursor.execute("delete from logs")
clibs.cursor.execute("delete from sqlite_sequence") # 主键归零
self.treeview_logs.delete(*self.treeview_logs.get_children())
self.label_pages_logs.set("-.-.-.-.-.-")
reset_methods()
# TBD: something signifies status of program and terminate some thread ect.
elif clibs.db_state == "readonly":
res = messagebox.askyesno(title="状态重置", message="这将清空本次所有的输出以及恢复本次的日志记录,请确认!", default=messagebox.NO, icon=messagebox.INFO)
if res:
self.text_output.delete("1.0", ctk.END)
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.__get_realtime_log()
reset_methods()
# TBD: something signifies status of program and terminate some thread ect.
def __get_realtime_log(self):
clibs.cursor.execute("select * from logs order by id desc")
len_records = len(clibs.cursor.fetchall())
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
self.treeview_logs.delete(*self.treeview_logs.get_children())
remainder = len_records % 100
end = len_records
start = len_records-remainder+1 if len_records-remainder > 0 else 0
# clibs.cursor.execute("select * from logs order by id desc limit 100")
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(1)
self.__reset_entry_keyword()
def __get_previous_log(self):
if self.btn_find.cget("fg_color") == "#979DA2":
try:
row = self.treeview_logs.get_children()
first_id = self.treeview_logs.item(row[0], "values")[0]
end = int(first_id)-1 if int(first_id)-1 > 0 else None
start = int(first_id)-100 if int(first_id)-100 > 0 else 0
if end is not None:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
# self.treeview_logs.yview_moveto(1)
clibs.cursor.execute("select * from logs order by id desc")
records = clibs.cursor.fetchall()
pages_all = len(records) // 100 if len(records) % 100 == 0 else len(records) // 100 + 1
current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1
self.label_pages_logs.set(f"{current_page} / {pages_all}")
except Exception as ERR:
...
else:
pages_all = self.label_pages_logs.get().split("/")[1].strip()
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
if current_page-1 > 0:
row = self.treeview_logs.get_children()
first_one = list(self.treeview_logs.item(row[0], "values"))
first_one[0] = int(first_one[0])
index_end = clibs.f_records.index(tuple(first_one))
self.treeview_logs.delete(*self.treeview_logs.get_children())
if index_end <= 100:
for record in clibs.f_records[:index_end]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
else:
for record in clibs.f_records[index_end-100:index_end]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
previous_page = current_page - 1
self.label_pages_logs.set(f"{previous_page} / {pages_all}")
def __get_next_log(self):
if self.btn_find.cget("fg_color") == "#979DA2":
try:
clibs.cursor.execute("select * from logs order by id desc")
len_records = len(clibs.cursor.fetchall())
row = self.treeview_logs.get_children()
last_id = self.treeview_logs.item(row[-1], "values")[0]
start = int(last_id) + 1
end = int(last_id) + 100
if int(start) <= len_records:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
# self.treeview_logs.yview_moveto(1)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
current_page = int(start) // 100 if int(start) % 100 == 0 else int(start) // 100 + 1
self.label_pages_logs.set(f"{current_page} / {pages_all}")
except Exception as ERR:
...
else:
len_records = len(clibs.f_records)
pages_all = int(self.label_pages_logs.get().split("/")[1].strip())
current_page = int(self.label_pages_logs.get().split("/")[0].strip())
if current_page < pages_all:
row = self.treeview_logs.get_children()
last_one = list(self.treeview_logs.item(row[-1], "values"))
last_one[0] = int(last_one[0])
index_start = clibs.f_records.index(tuple(last_one))+1
self.treeview_logs.delete(*self.treeview_logs.get_children())
if index_start+100 <= len_records:
for record in clibs.f_records[index_start:index_start+100]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
else:
for record in clibs.f_records[index_start:]:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
next_page = current_page + 1
self.label_pages_logs.set(f"{next_page} / {pages_all}")
def __load_log_db(self):
db_file = filedialog.askopenfilename(title="加载数据库文件", defaultextension=".db", initialdir=f"{clibs.PREFIX}/logs")
if not db_file:
return
self.conn = clibs.conn
self.cursor = clibs.cursor
try:
clibs.conn = sqlite3.connect(f"file:{db_file}?mode=ro", uri=True, isolation_level=None, check_same_thread=False, cached_statements=256)
clibs.cursor = clibs.conn.cursor()
clibs.db_state = "readonly"
self.__get_realtime_log()
self.btn_load.configure(fg_color="#000080")
except Exception as Err:
clibs.insert_logdb("ERROR", "aio", f"加载数据库失败,需确认 {db_file} 是否是有效数据库文件!")
messagebox.showerror(title="加载数据库失败", message=f"需确认 {db_file} 是否是有效数据库文件!")
def __main_switch_dp(self, event):
widgets = [self.om_sub_dp, self.om_vel_dp, self.om_trq_dp, self.om_trqh_dp, self.om_estop_dp, self.om_sensor_dp]
for widgets in widgets:
widgets.configure(state="disabled")
_func = self.om_main_dp.get()
if _func == "brake":
self.om_vel_dp.configure(state="normal")
self.om_trq_dp.configure(state="normal")
self.om_estop_dp.configure(state="normal")
elif _func == "current":
self.om_sub_dp.configure(state="normal")
self.om_vel_dp.configure(state="normal")
self.om_trq_dp.configure(state="normal")
self.om_trqh_dp.configure(state="normal")
self.om_sensor_dp.configure(state="normal")
elif _func == "iso" or _func == "wavelogger":
...
@staticmethod
def __goto_update():
webbrowser.open("www.baidu.com")
def __auth_and_vercheck(self):
url_vers = "http://10.2.23.150:10008/version"
try:
self.server_vers = request.urlopen(url_vers, timeout=2).read().decode("utf-8").strip()
except Exception as Err:
messagebox.showwarning(title="退出", message="无法连接至服务器.....")
clibs.cursor.close()
clibs.conn.close()
self.root.destroy()
def __set_root(self):
self.root.title("AIO - All in one automatic toolbox")
self.root.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
self.root.configure(fg_color="#E9E9E9")
self.root.geometry(f"1120x550+10+10")
self.root.minsize(1120, 550)
# self.root.resizable(False, False)
self.root.protocol("WM_DELETE_WINDOW", self.__close_root)
self.f_logo = ("Segoe Script Bold", 28, "bold")
self.f_normal = ("Consolas", 20, "bold")
self.f_segbtn = ("楷体", 20, "bold")
self.f_common = ("Consolas", 18, "bold")
self.f_entry = ("Consolas", 16, "bold")
self.f_text = ("仿宋", 16, "normal")
self.f_treeview = ("Consolas", 16, "normal")
self.f_pager = ("Times", 16, "normal")
self.f_toplevel = ("Consolas", 14, "normal")
self.f_status = ("Consolas", 12, "bold")
def __quit_preparation(self):
os.chdir(clibs.log_path)
t = datetime.now().strftime("%Y%m%d%H%M%S")
disk_conn = sqlite3.connect(f"log_{t}.db", isolation_level=None, check_same_thread=False, cached_statements=256)
disk_cursor = disk_conn.cursor()
if clibs.db_state == "readonly":
clibs.cursor.close()
clibs.conn.close()
clibs.conn = self.conn
clibs.cursor = self.cursor
clibs.db_state = "readwrite"
self.__reset_entry_keyword()
self.btn_load.configure(fg_color="#979DA2")
clibs.conn.backup(target=disk_conn, pages=1, progress=None)
_, logs = clibs.traversal_files(".", self.__w2t)
logs.sort()
while len(logs) > 10:
_ = logs.pop(0)
os.remove(_)
disk_cursor.close()
disk_conn.close()
clibs.cursor.close()
clibs.conn.close()
self.root.destroy()
def __reset_entry_keyword(self):
self.btn_find.configure(fg_color="#979DA2")
self.entry_keyword.delete(0, "end")
self.entry_keyword.configure(placeholder_text="[id/level/module] 查找内容")
self.root.focus_set()
def __close_root(self):
msg = "相关数据可能未保存,正在运行程序时有概率会损坏数据文件,确定要终止程序运行吗?"
res = messagebox.askyesno(title="退出程序", message=msg, default=messagebox.NO, icon=messagebox.WARNING)
if res:
self.__quit_preparation()
def __switch_tab_top(self):
...
def __switch_tab_bottom(self):
if self.tabview_bottom.get() == "日志":
self.__get_realtime_log()
@staticmethod
def __cut(widget):
try:
selected = widget.selection_get()
all_text = widget.get()
widget.clipboard_clear()
widget.clipboard_append(selected)
index_start = all_text.find(selected)
index_end = index_start + len(selected)
widget.delete(index_start, index_end)
except Exception as Err:
...
@staticmethod
def __paste(widget):
try:
copy_text = widget.clipboard_get()
widget.insert("insert", copy_text)
selected = widget.selection_get()
all_text = widget.get()
index_start = all_text.find(selected)
index_end = index_start + len(selected)
widget.delete(index_start, index_end)
except Exception as Err:
...
@staticmethod
def __copy(widget):
try:
widget.clipboard_clear()
copy_text = widget.selection_get()
widget.clipboard_append(copy_text)
except Exception as Err:
...
@staticmethod
def __select(widget):
widget.select_range(0, "end")
# self.entry_keyword.select_range(0, "end")
def __draw(self):
def select_path(event):
t_name = self.tabview_top.get()
dir_path = filedialog.askdirectory()
if t_name == "数据处理":
c_origin = self.entry_path_dpv.get()
if dir_path:
self.entry_path_dpv.set(dir_path)
else:
self.entry_path_dpv.set(c_origin)
elif t_name == "自动测试":
c_origin = self.entry_path_atv.get()
if dir_path:
self.entry_path_atv.set(dir_path)
else:
self.entry_path_atv.set(c_origin)
def esc_quit_log_tl(event):
widget_toplevel = event.widget
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def esc_quit_log_text(event):
widget_toplevel = event.widget.master.master
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def double_click(event):
try:
e = event.widget
iid = e.identify("item", event.x, event.y)
_id, _ts, _level, _module, _content = e.item(iid, "values")
toplevel = tk.Toplevel(self.root)
toplevel.title(f"LogID: {_id}")
self.__set_geometry(toplevel, 1000, 400)
toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
# self.root.after(200, lambda: toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico"))
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
toplevel.bind("<Escape>", esc_quit_log_tl)
toplevel.transient(self.root)
toplevel.focus()
_text = f"TS: {_ts} | Log level: {_level} | Module: {_module}"
label_info = ctk.CTkLabel(toplevel, text=_text, width=10, fg_color="#5F9EA0", corner_radius=5, font=self.f_treeview)
text_content = ctk.CTkTextbox(toplevel, width=10, font=self.f_toplevel)
text_content.insert(ctk.END, repr(_content))
text_content.configure(state="disabled")
text_content.bind("<Escape>", esc_quit_log_text)
toplevel.grid_rowconfigure(1, weight=1)
toplevel.columnconfigure(0, weight=1)
label_info.grid(row=0, column=0, padx=10, pady=10, sticky="we")
text_content.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="news")
self.root.attributes("-disabled", 1)
except Exception as ERR:
...
def jump2page():
try:
number = int(self.entry_tips_v)
if number > 0:
start = number * 100 - 99
end = number * 100
clibs.cursor.execute("select * from logs order by id desc")
len_records = len(clibs.cursor.fetchall())
if start <= len_records:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
self.treeview_logs.delete(*self.treeview_logs.get_children())
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(1)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{number} / {pages_all}")
except Exception as Err:
...
def close_toplevel(tl):
self.entry_tips_v = None
self.root.attributes("-disabled", 0)
tl.destroy()
def enter_quit(event):
widget_entry = event.widget
widget_toplevel = event.widget.master.master
self.entry_tips_v = widget_entry.get()
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
jump2page()
def esc_quit_page(event):
widget_toplevel = event.widget
self.entry_tips_v = None
self.root.attributes("-disabled", 0)
widget_toplevel.destroy()
def select_page(event):
self.root.attributes("-disabled", 1)
toplevel = tk.Toplevel(self.root)
toplevel.title("跳转")
self.__set_geometry(toplevel, 500, 150)
toplevel.resizable(False, False)
toplevel.iconbitmap(f"{clibs.PREFIX}/media/icon.ico")
toplevel.protocol("WM_DELETE_WINDOW", lambda tl=toplevel: close_toplevel(tl))
toplevel.bind("<Escape>", esc_quit_page)
toplevel.focus()
label_tips = ctk.CTkLabel(toplevel, width=100, text="页面号码", font=self.f_segbtn)
entry_tips = ctk.CTkEntry(toplevel, width=200, placeholder_text="Fill and press enter", font=self.f_toplevel)
entry_tips.bind("<Return>", enter_quit, add="+")
toplevel.rowconfigure(0, weight=1)
toplevel.columnconfigure([0, 1], weight=1)
label_tips.grid(row=0, column=0, padx=10, pady=10, sticky="w")
entry_tips.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="w")
def find_log(event):
def find_error(cdt):
clibs.insert_logdb("WARNING", "aio", f"查询条件 [{cdt}] 书写规则错误!")
msg = "可单独查询 ID/Level/Module也可以指定 ID 范围内查询指定的 Levels(多个条件使用冒号分割仅限debug, info, warning, error的其中之一或组合]) 和(或) Modules(多个条件使用冒号分割),且当同时查询 Levels 和 Modules 时Levels 位置在前!\n"
messagebox.showwarning(title="查询条件错误", message=msg)
return
kw = self.entry_keyword.get().strip()
if not kw:
return
match = re.search("^\\[.*]", kw)
if match:
condition = match.group().removeprefix("[").removesuffix("]").strip()
f_text_isnull = kw.removeprefix(match.group()).strip()
f_text = f"%{f_text_isnull}%"
if "-" in condition:
# PreDOs
conditions = condition.strip().split("-")
start = conditions[0].strip()
end = conditions[1].strip()
if not (start.isdigit() and end.isdigit()):
clibs.insert_logdb("WARNING", "aio", f"查询条件 {condition} 书写规则错误:指定 ID 范围参数非数字,需核对!")
messagebox.showwarning(title="查询条件错误", message="指定 ID 范围参数非数字,需核对!")
return
if len(conditions) == 2: # id range only
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and content like '{f_text}'")
elif len(conditions) == 3: # with id range, and level or module part
c_unknown = conditions[2].strip()
if ":" in c_unknown: # multi conditions
is_level = True
c_unknowns = c_unknown.split(":")
for c_unknown in c_unknowns:
if c_unknown.strip().upper() not in clibs.levels:
is_level = False
break
if is_level: # levels
levels = tuple(level.strip().upper() for level in c_unknowns)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and content like '{f_text}'")
else: # modules
modules = tuple(module.strip() for module in c_unknowns)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module in {modules} and content like '{f_text}'")
else: # single condition
if c_unknown.upper() in clibs.levels: # the third one is level
level = c_unknown.upper()
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and content like '{f_text}'")
else: # the third one is module
module = c_unknown
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and module = '{module}' and content like '{f_text}'")
elif len(conditions) == 4: # with id range, level and module
_levels = conditions[2].strip()
_modules = conditions[3].strip()
if ":" in _levels and ":" in _modules:
levels = tuple(level.strip().upper() for level in _levels.split(":"))
modules = tuple(module.strip() for module in _modules.split(":"))
for level in levels:
if level not in clibs.levels:
find_error(condition)
break
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module in {modules} and content like '{f_text}'")
elif ":" in _levels and ":" not in _modules:
levels = tuple(level.strip().upper() for level in _levels.split(":"))
module = _modules
for level in levels:
if level not in clibs.levels:
find_error(condition)
break
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level in {levels} and module = '{module}' and content like '{f_text}'")
elif ":" not in _levels and ":" in _modules:
level = _levels.upper()
modules = tuple(module.strip() for module in _modules.split(":"))
if level not in clibs.levels:
find_error(condition)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules}")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module in {modules} and content like '{f_text}'")
elif ":" not in _levels and ":" not in _modules:
level = _levels.upper()
module = _modules
if level not in clibs.levels:
find_error(condition)
if not f_text_isnull: # without content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}'")
else: # with content
clibs.cursor.execute(f"select * from logs where id between {int(start)} and {int(end)} and level = '{level}' and module = '{module}' and content like '{f_text}'")
else:
find_error(condition)
elif condition.isdigit():
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where id = {int(condition)}")
else:
clibs.cursor.execute(f"select * from logs where id = {int(condition)} and content like '{f_text}'")
elif condition.upper() in clibs.levels:
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}'")
else:
clibs.cursor.execute(f"select * from logs where level = '{condition.upper()}' and content like '{f_text}'")
else: # 模块名
if not f_text_isnull:
clibs.cursor.execute(f"select * from logs where module = '{condition}'")
else:
clibs.cursor.execute(f"select * from logs where module = '{condition}' and content like '{f_text}'")
else:
f_text = f"%{kw}%"
clibs.cursor.execute(f"select * from logs where content like '{f_text}'")
clibs.f_records = clibs.cursor.fetchall()
len_records = len(clibs.f_records)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
self.label_pages_logs.set(f"{pages_all} / {pages_all}")
self.treeview_logs.delete(*self.treeview_logs.get_children())
remainder = len_records % 100
records = clibs.f_records[-1*remainder:]
for record in records:
self.treeview_logs.insert("", "end", values=record, tags=record[2])
self.treeview_logs.yview_moveto(1)
self.btn_find.configure(fg_color="#000080")
def show_popupmenu_kw(event):
self.popupmenu_kw.post(event.x_root, event.y_root)
def show_popupmenu_ip(event):
self.popupmenu_ip.post(event.x_root, event.y_root)
# ========================================================================
self.root.rowconfigure(0, weight=1)
self.root.rowconfigure(1, weight=99)
self.root.columnconfigure(0, weight=1)
self.root.columnconfigure(1, weight=99)
self.frame_left.grid(row=0, column=0, rowspan=2, sticky="news")
self.tabview_top.grid(row=0, column=1, padx=10, pady=5, sticky="news")
self.tabview_bottom.grid(row=1, column=1, padx=10, pady=(0, 10), sticky="news")
self.frame_status.grid(row=2, column=0, columnspan=2, sticky="news")
# ========================================================================
self.frame_left.rowconfigure(list(range(30)), weight=1)
self.frame_left.columnconfigure(0, weight=1)
self.label_logo.grid(row=0, column=0, sticky="new", padx=20, pady=(20, 40))
self.btn_start.grid(row=1, column=0, sticky="new", padx=15, pady=1, ipady=4)
self.btn_reset_state.grid(row=2, column=0, sticky="new", padx=15, pady=(0, 1), ipady=4)
# ========================================================================
# self.tabview_top.tab("数据处理").grid_rowconfigure([0, 1], weight=1)
self.tabview_top.tab("数据处理").grid_columnconfigure(11, weight=1)
self.om_main_dp.grid(row=0, column=0, padx=10)
self.om_sub_dp.grid(row=1, column=0, padx=10, pady=(0, 10))
self.label_path_dp.grid(row=0, column=1, padx=(0, 5), pady=10)
self.entry_path_dp.grid(row=0, column=2, columnspan=10, padx=(0, 10), pady=10, sticky="we")
self.label_vel_dp.grid(row=1, column=1, padx=(0, 5), pady=(0, 10))
self.om_vel_dp.grid(row=1, column=2, padx=(0, 5), pady=(0, 10))
self.label_trq_dp.grid(row=1, column=3, padx=(0, 5), pady=(0, 10))
self.om_trq_dp.grid(row=1, column=4, padx=(0, 5), pady=(0, 10))
self.label_trqh_dp.grid(row=1, column=5, padx=(0, 5), pady=(0, 10))
self.om_trqh_dp.grid(row=1, column=6, padx=(0, 5), pady=(0, 10))
self.label_estop_dp.grid(row=1, column=7, padx=(0, 5), pady=(0, 10))
self.om_estop_dp.grid(row=1, column=8, padx=(0, 5), pady=(0, 10))
self.label_sensor_dp.grid(row=1, column=9, padx=(0, 5), pady=(0, 10))
self.om_sensor_dp.grid(row=1, column=10, padx=(0, 10), pady=(0, 10))
self.entry_path_dp.bind("<Button-1>", select_path)
self.om_vel_dp.set("1")
self.om_trq_dp.set("2")
self.om_trqh_dp.set("2")
self.om_estop_dp.set("3")
self.om_sensor_dp.set("3")
# ========================================================================
# self.tabview_top.tab("自动测试").grid_rowconfigure([0, 1, 2], weight=1)
self.tabview_top.tab("自动测试").grid_columnconfigure(4, weight=1)
self.om_main_at.grid(row=0, column=0, padx=10, pady=10, sticky="w")
self.om_sub_at.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="w")
self.label_ip_at.grid(row=0, column=1, padx=(0, 10), pady=10, sticky="e")
self.entry_ip_at.grid(row=0, column=2, padx=(0, 10), pady=10)
self.btn_conn.grid(row=0, column=3, padx=(0, 10), pady=10)
self.progressbar_at.grid(row=0, column=4, padx=(0, 10), pady=10, sticky="e")
self.label_path_at.grid(row=1, column=1, padx=(0, 10), pady=(0, 10), sticky="e")
self.entry_path_at.grid(row=1, column=2, columnspan=3, padx=(0, 10), pady=(0, 10), sticky="we")
self.frame_top.grid(row=2, column=0, columnspan=5, padx=0, pady=0, sticky="we")
self.btn_robot_state.grid(row=0, column=0, padx=10, pady=0)
self.btn_trig_estop.grid(row=0, column=1, padx=(0, 10), pady=0)
self.btn_reset_estop.grid(row=0, column=2, padx=(0, 10), pady=0)
self.progressbar_at.start()
self.progressbar_at.configure(progress_color="red", fg_color="gray")
self.entry_path_at.bind("<Button-1>", select_path)
self.entry_ip_at.bind("<Button-3>", show_popupmenu_ip, add="+")
# ========================================================================
self.tabview_bottom.tab("输出").grid_rowconfigure(0, weight=1)
self.tabview_bottom.tab("输出").grid_columnconfigure(0, weight=1)
self.text_output.grid(row=0, column=0, sticky="news")
# ========================================================================
self.tabview_bottom.tab("日志").grid_rowconfigure(0, weight=1)
self.tabview_bottom.tab("日志").grid_columnconfigure(6, weight=1)
self.treeview_logs.grid(row=0, column=0, columnspan=7, padx=(10, 0), pady=10, sticky="news")
self.y_scrollbar_logs.grid(row=0, column=7, pady=10, sticky="ns")
self.label_logs.grid(row=1, column=0, padx=(0, 10), pady=5, sticky="we")
self.btn_previous.grid(row=1, column=1, padx=(0, 10), pady=5, sticky="we")
self.btn_realtime.grid(row=1, column=2, padx=(0, 10), pady=5, sticky="we")
self.btn_next.grid(row=1, column=3, padx=(0, 10), pady=5, sticky="we")
self.btn_load.grid(row=1, column=4, padx=(0, 10), pady=5, sticky="we")
self.btn_find.grid(row=1, column=5, padx=(0, 10), pady=5, sticky="we")
self.entry_keyword.grid(row=1, column=6, columnspan=2, padx=0, pady=5, sticky="we")
self.treeview_logs.configure(yscrollcommand=self.y_scrollbar_logs.set)
self.treeview_logs.tag_configure("DEBUG", background="#708090")
self.treeview_logs.tag_configure("INFO", background="#F5F5F5") # #43CD80
self.treeview_logs.tag_configure("WARNING", background="#EEE8AA")
self.treeview_logs.tag_configure("ERROR", background="#CD5C5C")
for k, v in self.tv_cols.items():
self.treeview_logs.heading(f"#{k}", text=v[0].title(), anchor="w")
self.treeview_logs.column(f"#{k}", anchor="w", width=v[1])
self.treeview_logs.bind("<Double-1>", double_click)
self.label_logs.bind("<Button-1>", select_page)
self.btn_find.bind("<Button-1>", find_log)
self.entry_keyword.bind("<Return>", find_log)
self.entry_keyword.bind("<Button-3>", show_popupmenu_kw, add="+")
# ========================================================================
self.frame_status.rowconfigure(0, weight=1)
self.frame_status.columnconfigure([0, 1], weight=1)
self.label_vers.grid(row=0, column=0, sticky="news")
self.label_tips.grid(row=0, column=1, sticky="news")
# ========================================================================
# widgets = [self.om_sub_dp, self.om_vel_dp, self.om_trq_dp, self.om_trqh_dp, self.om_estop_dp, self.om_sensor_dp]
# for widgets in widgets:
# widgets.configure(state="disabled")
self.om_sub_dp.configure(state="disabled")
self.om_trqh_dp.configure(state="disabled")
self.om_sensor_dp.configure(state="disabled")
def show(self):
if self.server_vers:
self.root.mainloop()
if __name__ == "__main__":
aio = App()
aio.show()

View File

@ -1,215 +0,0 @@
# from tkinter import colorchooser
# from tkinter import font
# import tkinter as tk
# import threading
# import logging
# import time
# import os
import customtkinter as ctk
from tkinter import messagebox
from tkinter import filedialog
from tkinter import ttk
class App:
def __init__(self):
# ========================================================================
self.root = ctk.CTk()
self.__set_root()
# ========================================================================
# self.style = ttk.Style()
# self.style.configure("nb.TNotebook", font=("Consolas", 28, "normal"))
self.entry_path_dpv = ctk.StringVar()
self.entry_path_dpv.set("数据文件夹路径")
self.entry_path_atv = ctk.StringVar()
self.entry_path_atv.set("数据文件夹路径")
self.entry_ip_atv = ctk.StringVar()
self.entry_ip_atv.set("192.168.0.160")
# ========================================================================
self.frame_left = ctk.CTkFrame(self.root, width=80, corner_radius=0, fg_color="#E9E9E9")
# -------
self.tabview_top = ctk.CTkTabview(self.root, width=900, height=180, fg_color="#E9E9E9", border_width=2, anchor="w", command=self.__switch_tab)
self.tabview_top.add("数据处理")
self.tabview_top.add("自动测试")
# -------
self.tabview_bottom = ctk.CTkTabview(self.root, fg_color="#E9E9E9", border_width=2, anchor="w")
self.tabview_bottom.add("输出")
self.tabview_bottom.add("日志")
# -------
self.frame_status = ctk.CTkFrame(self.root, height=30, bg_color="purple")
# ========================================================================
self.label_logo = ctk.CTkLabel(self.frame_left, text="Rokae AIO", font=self.f_logo, text_color="#4F4F4F")
self.btn_start = ctk.CTkButton(self.frame_left, text="开始运行", font=self.f_normal, fg_color='#4F4F4F', command=lambda: print("---"))
self.btn_save_log = ctk.CTkButton(self.frame_left, text="保存日志", font=self.f_normal, fg_color='#4F4F4F', command=lambda: print("---"))
# ========================================================================
self.om_main_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=120, dynamic_resizing=False, values=["INIT", "brake", "current", "iso", "wavelog"], font=self.f_normal, text_color='#3C3C3C', button_color='#7B6B5B', fg_color="#8D8D8D", command=print)
self.om_sub_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=120, dynamic_resizing=False, values=["cycle", "max", "avg"], font=self.f_normal, text_color='#3C3C3C', button_color='#7B6B5B', fg_color="#8D8D8D", command=print)
self.label_path_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="Path", font=self.f_normal)
self.entry_path_dp = ctk.CTkEntry(self.tabview_top.tab("数据处理"), width=80, state="disabled", textvariable=self.entry_path_dpv, font=self.f_entry, text_color="#818181")
self.label_vel_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="Vel", font=self.f_normal)
self.om_vel_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_normal)
self.label_trq_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="Trq", font=self.f_normal)
self.om_trq_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_normal)
self.label_sensor_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="Sensor", font=self.f_normal)
self.om_sensor_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_normal)
self.label_estop_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="Estop", font=self.f_normal)
self.om_estop_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_normal)
self.label_trqh_dp = ctk.CTkLabel(self.tabview_top.tab("数据处理"), width=50, anchor="e", text="TrqH", font=self.f_normal)
self.om_trqh_dp = ctk.CTkOptionMenu(self.tabview_top.tab("数据处理"), width=80, values=["1", "2", "3", "4", "5"], font=self.f_normal)
# ========================================================================
self.om_main_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=120, dynamic_resizing=False, values=["INIT", "brake", "current"], font=self.f_normal, text_color='#3C3C3C', button_color='#7B6B5B', fg_color="#8D8D8D", command=print)
self.om_sub_at = ctk.CTkOptionMenu(self.tabview_top.tab("自动测试"), width=120, dynamic_resizing=False, values=["负载信息", "tool33", "tool66", "tool100", "inertia"], font=self.f_normal, text_color='#3C3C3C', button_color='#7B6B5B', fg_color="#8D8D8D", command=print)
self.label_ip_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), anchor="e", text="IP", font=self.f_normal)
self.entry_ip_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=160, textvariable=self.entry_ip_atv, font=self.f_entry, text_color="#818181")
self.btn_conn = ctk.CTkButton(self.tabview_top.tab("自动测试"), text="连接", width=60, font=self.f_segbtn)
self.progressbar_at = ctk.CTkProgressBar(self.tabview_top.tab("自动测试"), width=160, mode="indeterminate")
self.label_path_at = ctk.CTkLabel(self.tabview_top.tab("自动测试"), width=50, anchor="e", text="Path", font=self.f_normal)
self.entry_path_at = ctk.CTkEntry(self.tabview_top.tab("自动测试"), width=80, state="disabled", textvariable=self.entry_path_atv, font=self.f_entry, text_color="#818181")
self.segbutton_at = ctk.CTkSegmentedButton(self.tabview_top.tab("自动测试"), width=120, dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"], font=self.f_segbtn)
# ========================================================================
self.text_output = ctk.CTkTextbox(self.tabview_bottom.tab("输出"), height=10, font=self.f_text)
# ========================================================================
self.segbutton_logs = ctk.CTkSegmentedButton(self.tabview_bottom.tab("日志"), width=200, dynamic_resizing=False, values=["上一页", "实时", "下一页"], font=self.f_segbtn)
self.btn_find = ctk.CTkButton(self.tabview_bottom.tab("日志"), text="查找", width=60, font=self.f_segbtn)
self.entry_keyword = ctk.CTkEntry(self.tabview_bottom.tab("日志"), placeholder_text="[id-level] 查找内容", width=60, font=self.f_segbtn)
self.label_loglevel = ctk.CTkLabel(self.tabview_bottom.tab("日志"), width=60, anchor="e", text="等级", font=self.f_normal)
self.om_loglevel = ctk.CTkOptionMenu(self.tabview_bottom.tab("日志"), width=60, dynamic_resizing=False, values=["DEBUG", "INFO", "WARNING", "ERROR"], font=self.f_normal, text_color='#3C3C3C', button_color='#7B6B5B', fg_color="#8D8D8D", command=print)
self.treeview_logs = ttk.Treeview(self.tabview_bottom.tab("日志"), height=10, columns=("time", "level", "module", "content"))
# ========================================================================
with open("../../assets/version/vers", mode="r", encoding="utf-8") as f_vers:
_version, _update = f_vers.read().split("@")
_text = f" v{_version} Update@{_update}"
self.label_vers = ctk.CTkLabel(self.frame_status, text=_text, bg_color="#C9C9C9", font=self.f_status, anchor="w")
var_tips = ctk.StringVar()
var_tips.set("当前是最新版本!")
self.label_tips = ctk.CTkLabel(self.frame_status, textvariable=var_tips, bg_color="#ABCDEF", font=self.f_status, anchor="e")
# ========================================================================
self.__draw()
def __set_root(self):
self.root.title("AIO - All in one automatic toolbox")
self.root.iconbitmap("../../assets/media/icon.ico")
self.root.configure(fg_color="#E9E9E9")
self.root.geometry(f"1100x500+10+10")
# self.root.resizable(False, False)
# self.root.protocol("WM_DELETE_WINDOW", self.__close_root)
self.f_normal = ("Consolas", 20, "bold")
self.f_entry = ("Consolas", 18, "bold")
self.f_segbtn = ("楷体", 20, "bold")
self.f_logo = ("Segoe Script Bold", 28, "bold")
self.f_text = ("仿宋", 16, "normal")
self.f_status = ("Consolas", 12, "bold")
def __close_root(self):
msg = "相关数据可能未保存,正在运行程序时有概率会损坏数据文件,确定要终止程序运行吗?"
res = messagebox.askyesno(title="退出程序", message=msg, default=messagebox.NO, icon=messagebox.WARNING)
if res:
self.root.destroy()
def __switch_tab(self):
t_name = self.tabview_top.get()
if t_name == "数据处理":
self.tabview_top.configure(height=180)
elif t_name == "自动测试":
self.tabview_top.configure(height=180)
def __draw(self):
def select_path(event):
t_name = self.tabview_top.get()
dir_path = filedialog.askdirectory()
if t_name == "数据处理":
c_origin = self.entry_path_dpv.get()
if dir_path:
self.entry_path_dpv.set(dir_path)
else:
self.entry_path_dpv.set(c_origin)
elif t_name == "自动测试":
c_origin = self.entry_path_atv.get()
if dir_path:
self.entry_path_atv.set(dir_path)
else:
self.entry_path_atv.set(c_origin)
# ========================================================================
self.root.rowconfigure(0, weight=1)
self.root.rowconfigure(1, weight=99)
self.root.columnconfigure(0, weight=1)
self.root.columnconfigure(1, weight=99)
self.frame_left.grid(row=0, column=0, rowspan=2, sticky="news")
self.tabview_top.grid(row=0, column=1, padx=10, pady=5, sticky="news")
self.tabview_bottom.grid(row=1, column=1, padx=10, pady=(0, 10), sticky="news")
self.frame_status.grid(row=2, column=0, columnspan=2, sticky="news")
# ========================================================================
self.frame_left.rowconfigure(list(range(30)), weight=1)
self.frame_left.columnconfigure(0, weight=1)
self.label_logo.grid(row=0, column=0, sticky="new", padx=20, pady=(20, 40))
self.btn_start.grid(row=1, column=0, sticky="new", padx=15, ipady=4)
self.btn_save_log.grid(row=2, column=0, sticky="new", padx=15, ipady=4)
# ========================================================================
# self.tabview_top.tab("数据处理").grid_rowconfigure([0, 1], weight=1)
self.tabview_top.tab("数据处理").grid_columnconfigure(11, weight=1)
self.om_main_dp.grid(row=0, column=0, padx=10)
self.om_sub_dp.grid(row=1, column=0, padx=10, pady=(0, 10))
self.label_path_dp.grid(row=0, column=1, padx=(0, 5), pady=10)
self.entry_path_dp.grid(row=0, column=2, columnspan=10, padx=(0, 10), pady=10, sticky="we")
self.label_vel_dp.grid(row=1, column=1, padx=(0, 5), pady=(0, 10))
self.om_vel_dp.grid(row=1, column=2, padx=(0, 5), pady=(0, 10))
self.label_trq_dp.grid(row=1, column=3, padx=(0, 5), pady=(0, 10))
self.om_trq_dp.grid(row=1, column=4, padx=(0, 5), pady=(0, 10))
self.label_trqh_dp.grid(row=1, column=5, padx=(0, 5), pady=(0, 10))
self.om_trqh_dp.grid(row=1, column=6, padx=(0, 5), pady=(0, 10))
self.label_estop_dp.grid(row=1, column=7, padx=(0, 5), pady=(0, 10))
self.om_estop_dp.grid(row=1, column=8, padx=(0, 5), pady=(0, 10))
self.label_sensor_dp.grid(row=1, column=9, padx=(0, 5), pady=(0, 10))
self.om_sensor_dp.grid(row=1, column=10, padx=(0, 10), pady=(0, 10))
self.entry_path_dp.bind("<Button-1>", select_path)
self.om_main_dp.set("INIT")
self.om_vel_dp.set("1")
self.om_trq_dp.set("2")
self.om_trqh_dp.set("2")
self.om_estop_dp.set("3")
self.om_sensor_dp.set("3")
# ========================================================================
# self.tabview_top.tab("自动测试").grid_rowconfigure([0, 1, 2], weight=1)
self.tabview_top.tab("自动测试").grid_columnconfigure(4, weight=1)
self.om_main_at.grid(row=0, column=0, padx=10, pady=10, sticky="w")
self.om_sub_at.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="w")
self.label_ip_at.grid(row=0, column=1, padx=(0, 10), sticky="e")
self.entry_ip_at.grid(row=0, column=2)
self.btn_conn.grid(row=0, column=3, padx=10)
self.progressbar_at.grid(row=0, column=4, sticky="e")
self.label_path_at.grid(row=1, column=1, padx=(0, 10), sticky="e")
self.entry_path_at.grid(row=1, column=2, columnspan=3, sticky="we")
self.segbutton_at.grid(row=2, column=0, columnspan=5, sticky="we")
self.progressbar_at.start()
self.entry_path_at.bind("<Button-1>", select_path)
# ========================================================================
self.tabview_bottom.tab("输出").grid_rowconfigure(0, weight=1)
self.tabview_bottom.tab("输出").grid_columnconfigure(0, weight=1)
self.text_output.grid(row=0, column=0, padx=0, pady=1, sticky="news")
# ========================================================================
self.tabview_bottom.tab("日志").grid_rowconfigure(1, weight=1)
self.tabview_bottom.tab("日志").grid_columnconfigure(2, weight=1)
self.segbutton_logs.grid(row=0, column=0, padx=10, pady=(0, 10), sticky="w")
self.btn_find.grid(row=0, column=1, padx=(0, 10), pady=(0, 10), sticky="w")
self.entry_keyword.grid(row=0, column=2, padx=0, pady=(0, 10), sticky="we")
self.treeview_logs.grid(row=1, column=0, columnspan=3, padx=0, pady=1, sticky="news")
self.segbutton_logs.set("实时")
# ========================================================================
self.frame_status.rowconfigure(0, weight=1)
self.frame_status.columnconfigure([0, 1], weight=1)
self.label_vers.grid(row=0, column=0, sticky="news")
self.label_tips.grid(row=0, column=1, sticky="news")
# ========================================================================
def show(self):
self.root.mainloop()
if __name__ == "__main__":
aio = App()
aio.show()

93
code/common/clibs.py Normal file
View File

@ -0,0 +1,93 @@
import os
import os.path
import sqlite3
import threading
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not os.path.exists(path):
w2t(f"数据文件夹{path}不存在,请确认后重试......", "red", "PathNotExistError")
else:
dirs, files = [], []
for item in os.scandir(path):
if item.is_dir():
dirs.append(item.path.replace("\\", "/"))
elif item.is_file():
files.append(item.path.replace("\\", "/"))
return dirs, files
def init_logdb(connect, cur):
connect = sqlite3.connect(":memory:", isolation_level=None, check_same_thread=False, cached_statements=256)
# connect = sqlite3.connect("log.db", isolation_level=None, check_same_thread=False, cached_statements=256)
# time text default (datetime('now', 'localtime')),
cur = connect.cursor()
cur.execute(
"""
create table if not exists logs(
id integer primary key autoincrement,
time DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
level text,
module text,
content text
)
"""
)
return connect, cur
def insert_logdb(_level, _module, _content):
if db_state == "readwrite":
global conn, cursor
data = [_level, _module, repr(_content)]
cursor.execute("insert into logs (level, module, content) values (?, ?, ?)", data)
class GetThreadResult(threading.Thread):
def __init__(self, func, args=()):
super(GetThreadResult, self).__init__()
self.func = func
self.args = args
self.result = None
def run(self):
self.result = self.func(*self.args)
def get_result(self):
threading.Thread.join(self) # 等待线程执行完毕
try:
return self.result
except Exception as Err:
return None
# PREFIX = 'assets' # for pyinstaller packaging
PREFIX = '../assets' # for source code testing and debug
log_path = f"{PREFIX}/logs"
conn = None
cursor = None
levels = ["DEBUG", "INFO", "WARNING", "ERROR"]
db_state = "readwrite"
data_dp = {}
data_at = {}
w2t = None
running = False
stop = True
tl_prg = None
f_records = None
conn, cursor = init_logdb(conn, cursor)
for i in range(100):
insert_logdb("DEBUG", "clibs", 'this is a DEBUG log -0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.00021840051467521813,\n\t\t\t\t-0.0\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t"channel" : 4,\n\t\t\t"name" : "hw_joint_vel_feedback",\n\t\t\t"value" : \n\t\t\t[\n\t\t\t\t-0.0,\n\t\t\t\t-0.0\x04\x00,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n')
insert_logdb("INFO", "clibs", 'this is a INFO log -0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.00021840051467521813,\n\t\t\t\t-0.0\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t"channel" : 4,\n\t\t\t"name" : "hw_joint_vel_feedback",\n\t\t\t"value" : \n\t\t\t[\n\t\t\t\t-0.0,\n\t\t\t\t-0.0\x04\x00,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n')
insert_logdb("WARNING", "clibs", 'this is a WARNING log -0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.00021840051467521813,\n\t\t\t\t-0.0\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t"channel" : 4,\n\t\t\t"name" : "hw_joint_vel_feedback",\n\t\t\t"value" : \n\t\t\t[\n\t\t\t\t-0.0,\n\t\t\t\t-0.0\x04\x00,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n')
insert_logdb("ERROR", "clibs", 'this is a ERROR log -0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.00021840051467521813,\n\t\t\t\t-0.0\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t"channel" : 4,\n\t\t\t"name" : "hw_joint_vel_feedback",\n\t\t\t"value" : \n\t\t\t[\n\t\t\t\t-0.0,\n\t\t\t\t-0.0\x04\x00,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n\t\t\t\t-0.0,\n')
insert_logdb("DEBUG", "clibs", 'running')
insert_logdb("INFO", "clibs", 'running')
insert_logdb("WARNING", "clibs", 'running')
insert_logdb("ERROR", "clibs", 'running')

View File

@ -0,0 +1,5 @@
from common import clibs
def main():
print("brake")

View File

@ -0,0 +1,423 @@
import threading
import openpyxl
import pandas
import re
import csv
from common import clibs
def initialization(path, w2t, insert_logdb):
_, data_files = clibs.traversal_files(path, w2t)
count = 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "configs.xlsx":
count += 1
elif filename == "T_电机电流.xlsx":
...
else:
if not re.match("j[1-7].*\\.data", filename):
msg = f"不合规 {data_file}\n"
msg += "所有数据文件必须以 j[1-7]_ 开头,以 .data 结尾比如j1_abcdef.data\n配置文件需要命名为\"configs.xlsx\",结果文件需要命名为\"T_电机电流.xlsx\"\n"
msg += "需要有配置文件\"configs.xlsx\"表格,以及数据处理文件\"T_电机电流.xlsx\"表格,请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
if count != 1:
msg = "需要有配置文件\"configs.xlsx\"表格,以及数据处理文件\"T_电机电流.xlsx\"表格,请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
insert_logdb("INFO", "current", f"current: 获取必要文件:{data_files}")
return data_files
def current_max(data_files, rcs, trq, w2t, insert_logdb):
insert_logdb("INFO", "current", "MAX: 正在处理最大电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"MAX: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"MAX: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trq-1] # 获取 "device_servo_trq_feedback"
c_max = df[col].abs().max()
scale = 1000
_ = abs(c_max/scale*rca)
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"MAX: 获取到的列名为 {col},最大电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【MAX】数据处理完毕......")
insert_logdb("INFO", "current", f"MAX: 获取最大电流值结束 current_max = {current}")
return current
def current_avg(data_files, rcs, trq, w2t, insert_logdb):
insert_logdb("INFO", "current", "AVG: 正在处理平均电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"AVG: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"AVG: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trq-1]
c_std = df[col].std()
c_avg = df[col].mean()
scale = 1000
_ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"AVG: 获取到的列名为 {col},平均电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【AVG】数据处理完毕......\n")
insert_logdb("INFO", "current", f"AVG: 获取平均电流值结束 current_avg = {current}")
return current
def current_cycle(data_files, vel, trq, trqh, rrs, rcs, rpms, w2t, insert_logdb):
result, hold, single, scenario, dur_time = None, [], [], [], 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx":
result = data_file
elif re.match("j[1-7]_hold_.*\\.data", filename):
hold.append(data_file)
elif re.match("j[1-7]_s_.*\\.data", filename):
scenario.append(data_file)
dur_time = float(filename.split("_")[3])
elif re.match("j[1-7]_.*\\.data", filename):
single.append(data_file)
clibs.stop = True
w2t(f"正在打开文件 {result},需要 10s 左右......\n")
t_excel = clibs.GetThreadResult(openpyxl.load_workbook, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_progress = threading.Thread(target=clibs.tl_prg, args=("Processing......", ))
t_progress.daemon = True
t_progress.start()
wb = t_excel.get_result()
if hold:
avg = current_avg(hold, rcs, trqh, w2t, insert_logdb)
for axis, cur_value in avg.items():
sht_name = f"J{axis}"
wb[sht_name]["O4"].value = float(cur_value[0])
if dur_time == 0:
p_single(wb, single, vel, rrs, w2t, insert_logdb)
else:
p_scenario(wb, scenario, vel, rrs, dur_time, w2t, insert_logdb)
clibs.stop = True
w2t(f"正在保存文件 {result},需要 10s 左右......\n")
t_excel = threading.Thread(target=wb.save, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_excel.join()
clibs.stop = False
t_progress.join()
w2t("----------------------------------------------------------\n")
w2t("全部处理完毕")
def find_point(data_file, df, flag, row_s, row_e, threshold, step, end_point, skip_scale, axis, seq, w2t, insert_logdb):
if flag == "lt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都大于 2说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg < threshold:
insert_logdb("WARNING", "current", f"【lt】{axis} 轴第 {seq} 次查找数据有异常row_s = {row_s}, row_e = {row_e}")
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
continue
else:
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
elif flag == "gt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
# if axis == 1 and seq == 1:
# insert_logdb("DEBUG", "current", f"【gt】{axis} 轴speed_avg = {speed_avg}row_s = {row_s}, row_e = {row_e}")
if speed_avg > threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都小于 2说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg > threshold:
insert_logdb("WARNING", "current", f"【gt】{axis} 轴第 {seq} 次查找数据有异常row_s = {row_s}, row_e = {row_e}")
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
continue
else:
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
def get_row_number(threshold, flag, df, row_s, row_e, axis, insert_logdb):
count_1, count_2 = 0, 0
if flag == "start" or flag == "end":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number > threshold:
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
elif flag == "middle":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number < threshold: # 唯一的区别
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
places = {"start": "起点", "middle": "中间点", "end": "终点"}
insert_logdb("WARNING", "current", f"{axis} 轴获取{places[flag]}数据 {row_e} 可能有异常,需关注!")
return row_e
def p_single(wb, single, vel, rrs, w2t, insert_logdb):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single:
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
step = 50 # 步进值
end_point = 200 # 有效数值的数目
threshold = 2 # 200个点的平均阈值线
skip_scale = 2
row_start, row_middle, row_end = 0, 0, 0
row_e = df.index[-1]
row_s = row_e - end_point
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < 2:
# 第一次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第一次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第二次采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第三次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
elif speed_avg > 2:
# 第一次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第一次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次正式采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
insert_logdb("INFO", "current", f"{axis} 轴起点:{row_start}")
insert_logdb("INFO", "current", f"{axis} 轴中间点:{row_middle}")
insert_logdb("INFO", "current", f"{axis} 轴终点:{row_end}")
insert_logdb("INFO", "current", f"{axis} 轴数据非零段点数:{row_middle-row_start+1}")
insert_logdb("INFO", "current", f"{axis} 轴数据为零段点数:{row_end-row_middle+1}")
if abs(row_end+row_start-2*row_middle) > 1000:
insert_logdb("WARNING", "current", f"{axis} 轴数据占空比异常!")
data = []
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
data.append(df_origin.iloc[row, 2])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception as Err:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
cell.value = None
i += 1
def p_scenario(wb, scenario, vel, rrs, dur_time, w2t, insert_logdb):
for data_file in scenario:
cycle = 0.001
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
row_start = 3000
row_end = row_start + int(dur_time/cycle)
if row_end > df.index[-1]:
w2t(f"位置超限:{data_file} 共有 {df.index[-1]} 条数据,无法取到第 {row_end} 条数据,需要确认场景周期时间...", "red", "DataOverLimit")
data = []
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
data.append(df_origin.iloc[row, 2])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=250000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception as Err:
cell.value = None
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
i += 1
def get_configs(configfile, w2t, insert_logdb):
try:
wb = openpyxl.load_workbook(configfile, read_only=True)
ws = wb["Target"]
except Exception as Err:
insert_logdb("ERROR", "current", f"无法打开 {configfile},获取配置文件参数错误 {Err}")
w2t(f"无法打开 {configfile}", color="red", desc="OpenFileError")
# 最大角速度,额定电流,减速比,额定转速
rrs, avs, rcs, rpms = [], [], [], []
for i in range(2, 8):
rrs.append(abs(float(ws.cell(row=2, column=i).value)))
avs.append(abs(float(ws.cell(row=3, column=i).value)))
rpms.append(abs(float(ws.cell(row=4, column=i).value)))
rcs.append(abs(float(ws.cell(row=6, column=i).value)))
insert_logdb("INFO", "current", f"current: 获取减速比:{rrs}")
insert_logdb("INFO", "current", f"current: 获取角速度:{avs}")
insert_logdb("INFO", "current", f"current: 获取额定电流:{rcs}")
insert_logdb("INFO", "current", f"current: 获取额定转速:{rpms}")
return rrs, avs, rcs, rpms
def main():
sub = clibs.data_dp["_sub"]
path = clibs.data_dp["_path"]
vel = int(clibs.data_dp["_vel"])
trq = int(clibs.data_dp["_trq"])
trqh = int(clibs.data_dp["_trqh"])
w2t = clibs.w2t
insert_logdb = clibs.insert_logdb
insert_logdb("INFO", "current", "current: 参数初始化成功")
data_files = initialization(path, w2t, insert_logdb)
rrs, avs, rcs, rpms = get_configs(path + "\\configs.xlsx", w2t, insert_logdb)
if sub == "max":
current_max(data_files, rcs, trq, w2t, insert_logdb)
elif sub == "avg":
current_avg(data_files, rcs, trq, w2t, insert_logdb)
elif sub == "cycle":
current_cycle(data_files, vel, trq, trqh, rrs, rcs, rpms, w2t, insert_logdb)
if __name__ == '__main__':
main()

5
code/data_process/iso.py Normal file
View File

@ -0,0 +1,5 @@
from common import clibs
def main():
print("iso")

View File

@ -0,0 +1,5 @@
from common import clibs
def main():
print("wavelogger")

View File

@ -1,6 +1,43 @@
#### 其他
# 珞石测试部自动化工具
## 一、处理逻辑以及原理
### 1. 数据处理
#### A. 制动数据
#### B. 电机电流
#### C. 激光
#### D. 基恩士数据
## 二、自动测试
### 1. 协议封包解包
### 2. 制动测试
### 3. 电机电流测试
## 三、注意事项
## 四、发版记录
## 五、其他
### 1. 打包命令
打包时,只需要修改 clibs.py 中的 PREFIX 即可,调试时再修改回来
```
pyinstaller --noconfirm --onedir --windowed --optimize 2 --contents-directory . --upx-dir "D:/Syncthing/common/A_Program/upx-4.2.4-win64/" --add-data "../.venv/Lib/site-packages/customtkinter;customtkinter/" --add-data "../assets:assets" --version-file ../assets/files/version/file_version_info.txt -i ../assets/media/icon.ico ../code/aio.py -p ../code/data_process/brake.py -p ../code/data_process/iso.py -p ../code/data_process/current.py -p ../code/data_process/wavelogger.py -p ../code/commons/clibs.py
```
### 2. tabview 组件字体修改
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
a. 运行 `pip show customtkinter`,获取到库的路径
b. 修改.../windows/widgets/ctk_tabview.py
c. 增加 from .font.ctk_font import CTkFont
d. 在大概 78 行的位置,增加 font=CTkFont(family="Consolas", size=18, weight='bold')
a. 运行 `pip show customtkinter`,获取到库的路径
b. 修改.../windows/widgets/ctk_tabview.py
c. 增加 from .font.ctk_font import CTkFont
d. 在大概 78 行的位置,增加 font=CTkFont(family="Consolas", size=18, weight='bold')