create my toolbox -- first commit

This commit is contained in:
2025-09-26 08:39:05 +08:00
parent 34c74bf3d0
commit bb3ae1e65a
273 changed files with 903 additions and 1052 deletions

View File

View File

@@ -0,0 +1,20 @@
from pathlib import Path
from threading import Lock
base_path = Path(__file__).resolve().parent.parent.parent
lock = Lock()
account = None
code_dict = [4, 11, 4, 31, 22, 12, 19, 23, 7, 16, 7, 23, 1, 8, 7, 18, 27, 32, 28, 25, 7, 32, 9, 15, 2, 32, 0, 12, 26, 15, 14, 17]
username, password = "", ""
avatar = f"{base_path}/assets/media/avatar.jpg"
proverb = "佛曰Time will say~"
bg = f"{base_path}/assets/media/bg.jpg"
win_width, win_height = 1100, 500
def delete_files_in_directory(directory):
path = Path(directory)
if path.exists() and path.is_dir():
for child in path.iterdir():
if child.is_file():
child.unlink()
elif child.is_dir():
delete_files_in_directory(child)

View File

@@ -0,0 +1,80 @@
import sqlite3
import time
from codes.common import clibs
from pathlib import Path
def db_init(db_file):
conn = sqlite3.connect(db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=10.0)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=wal")
cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
cursor.execute("PRAGMA synchronous=normal")
cursor.execute("PRAGMA temp_store=memory")
cursor.execute("PRAGMA mmap_size=30000000000")
cursor.execute("PRAGMA cache_size=200000")
cursor.execute(
"""
create table if not exists logs(
id integer primary key autoincrement,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
level text,
module text,
content text
)
"""
)
cursor.execute(
"""
create table if not exists users(
id integer primary key autoincrement,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
username text not null unique,
password text not null,
salt text not null
)
"""
)
cursor.close()
conn.close()
def db_lock(func):
def wrapper(*args, **kwargs):
try:
clibs.lock.acquire(True)
ret = func(*args, **kwargs)
finally:
clibs.lock.release()
return ret
return wrapper
def db_backup():
t = time.strftime("%Y%m%d%H%M%S", time.localtime())
db_file = clibs.base_path / "assets/database/toolbox.db"
db_file_backup = clibs.base_path / f"assets/database/toolbox.{t}.db"
if not (db_file.exists() and db_file.is_file()):
db_init(db_file)
else:
db_file_backup.write_bytes(db_file.read_bytes())
db_dir = clibs.base_path / "assets/database"
db_list = [db for db in db_dir.glob("*.db")]
for db in sorted(db_list)[:-clibs.account["maximum_db_number"]]:
db.unlink()
def db_conn():
db_file = clibs.base_path / "assets/database/toolbox.db"
conn = sqlite3.connect(db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=3.0)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=wal")
cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
cursor.execute("PRAGMA synchronous=normal")
cursor.execute("PRAGMA temp_store=memory")
cursor.execute("PRAGMA mmap_size=30000000000")
cursor.execute("PRAGMA cache_size=200000")
return conn, cursor
@db_lock
def db_close(conn, cursor):
cursor.close()
conn.close()

View File

@@ -0,0 +1,37 @@
import base64
import hashlib
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Util.Padding import pad, unpad
from codes.common import clibs
class PassCipher:
def __init__(self, salt):
salt = salt.encode("utf-8")
self.key = hashlib.sha256(salt).digest()
def encrypt(self, plaintext):
plaintext = plaintext.encode("utf-8")
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(plaintext, AES.block_size))
return base64.b64encode(iv + ciphertext).decode("utf-8")
def decrypt(self, ciphertext):
ciphertext = base64.b64decode(ciphertext)
iv = ciphertext[:AES.block_size]
ciphertext = ciphertext[AES.block_size:]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), AES.block_size)
return plaintext.decode("utf-8")
@staticmethod
def gen_salt(text: str):
key = ""
passwd = {idx: char for idx, char in enumerate(text * 4)}
for idx in range(32):
char_i = 0 if ord(passwd[idx]) - clibs.code_dict[idx] < 0 else ord(passwd[idx]) - clibs.code_dict[idx]
key += chr(char_i)
salt = base64.urlsafe_b64encode(key.encode()).decode()
return salt

View File

@@ -0,0 +1,28 @@
import subprocess
import sys
from codes.common import clibs
from pathlib import Path
UIC_CMD = "pyside6-uic"
def single_uic(ui_path: str, py_path: str):
for file in Path(ui_path).rglob("*.ui"):
file_name = file.stem
ui_file = file
py_file = Path(py_path) / f"{file_name}.py"
cmd = [UIC_CMD, "-o", py_file, ui_file]
print(f"Processing {ui_file} -> {py_file}")
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(f"转换失败: {ui_file}\n{e.stderr}", file=sys.stderr)
def main():
ui_path = clibs.base_path / "assets" / "ui"
py_path = clibs.base_path / "codes" / "ui"
single_uic(str(ui_path), str(py_path))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
from PySide6.QtCore import QThread, Signal, QRunnable, QThreadPool, QMetaObject, Q_ARG, QMetaType, Qt
from typing import Callable, Any
class Worker(QThread):
result = Signal(dict)
error = Signal(dict)
def __init__(self, func, *args, **kwargs):
super().__init__()
self.func = func
self.args = args
self.kwargs = kwargs
def run(self):
try:
result = self.func(*self.args, **self.kwargs)
self.result.emit({"result": result})
except Exception as error:
self.error.emit({"error": str(error)})
# launch函数必须在主进程中定义调用
# def launch(self, func, on_anything: Callable[..., Any] = print, *args, **kwargs):
# self.thread = Worker(func, *args, **kwargs)
# self.thread.started.connect(lambda: on_anything({"started": True}))
# self.thread.result.connect(on_anything)
# self.thread.error.connect(on_anything)
# self.thread.finished.connect(lambda: on_anything({"finished": True}))
# self.thread.start()
#
#
# def on_anything(results):
# if "started" in results:
# print("运行开始:", results["started"])
# if "result" in results:
# print("正常结束:", results["result"])
# if "error" in results:
# print(f"有异常发生:", results["error"])
# if "finished" in results:
# print("运行结束:", results["finished"])