AIO/aio.py
2025-03-27 19:05:02 +08:00

1023 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import json
import os
import re
import sqlite3
import sys
import time
from urllib import request
import os.path
import matplotlib
import matplotlib.pyplot as plt
import pandas
from matplotlib.widgets import Slider
matplotlib.use('QtAgg')
from PySide6.QtCore import Qt, QThread, Signal, QObject, QTimer
from PySide6.QtGui import QTextCursor, QFont, QPixmap, QColor, QBrush
from PySide6.QtWidgets import QMessageBox, QCheckBox, QSplashScreen, QApplication, QFrame, QLabel, QTreeWidgetItem, QFileDialog, QHeaderView, QDialog, QVBoxLayout, QPlainTextEdit
import codes.common.clibs as clibs
import codes.common.openapi as openapi
import codes.ui.main_window as main_window
from codes.analysis import brake, current, wavelogger, iso
from codes.autotest import do_current, do_brake
from codes.durable import factory_test
class MultiWindows:
login_window = None
reset_window = None
main_window = None
class ContentDialog(QDialog):
def __init__(self, content, parent=None):
super().__init__(parent)
self.setWindowTitle("日志详情")
self.setGeometry(100, 100, 700, 300)
# 设置对话框在屏幕正中间
self.center_on_screen()
# 创建布局
layout = QVBoxLayout(self)
# 创建 QPlainTextEdit 并设置为只读
self.plain_text_edit = QPlainTextEdit()
self.plain_text_edit.setReadOnly(True)
self.plain_text_edit.setPlainText(content)
# 添加到布局
layout.addWidget(self.plain_text_edit)
def center_on_screen(self):
# 获取屏幕尺寸
screen_geometry = QApplication.primaryScreen().geometry()
screen_width = screen_geometry.width()
screen_height = screen_geometry.height()
# 获取对话框尺寸
dialog_width = self.width()
dialog_height = self.height()
# 计算位置
x = (screen_width - dialog_width) // 2
y = (screen_height - dialog_height) // 2
# 设置位置
self.move(x, y)
class RunProg(QObject):
completed = Signal(tuple)
def __init__(self):
super().__init__()
def program(self, action): # 0: prog 1: idx
prog, idx, reserved = action
if idx in range(7):
run = prog.processing
elif idx == -1:
run = prog.net_conn
elif idx == -97:
run = prog.do_draw
elif idx == -98 or idx == -99:
run = prog
try:
ret = run()
self.completed.emit((True, prog, ret, "", idx, reserved)) # 运行是否成功/返回值/报错信息/idx
except Exception as err:
self.completed.emit((False, None, None, err, idx, reserved)) # 运行是否成功/返回值/报错信息/idx
class MainWindow(main_window.Ui_MainWindow):
action = Signal(tuple)
def __init__(self):
super(MainWindow, self).__init__()
self.close_on_net_error = None
self.is_searching = False
self.setupUi(self)
self.predoes()
def predoes(self):
# ========================= treeview init =========================
self.treew_log.header().setFont(QFont("Arial", 12, QFont.Weight.Bold))
header = self.treew_log.header()
for i in range(self.treew_log.columnCount()):
header.setSectionResizeMode(i, QHeaderView.ResizeMode.ResizeToContents)
# ========================= clibs =========================
self.setup_statusbar()
# ========================= styleSheet =========================
tws = [self.tw_funcs, self.tw_docs]
for tw in tws:
tw.setStyleSheet("""
QTabBar::tab:selected {
background: #0078D4;
color: white;
border-radius: 4px;
}
QTabBar::tab:!selected {
background: #F0F0F0;
color: #333;
}
QTabWidget::pane {
border: 1px solid #CCCCCC;
}
""")
# ============================↓↓↓debug↓↓↓============================
# print(f"self.cb_data_func.currentIndex() = {self.cb_data_func.currentIndex()}")
def run_program_thread(self, prog, idx, prog_done, reserved):
if idx != -98 and idx != -97:
self.tw_docs.setCurrentIndex(0)
if idx != -99 and idx != -98:
prog.output.connect(self.w2t)
self.t = QThread(self)
self.run = RunProg()
self.run.moveToThread(self.t)
self.run.completed.connect(prog_done)
self.action.connect(self.run.program)
self.t.start()
self.action.emit((prog, idx, reserved))
def w2t(self, msg, color="black"):
self.pte_output.appendHtml(f"<span style='color:{color};'>{msg}</span>")
cursor = self.pte_output.textCursor()
cursor.movePosition(QTextCursor.End)
self.pte_output.setTextCursor(cursor)
self.pte_output.ensureCursorVisible()
self.update()
def get_checkbox_states(self):
cb_durable_states = {}
container = self.sa_durable.widget()
for checkbox in container.findChildren(QCheckBox):
cb_durable_states[checkbox.text()] = checkbox.isChecked()
return cb_durable_states
def prog_start(self):
def prog_done(results):
flag, result, ret, error, idx, network = results
clibs.running[idx] = 0
# if flag is False:
# self.w2t(f"{clibs.functions[idx]}运行失败:{error}", "red")
# elif flag is True:
# ...
if sum(clibs.running) > 0:
if sum(clibs.running) == 1:
QMessageBox.critical(self, "运行中", f"{clibs.functions[clibs.running.index(1)]}正在执行中,不可同时运行两个处理/测试程序!")
return
else:
self.w2t(f"clibs.running = {clibs.running}", "red")
self.w2t(f"clibs.functions = {clibs.functions}", "red")
QMessageBox.critical(self, "严重错误", "理论上不允许同时运行两个处理程序,需要检查!")
return
if self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 0: # tab: 数据处理 | func: 制动数据处理
self.run_program_thread(brake.BrakeDataProcess(self.le_data_path.text()), 0, prog_done, None)
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 1: # tab: 数据处理 | func: 转矩数据处理
self.run_program_thread(current.CurrentDataProcess(self.le_data_path.text(), self.cb_data_current.currentText()), 1, prog_done, None)
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 2: # tab: 数据处理 | func: 激光数据处理
self.run_program_thread(iso.IsoDataProcess(self.le_data_path.text()), 2, prog_done, None)
elif self.tw_funcs.currentIndex() == 0 and self.cb_data_func.currentIndex() == 3: # tab: 数据处理 | func: 精度数据处理
self.run_program_thread(wavelogger.WaveloggerDataProcess(self.le_data_path.text()), 3, prog_done, None)
elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 0: # tab: 自动测试 | func: 制动测试
self.run_program_thread(do_brake.DoBrakeTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 4, prog_done, None)
elif self.tw_funcs.currentIndex() == 1 and self.cb_unit_func.currentIndex() == 1: # tab: 自动测试 | func: 转矩测试
self.run_program_thread(do_current.DoCurrentTest(self.le_unit_path.text(), self.cb_unit_tool.currentText()), 5, prog_done, None)
elif self.tw_funcs.currentIndex() == 2: # tab: 耐久采集 | func: 场景数据持久化
self.run_program_thread(factory_test.DoFactoryTest(self.le_durable_path.text(), self.le_durable_interval.text(), self.get_checkbox_states()), 6, prog_done, None)
def prog_stop(self):
QMessageBox.warning(self, "停止运行", "运行过程中不建议停止运行,可能会损坏文件,如果确实需要停止运行,可以直接关闭窗口!")
def prog_reset(self):
self.pte_output.clear()
def file_browser(self):
idx_dict = {0: self.le_data_path, 1: self.le_unit_path, 2: self.le_durable_path}
dir_path = QFileDialog.getExistingDirectory()
tab_index = self.tw_funcs.currentIndex()
if dir_path:
idx_dict[tab_index].setText(dir_path)
def curve_draw(self):
output = Signal(str, str)
procs = self.get_checkbox_states()
dir_path = self.le_durable_path.text()
curve_map = {
"周期内平均转矩": ["device_servo_trq_feedback", ],
"周期内最大速度": ["hw_joint_vel_feedback", ],
}
ylabels = {"周期内最大速度": "速度(rad/s)", "周期内平均转矩": "转矩(Nm)"}
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.size'] = 14
plt.rcParams['lines.marker'] = 'o'
plt.rcParams["figure.autolayout"] = True
if not os.path.exists(dir_path):
self.w2t(f"数据文件夹{dir_path}不存在,请确认后重试......", "red")
return
for proc_name, is_enabled in procs.items():
if is_enabled:
break
else:
self.w2t(f"需要选择至少一个功能,才能继续绘图......", "red")
return
_, files = clibs.traversal_files(dir_path, output)
csv_files = []
for file in files:
if file.endswith(".csv"):
csv_files.append(file)
if len(csv_files) == 0:
self.w2t("程序未开始运行,暂无数据可以展示......", "red")
return
for proc_name, is_enabled in procs.items():
if not is_enabled:
continue
title = proc_name
ylabel = ylabels[proc_name]
fig, axes = plt.subplots(figsize=(10, 4.5), dpi=100)
for curve in curve_map[proc_name]:
cols = [f"{curve}_{i}" for i in range(6)]
cols.insert(0, "time")
try:
df = pandas.read_csv(f"{dir_path}/{proc_name}.csv")
except Exception as err:
self.w2t(f"获取{dir_path}/{proc_name}.csv文件数据失败需确认是否存在该文件......", "red")
self.w2t(f"报错信息:{err}", "red")
return
for i in range((len(cols) - 1) // 6):
plt.plot(df[cols[1]], label=f"一轴{curve_map[proc_name][i]}")
plt.plot(df[cols[2]], label=f"二轴{curve_map[proc_name][i]}")
plt.plot(df[cols[3]], label=f"三轴{curve_map[proc_name][i]}")
plt.plot(df[cols[4]], label=f"四轴{curve_map[proc_name][i]}")
plt.plot(df[cols[5]], label=f"五轴{curve_map[proc_name][i]}")
plt.plot(df[cols[6]], label=f"六轴{curve_map[proc_name][i]}")
axes.set_title(title)
axes.set_ylabel(ylabel)
axes.legend(loc="upper right")
slider_position = plt.axes((0.1, 0.01, 0.8, 0.05), facecolor="blue") # (left, bottom, width, height)
scrollbar = Slider(slider_position, 'Time', 1, int(len(df)), valstep=1)
def update(val):
pos = scrollbar.val
axes.set_xlim([pos, pos + 10])
fig.canvas.draw_idle()
scrollbar.on_changed(update)
fig.tight_layout(rect=(0, 0.02, 0.96, 1)) # tuple (left, bottom, right, top)
plt.show()
def durable_cb_change(self):
stat = True if self.cb_durable_total.isChecked() else False
for cb in self.sa_durable.widget().findChildren(QCheckBox):
cb.setChecked(stat)
def display_tree_content(self, records):
for record in records:
item = QTreeWidgetItem(self.treew_log)
for i in range(self.treew_log.columnCount()):
item.setText(i, str(record[i]))
self.treew_log.addTopLevelItem(item)
color_map = {"DEBUG": QColor(220, 220, 220), "INFO": QColor(144, 238, 144), "WARNING": QColor(255, 240, 210), "ERROR": QColor(255, 220, 220)}
brush = QBrush(color_map[record[2].upper()])
for i in range(self.treew_log.columnCount()):
item.setBackground(i, brush)
last_item = self.treew_log.topLevelItem(self.treew_log.topLevelItemCount() - 1)
self.treew_log.scrollToItem(last_item)
def prog_done_pre(self, results):
flag, result, ret, error, idx, network = results
if ret is None:
return
pages_all, current_page, records = ret
self.treew_log.clear()
self.display_tree_content(records)
self.label_page.setText(f"{current_page}/{pages_all}")
def pre_page(self):
@clibs.db_lock
def do_pre():
if self.is_searching is False:
first_id = self.treew_log.topLevelItem(0).text(0)
end = int(first_id)-1 if int(first_id)-1 > 0 else None
start = int(first_id)-100 if int(first_id)-100 > 0 else 0
if end is None:
return
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}")
records = clibs.cursor.fetchall()
clibs.cursor.execute("SELECT COUNT(id) FROM logs")
len_records = clibs.cursor.fetchone()[0]
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
current_page = int(end) // 100 if int(end) % 100 == 0 else int(end) // 100 + 1
return pages_all, current_page, records
elif self.is_searching is True:
current_page, pages_all = self.label_page.text().split("/")
if int(current_page)-1 == 0:
return
current_page = int(current_page) - 1
first_row = []
for i in range(self.treew_log.columnCount()):
if i == 0:
first_row.append(int(self.treew_log.topLevelItem(0).text(i)))
else:
first_row.append(self.treew_log.topLevelItem(0).text(i))
index_end = clibs.search_records.index(tuple(first_row))
if index_end == 0:
return
elif index_end <= 100:
records = clibs.search_records[:index_end]
else:
records = clibs.search_records[index_end-100:index_end]
return pages_all, current_page, records
self.run_program_thread(do_pre, -98, self.prog_done_pre, None)
def prog_done_realtime(self, results):
flag, result, ret, error, idx, network = results
pages_all, records = ret
self.label_page.setText(f"{pages_all}/{pages_all}")
self.treew_log.clear()
self.display_tree_content(records)
self.is_searching = False
clibs.search_records = None
def realtime_page(self):
@clibs.db_lock
def do_realtime():
clibs.cursor.execute("SELECT COUNT(id) from logs")
len_records = clibs.cursor.fetchone()[0]
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
remainder = len_records % 100 if len_records % 100 != 0 else 100
end = len_records
start = len_records - remainder + 1 if len_records - remainder > 0 else 0
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
return pages_all, records
self.run_program_thread(do_realtime, -98, self.prog_done_realtime, None)
def prog_done_next(self, results):
flag, result, ret, error, idx, network = results
if ret is None:
return
pages_all, current_page, records = ret
self.treew_log.clear()
self.display_tree_content(records)
self.label_page.setText(f"{current_page}/{pages_all}")
def next_page(self):
@clibs.db_lock
def do_next():
clibs.cursor.execute("select count(id) from logs")
len_records = clibs.cursor.fetchone()[0]
if self.is_searching is False:
line_number = self.treew_log.topLevelItemCount()
last_id = int(self.treew_log.topLevelItem(line_number-1).text(0))
start = last_id + 1 if last_id % 100 == 0 else last_id - last_id % 100 + 1
end = int(last_id) + 100
if int(start) <= len_records:
clibs.cursor.execute(f"select * from logs where id between {start} and {end}")
records = clibs.cursor.fetchall()
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
current_page = int(start) // 100 if int(start) % 100 == 0 else int(start) // 100 + 1
return pages_all, current_page, records
elif self.is_searching is True:
current_page, pages_all = self.label_page.text().split("/")
if pages_all == current_page:
return
current_page = int(current_page) + 1
last_row = []
for i in range(self.treew_log.columnCount()):
if i == 0:
last_row.append(int(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i)))
else:
last_row.append(self.treew_log.topLevelItem(self.treew_log.topLevelItemCount()-1).text(i))
index_start = clibs.search_records.index(tuple(last_row)) + 1
if index_start > len(clibs.search_records):
return
elif index_start + 100 > len(clibs.search_records):
records = clibs.search_records[index_start:]
else:
records = clibs.search_records[index_start:index_start+100]
return pages_all, current_page, records
self.run_program_thread(do_next, -98, self.prog_done_next, None)
def show_item_content(self, item, column):
content = " | ".join([item.text(i) for i in range(self.treew_log.columnCount())])
dialog = ContentDialog(content, self)
dialog.exec()
def switch_log_tab(self, index):
if index == 1: # 切换到日志tab页时自动执行实时日志功能
self.realtime_page()
def prog_done_search(self, results):
msg = "可单独查询 ID/Level/Module也可以按照顺序组合查询指定条件ID 范围使用 - 分隔,多个 Level 或者 Module 使用 : 分割,所有查询条件以 / 分隔并包含在 [] 内;如有关键字查询,则置于条件查询之后!"
flag, result, ret, error, idx, network = results
if ret is None:
QMessageBox.warning(self, "查询条件错误", msg)
self.realtime_page()
return
if ret == "nothing":
return
pages_all, records = ret
self.treew_log.clear()
self.display_tree_content(records)
self.label_page.setText(f"{pages_all}/{pages_all}")
self.is_searching = True
def search_keyword(self):
@clibs.db_lock
def do_search():
one_more = "!@#123asd"
kw = self.le_docs_search.text().strip()
if kw == "":
return "nothing"
match = re.search("^\\[.*]", kw)
if match:
condition = match.group().removeprefix("[").removesuffix("]").strip()
f_text = f"%{kw.removeprefix(match.group()).strip()}%"
if condition == "" and f_text == "%%":
return
elif condition == "" and f_text != "%%":
clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'")
elif condition != "" and f_text == "%%":
conditions = condition.split("/")
if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一
c1 = conditions[0]
if c1.isdigit() and int(c1) > 0: # 单独一个数字
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)}")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end}")
else:
for level in c1.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c1.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels}")
else: # 是模块
modules = tuple(module.strip() for module in c1.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules}")
elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序
c1, c2 = conditions
if c1.isdigit() and int(c1) > 0: # 单独一个数字
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels}")
else: # 是模块
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules}")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels}")
else: # 是模块
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules}")
else: # c1是等级c2是模块
for level in c1.split(":"):
if level.strip().upper() not in clibs.levels:
return
levels = tuple(level.strip().upper() for level in c1.split(":"))
levels = (*levels, one_more)
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules}")
elif len(conditions) == 3: # 依次为id/level(s)/module(s)
c1, c2, c3 = conditions
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
return
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
modules = tuple(module.strip() for module in c3.split(":"))
modules = (*modules, one_more)
if c1.isdigit() and int(c1) > 0: # 单独一个数字
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules}")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules}")
else:
return
else:
return
elif condition != "" and f_text != "%%": # 和上面基本一样只不过加一个f_text的条件筛选
conditions = condition.split("/")
if len(conditions) == 1: # 可能是id/level(s)/module(s),任意其一
c1 = conditions[0]
if c1.isdigit() and int(c1) > 0: # 单独一个数字
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND content LIKE '{f_text}'")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
clibs.cursor.execute(f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND content LIKE '{f_text}'")
else:
for level in c1.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c1.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND content LIKE '{f_text}'")
else: # 是模块
modules = tuple(module.strip() for module in c1.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE module IN {modules} AND content LIKE '{f_text}'")
elif len(conditions) == 2: # 可能是id/level(s)/module(s),任意两个的组合,有顺序
c1, c2 = conditions
if c1.isdigit() and int(c1) > 0: # 单独一个数字
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND content LIKE '{f_text}'")
else: # 是模块
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE id = {int(c1)} AND module IN {modules} AND content LIKE '{f_text}'")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
is_level = False
break
else:
is_level = True
if is_level: # 是告警等级
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
clibs.cursor.execute(
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND content LIKE '{f_text}'")
else: # 是模块
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND module IN {modules} AND content LIKE '{f_text}'")
else: # c1是等级c2是模块
for level in c1.split(":"):
if level.strip().upper() not in clibs.levels:
return
levels = tuple(level.strip().upper() for level in c1.split(":"))
levels = (*levels, one_more)
modules = tuple(module.strip() for module in c2.split(":"))
modules = (*modules, one_more)
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
elif len(conditions) == 3: # 依次为id/level(s)/module(s)
c1, c2, c3 = conditions
for level in c2.split(":"):
if level.strip().upper() not in clibs.levels:
return
levels = tuple(level.strip().upper() for level in c2.split(":"))
levels = (*levels, one_more)
modules = tuple(module.strip() for module in c3.split(":"))
modules = (*modules, one_more)
if c1.isdigit() and int(c1) > 0: # 单独一个数字
clibs.cursor.execute(
f"SELECT * FROM logs WHERE id = {int(c1)} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
elif "-" in c1 and len(c1.split("-")) == 2 and c1.split("-")[0].isdigit() and c1.split("-")[
1].isdigit() and int(c1.split("-")[1]) - int(c1.split("-")[0]) > 0 and int(
c1.split("-")[0]) > 0: # 1-4 这种格式
start = int(c1.split("-")[0])
end = int(c1.split("-")[1])
clibs.cursor.execute(
f"SELECT * FROM logs WHERE id BETWEEN {start} AND {end} AND level IN {levels} AND module IN {modules} AND content LIKE '{f_text}'")
else:
return
else:
return
else:
f_text = f"%{kw}%"
clibs.cursor.execute(f"SELECT * FROM logs WHERE content LIKE '{f_text}'")
clibs.search_records = clibs.cursor.fetchall()
len_records = len(clibs.search_records)
pages_all = len_records // 100 if len_records % 100 == 0 else len_records // 100 + 1
remainder = len_records % 100
records = clibs.search_records[-1 * remainder:]
return pages_all, records
self.run_program_thread(do_search, -98, self.prog_done_search, None)
def prog_done_conn(self, results):
self.btn_hmi_conn.setDisabled(False)
self.btn_md_conn.setDisabled(False)
self.btn_ec_conn.setDisabled(False)
flag, result, ret, error, idx, network = results
if flag is False:
...
# self.w2t(f"{network.upper()}连接失败", "red")
elif flag is True:
clibs.status[network] = 1
if network == "hmi":
self.btn_hmi_conn.setText("断开")
clibs.c_hr = result
elif network == "md":
self.btn_md_conn.setText("断开")
clibs.c_md = result
elif network == "ec":
self.btn_ec_conn.setText("断开")
clibs.c_ec = result
def prog_done_disconn(self, results):
self.btn_hmi_conn.setDisabled(False)
self.btn_md_conn.setDisabled(False)
self.btn_ec_conn.setDisabled(False)
flag, result, ret, error, idx, network = results
if flag is False:
self.w2t(f"{network.upper()}断开连接失败", "red")
elif flag is True:
clibs.status[network] = 0
if network == "hmi":
self.btn_hmi_conn.setText("连接")
clibs.c_hr = result
elif network == "md":
self.btn_md_conn.setText("连接")
clibs.c_md = result
elif network == "ec":
self.btn_ec_conn.setText("连接")
clibs.c_ec = result
def hmi_conn(self):
self.btn_hmi_conn.setDisabled(True)
if self.btn_hmi_conn.text() == "连接":
clibs.ip_addr = self.le_hmi_ip.text().strip()
ip_pattern = re.compile(r"(([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}([1-9]?\d|1\d\d|2[0-4]\d|25[0-5])")
if not ip_pattern.fullmatch(clibs.ip_addr):
self.w2t(f"{clibs.ip_addr} 不是一个有效的 IP 地址", "red")
return
self.run_program_thread(openapi.HmiRequest(clibs.ip_addr, clibs.socket_port, clibs.xService_port), -1, self.prog_done_conn, "hmi")
elif self.btn_hmi_conn.text() == "断开":
self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi")
def md_conn(self):
if clibs.status["hmi"] == 0:
QMessageBox.warning(self, "告警", "操作Modbus连接之前需要先打开HMI连接")
return
self.btn_md_conn.setDisabled(True)
if self.btn_md_conn.text() == "连接":
clibs.modbus_port = self.le_md_port.text().strip()
self.run_program_thread(openapi.ModbusRequest(clibs.ip_addr, clibs.modbus_port), -1, self.prog_done_conn, "md")
elif self.btn_md_conn.text() == "断开":
self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md")
def ec_conn(self):
if clibs.status["hmi"] == 0:
QMessageBox.warning(self, "告警", "操作外部通信连接之前需要先打开HMI连接")
return
self.btn_ec_conn.setDisabled(True)
if self.btn_ec_conn.text() == "连接":
clibs.external_port = self.le_ec_port.text().strip()
self.run_program_thread(openapi.ExternalCommunication(clibs.ip_addr, clibs.external_port), -1, self.prog_done_conn, "ec")
elif self.btn_ec_conn.text() == "断开":
self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec")
def hmi_page(self):
self.sw_network.setCurrentIndex(0)
def md_page(self):
self.sw_network.setCurrentIndex(1)
def ec_page(self):
self.sw_network.setCurrentIndex(2)
def prog_done_hmi_send(self, results):
_, result, ret, error, idx, (msg_id, flag) = results
records = clibs.c_hr.get_from_id(msg_id)
for record in records:
if "请求发送成功" not in record[0]:
self.pte_him_recv.clear()
response = eval(record[0]) if flag == 0 else json.loads(record[0])
self.pte_him_recv.appendPlainText(json.dumps(response, indent=4, separators=(",", ":")))
else:
self.btn_hmi_send.setDisabled(False)
def hmi_send(self):
def hmi_send_thread():
nonlocal hmi_dict, cmd_json, flag
if flag == 0:
clibs.c_hr.c.send(clibs.c_hr.package(cmd_json))
clibs.c_hr.logger("DEBUG", "aio", f"hmi: [send] 老协议请求发送成功 {cmd_json}")
elif flag == 1:
clibs.c_hr.c_xs.send(clibs.c_hr.package_xs(hmi_dict))
clibs.c_hr.logger("DEBUG", "aio", f"hmi: xService请求发送成功 {cmd_json}")
if clibs.status["hmi"] == 0:
QMessageBox.critical(self, "错误", "使用该功能之前需要先打开HMI连接")
return
if self.pte_hmi_send.toPlainText() == "":
return
self.btn_hmi_send.setDisabled(True)
hmi_dict = json.loads(self.pte_hmi_send.toPlainText())
t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
msg_id = hmi_dict["id"] = "@".join([hmi_dict["id"].split("@")[0], t])
self.pte_hmi_send.clear()
self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":")))
flag = hmi_dict["p_type"]
del hmi_dict["p_type"]
cmd_json = json.dumps(hmi_dict, separators=(",", ":"))
self.run_program_thread(hmi_send_thread, -99, self.prog_done_hmi_send, (msg_id, flag))
def md_send(self):
if clibs.status["md"] == 0:
QMessageBox.critical(self, "连接错误", "使用该功能之前,需要先打开 Modbus 连接!")
return
if self.pte_md_send.toPlainText() == "":
return
self.pte_md_recv.clear()
content = self.pte_md_send.toPlainText().split("\n")
if content[0].strip().startswith("sta"):
for item in content[1:]:
addr = int(item.split(":")[0].strip())
count = int(item.split(":")[1].strip())
value_type = item.split(":")[2].strip()
if value_type == "bool":
try:
result = clibs.c_md.c.read_holding_registers(addr, count=count).registers[0]
self.pte_md_recv.appendPlainText(str(result))
except Exception as err:
self.pte_md_recv.appendPlainText(f"获取失败:{err}")
return
elif content[0].strip().startswith("ctrl"):
for item in content[1:]:
addr = int(item.split(":")[0].strip())
value = int(item.split(":")[1].strip())
try:
clibs.c_md.c.write_register(addr, value)
time.sleep(clibs.INTERVAL/4)
except Exception as err:
self.pte_md_recv.appendPlainText(f"操作失败:{err}")
return
else:
self.pte_md_recv.appendPlainText("操作成功!")
else:
QMessageBox.critical(self, "格式错误", "非法的发送内容,自定义发送需参考已有的格式!")
def ec_send(self):
if clibs.status["ec"] == 0:
QMessageBox.critical(self, "错误", "使用该功能之前需要先打开MD连接")
return
if self.pte_ec_send.toPlainText() == "":
return
self.pte_ec_recv.clear()
cmd = self.pte_ec_send.toPlainText().strip()
try:
clibs.c_ec.s_string(cmd)
time.sleep(clibs.INTERVAL/2)
result = clibs.c_ec.r_string(cmd)
self.pte_ec_recv.appendPlainText(str(result))
except Exception as err:
self.pte_ec_recv.appendPlainText(f"操作失败:{err}")
def hmi_cb_change(self):
cmd = self.cb_hmi_cmd.currentText()
self.pte_hmi_send.clear()
self.pte_him_recv.clear()
with open(f"assets/files/protocols/hmi/{cmd}.json", mode="r", encoding="utf-8") as f_hmi:
hmi_dict = json.load(f_hmi)
t = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
hmi_dict["id"] = "@".join([cmd, t])
self.pte_hmi_send.appendPlainText(json.dumps(hmi_dict, indent=4, separators=(",", ":")))
def md_cb_change(self):
cmd = self.cb_md_cmd.currentText()
self.pte_md_send.clear()
self.pte_md_recv.clear()
self.pte_md_send.appendPlainText(cmd)
with open(f"assets/files/protocols/md/{cmd}.txt", mode="r", encoding="utf-8") as f_md:
c_send = f_md.read()
self.pte_md_send.appendPlainText(c_send)
def ec_cb_change(self):
cmd = self.cb_ec_cmd.currentText()
self.pte_ec_send.clear()
self.pte_ec_recv.clear()
with open(f"assets/files/protocols/ec/{cmd}.txt", mode="r", encoding="utf-8") as f_md:
c_send = f_md.read()
self.pte_ec_send.appendPlainText(c_send)
def check_interval(self):
try:
interval = float(self.le_durable_interval.text())
interval = 300 if interval < 300 else int(interval)
except Exception:
interval = 300
self.le_durable_interval.setText(str(interval))
def state_detection(self):
while True:
time.sleep(clibs.INTERVAL)
if clibs.status["hmi"] == 0 and self.btn_hmi_conn.text() == "断开":
self.btn_hmi_conn.setText("连接")
elif clibs.status["hmi"] == 1 and self.btn_hmi_conn.text() == "连接":
self.btn_hmi_conn.setText("断开")
def closeEvent(self, event):
idx = -1 if clibs.running.count(1) == 0 else clibs.running.index(1)
info_text = "当前无程序正在运行,可放心退出!" if idx == -1 else f"当前正在运行{clibs.functions[idx]},确认退出?"
reply = QMessageBox.question(self, "退出", info_text)
if reply == QMessageBox.Yes:
os.chdir(clibs.log_path)
t = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
disk_conn = sqlite3.connect(f"log_{t}.db", isolation_level=None, check_same_thread=False, cached_statements=256)
clibs.conn.backup(target=disk_conn, pages=1, progress=None)
_, logs = clibs.traversal_files(".", None)
logs.sort()
while len(logs) > 10:
_ = logs.pop(0)
os.remove(_)
if clibs.status["md"] == 1:
self.run_program_thread(clibs.c_md.close, -99, self.prog_done_disconn, "md")
if clibs.status["ec"] == 1:
self.run_program_thread(clibs.c_ec.close, -99, self.prog_done_disconn, "ec")
if clibs.status["hmi"] == 1:
self.run_program_thread(clibs.c_hr.close, -99, self.prog_done_disconn, "hmi")
event.accept()
else:
event.ignore()
def setup_statusbar(self):
with open(f"assets/files/version/local_vers", mode="r", encoding="utf-8") as f_local:
local_vers = f_local.read().strip()
l_version, update = local_vers.split("@")
vers_info = f" v{l_version} Update@{update}"
with open(f"assets/files/version/server_vers", mode="r", encoding="utf-8") as f_server:
server_vers = f_server.read().strip()
update_label = QLabel()
version_label = QLabel()
self.statusbar.addWidget(version_label, 0)
self.statusbar.addPermanentWidget(update_label, 0) # 添加到右侧
if local_vers == server_vers:
update_label.setText('<img src="assets/media/updated.png" width="12" height="12" /><font color="#0D8A3D" face="consolas" size="4"><b>&nbsp;当前是最新版本,继续保持!&nbsp;</b></font>')
elif local_vers > server_vers:
pass
elif local_vers < server_vers:
update_label.setText(f'''<a href="https://www.rustle.cc/aio.zip" style="text-decoration: none;"><img src="assets/media/upgrade.png" width="12" height="12" /><font color="#D81E06" face="consolas" size="4"><b>&nbsp;v{server_vers.split('@')[0]}已经发布,尽快更新至最新版本!&nbsp;</b></font></a>''')
version_label.setText(f'<font color="black" face="consolas" size="4"><b>&nbsp;{vers_info}</b></font>')
update_label.setOpenExternalLinks(True) # 允许超链接在浏览器中打开
# update_label.setAlignment(Qt.AlignmentFlag.AlignVCenter | Qt.AlignmentFlag.AlignRight)
update_label.setStyleSheet("border: none;")
update_label.setFrameShape(QFrame.Shape.NoFrame)
update_label.setFrameShadow(QFrame.Shadow.Plain)
self.close_on_net_error = False
class InitWork(QThread):
completed = Signal(str)
def program(self, action):
url_vers, server_vers = "https://www.rustle.cc/server_vers", ""
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
req = request.Request(url_vers, headers=headers)
response = request.urlopen(req, timeout=clibs.INTERVAL * 10)
server_vers = response.read().decode("utf-8").strip()
with open("assets/files/version/server_vers", mode="w", encoding="utf-8") as f_server:
f_server.write(server_vers)
self.completed.emit("true")
except Exception as err:
print(f"err = {err}")
clibs.cursor.close()
clibs.conn.close()
self.completed.emit("false")
class SplashScreen(QApplication):
action = Signal(int)
def __init__(self, argv):
super().__init__(argv)
self.window = None
pixmap = QPixmap("./assets/media/splash.png")
self.splash = QSplashScreen(pixmap, Qt.WindowType.WindowStaysOnTopHint)
scaled_pixmap = pixmap.scaled(800, 400, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
self.splash.setPixmap(scaled_pixmap)
self.splash.setEnabled(False) # 禁用交互
self.splash.setFont(QFont("Arial", 12))
self.splash.show()
self.splash.showMessage("正在加载资源.....", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
self.t = QThread(self)
self.run = InitWork()
self.run.moveToThread(self.t)
self.run.completed.connect(self.prog_done)
self.action.connect(self.run.program)
self.t.start()
self.action.emit(1)
def prog_done(self, result):
if result == "false" or result == "":
self.splash.showMessage("网络错误无法连接至服务器确认当前网络环境是否可以访问www.rustle.cc......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
time.sleep(clibs.INTERVAL*2)
self.splash.close()
sys.exit()
elif result == "true":
self.window = MainWindow()
self.splash.showMessage("初始化完成,即将进入主界面......", Qt.AlignmentFlag.AlignBottom | Qt.AlignmentFlag.AlignHCenter, Qt.GlobalColor.white)
QTimer.singleShot(1000, lambda: (self.splash.finish(self.window), self.window.show()))
if __name__ == '__main__':
app = SplashScreen(sys.argv)
sys.exit(app.exec())