287 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from PySide6.QtWidgets import QWidget, QLabel, QMessageBox, QVBoxLayout, QTreeWidget, QHeaderView, QHBoxLayout, QPushButton, QFrame, QLineEdit, QCheckBox, QTreeWidgetItem, QDialog, QPlainTextEdit, QApplication
 | 
						|
from PySide6.QtCore import Qt, Signal, QSize
 | 
						|
from PySide6.QtGui import QColor, QIcon, QKeySequence, QIntValidator, QShortcut
 | 
						|
 | 
						|
from codes.common.signal_bus import signal_bus
 | 
						|
from codes.common import db_operation
 | 
						|
from codes.common import clibs
 | 
						|
from codes.common.qss_reloader import qss_reloader
 | 
						|
 | 
						|
 | 
						|
class LogDialog(QDialog):
 | 
						|
    def __init__(self, content, parent=None):
 | 
						|
        super().__init__(parent)
 | 
						|
        self.setWindowTitle("日志详情")
 | 
						|
        self.setGeometry(100, 100, 700, 300)
 | 
						|
        self.center_on_screen()
 | 
						|
        layout = QVBoxLayout(self)
 | 
						|
        self.plain_text_edit = QPlainTextEdit()
 | 
						|
        self.plain_text_edit.setReadOnly(True)
 | 
						|
        self.plain_text_edit.setPlainText(content)
 | 
						|
        layout.addWidget(self.plain_text_edit)
 | 
						|
 | 
						|
    def center_on_screen(self):
 | 
						|
        screen_geometry = QApplication.primaryScreen().geometry()
 | 
						|
        screen_width = screen_geometry.width()
 | 
						|
        screen_height = screen_geometry.height()
 | 
						|
        dialog_width = self.width()
 | 
						|
        dialog_height = self.height()
 | 
						|
        x = (screen_width - dialog_width) // 2
 | 
						|
        y = (screen_height - dialog_height) // 2
 | 
						|
        self.move(x, y)
 | 
						|
 | 
						|
 | 
						|
class PageNumberInput(QDialog):
 | 
						|
    def __init__(self, parent=None):
 | 
						|
        super().__init__(parent)
 | 
						|
        self.init_ui()
 | 
						|
        self.setup_slot()
 | 
						|
 | 
						|
    def setup_slot(self):
 | 
						|
        QShortcut(QKeySequence("Esc"), self).activated.connect(self.reject)
 | 
						|
        self.le_page_number.returnPressed.connect(self.accept)
 | 
						|
        self.le_page_number.returnPressed.connect(self.get_page_number)
 | 
						|
 | 
						|
    def init_ui(self):
 | 
						|
        self.setMinimumSize(300,100)
 | 
						|
        self.setMaximumSize(400,120)
 | 
						|
        self.resize(300, 100)
 | 
						|
        self.setWindowIcon(QIcon(clibs.icon))
 | 
						|
        self.setWindowTitle("输入页码")
 | 
						|
        layout_h = QHBoxLayout()
 | 
						|
        layout_h.addStretch(1)
 | 
						|
        self.le_page_number = QLineEdit(self)
 | 
						|
        self.le_page_number.setText("1")
 | 
						|
        self.le_page_number.selectAll()
 | 
						|
        self.le_page_number.setValidator(QIntValidator(0, 9999999, self))
 | 
						|
        layout_h.addWidget(self.le_page_number, stretch=4)
 | 
						|
        layout_h.addStretch(1)
 | 
						|
        self.setLayout(layout_h)
 | 
						|
 | 
						|
    def get_page_number(self):
 | 
						|
        text = self.le_page_number.text()
 | 
						|
        return 1 if text == 0 else int(text)
 | 
						|
 | 
						|
 | 
						|
class ClickableLabel(QLabel):
 | 
						|
    clicked = Signal()
 | 
						|
 | 
						|
    def mousePressEvent(self, event):
 | 
						|
        if event.button() == Qt.MouseButton.LeftButton:
 | 
						|
            self.clicked.emit()
 | 
						|
        super().mousePressEvent(event)
 | 
						|
 | 
						|
 | 
						|
class W08Log(QWidget):
 | 
						|
    def __init__(self, parent=None):
 | 
						|
        super().__init__(parent)
 | 
						|
        self.pre_do()
 | 
						|
        self.ui_init()
 | 
						|
        self.post_do()
 | 
						|
 | 
						|
    def pre_do(self):
 | 
						|
        self.records, self.len_records = "", ""
 | 
						|
        self.is_searching = False
 | 
						|
        self.max_item_number = clibs.config["log_number_per_page"]
 | 
						|
 | 
						|
    def ui_init(self):
 | 
						|
        self.setObjectName("W08Log")
 | 
						|
        layout_v = QVBoxLayout(self)
 | 
						|
        self.treeW = QTreeWidget()
 | 
						|
        self.treeW.setObjectName("treeW")
 | 
						|
        self.treeW.setHeaderLabels(["ID", "时间戳", "告警级别", "模块信息", "告警内容"])
 | 
						|
        self.treeW.headerItem().setTextAlignment(0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
 | 
						|
        self.header = self.treeW.header()
 | 
						|
        self.header.setObjectName("header")
 | 
						|
        for i in range(self.treeW.columnCount()):
 | 
						|
            self.header.setSectionResizeMode(i, QHeaderView.ResizeMode.ResizeToContents)
 | 
						|
        layout_v.addWidget(self.treeW, stretch=9)
 | 
						|
 | 
						|
        layout_h = QHBoxLayout()
 | 
						|
        self.pb_previous = QPushButton("上一页")
 | 
						|
        self.pb_previous.setObjectName("pb_previous")
 | 
						|
        layout_h.addWidget(self.pb_previous, stretch=1)
 | 
						|
 | 
						|
        self.lb_page = ClickableLabel("999999/999999")
 | 
						|
        self.lb_page.setObjectName("lb_page")
 | 
						|
        self.lb_page.setAlignment(Qt.AlignmentFlag.AlignCenter)
 | 
						|
        self.lb_page.setMinimumWidth(144)
 | 
						|
        layout_h.addWidget(self.lb_page, stretch=2)
 | 
						|
 | 
						|
        self.pb_next = QPushButton("下一页")
 | 
						|
        self.pb_next.setObjectName("pb_next")
 | 
						|
        layout_h.addWidget(self.pb_next, stretch=1)
 | 
						|
 | 
						|
        layout_h.addStretch(9)
 | 
						|
 | 
						|
        self.frame_checkbox = QFrame()
 | 
						|
        self.frame_checkbox.setObjectName("frame_checkbox")
 | 
						|
        layout_h_checkbox = QHBoxLayout()
 | 
						|
        self.box_info = QCheckBox("通知", parent=self.frame_checkbox)
 | 
						|
        self.box_info.setObjectName("box_info")
 | 
						|
        self.box_info.setChecked(True)
 | 
						|
        layout_h_checkbox.addWidget(self.box_info, stretch=1)
 | 
						|
        self.box_warning = QCheckBox("告警", parent=self.frame_checkbox)
 | 
						|
        self.box_warning.setObjectName("box_warning")
 | 
						|
        self.box_warning.setChecked(True)
 | 
						|
        layout_h_checkbox.addWidget(self.box_warning, stretch=1)
 | 
						|
        self.box_error = QCheckBox("错误", parent=self.frame_checkbox)
 | 
						|
        self.box_error.setObjectName("box_error")
 | 
						|
        self.box_error.setChecked(True)
 | 
						|
        layout_h_checkbox.addWidget(self.box_error, stretch=1)
 | 
						|
        self.box_exception = QCheckBox("异常", parent=self.frame_checkbox)
 | 
						|
        self.box_exception.setObjectName("box_exception")
 | 
						|
        self.box_exception.setChecked(True)
 | 
						|
        layout_h_checkbox.addWidget(self.box_exception, stretch=1)
 | 
						|
        self.box_unknown = QCheckBox("未知", parent=self.frame_checkbox)
 | 
						|
        self.box_unknown.setObjectName("box_unknown")
 | 
						|
        self.box_unknown.setChecked(True)
 | 
						|
        layout_h_checkbox.addWidget(self.box_unknown, stretch=1)
 | 
						|
        layout_h.addLayout(layout_h_checkbox, stretch=4)
 | 
						|
 | 
						|
        self.le_search = QLineEdit()
 | 
						|
        self.le_search.setObjectName("le_search")
 | 
						|
        self.le_search.setPlaceholderText("告警内容")
 | 
						|
        self.le_search.setMinimumWidth(300)
 | 
						|
        layout_h.addWidget(self.le_search, stretch=5)
 | 
						|
 | 
						|
        self.pb_search = QPushButton("查找")
 | 
						|
        self.pb_search.setObjectName("pb_search")
 | 
						|
        layout_h.addWidget(self.pb_search, stretch=1)
 | 
						|
 | 
						|
        layout_v.addLayout(layout_h, stretch=1)
 | 
						|
 | 
						|
        self.setLayout(layout_v)
 | 
						|
 | 
						|
    def post_do(self):
 | 
						|
        qss_reloader.register(clibs.qss_w08_log, self)
 | 
						|
        self.setup_slot()
 | 
						|
        self.setup_sc()
 | 
						|
 | 
						|
    def setup_slot(self):
 | 
						|
        self.treeW.itemDoubleClicked.connect(self.show_single_log)
 | 
						|
        self.pb_previous.clicked.connect(self.previous_page)
 | 
						|
        self.pb_next.clicked.connect(self.next_page)
 | 
						|
        self.pb_search.clicked.connect(self.search_page)
 | 
						|
        self.le_search.returnPressed.connect(self.search_page)
 | 
						|
        self.lb_page.clicked.connect(self.goto_page)
 | 
						|
        signal_bus.stacked_page_switch_log.connect(self.show_latest_page)
 | 
						|
 | 
						|
    def setup_sc(self):
 | 
						|
        ...
 | 
						|
 | 
						|
    def previous_page(self):
 | 
						|
        if not self.is_searching:
 | 
						|
            self.records, self.len_records = db_operation.db_query_logs(None)
 | 
						|
        current, total = self.lb_page.text().split("/")
 | 
						|
        page_number = int(current) - 1
 | 
						|
        self.show_page(self.records, self.len_records, page_number=page_number)
 | 
						|
 | 
						|
    def next_page(self):
 | 
						|
        if not self.is_searching:
 | 
						|
            self.records, self.len_records = db_operation.db_query_logs(None)
 | 
						|
        current, total = self.lb_page.text().split("/")
 | 
						|
        page_number = int(current) + 1
 | 
						|
        self.show_page(self.records, self.len_records, page_number=page_number)
 | 
						|
 | 
						|
    def search_page(self):
 | 
						|
        filters = {"info": self.box_info.isChecked(), "warning": self.box_warning.isChecked(), "error": self.box_error.isChecked(), "exception": self.box_exception.isChecked(), "unknown": self.box_unknown.isChecked()}
 | 
						|
        search_text = self.le_search.text().strip()
 | 
						|
        flag, levels = False, []
 | 
						|
        for level, enable in filters.items():
 | 
						|
            if not enable:
 | 
						|
                continue
 | 
						|
            flag = True
 | 
						|
            levels.append(level)
 | 
						|
 | 
						|
        if not flag:
 | 
						|
            QMessageBox().warning(None, "警告", "至少选择一个过滤器!")
 | 
						|
            return
 | 
						|
 | 
						|
        self.records, self.len_records = db_operation.db_query_logs(levels)
 | 
						|
        if search_text:
 | 
						|
            self.records, self.len_records = db_operation.db_query_logs(search_text, self.records)
 | 
						|
 | 
						|
        self.is_searching = True
 | 
						|
        self.show_page(self.records, self.len_records, page_number=None)
 | 
						|
 | 
						|
    def goto_page(self):
 | 
						|
        dlg = PageNumberInput()
 | 
						|
        if dlg.exec() != 1:
 | 
						|
            return
 | 
						|
 | 
						|
        page_number = dlg.get_page_number()
 | 
						|
        if not self.is_searching:
 | 
						|
            self.records, self.len_records = db_operation.db_query_logs(None)
 | 
						|
        self.show_page(self.records, self.len_records, page_number=page_number)
 | 
						|
 | 
						|
    def show_latest_page(self):
 | 
						|
        self.records, self.len_records = db_operation.db_query_logs(None)
 | 
						|
        self.is_searching = False
 | 
						|
        self.box_info.setChecked(True)
 | 
						|
        self.box_warning.setChecked(True)
 | 
						|
        self.box_error.setChecked(True)
 | 
						|
        self.box_exception.setChecked(True)
 | 
						|
        self.box_unknown.setChecked(True)
 | 
						|
        self.le_search.clear()
 | 
						|
        self.show_page(self.records, self.len_records, page_number=None)
 | 
						|
 | 
						|
    def show_page(self, records, len_records, page_number: int | None):
 | 
						|
        if len_records == 0:
 | 
						|
            self.treeW.clear()
 | 
						|
            return
 | 
						|
 | 
						|
        remainder = len_records % self.max_item_number
 | 
						|
        total = len_records // self.max_item_number + 1 if remainder else len_records // self.max_item_number
 | 
						|
        if page_number is None:
 | 
						|
            current = total
 | 
						|
        elif page_number <= 0:
 | 
						|
            current = 1
 | 
						|
        elif page_number < total:
 | 
						|
            current = page_number
 | 
						|
        else:
 | 
						|
            current = total
 | 
						|
        self.lb_page.setText(f"{current}/{total}")
 | 
						|
 | 
						|
        if current == 1:
 | 
						|
            idx_start = 0
 | 
						|
            idx_end = self.max_item_number if len_records >= self.max_item_number else len_records
 | 
						|
        elif current == total:
 | 
						|
            remainder = len_records % self.max_item_number
 | 
						|
            idx_start = len_records - remainder if remainder else len_records - self.max_item_number
 | 
						|
            idx_end = len_records
 | 
						|
        else:
 | 
						|
            idx_start = self.max_item_number * (current-1)
 | 
						|
            idx_end = self.max_item_number * current
 | 
						|
 | 
						|
        self.treeW.clear()
 | 
						|
        self.treeW.setUniformRowHeights(True)
 | 
						|
        for record in records[idx_start:idx_end]:
 | 
						|
            record = [str(_) for _ in record]
 | 
						|
            item = QTreeWidgetItem(self.treeW, record)
 | 
						|
            colors = {
 | 
						|
                "info": QColor(255, 255, 255),  # 白色
 | 
						|
                "warning": QColor(244, 164, 96),  # 棕橙色
 | 
						|
                "error": QColor(205, 92, 92),  # 印度红
 | 
						|
                "exception": QColor(70, 130, 180),  # 钢蓝色
 | 
						|
                "unknown": QColor(190, 190, 190)  # 灰色
 | 
						|
            }
 | 
						|
            # colors = {"info": "#FFFFFF", "warning": "#F4A460", "error": "#CD5C5C", "exception": "#4682B4", "unknown": "#BEBEBE"}
 | 
						|
            level = record[2]
 | 
						|
            color = colors.get(level, QColor(255, 255, 255))
 | 
						|
            for col in range(self.treeW.columnCount()):
 | 
						|
                item.setBackground(col, color)
 | 
						|
            self.treeW.addTopLevelItem(item)
 | 
						|
        self.treeW.scrollToBottom()
 | 
						|
 | 
						|
    def show_single_log(self, item, column):
 | 
						|
        log_id = f"id = {item.text(0)}"
 | 
						|
        log_ts = f"ts = {item.text(1)}"
 | 
						|
        log_level = f"level = {item.text(2)}"
 | 
						|
        log_module = f"module = {item.text(3)}\n"
 | 
						|
        deco_line = "=" * 40
 | 
						|
        log_msg = item.text(4)
 | 
						|
        content = "\n".join([log_id, log_ts, log_level, log_module, deco_line, log_msg])
 | 
						|
        dialog = LogDialog(content, self)
 | 
						|
        dialog.exec()
 |