new project for personal toolbox
This commit is contained in:
0
gui/codes/common/__init__.py
Normal file
0
gui/codes/common/__init__.py
Normal file
6
gui/codes/common/clibs.py
Normal file
6
gui/codes/common/clibs.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from pathlib import Path
|
||||
|
||||
base_path = Path(__file__).resolve().parent.parent.parent
|
||||
code_dict = [4, 11, 4, 31, 22, 12, 19, 23, 7, 16, 7, 23, 1, 8, 7, 18, 27, 32, 28, 25, 7, 32, 9, 15, 2, 32, 0, 12, 26, 15, 14, 17]
|
||||
salt = ""
|
||||
max_db_number = 10
|
64
gui/codes/common/db_operation.py
Normal file
64
gui/codes/common/db_operation.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import sqlite3
|
||||
from threading import Lock
|
||||
import time
|
||||
from codes.common import clibs
|
||||
|
||||
|
||||
def db_init(db_file):
|
||||
conn = sqlite3.connect(db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=10.0)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("PRAGMA journal_mode=wal")
|
||||
cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
|
||||
cursor.execute("PRAGMA synchronous=normal")
|
||||
cursor.execute("PRAGMA temp_store=memory")
|
||||
cursor.execute("PRAGMA mmap_size=30000000000")
|
||||
cursor.execute("PRAGMA cache_size=200000")
|
||||
cursor.execute(
|
||||
"""
|
||||
create table if not exists logs(
|
||||
id integer primary key autoincrement,
|
||||
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
|
||||
level text,
|
||||
module text,
|
||||
content text
|
||||
)
|
||||
"""
|
||||
)
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def db_lock(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
Lock().acquire(True)
|
||||
ret = func(*args, **kwargs)
|
||||
finally:
|
||||
Lock().release()
|
||||
return ret
|
||||
return wrapper
|
||||
|
||||
def db_backup():
|
||||
t = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
||||
db_file = clibs.base_path / "assets/database/toolbox.db"
|
||||
db_file_backup = clibs.base_path / f"assets/database/toolbox.{t}.db"
|
||||
if not (db_file.exists() and db_file.is_file()):
|
||||
db_init(db_file)
|
||||
else:
|
||||
db_file_backup.write_bytes(db_file.read_bytes())
|
||||
db_dir = clibs.base_path / "assets/database"
|
||||
db_list = [db for db in db_dir.glob("*.db")]
|
||||
print(f"db_list: {sorted(db_list)}")
|
||||
for db in sorted(db_list)[:-clibs.max_db_number]:
|
||||
db.unlink()
|
||||
|
||||
def db_conn():
|
||||
db_file = clibs.base_path / "assets/database/toolbox.db"
|
||||
conn = sqlite3.connect(db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=10.0)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("PRAGMA journal_mode=wal")
|
||||
cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
|
||||
cursor.execute("PRAGMA synchronous=normal")
|
||||
cursor.execute("PRAGMA temp_store=memory")
|
||||
cursor.execute("PRAGMA mmap_size=30000000000")
|
||||
cursor.execute("PRAGMA cache_size=200000")
|
||||
return conn, cursor
|
37
gui/codes/common/secure_encrypt.py
Normal file
37
gui/codes/common/secure_encrypt.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import base64
|
||||
import hashlib
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto import Random
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
|
||||
class PassCipher:
|
||||
def __init__(self, salt):
|
||||
salt = salt.encode("utf-8")
|
||||
self.key = hashlib.sha256(salt).digest()
|
||||
|
||||
def encrypt(self, plaintext):
|
||||
plaintext = plaintext.encode("utf-8")
|
||||
iv = Random.new().read(AES.block_size)
|
||||
cipher = AES.new(self.key, AES.MODE_CBC, iv)
|
||||
ciphertext = cipher.encrypt(pad(plaintext, AES.block_size))
|
||||
return base64.b64encode(iv + ciphertext).decode("utf-8")
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
ciphertext = base64.b64decode(ciphertext)
|
||||
iv = ciphertext[:AES.block_size]
|
||||
ciphertext = ciphertext[AES.block_size:]
|
||||
cipher = AES.new(self.key, AES.MODE_CBC, iv)
|
||||
plaintext = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
return plaintext.decode("utf-8")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# salt = "my_secret_salt_string"
|
||||
# cipher = PassCipher(salt)
|
||||
# original_text = "这是一段需要加密的敏感信息"
|
||||
# encrypted_text = cipher.encrypt(original_text)
|
||||
# print(f"加密后的文本: {encrypted_text}")
|
||||
# decrypted_text = cipher.decrypt(encrypted_text)
|
||||
# print(f"解密后的文本: {decrypted_text}")
|
||||
# print(f"加解密是否成功: {original_text == decrypted_text}")
|
26
gui/codes/common/ui2py.py
Normal file
26
gui/codes/common/ui2py.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import subprocess
|
||||
import sys
|
||||
from codes.common import clibs
|
||||
from pathlib import Path
|
||||
|
||||
UIC_CMD = "pyside6-uic"
|
||||
|
||||
|
||||
def single_uic(ui_path: str, py_path: str):
|
||||
for file in Path(ui_path).rglob("*.ui"):
|
||||
file_name = file.stem
|
||||
ui_file = file
|
||||
py_file = Path(py_path) / f"{file_name}.py"
|
||||
cmd = [UIC_CMD, "-o", py_file, ui_file]
|
||||
|
||||
print(f"Processing {ui_file} -> {py_file}")
|
||||
try:
|
||||
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"转换失败: {ui_file}\n{e.stderr}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ui_path = clibs.base_path / "assets" / "ui"
|
||||
py_path = clibs.base_path / "codes" / "ui"
|
||||
single_uic(str(ui_path), str(py_path))
|
40
gui/codes/common/worker.py
Normal file
40
gui/codes/common/worker.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from PySide6.QtCore import QThread, Signal, QRunnable, QThreadPool, QMetaObject, Q_ARG, QMetaType, Qt
|
||||
from typing import Callable, Any
|
||||
|
||||
|
||||
class Worker(QThread):
|
||||
result = Signal(dict)
|
||||
error = Signal(dict)
|
||||
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
super().__init__()
|
||||
self.func = func
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
result = self.func(*self.args, **self.kwargs)
|
||||
self.result.emit({"result": result})
|
||||
except Exception as error:
|
||||
self.error.emit({"error": str(error)})
|
||||
|
||||
# launch函数必须在主进程中定义调用
|
||||
# def launch(self, func, on_anything: Callable[..., Any] = print, *args, **kwargs):
|
||||
# self.thread = Worker(func, *args, **kwargs)
|
||||
# self.thread.started.connect(lambda: on_anything({"started": True}))
|
||||
# self.thread.result.connect(on_anything)
|
||||
# self.thread.error.connect(on_anything)
|
||||
# self.thread.finished.connect(lambda: on_anything({"finished": True}))
|
||||
# self.thread.start()
|
||||
#
|
||||
#
|
||||
# def on_anything(results):
|
||||
# if "started" in results:
|
||||
# print("运行开始:", results["started"])
|
||||
# if "result" in results:
|
||||
# print("正常结束:", results["result"])
|
||||
# if "error" in results:
|
||||
# print(f"有异常发生:", results["error"])
|
||||
# if "finished" in results:
|
||||
# print("运行结束:", results["finished"])
|
Reference in New Issue
Block a user