1039 lines
53 KiB
Python
1039 lines
53 KiB
Python
import datetime
|
||
import json
|
||
import re
|
||
import sys
|
||
import time
|
||
from urllib import request
|
||
|
||
from PySide6.QtCore import Qt, QThread, Signal, QObject, QTimer
|
||
from PySide6.QtGui import QTextCursor, QFont, QPixmap, QColor, QBrush, QIcon
|
||
from PySide6.QtWidgets import QMessageBox, QCheckBox, QSplashScreen, QApplication, QLabel, QTreeWidgetItem, QFileDialog, QHeaderView, QDialog, QVBoxLayout, QPlainTextEdit
|
||
|
||
import codes.common.clibs as clibs
|
||
import codes.common.openapi as openapi
|
||
import codes.ui.main_window as main_window
|
||
from codes.analysis import brake, current, wavelogger, iso
|
||
from codes.autotest import do_current, do_brake
|
||
from codes.durable import factory_test, curves_draw
|
||
|
||
|
||
class ContentDialog(QDialog):
|
||
def __init__(self, content, parent=None):
|
||
super().__init__(parent)
|
||
self.setWindowTitle("日志详情")
|
||
self.setGeometry(100, 100, 700, 300)
|
||
self.center_on_screen()
|
||
layout = QVBoxLayout(self)
|
||
self.plain_text_edit = QPlainTextEdit()
|
||
self.plain_text_edit.setReadOnly(True)
|
||
self.plain_text_edit.setPlainText(content)
|
||
layout.addWidget(self.plain_text_edit)
|
||
|
||
def center_on_screen(self):
|
||
screen_geometry = QApplication.primaryScreen().geometry()
|
||
screen_width = screen_geometry.width()
|
||
screen_height = screen_geometry.height()
|
||
dialog_width = self.width()
|
||
dialog_height = self.height()
|
||
x = (screen_width - dialog_width) // 2
|
||
y = (screen_height - dialog_height) // 2
|
||
self.move(x, y)
|
||
|
||
|
||
class RunProg(QObject):
|
||
completed = Signal(tuple)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def program(self, action): # 0: prog 1: idx
|
||
prog, idx, reserved = action
|
||
if idx in range(7):
|
||
run = prog.processing
|
||
elif idx == -1:
|
||
run = prog.net_conn
|
||
elif idx == -97:
|
||
run = prog.do_draw
|
||
elif idx == -98 or idx == -99:
|
||
run = prog
|
||
|
||
try:
|
||
ret = run()
|
||
self.completed.emit((True, prog, ret, "", idx, reserved)) # 运行是否成功/返回值/报错信息/idx
|
||
except Exception as err:
|
||
self.completed.emit((False, None, None, err, idx, reserved)) # 运行是否成功/返回值/报错信息/idx
|
||
|
||
|
||
class MainWindow(main_window.Ui_MainWindow):
|
||
action = Signal(tuple)
|
||
|
||
def __init__(self):
|
||
super(MainWindow, self).__init__()
|
||
self.is_searching = False
|
||
self.setupUi(self)
|
||
self.pre_does()
|
||
|
||
def pre_does(self):
|
||
# ========================= treeview init =========================
|
||
self.cb_data_current.setDisabled(True)
|
||
self.setup_statusbar()
|
||
self.setWindowIcon(QIcon(f"{clibs.PREFIX}/media/icon.ico"))
|
||
self.treew_log.header().setFont(QFont("Arial", 12, QFont.Weight.Bold))
|
||
header = self.treew_log.header()
|
||
for i in range(self.treew_log.columnCount()):
|
||
header.setSectionResizeMode(i, QHeaderView.ResizeMode.ResizeToContents)
|
||
self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
# ========================= stylesheet =========================
|
||
with open(f"{clibs.PREFIX}/qss/style.qss", mode="r", encoding="utf-8") as f_qss:
|
||
style_sheet = f_qss.read()
|
||
self.setStyleSheet(style_sheet)
|
||
# ========================= clibs =========================
|
||
logger_handler = clibs.LoggerHandler()
|
||
logger_handler.signal.connect(self.w2t)
|
||
clibs.logger = logger_handler.logger
|
||
# ========================= detection =========================
|
||
self.detection_thread = StateDetection()
|
||
self.detection_thread.finished.connect(self.prog_done_detection)
|
||
self.detection_thread.start()
|
||
# ========================= debug =========================
|
||
# for i in range(105):
|
||
# clibs.logger("DEBUG", "aio", "debug testing log for aio", color="gray")
|
||
# clibs.logger("INFO", "clibs", "info testing log for clibs", color="green")
|
||
# clibs.logger("WARNING", "iso", "warning testing log for aio", color="yellow")
|
||
# # clibs.logger("ERROR", "openapi", "error testing log for aio", color="red")
|
||
|
||
def run_program_thread(self, prog, idx, prog_done, reserved):
|
||
if idx != -98 and idx != -97:
|
||
self.tw_docs.setCurrentIndex(0)
|
||
|
||
self.t = QThread(self)
|
||
self.run = RunProg()
|
||
self.run.moveToThread(self.t)
|
||
self.run.completed.connect(prog_done)
|
||
try:
|
||
self.action.disconnect()
|
||
except TypeError:
|
||
pass
|
||
self.action.connect(self.run.program)
|
||
self.t.start()
|
||
self.action.emit((prog, idx, reserved))
|
||
|
||
def w2t(self, msg, color="black"):
|
||
self.pte_output.appendHtml(f"<span style='color:{color};'>{msg}</span>")
|
||
cursor = self.pte_output.textCursor()
|
||
cursor.movePosition(QTextCursor.End)
|
||
self.pte_output.setTextCursor(cursor)
|
||
self.pte_output.ensureCursorVisible()
|
||
self.update()
|
||
|
||
def get_checkbox_states(self):
|
||
cb_durable_states = {}
|
||
container = self.sa_durable.widget()
|
||
for checkbox in container.findChildren(QCheckBox):
|
||
cb_durable_states[checkbox.text()] = checkbox.isChecked()
|
||
return cb_durable_states
|
||
|
||
def prog_start(self):
|
||
def prog_done(results):
|
||
flag, result, ret, error, idx, network = results
|
||
clibs.running[idx] = 0
|
||
# if flag is False:
|
||
# self.w2t(f"{clibs.functions[idx]}运行失败:{error}", "red")
|
||
# elif flag is True:
|
||
# ...
|
||
|
||
if sum(clibs.running) > 0:
|
||
if sum(clibs.running) == 1:
|
||
QMessageBox.critical(self, "运行中", f"{clibs.functions[clibs.running.index(1)]}正在执行中,不可同时运行两个处理/测试程序!")
|
||
return
|
||
else:
|
||
self.w2t(f"clibs.running = {clibs.running}", "red")
|
||
self.w2t(f"clibs.functions = {clibs.functions}", "red")
|
||
QMessageBox.critical(self, "严重错误", "理论上不允许同时运行两个处理程序,需要检查!")
|
||
return
|
||
|
||
if self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 0: # tab: 数据处理 | func: 制动数据处理
|
||
self.run_program_thread(brake.BrakeDataProcess(self.le_data_path.text()), 0, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 1: # tab: 数据处理 | func: 转矩数据处理
|
||
self.run_program_thread(current.CurrentDataProcess(self.le_data_path.text(), self.cb_data_current.currentText()), 1, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 2: # tab: 数据处理 | func: 激光数据处理
|
||
self.run_program_thread(iso.IsoDataProcess(self.le_data_path.text()), 2, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 3: # tab: 数据处理 | func: 精度数据处理
|
||
self.run_program_thread(wavelogger.WaveloggerDataProcess(self.le_data_path.text()), 3, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 0: # tab: 自动测试 | func: 制动测试
|
||
self.run_program_thread(do_brake.DoBrakeTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 4, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 1: # tab: 自动测试 | func: 转矩测试
|
||
self.run_program_thread(do_current.DoCurrentTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 5, prog_done, None)
|
||
elif self.tw_funcs.currentIndex() == 2: # tab: 耐久采集 | func: 场景数据持久化
|
||
self.run_program_thread(factory_test.DoFactoryTest(self.le_durable_path.text(), self.le_durable_interval.text(), self.get_checkbox_states()), 6, prog_done, None)
|
||
else:
|
||
QMessageBox.warning(self, "运行错误", "当前无可运行的功能!")
|
||
|
||
def prog_stop(self):
|
||
if sum(clibs.running) == 0:
|
||
return
|
||
if clibs.stop_flag:
|
||
QMessageBox.warning(self, "停止运行", "当前程序正在停止中,请勿重复执行!")
|
||
return
|
||
|
||
idx = clibs.running.index(1)
|
||
if idx in [0, 1, 2, 3]:
|
||
QMessageBox.warning(self, "停止运行", "数据处理功能不支持中途停止,可直接关闭窗口以强制终止程序的执行,但有文件损坏的风险!")
|
||
return
|
||
if idx in [4, 5, 6]:
|
||
try:
|
||
clibs.c_hr.execution("diagnosis.open", open=False, display_open=False)
|
||
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=[])
|
||
clibs.c_md.r_soft_estop(0)
|
||
except:
|
||
...
|
||
|
||
clibs.running[idx], clibs.stop_flag = 0, True
|
||
clibs.logger("INFO", "aio", f"{clibs.functions[idx]}程序正在停止中,需要一些时间清理后台数据,等待程序运行状态为 IDLE 时可重新运行其他程序!", "red")
|
||
|
||
def prog_reset(self):
|
||
self.tw_docs.setCurrentIndex(0)
|
||
self.pte_output.clear()
|
||
|
||
def file_browser(self):
|
||
idx_dict = {0: self.le_data_path, 1: self.le_unit_path, 2: self.le_durable_path}
|
||
dir_path = QFileDialog.getExistingDirectory()
|
||
tab_index = self.tw_funcs.currentIndex()
|
||
if dir_path:
|
||
idx_dict[tab_index].setText(dir_path)
|
||
|
||
def curve_draw(self):
|
||
window = curves_draw.ChartWindow(self.le_durable_path.text(), self.get_checkbox_states())
|
||
window.setWindowModality(Qt.WindowModality.ApplicationModal)
|
||
window.show()
|
||
|
||
def durable_cb_change(self):
|
||
stat = True if self.cb_durable_total.isChecked() else False
|
||
for cb in self.sa_durable.widget().findChildren(QCheckBox):
|
||
cb.setChecked(stat)
|
||
|
||
def display_tree_content(self, records):
|
||
for record in records:
|
||
item = QTreeWidgetItem(self.treew_log)
|
||
for i in range(self.treew_log.columnCount()):
|
||
item.setText(i, str(record[i]))
|
||
item.setTextAlignment(i, Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter)
|
||
self.treew_log.addTopLevelItem(item)
|
||
color_map = {"DEBUG": QColor(220, 220, 220), "INFO": QColor(144, 238, 144), "WARNING": QColor(255, 240, 210), "ERROR": QColor(255, 220, 220)}
|
||
brush = QBrush(color_map[record[2].upper()])
|
||
for i in range(self.treew_log.columnCount()):
|
||
item.setBackground(i, brush)
|
||
|
||
last_item = self.treew_log.topLevelItem(self.treew_log.topLevelItemCount() - 1)
|
||
self.treew_log.scrollToItem(last_item)
|
||
|
||
def prog_done_pre(self, results):
|
||
flag, result, ret, error, idx, network = results
|
||
if ret is None:
|
||
return
|
||
|
||
pages_all, current_page, records = ret
|
||
self.treew_log.clear()
|
||
self.display_tree_content(records)
|
||
self.label_page.setText(f"{current_page}/{pages_all}")
|
||
|
||
def pre_page(self):
|
||
@clibs.db_lock
|
||
def do_pre():
|
||
if self.is_searching is False:
|
||
first_id = int(self.treew_log.topLevelItem(0).text(0))
|
||
end = first_id-1 if first_id-1 > 0 else None
|
||
if end is None:
|
||
return
|
||
start = end-100+1 if end-100+1 > 0 else 1
|
||
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}")
|
||
records = clibs.cursor.fetchall()
|
||
clibs.cursor.execute("SELECT COUNT(id) FROM logs")
|
||
len_records = clibs.cursor.fetchone()[0]
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1
|
||
return pages_all, current_page, records
|
||
|
||
elif self.is_searching is True:
|
||
current_page, pages_all = self.label_page.text().split("/")
|
||
if int(current_page)-1 == 0:
|
||
return
|
||
|
||
current_page = int(current_page) - 1
|
||
first_row = []
|
||
for i in range(self.treew_log.columnCount()):
|
||
if i == 0:
|
||
first_row.append(int(self.treew_log.topLevelItem(0).text(i)))
|
||
else:
|
||
first_row.append(self.treew_log.topLevelItem(0).text(i))
|
||
|
||
index_end = clibs.search_records.index(tuple(first_row))
|
||
if index_end == 0:
|
||
return
|
||
elif index_end <= 100:
|
||
records = clibs.search_records[:index_end]
|
||
else:
|
||
records = clibs.search_records[index_end-100:index_end]
|
||
return pages_all, current_page, records
|
||
|
||
self.run_program_thread(do_pre, -98, self.prog_done_pre, None)
|
||
|
||
def prog_done_realtime(self, results):
|
||
flag, result, ret, error, idx, network = results
|
||
pages_all, records = ret
|
||
|
||
self.label_page.setText(f"{pages_all}/{pages_all}")
|
||
self.treew_log.clear()
|
||
self.display_tree_content(records)
|
||
self.is_searching = False
|
||
clibs.search_records = None
|
||
|
||
def realtime_page(self):
|
||
@clibs.db_lock
|
||
def do_realtime():
|
||
clibs.cursor.execute("SELECT COUNT(id) from logs")
|
||
len_records = clibs.cursor.fetchone()[0]
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
remainder = len_records % 100 if len_records % 100 != 0 else 100
|
||
end = len_records
|
||
start = len_records - remainder + 1 if len_records - remainder > 0 else 0
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
return pages_all, records
|
||
|
||
self.run_program_thread(do_realtime, -98, self.prog_done_realtime, None)
|
||
|
||
def prog_done_next(self, results):
|
||
flag, result, ret, error, idx, network = results
|
||
if ret is None:
|
||
return
|
||
pages_all, current_page, records = ret
|
||
self.treew_log.clear()
|
||
self.display_tree_content(records)
|
||
self.label_page.setText(f"{current_page}/{pages_all}")
|
||
|
||
def next_page(self):
|
||
@clibs.db_lock
|
||
def do_next():
|
||
clibs.cursor.execute("select count(id) from logs")
|
||
len_records = clibs.cursor.fetchone()[0]
|
||
if self.is_searching is False:
|
||
line_number = self.treew_log.topLevelItemCount()
|
||
last_id = int(self.treew_log.topLevelItem(line_number-1).text(0))
|
||
start = last_id + 1 if last_id % 100 == 0 else last_id - last_id % 100 + 1
|
||
end = int(start) + 100 - 1
|
||
if start <= len_records:
|
||
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
|
||
records = clibs.cursor.fetchall()
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
current_page = start // 100 if start % 100 == 0 else start // 100 + 1
|
||
return pages_all, current_page, records
|
||
|
||
elif self.is_searching is True:
|
||
current_page, pages_all = self.label_page.text().split("/")
|
||
if pages_all == current_page:
|
||
return
|
||
|
||
current_page = int(current_page) + 1
|
||
last_row = []
|
||
for i in range(self.treew_log.columnCount()):
|
||
if i == 0:
|
||
last_row.append(int(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i)))
|
||
else:
|
||
last_row.append(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i))
|
||
index_start = clibs.search_records.index(tuple(last_row)) + 1
|
||
if index_start > len(clibs.search_records):
|
||
return
|
||
elif index_start + 100 > len(clibs.search_records):
|
||
records = clibs.search_records[index_start:]
|
||
else:
|
||
records = clibs.search_records[index_start:index_start+100]
|
||
return pages_all, current_page, records
|
||
|
||
self.run_program_thread(do_next, -98, self.prog_done_next, None)
|
||
|
||
def show_item_content(self, item, column):
|
||
content = "\n".join([item.text(i) for i in range(self.treew_log.columnCount())])
|
||
dialog = ContentDialog(content, self)
|
||
dialog.exec()
|
||
|
||
def switch_log_tab(self, index):
|
||
if index == 1: # 切换到日志tab页时,自动执行实时日志功能
|
||
self.realtime_page()
|
||
|
||
def prog_done_search(self, results):
|
||
msg = "可单独查询 ID/Level/Module,也可以按照顺序组合查询指定条件,ID 范围使用 - 分隔,多个 Level 或者 Module 使用 : 分割,所有查询条件以 / 分隔并包含在 [] 内;如有关键字查询,则置于条件查询之后!"
|
||
flag, result, ret, error, idx, network = results
|
||
if ret is None:
|
||
QMessageBox.warning(self, "查询条件错误", msg)
|
||
self.realtime_page()
|
||
return
|
||
if ret == "nothing":
|
||
return
|
||
|
||
pages_all, records = ret
|
||
self.treew_log.clear()
|
||
self.display_tree_content(records)
|
||
self.label_page.setText(f"{pages_all}/{pages_all}")
|
||
self.is_searching = True
|
||
|
||
def search_keyword(self):
|
||
@clibs.db_lock
|
||
def do_search():
|
||
one_more = "!@#123asd"
|
||
kw = self.le_docs_search.text().strip()
|
||
if kw == "":
|
||
return "nothing"
|
||
|
||
match = re.search("^\\[.*]", kw)
|
||
if match:
|
||
condition = match.group().removeprefix("[").removesuffix("]").strip()
|
||
f_text = f"%{kw.removeprefix(match.group()).strip()}%"
|
||
if condition == "" and f_text == "%%":
|
||
return
|
||
elif condition == "" and f_text != "%%":
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'")
|
||
elif condition != "" and f_text == "%%":
|
||
conditions = condition.split("/")
|
||
if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一
|
||
c1 = conditions[0]
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)}")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}")
|
||
else:
|
||
for level in c1.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c1.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels}")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c1.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules}")
|
||
|
||
elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序
|
||
c1, c2 = conditions
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels}")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules}")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels}")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules}")
|
||
else: # c1是等级,c2是模块
|
||
for level in c1.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
return
|
||
|
||
levels = tuple(level.strip().upper() for level in c1.split(":"))
|
||
levels = (*levels, one_more)
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules}")
|
||
|
||
elif len(conditions) == 3: # 依次为id/level(s)/module(s)
|
||
c1, c2, c3 = conditions
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
return
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
modules = tuple(module.strip() for module in c3.split(":"))
|
||
modules = (*modules, one_more)
|
||
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules}")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules}")
|
||
else:
|
||
return
|
||
else:
|
||
return
|
||
elif condition != "" and f_text != "%%": # 和上面基本一样,只不过加一个f_text的条件筛选
|
||
conditions = condition.split("/")
|
||
if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一
|
||
c1 = conditions[0]
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND content LIKE '{f_text}'")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
|
||
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
|
||
c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND content LIKE '{f_text}'")
|
||
else:
|
||
for level in c1.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c1.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND content LIKE '{f_text}'")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c1.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules} AND content LIKE '{f_text}'")
|
||
|
||
elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序
|
||
c1, c2 = conditions
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND content LIKE '{f_text}'")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules} AND content LIKE '{f_text}'")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
|
||
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
|
||
c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
is_level = False
|
||
break
|
||
else:
|
||
is_level = True
|
||
|
||
if is_level: # 是告警等级
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
clibs.cursor.execute(
|
||
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND content LIKE '{f_text}'")
|
||
else: # 是模块
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(
|
||
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules} AND content LIKE '{f_text}'")
|
||
else: # c1是等级,c2是模块
|
||
for level in c1.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
return
|
||
|
||
levels = tuple(level.strip().upper() for level in c1.split(":"))
|
||
levels = (*levels, one_more)
|
||
modules = tuple(module.strip() for module in c2.split(":"))
|
||
modules = (*modules, one_more)
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
|
||
|
||
elif len(conditions) == 3: # 依次为id/level(s)/module(s)
|
||
c1, c2, c3 = conditions
|
||
for level in c2.split(":"):
|
||
if level.strip().upper() not in clibs.levels:
|
||
return
|
||
levels = tuple(level.strip().upper() for level in c2.split(":"))
|
||
levels = (*levels, one_more)
|
||
modules = tuple(module.strip() for module in c3.split(":"))
|
||
modules = (*modules, one_more)
|
||
|
||
if c1.isdigit() and int(c1) > 0: # 单独一个数字
|
||
clibs.cursor.execute(
|
||
f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
|
||
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
|
||
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
|
||
c1.split("-")[0]) > 0: # 1-4 这种格式
|
||
start = int(c1.split("-")[0])
|
||
end = int(c1.split("-")[1])
|
||
clibs.cursor.execute(
|
||
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
|
||
else:
|
||
return
|
||
else:
|
||
return
|
||
else:
|
||
f_text = f"%{kw}%"
|
||
clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'")
|
||
|
||
clibs.search_records = clibs.cursor.fetchall()
|
||
len_records = len(clibs.search_records)
|
||
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
|
||
remainder = len_records % 100 if len_records % 100 != 0 else 100
|
||
records = clibs.search_records[-1 * remainder:]
|
||
|
||
return pages_all, records
|
||
|
||
self.run_program_thread(do_search, -98, self.prog_done_search, None)
|
||
|
||
def prog_done_conn(self, results):
|
||
self.btn_hmi_conn.setDisabled(False)
|
||
self.btn_md_conn.setDisabled(False)
|
||
self.btn_ec_conn.setDisabled(False)
|
||
|
||
flag, result, ret, error, idx, network = results
|
||
if flag is False:
|
||
self.w2t(f"{network.upper()}连接失败", "red")
|
||
elif flag is True:
|
||
clibs.status[network] = 1
|
||
if network == "hmi":
|
||
self.btn_hmi_conn.setText("断开")
|
||
self.hmi_state.setText(f'<img src="{clibs.PREFIX}/media/green.png" width="10" height="10" /><font face="consolas" size="4"><b>HR </b></font>')
|
||
clibs.c_hr = result
|
||
elif network == "md":
|
||
self.btn_md_conn.setText("断开")
|
||
self.md_state.setText(f'<img src="{clibs.PREFIX}/media/green.png" width="10" height="10" /><font face="consolas" size="4"><b>MD </b></font>')
|
||
clibs.c_md = result
|
||
elif network == "ec":
|
||
self.btn_ec_conn.setText("断开")
|
||
self.ec_state.setText(f'<img src="{clibs.PREFIX}/media/green.png" width="10" height="10" /><font face="consolas" size="4"><b>EC </b></font>')
|
||
clibs.c_ec = result
|
||
|
||
def prog_done_disconn(self, results):
|
||
self.btn_hmi_conn.setDisabled(False)
|
||
self.btn_md_conn.setDisabled(False)
|
||
self.btn_ec_conn.setDisabled(False)
|
||
flag, result, ret, error, idx, network = results
|
||
if flag is False:
|
||
self.w2t(f"{network.upper()}断开连接失败", "red")
|
||
elif flag is True:
|
||
clibs.status[network] = 0
|
||
if network == "hmi":
|
||
self.btn_hmi_conn.setText("连接")
|
||
self.hmi_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>HR </b></font>')
|
||
clibs.c_hr = None
|
||
elif network == "md":
|
||
self.btn_md_conn.setText("连接")
|
||
self.md_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>MD </b></font>')
|
||
clibs.c_md = None
|
||
elif network == "ec":
|
||
self.btn_ec_conn.setText("连接")
|
||
self.ec_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>EC </b></font>')
|
||
clibs.c_ec = None
|
||
|
||
def hmi_conn(self):
|
||
self.btn_hmi_conn.setDisabled(True)
|
||
if self.btn_hmi_conn.text() == "连接":
|
||
clibs.ip_addr = self.le_hmi_ip.text().strip()
|
||
ip_pattern = re.compile(r"(([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])")
|
||
if not ip_pattern.fullmatch(clibs.ip_addr):
|
||
self.w2t(f"{clibs.ip_addr} 不是一个有效的 IP 地址", "red")
|
||
return
|
||
self.run_program_thread(openapi.HmiRequest(clibs.ip_addr, clibs.socket_port, clibs.xService_port), -1, self.prog_done_conn, "hmi")
|
||
elif self.btn_hmi_conn.text() == "断开":
|
||
self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi")
|
||
|
||
def md_conn(self):
|
||
if clibs.status["hmi"] == 0:
|
||
QMessageBox.warning(self, "告警", "操作 Modbus 连接之前,需要先打开 HMI 连接!")
|
||
return
|
||
|
||
self.btn_md_conn.setDisabled(True)
|
||
if self.btn_md_conn.text() == "连接":
|
||
clibs.modbus_port = self.le_md_port.text().strip()
|
||
self.run_program_thread(openapi.ModbusRequest(clibs.ip_addr, clibs.modbus_port), -1, self.prog_done_conn, "md")
|
||
elif self.btn_md_conn.text() == "断开":
|
||
self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md")
|
||
|
||
def ec_conn(self):
|
||
if clibs.status["hmi"] == 0:
|
||
QMessageBox.warning(self, "告警", "操作 EC 连接之前,需要先打开 HMI 连接!")
|
||
return
|
||
|
||
self.btn_ec_conn.setDisabled(True)
|
||
if self.btn_ec_conn.text() == "连接":
|
||
clibs.external_port = self.le_ec_port.text().strip()
|
||
self.run_program_thread(openapi.ExternalCommunication(clibs.ip_addr, clibs.external_port), -1, self.prog_done_conn, "ec")
|
||
elif self.btn_ec_conn.text() == "断开":
|
||
self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec")
|
||
|
||
def hmi_page(self):
|
||
self.pte_md_send.clear()
|
||
self.pte_md_recv.clear()
|
||
self.pte_ec_send.clear()
|
||
self.pte_ec_recv.clear()
|
||
self.tw_funcs.setCurrentIndex(3)
|
||
self.sw_network.setCurrentIndex(0)
|
||
self.hmi_btn.setStyleSheet("background-color: DarkCyan; color: white;")
|
||
self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
|
||
def md_page(self):
|
||
self.pte_hmi_send.clear()
|
||
self.pte_hmi_recv.clear()
|
||
self.pte_ec_send.clear()
|
||
self.pte_ec_recv.clear()
|
||
self.tw_funcs.setCurrentIndex(3)
|
||
self.sw_network.setCurrentIndex(1)
|
||
self.hmi_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
self.md_btn.setStyleSheet("background-color: DarkCyan; color: white;")
|
||
self.ec_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
|
||
def ec_page(self):
|
||
self.pte_md_send.clear()
|
||
self.pte_md_recv.clear()
|
||
self.pte_hmi_send.clear()
|
||
self.pte_hmi_recv.clear()
|
||
self.tw_funcs.setCurrentIndex(3)
|
||
self.sw_network.setCurrentIndex(2)
|
||
self.hmi_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
self.md_btn.setStyleSheet("background-color: DarkGrey; color: white;")
|
||
self.ec_btn.setStyleSheet("background-color: DarkCyan; color: white;")
|
||
|
||
def prog_done_hmi_send(self, results):
|
||
_, result, ret, error, idx, (msg_id, flag) = results
|
||
if _ is False:
|
||
self.btn_hmi_send.setDisabled(False)
|
||
clibs.logger("INFO", "aio", f"hmi: [send] 请求发送失败 {msg_id}", "red")
|
||
return
|
||
|
||
records = clibs.c_hr.get_from_id(msg_id)
|
||
for record in records:
|
||
if "请求发送成功" not in record[0]:
|
||
self.pte_hmi_recv.clear()
|
||
response = eval(record[0]) if flag == 0 else json.loads(record[0])
|
||
self.pte_hmi_recv.appendPlainText(json.dumps(response, indent=4, separators=(",", ":")))
|
||
else:
|
||
self.btn_hmi_send.setDisabled(False)
|
||
|
||
def hmi_send(self):
|
||
def hmi_send_thread():
|
||
nonlocal hmi_dict, cmd_json, flag
|
||
if flag == 0:
|
||
clibs.c_hr.c.send(clibs.c_hr.package(cmd_json))
|
||
clibs.c_hr.logger("DEBUG", "aio", f"hmi: [send] 老协议请求发送成功 {cmd_json}")
|
||
elif flag == 1:
|
||
clibs.c_hr.c_xs.send(clibs.c_hr.package_xs(hmi_dict))
|
||
clibs.c_hr.logger("DEBUG", "aio", f"hmi: xService请求发送成功 {cmd_json}")
|
||
|
||
if clibs.status["hmi"] == 0:
|
||
QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 HMI 连接!")
|
||
return
|
||
if self.pte_hmi_send.toPlainText() == "":
|
||
return
|
||
|
||
self.btn_hmi_send.setDisabled(True)
|
||
hmi_dict = json.loads(self.pte_hmi_send.toPlainText())
|
||
t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
|
||
msg_id = hmi_dict["id"] = "@".join([hmi_dict["id"].split("@")[0], t])
|
||
self.pte_hmi_send.clear()
|
||
self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":")))
|
||
flag = hmi_dict["p_type"]
|
||
del hmi_dict["p_type"]
|
||
cmd_json = json.dumps(hmi_dict, separators=(",", ":"))
|
||
|
||
self.run_program_thread(hmi_send_thread, -99, self.prog_done_hmi_send, (msg_id, flag))
|
||
|
||
def md_send(self):
|
||
if clibs.status["md"] == 0:
|
||
QMessageBox.critical(self, "连接错误", "使用该功能之前,需要先打开 Modbus 连接!")
|
||
return
|
||
if self.pte_md_send.toPlainText() == "":
|
||
return
|
||
|
||
self.pte_md_recv.clear()
|
||
content = self.pte_md_send.toPlainText().split("\n")
|
||
if content[0].strip().startswith("sta"):
|
||
for item in content[1:]:
|
||
addr = int(item.split(":")[0].strip())
|
||
count = int(item.split(":")[1].strip())
|
||
value_type = item.split(":")[2].strip()
|
||
if value_type == "bool":
|
||
try:
|
||
result = clibs.c_md.c.read_holding_registers(addr, count=count).registers[0]
|
||
self.pte_md_recv.appendPlainText(str(result))
|
||
except Exception as err:
|
||
self.pte_md_recv.appendPlainText(f"获取失败:{err}")
|
||
return
|
||
elif content[0].strip().startswith("ctrl"):
|
||
for item in content[1:]:
|
||
try:
|
||
addr = int(item.split(":")[0].strip())
|
||
value = int(item.split(":")[1].strip())
|
||
clibs.c_md.c.write_register(addr, value)
|
||
time.sleep(clibs.INTERVAL/4)
|
||
except Exception as err:
|
||
self.pte_md_recv.appendPlainText(f"操作失败:{err}")
|
||
return
|
||
else:
|
||
self.pte_md_recv.appendPlainText("操作成功!")
|
||
else:
|
||
QMessageBox.critical(self, "格式错误", "非法的发送内容,自定义发送需参考已有的格式!")
|
||
|
||
def prog_done_ec_send(self, results):
|
||
flag, result, ret, error, idx, cmd = results
|
||
print(f"res = {results}")
|
||
if ret[1] == "error":
|
||
clibs.logger("ERROR", "openapi", f"{ret[0]}", "red")
|
||
else:
|
||
self.pte_ec_recv.appendPlainText(str(ret))
|
||
|
||
def ec_send(self):
|
||
def ec_send_thread():
|
||
return clibs.c_ec.sr_string(cmd)
|
||
|
||
if clibs.status["ec"] == 0:
|
||
QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 EC 连接!")
|
||
return
|
||
if self.pte_ec_send.toPlainText() == "":
|
||
return
|
||
|
||
cmd = self.pte_ec_send.toPlainText().strip()
|
||
self.pte_ec_recv.clear()
|
||
self.run_program_thread(ec_send_thread, -99, self.prog_done_ec_send, cmd)
|
||
|
||
def hmi_cb_change(self):
|
||
cmd = self.cb_hmi_cmd.currentText()
|
||
self.pte_hmi_send.clear()
|
||
self.pte_hmi_recv.clear()
|
||
with open(f"{clibs.PREFIX}/files/protocols/hmi/{cmd}.json", mode="r", encoding="utf-8") as f_hmi:
|
||
hmi_dict = json.load(f_hmi)
|
||
t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
|
||
hmi_dict["id"] = "@".join([cmd, t])
|
||
self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":")))
|
||
|
||
def md_cb_change(self):
|
||
cmd = self.cb_md_cmd.currentText()
|
||
self.pte_md_send.clear()
|
||
self.pte_md_recv.clear()
|
||
self.pte_md_send.appendPlainText(cmd)
|
||
with open(f"{clibs.PREFIX}/files/protocols/md/{cmd}.txt", mode="r", encoding="utf-8") as f_md:
|
||
c_send = f_md.read()
|
||
self.pte_md_send.appendPlainText(c_send)
|
||
|
||
def ec_cb_change(self):
|
||
cmd = self.cb_ec_cmd.currentText()
|
||
self.pte_ec_send.clear()
|
||
self.pte_ec_recv.clear()
|
||
with open(f"{clibs.PREFIX}/files/protocols/ec/{cmd}.txt", mode="r", encoding="utf-8") as f_md:
|
||
c_send = f_md.read()
|
||
self.pte_ec_send.appendPlainText(c_send)
|
||
|
||
def data_cb_change(self):
|
||
proc = self.cb_data_func.currentText()
|
||
if proc != "转矩":
|
||
self.cb_data_current.setDisabled(True)
|
||
else:
|
||
self.cb_data_current.setDisabled(False)
|
||
|
||
def check_interval(self):
|
||
try:
|
||
interval = float(self.le_durable_interval.text())
|
||
interval = clibs.CYCLE if interval < clibs.CYCLE else int(interval)
|
||
except Exception:
|
||
interval = clibs.CYCLE
|
||
self.le_durable_interval.setText(str(interval))
|
||
|
||
def prog_done_detection(self, finished):
|
||
if clibs.status["hmi"] == 0:
|
||
self.btn_hmi_conn.setText("连接")
|
||
self.hmi_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>HR </b></font>')
|
||
if clibs.status["md"] == 0:
|
||
self.btn_md_conn.setText("连接")
|
||
self.md_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>MD </b></font>')
|
||
if clibs.status["ec"] == 0:
|
||
self.btn_ec_conn.setText("连接")
|
||
self.ec_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>EC </b></font>')
|
||
# ============= Program running status =============
|
||
if sum(clibs.running) == 1 or clibs.stop_flag:
|
||
self.run_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>BUSY</b></font>')
|
||
else:
|
||
self.run_state.setText(f'<img src="{clibs.PREFIX}/media/green.png" width="10" height="10" /><font face="consolas" size="4"><b>IDLE</b></font>')
|
||
|
||
def closeEvent(self, event):
|
||
idx = -1 if clibs.running.count(1) == 0 else clibs.running.index(1)
|
||
info_text = "当前无程序正在运行,可放心退出!" if idx == -1 else f"当前正在运行{clibs.functions[idx]},确认退出?"
|
||
info_text = "当前有程序正在停止中,确认退出?" if clibs.stop_flag else info_text
|
||
reply = QMessageBox.question(self, "退出", info_text)
|
||
if reply == QMessageBox.Yes:
|
||
if clibs.status["md"] == 1:
|
||
self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md")
|
||
if clibs.status["ec"] == 1:
|
||
self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec")
|
||
if clibs.status["hmi"] == 1:
|
||
self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi")
|
||
|
||
event.accept()
|
||
else:
|
||
event.ignore()
|
||
|
||
def setup_statusbar(self):
|
||
with open(f"{clibs.PREFIX}/files/version/local_vers", mode="r", encoding="utf-8") as f_local:
|
||
local_vers = f_local.read().strip()
|
||
l_version, update = local_vers.split("@")
|
||
vers_info = f" v{l_version} Update@{update}"
|
||
with open(f"{clibs.PREFIX}/files/version/server_vers", mode="r", encoding="utf-8") as f_server:
|
||
server_vers = f_server.read().strip()
|
||
|
||
self.update_label, self.hmi_state, self.md_state, self.ec_state, self.run_state, self.version_label = QLabel(), ClickableLabel(), ClickableLabel(), ClickableLabel(), QLabel(), QLabel()
|
||
self.statusbar.addWidget(self.version_label, 0)
|
||
self.statusbar.addWidget(self.hmi_state, 0)
|
||
self.statusbar.addWidget(self.md_state, 0)
|
||
self.statusbar.addWidget(self.ec_state, 0)
|
||
self.statusbar.addWidget(self.run_state, 0)
|
||
self.statusbar.addPermanentWidget(self.update_label, 0) # 添加到右侧
|
||
self.hmi_state.clicked.connect(self.hmi_page)
|
||
self.md_state.clicked.connect(self.md_page)
|
||
self.ec_state.clicked.connect(self.ec_page)
|
||
|
||
if local_vers == server_vers:
|
||
self.update_label.setText(f'<img src="{clibs.PREFIX}/media/updated.png" width="14" height="14" style="vertical-align: middle;" /><span style="vertical-align: middle; font-size: 14px; font-weight: bold; color: #0D8A3D;"> Current is the latest version!</span>')
|
||
elif local_vers > server_vers:
|
||
self.update_label.setText(f'<img src="{clibs.PREFIX}/media/updated.png" width="14" height="14" style="vertical-align: middle;" /><span style="vertical-align: middle; font-size: 14px; font-weight: bold; color: #0D8A3D;"> Current is the latest version!</span>')
|
||
elif local_vers < server_vers:
|
||
self.update_label.setText(f'''<a href="https://www.rustle.cc/aio.zip" style="text-decoration: none;"><img src="{clibs.PREFIX}/media/upgrade.png" width="14" height="14" style="vertical-align: middle;" /><span style="vertical-align: middle; font-size: 14px; font-weight: bold; color: #D81E06;"> v{server_vers.split('@')[0]} has been released, update ASAP!</span></a>''')
|
||
|
||
self.version_label.setText(f'<font color="black" face="consolas" size="4"><b>{vers_info} </b></font>')
|
||
self.hmi_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>HR </b></font>')
|
||
self.md_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>MD </b></font>')
|
||
self.ec_state.setText(f'<img src="{clibs.PREFIX}/media/red.png" width="10" height="10" /><font face="consolas" size="4"><b>EC </b></font>')
|
||
self.run_state.setText(f'<img src="{clibs.PREFIX}/media/green.png" width="10" height="10" /><font face="consolas" size="4"><b>IDLE</b></font>')
|
||
self.update_label.setOpenExternalLinks(True) # 允许超链接在浏览器中打开
|
||
|
||
|
||
class StateDetection(QThread):
|
||
finished = Signal(bool)
|
||
|
||
def __init__(self, /):
|
||
super().__init__()
|
||
|
||
def run(self):
|
||
while True:
|
||
time.sleep(clibs.INTERVAL*2)
|
||
# ============= HMI connection status =============
|
||
try:
|
||
clibs.c_hr.execution("controller.heart")
|
||
except:
|
||
clibs.status["hmi"] = 0
|
||
# ============= MD connection status =============
|
||
try:
|
||
clibs.c_md.c.read_holding_registers(40503, count=1).registers[0]
|
||
except:
|
||
clibs.status["md"] = 0
|
||
# ============= EC connection status =============
|
||
try:
|
||
clibs.c_ec.sr_string("controller_is_running")
|
||
except:
|
||
clibs.status["ec"] = 0
|
||
|
||
self.finished.emit(True)
|
||
|
||
|
||
class ClickableLabel(QLabel):
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
|
||
def mousePressEvent(self, event):
|
||
self.clicked.emit()
|
||
|
||
clicked = Signal()
|
||
|
||
|
||
class InitWork(QThread):
|
||
completed = Signal(str)
|
||
|
||
def program(self, action):
|
||
url_vers, server_vers = "https://www.rustle.cc/server_vers", ""
|
||
try:
|
||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
|
||
req = request.Request(url_vers, headers=headers)
|
||
response = request.urlopen(req, timeout=clibs.INTERVAL * 10)
|
||
server_vers = response.read().decode("utf-8").strip()
|
||
with open(f"{clibs.PREFIX}/files/version/server_vers", mode="w", encoding="utf-8") as f_server:
|
||
f_server.write(server_vers)
|
||
self.completed.emit("true")
|
||
except Exception as err:
|
||
print(f"err = {err}")
|
||
clibs.cursor.close()
|
||
clibs.conn.close()
|
||
self.completed.emit("false")
|
||
|
||
|
||
class SplashScreen(QApplication):
|
||
action = Signal(int)
|
||
|
||
def __init__(self, argv):
|
||
super().__init__(argv)
|
||
self.window = None
|
||
|
||
pixmap = QPixmap(f"{clibs.PREFIX}/media/splash.png")
|
||
self.splash = QSplashScreen(pixmap, Qt.WindowType.WindowStaysOnTopHint)
|
||
scaled_pixmap = pixmap.scaled(800, 400, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
|
||
self.splash.setPixmap(scaled_pixmap)
|
||
self.splash.setEnabled(False) # 禁用交互
|
||
self.splash.setFont(QFont("Arial", 12))
|
||
self.splash.show()
|
||
self.splash.showMessage("正在加载资源.....", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
|
||
|
||
with open(f"{clibs.PREFIX}/files/version/IS_VALIDATE", mode="r", encoding="utf-8") as f_validate:
|
||
is_validate = f_validate.read()
|
||
if is_validate == "1":
|
||
# with validation of server version
|
||
self.t = QThread(self)
|
||
self.run = InitWork()
|
||
self.run.moveToThread(self.t)
|
||
self.run.completed.connect(self.prog_done)
|
||
self.action.connect(self.run.program)
|
||
self.t.start()
|
||
self.action.emit(1)
|
||
else:
|
||
# without validation of server version
|
||
self.prog_done("true")
|
||
|
||
def prog_done(self, result):
|
||
if result == "false" or result == "":
|
||
self.splash.showMessage("网络错误,无法连接至服务器,确认当前网络环境是否可以访问www.rustle.cc......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
|
||
time.sleep(clibs.INTERVAL*2)
|
||
self.splash.close()
|
||
sys.exit()
|
||
elif result == "true":
|
||
self.window = MainWindow()
|
||
self.splash.showMessage("初始化完成,即将进入主界面......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
|
||
QTimer.singleShot(1000, lambda: (self.splash.finish(self.window), self.window.show()))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app = SplashScreen(sys.argv)
|
||
sys.exit(app.exec())
|