import datetime import json import os import re import sqlite3 import sys import threading import time from urllib import request import os.path import matplotlib import matplotlib.pyplot as plt import pandas from matplotlib.widgets import Slider matplotlib.use('QtAgg') from PySide6.QtCore import Qt, QThread, Signal, QObject, QTimer from PySide6.QtGui import QTextCursor, QFont, QPixmap, QColor, QBrush, QIcon from PySide6.QtWidgets import QMessageBox, QCheckBox, QSplashScreen, QApplication, QFrame, QLabel, QTreeWidgetItem, QFileDialog, QHeaderView, QDialog, QVBoxLayout, QPlainTextEdit import codes.common.clibs as clibs import codes.common.openapi as openapi import codes.ui.main_window as main_window from codes.analysis import brake, current, wavelogger, iso from codes.autotest import do_current, do_brake from codes.durable import factory_test class ContentDialog(QDialog): def __init__(self, content, parent=None): super().__init__(parent) self.setWindowTitle("日志详情") self.setGeometry(100, 100, 700, 300) # 设置对话框在屏幕正中间 self.center_on_screen() # 创建布局 layout = QVBoxLayout(self) # 创建 QPlainTextEdit 并设置为只读 self.plain_text_edit = QPlainTextEdit() self.plain_text_edit.setReadOnly(True) self.plain_text_edit.setPlainText(content) # 添加到布局 layout.addWidget(self.plain_text_edit) def center_on_screen(self): # 获取屏幕尺寸 screen_geometry = QApplication.primaryScreen().geometry() screen_width = screen_geometry.width() screen_height = screen_geometry.height() # 获取对话框尺寸 dialog_width = self.width() dialog_height = self.height() # 计算位置 x = (screen_width - dialog_width) // 2 y = (screen_height - dialog_height) // 2 # 设置位置 self.move(x, y) class RunProg(QObject): completed = Signal(tuple) def __init__(self): super().__init__() def program(self, action): # 0: prog 1: idx prog, idx, reserved = action if idx in range(7): run = prog.processing elif idx == -1: run = prog.net_conn elif idx == -97: run = prog.do_draw elif idx == -98 or idx == -99: run = prog try: ret = run() self.completed.emit((True, prog, ret, "", idx, reserved)) # 运行是否成功/返回值/报错信息/idx except Exception as err: self.completed.emit((False, None, None, err, idx, reserved)) # 运行是否成功/返回值/报错信息/idx class MainWindow(main_window.Ui_MainWindow): action = Signal(tuple) def __init__(self): super(MainWindow, self).__init__() self.is_searching = False self.setupUi(self) self.pre_does() def pre_does(self): # ========================= treeview init ========================= self.cb_data_current.setDisabled(True) self.setup_statusbar() self.setWindowIcon(QIcon(f"{clibs.PREFIX}/media/icon.ico")) self.treew_log.header().setFont(QFont("Arial", 12, QFont.Weight.Bold)) header = self.treew_log.header() for i in range(self.treew_log.columnCount()): header.setSectionResizeMode(i, QHeaderView.ResizeMode.ResizeToContents) self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;") self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;") # ========================= stylesheet ========================= with open(f"{clibs.PREFIX}/qss/style.qss", mode="r", encoding="utf-8") as f_qss: style_sheet = f_qss.read() self.setStyleSheet(style_sheet) # ========================= clibs ========================= logger_handler = clibs.LoggerHandler() logger_handler.signal.connect(self.w2t) clibs.logger = logger_handler.logger # ========================= detection ========================= self.detection_thread = StateDetection() self.detection_thread.finished.connect(self.prog_done_detection) self.detection_thread.start() # ========================= debug ========================= # for i in range(105): # clibs.logger("DEBUG", "aio", "debug testing log for aio", color="gray") # clibs.logger("INFO", "clibs", "info testing log for clibs", color="green") # clibs.logger("WARNING", "iso", "warning testing log for aio", color="yellow") # # clibs.logger("ERROR", "openapi", "error testing log for aio", color="red") def run_program_thread(self, prog, idx, prog_done, reserved): if idx != -98 and idx != -97: self.tw_docs.setCurrentIndex(0) self.t = QThread(self) self.run = RunProg() self.run.moveToThread(self.t) self.run.completed.connect(prog_done) self.action.connect(self.run.program) self.t.start() self.action.emit((prog, idx, reserved)) def w2t(self, msg, color="black"): self.pte_output.appendHtml(f"{msg}") cursor = self.pte_output.textCursor() cursor.movePosition(QTextCursor.End) self.pte_output.setTextCursor(cursor) self.pte_output.ensureCursorVisible() self.update() def get_checkbox_states(self): cb_durable_states = {} container = self.sa_durable.widget() for checkbox in container.findChildren(QCheckBox): cb_durable_states[checkbox.text()] = checkbox.isChecked() return cb_durable_states def prog_start(self): def prog_done(results): flag, result, ret, error, idx, network = results clibs.running[idx] = 0 # if flag is False: # self.w2t(f"{clibs.functions[idx]}运行失败:{error}", "red") # elif flag is True: # ... if sum(clibs.running) > 0: if sum(clibs.running) == 1: QMessageBox.critical(self, "运行中", f"{clibs.functions[clibs.running.index(1)]}正在执行中,不可同时运行两个处理/测试程序!") return else: self.w2t(f"clibs.running = {clibs.running}", "red") self.w2t(f"clibs.functions = {clibs.functions}", "red") QMessageBox.critical(self, "严重错误", "理论上不允许同时运行两个处理程序,需要检查!") return if self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 0: # tab: 数据处理 | func: 制动数据处理 self.run_program_thread(brake.BrakeDataProcess(self.le_data_path.text()), 0, prog_done, None) elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 1: # tab: 数据处理 | func: 转矩数据处理 self.run_program_thread(current.CurrentDataProcess(self.le_data_path.text(), self.cb_data_current.currentText()), 1, prog_done, None) elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 2: # tab: 数据处理 | func: 激光数据处理 self.run_program_thread(iso.IsoDataProcess(self.le_data_path.text()), 2, prog_done, None) elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 3: # tab: 数据处理 | func: 精度数据处理 self.run_program_thread(wavelogger.WaveloggerDataProcess(self.le_data_path.text()), 3, prog_done, None) elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 0: # tab: 自动测试 | func: 制动测试 self.run_program_thread(do_brake.DoBrakeTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 4, prog_done, None) elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 1: # tab: 自动测试 | func: 转矩测试 self.run_program_thread(do_current.DoCurrentTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 5, prog_done, None) elif self.tw_funcs.currentIndex() == 2: # tab: 耐久采集 | func: 场景数据持久化 self.run_program_thread(factory_test.DoFactoryTest(self.le_durable_path.text(), self.le_durable_interval.text(), self.get_checkbox_states()), 6, prog_done, None) def prog_stop(self): if sum(clibs.running) == 0: return idx = clibs.running.index(1) clibs.running[idx] = 0 clibs.logger("INFO", "aio", f"{clibs.functions[idx]}程序已经停止,涉及Excel文件读写时可能会损坏该文件!", "red") # QMessageBox.warning(self, "停止运行", "运行过程中不建议停止运行,可能会损坏文件,如果确实需要停止运行,可以直接关闭窗口!") def prog_reset(self): self.tw_docs.setCurrentIndex(0) self.pte_output.clear() def file_browser(self): idx_dict = {0: self.le_data_path, 1: self.le_unit_path, 2: self.le_durable_path} dir_path = QFileDialog.getExistingDirectory() tab_index = self.tw_funcs.currentIndex() if dir_path: idx_dict[tab_index].setText(dir_path) def curve_draw(self): procs = self.get_checkbox_states() dir_path = self.le_durable_path.text() curve_map = { "周期内平均转矩": ["device_servo_trq_feedback", ], "周期内最大速度": ["hw_joint_vel_feedback", ], } ylabels = {"周期内最大速度": "速度(rad/s)", "周期内平均转矩": "转矩(Nm)"} plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.rcParams['figure.dpi'] = 100 plt.rcParams['font.size'] = 14 plt.rcParams['lines.marker'] = 'o' plt.rcParams["figure.autolayout"] = True if not os.path.exists(dir_path): self.w2t(f"数据文件夹{dir_path}不存在,请确认后重试......", "red") return for proc_name, is_enabled in procs.items(): if is_enabled: break else: self.w2t(f"需要选择至少一个功能,才能继续绘图......", "red") return _, files = clibs.traversal_files(dir_path) csv_files = [] for file in files: if file.endswith(".csv"): csv_files.append(file) if len(csv_files) == 0: self.w2t("程序未开始运行,暂无数据可以展示......", "red") return for proc_name, is_enabled in procs.items(): if not is_enabled: continue title = proc_name ylabel = ylabels[proc_name] fig, axes = plt.subplots(figsize=(10, 4.5), dpi=100) for curve in curve_map[proc_name]: cols = [f"{curve}_{i}" for i in range(6)] cols.insert(0, "time") try: df = pandas.read_csv(f"{dir_path}/{proc_name}.csv") except Exception as err: self.w2t(f"获取{dir_path}/{proc_name}.csv文件数据失败,需确认是否存在该文件......", "red") self.w2t(f"报错信息:{err}", "red") return for i in range((len(cols) - 1) // 6): plt.plot(df[cols[1]], label=f"一轴{curve_map[proc_name][i]}") plt.plot(df[cols[2]], label=f"二轴{curve_map[proc_name][i]}") plt.plot(df[cols[3]], label=f"三轴{curve_map[proc_name][i]}") plt.plot(df[cols[4]], label=f"四轴{curve_map[proc_name][i]}") plt.plot(df[cols[5]], label=f"五轴{curve_map[proc_name][i]}") plt.plot(df[cols[6]], label=f"六轴{curve_map[proc_name][i]}") axes.set_title(title) axes.set_ylabel(ylabel) axes.legend(loc="upper right") slider_position = plt.axes((0.1, 0.01, 0.8, 0.05), facecolor="blue") # (left, bottom, width, height) scrollbar = Slider(slider_position, 'Time', 1, int(len(df)), valstep=1) def update(val): pos = scrollbar.val axes.set_xlim([pos, pos + 10]) fig.canvas.draw_idle() scrollbar.on_changed(update) fig.tight_layout(rect=(0, 0.02, 0.96, 1)) # tuple (left, bottom, right, top) plt.show() def durable_cb_change(self): stat = True if self.cb_durable_total.isChecked() else False for cb in self.sa_durable.widget().findChildren(QCheckBox): cb.setChecked(stat) def display_tree_content(self, records): for record in records: item = QTreeWidgetItem(self.treew_log) for i in range(self.treew_log.columnCount()): item.setText(i, str(record[i])) item.setTextAlignment(i, Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter) self.treew_log.addTopLevelItem(item) color_map = {"DEBUG": QColor(220, 220, 220), "INFO": QColor(144, 238, 144), "WARNING": QColor(255, 240, 210), "ERROR": QColor(255, 220, 220)} brush = QBrush(color_map[record[2].upper()]) for i in range(self.treew_log.columnCount()): item.setBackground(i, brush) last_item = self.treew_log.topLevelItem(self.treew_log.topLevelItemCount() - 1) self.treew_log.scrollToItem(last_item) def prog_done_pre(self, results): flag, result, ret, error, idx, network = results if ret is None: return pages_all, current_page, records = ret self.treew_log.clear() self.display_tree_content(records) self.label_page.setText(f"{current_page}/{pages_all}") def pre_page(self): @clibs.db_lock def do_pre(): if self.is_searching is False: first_id = int(self.treew_log.topLevelItem(0).text(0)) end = first_id-1 if first_id-1 > 0 else None if end is None: return start = end-100+1 if end-100+1 > 0 else 1 clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}") records = clibs.cursor.fetchall() clibs.cursor.execute("SELECT COUNT(id) FROM logs") len_records = clibs.cursor.fetchone()[0] pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1 current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1 return pages_all, current_page, records elif self.is_searching is True: current_page, pages_all = self.label_page.text().split("/") if int(current_page)-1 == 0: return current_page = int(current_page) - 1 first_row = [] for i in range(self.treew_log.columnCount()): if i == 0: first_row.append(int(self.treew_log.topLevelItem(0).text(i))) else: first_row.append(self.treew_log.topLevelItem(0).text(i)) index_end = clibs.search_records.index(tuple(first_row)) if index_end == 0: return elif index_end <= 100: records = clibs.search_records[:index_end] else: records = clibs.search_records[index_end-100:index_end] return pages_all, current_page, records self.run_program_thread(do_pre, -98, self.prog_done_pre, None) def prog_done_realtime(self, results): flag, result, ret, error, idx, network = results pages_all, records = ret self.label_page.setText(f"{pages_all}/{pages_all}") self.treew_log.clear() self.display_tree_content(records) self.is_searching = False clibs.search_records = None def realtime_page(self): @clibs.db_lock def do_realtime(): clibs.cursor.execute("SELECT COUNT(id) from logs") len_records = clibs.cursor.fetchone()[0] pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1 remainder = len_records % 100 if len_records % 100 != 0 else 100 end = len_records start = len_records - remainder + 1 if len_records - remainder > 0 else 0 clibs.cursor.execute(f"select * from logs where id between {start} and {end}") records = clibs.cursor.fetchall() return pages_all, records self.run_program_thread(do_realtime, -98, self.prog_done_realtime, None) def prog_done_next(self, results): flag, result, ret, error, idx, network = results if ret is None: return pages_all, current_page, records = ret self.treew_log.clear() self.display_tree_content(records) self.label_page.setText(f"{current_page}/{pages_all}") def next_page(self): @clibs.db_lock def do_next(): clibs.cursor.execute("select count(id) from logs") len_records = clibs.cursor.fetchone()[0] if self.is_searching is False: line_number = self.treew_log.topLevelItemCount() last_id = int(self.treew_log.topLevelItem(line_number-1).text(0)) start = last_id + 1 if last_id % 100 == 0 else last_id - last_id % 100 + 1 end = int(start) + 100 - 1 if start <= len_records: clibs.cursor.execute(f"select * from logs where id between {start} and {end}") records = clibs.cursor.fetchall() pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1 current_page = start // 100 if start % 100 == 0 else start // 100 + 1 return pages_all, current_page, records elif self.is_searching is True: current_page, pages_all = self.label_page.text().split("/") if pages_all == current_page: return current_page = int(current_page) + 1 last_row = [] for i in range(self.treew_log.columnCount()): if i == 0: last_row.append(int(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i))) else: last_row.append(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i)) index_start = clibs.search_records.index(tuple(last_row)) + 1 if index_start > len(clibs.search_records): return elif index_start + 100 > len(clibs.search_records): records = clibs.search_records[index_start:] else: records = clibs.search_records[index_start:index_start+100] return pages_all, current_page, records self.run_program_thread(do_next, -98, self.prog_done_next, None) def show_item_content(self, item, column): content = " | ".join([item.text(i) for i in range(self.treew_log.columnCount())]) dialog = ContentDialog(content, self) dialog.exec() def switch_log_tab(self, index): if index == 1: # 切换到日志tab页时,自动执行实时日志功能 self.realtime_page() def prog_done_search(self, results): msg = "可单独查询 ID/Level/Module,也可以按照顺序组合查询指定条件,ID 范围使用 - 分隔,多个 Level 或者 Module 使用 : 分割,所有查询条件以 / 分隔并包含在 [] 内;如有关键字查询,则置于条件查询之后!" flag, result, ret, error, idx, network = results if ret is None: QMessageBox.warning(self, "查询条件错误", msg) self.realtime_page() return if ret == "nothing": return pages_all, records = ret self.treew_log.clear() self.display_tree_content(records) self.label_page.setText(f"{pages_all}/{pages_all}") self.is_searching = True def search_keyword(self): @clibs.db_lock def do_search(): one_more = "!@#123asd" kw = self.le_docs_search.text().strip() if kw == "": return "nothing" match = re.search("^\\[.*]", kw) if match: condition = match.group().removeprefix("[").removesuffix("]").strip() f_text = f"%{kw.removeprefix(match.group()).strip()}%" if condition == "" and f_text == "%%": return elif condition == "" and f_text != "%%": clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'") elif condition != "" and f_text == "%%": conditions = condition.split("/") if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一 c1 = conditions[0] if c1.isdigit() and int(c1) > 0: # 单独一个数字 clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)}") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}") else: for level in c1.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c1.split(":")) levels = (*levels, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels}") else: # 是模块 modules = tuple(module.strip() for module in c1.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules}") elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序 c1, c2 = conditions if c1.isdigit() and int(c1) > 0: # 单独一个数字 for level in c2.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels}") else: # 是模块 modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules}") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) for level in c2.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels}") else: # 是模块 modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules}") else: # c1是等级,c2是模块 for level in c1.split(":"): if level.strip().upper() not in clibs.levels: return levels = tuple(level.strip().upper() for level in c1.split(":")) levels = (*levels, one_more) modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules}") elif len(conditions) == 3: # 依次为id/level(s)/module(s) c1, c2, c3 = conditions for level in c2.split(":"): if level.strip().upper() not in clibs.levels: return levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) modules = tuple(module.strip() for module in c3.split(":")) modules = (*modules, one_more) if c1.isdigit() and int(c1) > 0: # 单独一个数字 clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules}") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules}") else: return else: return elif condition != "" and f_text != "%%": # 和上面基本一样,只不过加一个f_text的条件筛选 conditions = condition.split("/") if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一 c1 = conditions[0] if c1.isdigit() and int(c1) > 0: # 单独一个数字 clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND content LIKE '{f_text}'") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[ 1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int( c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND content LIKE '{f_text}'") else: for level in c1.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c1.split(":")) levels = (*levels, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND content LIKE '{f_text}'") else: # 是模块 modules = tuple(module.strip() for module in c1.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules} AND content LIKE '{f_text}'") elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序 c1, c2 = conditions if c1.isdigit() and int(c1) > 0: # 单独一个数字 for level in c2.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND content LIKE '{f_text}'") else: # 是模块 modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules} AND content LIKE '{f_text}'") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[ 1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int( c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) for level in c2.split(":"): if level.strip().upper() not in clibs.levels: is_level = False break else: is_level = True if is_level: # 是告警等级 levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) clibs.cursor.execute( f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND content LIKE '{f_text}'") else: # 是模块 modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute( f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules} AND content LIKE '{f_text}'") else: # c1是等级,c2是模块 for level in c1.split(":"): if level.strip().upper() not in clibs.levels: return levels = tuple(level.strip().upper() for level in c1.split(":")) levels = (*levels, one_more) modules = tuple(module.strip() for module in c2.split(":")) modules = (*modules, one_more) clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'") elif len(conditions) == 3: # 依次为id/level(s)/module(s) c1, c2, c3 = conditions for level in c2.split(":"): if level.strip().upper() not in clibs.levels: return levels = tuple(level.strip().upper() for level in c2.split(":")) levels = (*levels, one_more) modules = tuple(module.strip() for module in c3.split(":")) modules = (*modules, one_more) if c1.isdigit() and int(c1) > 0: # 单独一个数字 clibs.cursor.execute( f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'") elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[ 1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int( c1.split("-")[0]) > 0: # 1-4 这种格式 start = int(c1.split("-")[0]) end = int(c1.split("-")[1]) clibs.cursor.execute( f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'") else: return else: return else: f_text = f"%{kw}%" clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'") clibs.search_records = clibs.cursor.fetchall() len_records = len(clibs.search_records) pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1 remainder = len_records % 100 if len_records % 100 != 0 else 100 records = clibs.search_records[-1 * remainder:] return pages_all, records self.run_program_thread(do_search, -98, self.prog_done_search, None) def prog_done_conn(self, results): self.btn_hmi_conn.setDisabled(False) self.btn_md_conn.setDisabled(False) self.btn_ec_conn.setDisabled(False) flag, result, ret, error, idx, network = results if flag is False: self.w2t(f"{network.upper()}连接失败", "red") elif flag is True: clibs.status[network] = 1 if network == "hmi": self.btn_hmi_conn.setText("断开") self.hmi_state.setText(f'HR  ') clibs.c_hr = result elif network == "md": self.btn_md_conn.setText("断开") self.md_state.setText(f'MD  ') clibs.c_md = result elif network == "ec": self.btn_ec_conn.setText("断开") self.ec_state.setText(f'EC  ') clibs.c_ec = result def prog_done_disconn(self, results): self.btn_hmi_conn.setDisabled(False) self.btn_md_conn.setDisabled(False) self.btn_ec_conn.setDisabled(False) flag, result, ret, error, idx, network = results if flag is False: self.w2t(f"{network.upper()}断开连接失败", "red") elif flag is True: clibs.status[network] = 0 if network == "hmi": self.btn_hmi_conn.setText("连接") self.hmi_state.setText(f'HR  ') clibs.c_hr = None elif network == "md": self.btn_md_conn.setText("连接") self.md_state.setText(f'MD  ') clibs.c_md = None elif network == "ec": self.btn_ec_conn.setText("连接") self.ec_state.setText(f'EC  ') clibs.c_ec = None def hmi_conn(self): self.btn_hmi_conn.setDisabled(True) if self.btn_hmi_conn.text() == "连接": clibs.ip_addr = self.le_hmi_ip.text().strip() ip_pattern = re.compile(r"(([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])") if not ip_pattern.fullmatch(clibs.ip_addr): self.w2t(f"{clibs.ip_addr} 不是一个有效的 IP 地址", "red") return self.run_program_thread(openapi.HmiRequest(clibs.ip_addr, clibs.socket_port, clibs.xService_port), -1, self.prog_done_conn, "hmi") elif self.btn_hmi_conn.text() == "断开": self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi") def md_conn(self): if clibs.status["hmi"] == 0: QMessageBox.warning(self, "告警", "操作 Modbus 连接之前,需要先打开 HMI 连接!") return self.btn_md_conn.setDisabled(True) if self.btn_md_conn.text() == "连接": clibs.modbus_port = self.le_md_port.text().strip() self.run_program_thread(openapi.ModbusRequest(clibs.ip_addr, clibs.modbus_port), -1, self.prog_done_conn, "md") elif self.btn_md_conn.text() == "断开": self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md") def ec_conn(self): if clibs.status["hmi"] == 0: QMessageBox.warning(self, "告警", "操作 EC 连接之前,需要先打开 HMI 连接!") return self.btn_ec_conn.setDisabled(True) if self.btn_ec_conn.text() == "连接": clibs.external_port = self.le_ec_port.text().strip() self.run_program_thread(openapi.ExternalCommunication(clibs.ip_addr, clibs.external_port), -1, self.prog_done_conn, "ec") elif self.btn_ec_conn.text() == "断开": self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec") def hmi_page(self): self.pte_md_send.clear() self.pte_md_recv.clear() self.pte_ec_send.clear() self.pte_ec_recv.clear() self.tw_funcs.setCurrentIndex(3) self.sw_network.setCurrentIndex(0) self.hmi_btn.setStyleSheet("background-color: DarkCyan; color: white;") self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;") self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;") def md_page(self): self.pte_hmi_send.clear() self.pte_hmi_recv.clear() self.pte_ec_send.clear() self.pte_ec_recv.clear() self.tw_funcs.setCurrentIndex(3) self.sw_network.setCurrentIndex(1) self.hmi_btn.setStyleSheet("background-color: DarkGrey; color: white;") self.md_btn.setStyleSheet("background-color: DarkCyan; color: white;") self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;") def ec_page(self): self.pte_md_send.clear() self.pte_md_recv.clear() self.pte_hmi_send.clear() self.pte_hmi_recv.clear() self.tw_funcs.setCurrentIndex(3) self.sw_network.setCurrentIndex(2) self.hmi_btn.setStyleSheet("background-color: DarkGrey; color: white;") self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;") self.ec_btn.setStyleSheet("background-color: DarkCyan; color: white;") def prog_done_hmi_send(self, results): _, result, ret, error, idx, (msg_id, flag) = results if _ is False: self.btn_hmi_send.setDisabled(False) clibs.logger("INFO", "aio", f"hmi: [send] 请求发送失败 {msg_id}", "red") return records = clibs.c_hr.get_from_id(msg_id) for record in records: if "请求发送成功" not in record[0]: self.pte_hmi_recv.clear() response = eval(record[0]) if flag == 0 else json.loads(record[0]) self.pte_hmi_recv.appendPlainText(json.dumps(response, indent=4, separators=(",", ":"))) else: self.btn_hmi_send.setDisabled(False) def hmi_send(self): def hmi_send_thread(): nonlocal hmi_dict, cmd_json, flag if flag == 0: clibs.c_hr.c.send(clibs.c_hr.package(cmd_json)) clibs.c_hr.logger("DEBUG", "aio", f"hmi: [send] 老协议请求发送成功 {cmd_json}") elif flag == 1: clibs.c_hr.c_xs.send(clibs.c_hr.package_xs(hmi_dict)) clibs.c_hr.logger("DEBUG", "aio", f"hmi: xService请求发送成功 {cmd_json}") if clibs.status["hmi"] == 0: QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 HMI 连接!") return if self.pte_hmi_send.toPlainText() == "": return self.btn_hmi_send.setDisabled(True) hmi_dict = json.loads(self.pte_hmi_send.toPlainText()) t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f") msg_id = hmi_dict["id"] = "@".join([hmi_dict["id"].split("@")[0], t]) self.pte_hmi_send.clear() self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":"))) flag = hmi_dict["p_type"] del hmi_dict["p_type"] cmd_json = json.dumps(hmi_dict, separators=(",", ":")) self.run_program_thread(hmi_send_thread, -99, self.prog_done_hmi_send, (msg_id, flag)) def md_send(self): if clibs.status["md"] == 0: QMessageBox.critical(self, "连接错误", "使用该功能之前,需要先打开 Modbus 连接!") return if self.pte_md_send.toPlainText() == "": return self.pte_md_recv.clear() content = self.pte_md_send.toPlainText().split("\n") if content[0].strip().startswith("sta"): for item in content[1:]: addr = int(item.split(":")[0].strip()) count = int(item.split(":")[1].strip()) value_type = item.split(":")[2].strip() if value_type == "bool": try: result = clibs.c_md.c.read_holding_registers(addr, count=count).registers[0] self.pte_md_recv.appendPlainText(str(result)) except Exception as err: self.pte_md_recv.appendPlainText(f"获取失败:{err}") return elif content[0].strip().startswith("ctrl"): for item in content[1:]: addr = int(item.split(":")[0].strip()) value = int(item.split(":")[1].strip()) try: clibs.c_md.c.write_register(addr, value) time.sleep(clibs.INTERVAL/4) except Exception as err: self.pte_md_recv.appendPlainText(f"操作失败:{err}") return else: self.pte_md_recv.appendPlainText("操作成功!") else: QMessageBox.critical(self, "格式错误", "非法的发送内容,自定义发送需参考已有的格式!") def ec_send(self): if clibs.status["ec"] == 0: QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 EC 连接!") return if self.pte_ec_send.toPlainText() == "": return self.pte_ec_recv.clear() cmd = self.pte_ec_send.toPlainText().strip() try: result = clibs.c_ec.sr_string(cmd) self.pte_ec_recv.appendPlainText(str(result)) except Exception as err: self.pte_ec_recv.appendPlainText(f"操作失败:{err}") def hmi_cb_change(self): cmd = self.cb_hmi_cmd.currentText() self.pte_hmi_send.clear() self.pte_hmi_recv.clear() with open(f"{clibs.PREFIX}/files/protocols/hmi/{cmd}.json", mode="r", encoding="utf-8") as f_hmi: hmi_dict = json.load(f_hmi) t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f") hmi_dict["id"] = "@".join([cmd, t]) self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":"))) def md_cb_change(self): cmd = self.cb_md_cmd.currentText() self.pte_md_send.clear() self.pte_md_recv.clear() self.pte_md_send.appendPlainText(cmd) with open(f"{clibs.PREFIX}/files/protocols/md/{cmd}.txt", mode="r", encoding="utf-8") as f_md: c_send = f_md.read() self.pte_md_send.appendPlainText(c_send) def ec_cb_change(self): cmd = self.cb_ec_cmd.currentText() self.pte_ec_send.clear() self.pte_ec_recv.clear() with open(f"{clibs.PREFIX}/files/protocols/ec/{cmd}.txt", mode="r", encoding="utf-8") as f_md: c_send = f_md.read() self.pte_ec_send.appendPlainText(c_send) def data_cb_change(self): proc = self.cb_data_func.currentText() if proc != "转矩": self.cb_data_current.setDisabled(True) else: self.cb_data_current.setDisabled(False) def check_interval(self): try: interval = float(self.le_durable_interval.text()) interval = clibs.CYCLE if interval < clibs.CYCLE else int(interval) except Exception: interval = clibs.CYCLE self.le_durable_interval.setText(str(interval)) def prog_done_detection(self, finished): if clibs.status["hmi"] == 0: self.btn_hmi_conn.setText("连接") self.hmi_state.setText(f'HR  ') if clibs.status["md"] == 0: self.btn_md_conn.setText("连接") self.md_state.setText(f'MD  ') if clibs.status["ec"] == 0: self.btn_ec_conn.setText("连接") self.ec_state.setText(f'EC  ') # ============= Program running status ============= if sum(clibs.running) == 1: self.run_state.setText(f'BUSY') else: self.run_state.setText(f'IDLE') def closeEvent(self, event): idx = -1 if clibs.running.count(1) == 0 else clibs.running.index(1) info_text = "当前无程序正在运行,可放心退出!" if idx == -1 else f"当前正在运行{clibs.functions[idx]},确认退出?" reply = QMessageBox.question(self, "退出", info_text) if reply == QMessageBox.Yes: os.chdir(clibs.log_path) t = datetime.datetime.now().strftime("%Y%m%d%H%M%S") disk_conn = sqlite3.connect(f"log_{t}.db", isolation_level=None, check_same_thread=False, cached_statements=256) clibs.conn.backup(target=disk_conn, pages=1, progress=None) _, logs = clibs.traversal_files(".") logs.sort() while len(logs) > 10: _ = logs.pop(0) os.remove(_) if clibs.status["md"] == 1: self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md") if clibs.status["ec"] == 1: self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec") if clibs.status["hmi"] == 1: self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi") event.accept() else: event.ignore() def setup_statusbar(self): with open(f"{clibs.PREFIX}/files/version/local_vers", mode="r", encoding="utf-8") as f_local: local_vers = f_local.read().strip() l_version, update = local_vers.split("@") vers_info = f" v{l_version} Update@{update}" with open(f"{clibs.PREFIX}/files/version/server_vers", mode="r", encoding="utf-8") as f_server: server_vers = f_server.read().strip() self.update_label, self.hmi_state, self.md_state, self.ec_state, self.run_state, self.version_label = QLabel(), ClickableLabel(), ClickableLabel(), ClickableLabel(), QLabel(), QLabel() self.statusbar.addWidget(self.version_label, 0) self.statusbar.addWidget(self.hmi_state, 0) self.statusbar.addWidget(self.md_state, 0) self.statusbar.addWidget(self.ec_state, 0) self.statusbar.addWidget(self.run_state, 0) self.statusbar.addPermanentWidget(self.update_label, 0) # 添加到右侧 self.hmi_state.clicked.connect(self.hmi_page) self.md_state.clicked.connect(self.md_page) self.ec_state.clicked.connect(self.ec_page) if local_vers == server_vers: self.update_label.setText(f' Current is the latest version!') elif local_vers > server_vers: pass elif local_vers < server_vers: self.update_label.setText(f''' v{server_vers.split('@')[0]} has been released, update ASAP!''') self.version_label.setText(f'{vers_info}  ') self.hmi_state.setText(f'HR  ') self.md_state.setText(f'MD  ') self.ec_state.setText(f'EC  ') self.run_state.setText(f'IDLE') self.update_label.setOpenExternalLinks(True) # 允许超链接在浏览器中打开 class StateDetection(QThread): finished = Signal(bool) def __init__(self, /): super().__init__() def run(self): while True: time.sleep(clibs.INTERVAL*2) # ============= HMI connection status ============= try: clibs.c_hr.execution("controller.heart") except: clibs.status["hmi"] = 0 # ============= MD connection status ============= try: clibs.c_md.c.read_holding_registers(40503, count=1).registers[0] except: clibs.status["md"] = 0 # ============= EC connection status ============= try: clibs.c_ec.sr_string("controller_is_running") except: clibs.status["ec"] = 0 self.finished.emit(True) class ClickableLabel(QLabel): def __init__(self, parent=None): super().__init__(parent) def mousePressEvent(self, event): self.clicked.emit() clicked = Signal() class InitWork(QThread): completed = Signal(str) def program(self, action): url_vers, server_vers = "https://www.rustle.cc/server_vers", "" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'} req = request.Request(url_vers, headers=headers) response = request.urlopen(req, timeout=clibs.INTERVAL * 10) server_vers = response.read().decode("utf-8").strip() with open(f"{clibs.PREFIX}/files/version/server_vers", mode="w", encoding="utf-8") as f_server: f_server.write(server_vers) self.completed.emit("true") except Exception as err: print(f"err = {err}") clibs.cursor.close() clibs.conn.close() self.completed.emit("false") class SplashScreen(QApplication): action = Signal(int) def __init__(self, argv): super().__init__(argv) self.window = None pixmap = QPixmap(f"{clibs.PREFIX}/media/splash.png") self.splash = QSplashScreen(pixmap, Qt.WindowType.WindowStaysOnTopHint) scaled_pixmap = pixmap.scaled(800, 400, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation) self.splash.setPixmap(scaled_pixmap) self.splash.setEnabled(False) # 禁用交互 self.splash.setFont(QFont("Arial", 12)) self.splash.show() self.splash.showMessage("正在加载资源.....", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white) # self.t = QThread(self) # self.run = InitWork() # self.run.moveToThread(self.t) # self.run.completed.connect(self.prog_done) # self.action.connect(self.run.program) # self.t.start() # self.action.emit(1) # without validation of server version self.prog_done("true") def prog_done(self, result): if result == "false" or result == "": self.splash.showMessage("网络错误,无法连接至服务器,确认当前网络环境是否可以访问www.rustle.cc......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white) time.sleep(clibs.INTERVAL*2) self.splash.close() sys.exit() elif result == "true": self.window = MainWindow() self.splash.showMessage("初始化完成,即将进入主界面......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white) QTimer.singleShot(1000, lambda: (self.splash.finish(self.window), self.window.show())) if __name__ == '__main__': app = SplashScreen(sys.argv) sys.exit(app.exec())