146 lines
4.9 KiB
Python
146 lines
4.9 KiB
Python
import sqlite3
|
|
import time
|
|
from inspect import currentframe
|
|
from functools import singledispatch
|
|
|
|
from codes.common import clibs
|
|
|
|
|
|
def db_init():
|
|
if clibs.db_file.exists():
|
|
return
|
|
|
|
conn = sqlite3.connect(clibs.db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=10.0)
|
|
cursor = conn.cursor()
|
|
cursor.execute("PRAGMA journal_mode=wal")
|
|
cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
|
|
cursor.execute("PRAGMA synchronous=normal")
|
|
cursor.execute("PRAGMA temp_store=memory")
|
|
cursor.execute("PRAGMA mmap_size=30000000000")
|
|
cursor.execute("PRAGMA cache_size=200000")
|
|
cursor.execute(
|
|
"""
|
|
create table if not exists logs(
|
|
id integer primary key autoincrement,
|
|
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
|
|
level text,
|
|
module text,
|
|
content text
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""
|
|
create table if not exists users(
|
|
id integer primary key autoincrement,
|
|
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
|
|
username text not null unique,
|
|
password text not null,
|
|
salt text not null
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(f"INSERT INTO logs (level, module, content) VALUES (?, ?, ?)", ("info", "login_ui", "数据库初始化成功!"))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def db_lock(func):
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
clibs.lock.acquire(True)
|
|
ret = func(*args, **kwargs)
|
|
except Exception as e:
|
|
print(f"db operation error: {e}")
|
|
ret = None
|
|
finally:
|
|
clibs.lock.release()
|
|
return ret
|
|
return wrapper
|
|
|
|
def db_backup():
|
|
t = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
db_file_backup = clibs.base_path / f"assets/database/toolbox.{t}.db"
|
|
db_file_backup.write_bytes(clibs.db_file.read_bytes())
|
|
db_dir = clibs.base_path / "assets/database"
|
|
db_list = [db for db in db_dir.glob("*.db")]
|
|
for db in sorted(db_list)[:-clibs.config["maximum_db_number"]]:
|
|
db.unlink()
|
|
|
|
def db_conn():
|
|
# import traceback, inspect
|
|
# print("[Conn] 被调用", traceback.format_stack()[-2])
|
|
# print("[Conn] conn=", clibs.conn, "cursor=", clibs.cursor)
|
|
|
|
if clibs.conn is not None:
|
|
return
|
|
|
|
clibs.conn = sqlite3.connect(clibs.db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=3.0)
|
|
clibs.cursor = clibs.conn.cursor()
|
|
clibs.cursor.execute("PRAGMA journal_mode=wal")
|
|
clibs.cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
|
|
clibs.cursor.execute("PRAGMA synchronous=normal")
|
|
clibs.cursor.execute("PRAGMA temp_store=memory")
|
|
clibs.cursor.execute("PRAGMA mmap_size=30000000000")
|
|
clibs.cursor.execute("PRAGMA cache_size=200000")
|
|
|
|
@db_lock
|
|
def db_close():
|
|
if clibs.cursor is not None:
|
|
clibs.cursor.close()
|
|
if clibs.conn is not None:
|
|
clibs.conn.close()
|
|
clibs.conn, clibs.cursor = None, None
|
|
|
|
@db_lock
|
|
def db_write_logs(content, module="", level="info"):
|
|
if module == "":
|
|
frame = currentframe().f_back
|
|
module_name = frame.f_globals["__name__"]
|
|
line_no = frame.f_lineno
|
|
module = f"{module_name}.{line_no}"
|
|
|
|
if level.lower() not in ["info", "warning", "error", "exception"]:
|
|
level = "unknown"
|
|
|
|
clibs.cursor.execute(f"INSERT INTO logs (level, module, content) VALUES (?, ?, ?)", (level, module, content))
|
|
|
|
@singledispatch
|
|
@db_lock
|
|
def db_query_logs(dummy: bool = True):
|
|
clibs.cursor.execute(f"SELECT * FROM logs")
|
|
records = clibs.cursor.fetchall()
|
|
len_records = len(records)
|
|
return records, len_records
|
|
|
|
@db_query_logs.register
|
|
def _(levels: list):
|
|
placeholders = ",".join("?" * len(levels))
|
|
clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN ({placeholders})", (*levels, ))
|
|
records = clibs.cursor.fetchall()
|
|
len_records = len(records)
|
|
return records, len_records
|
|
|
|
@db_query_logs.register
|
|
def _(search_text: str, records: list):
|
|
ids = [_[0] for _ in records]
|
|
placeholder = ",".join("?" * len(ids))
|
|
clibs.cursor.execute(f"SELECT * FROM logs WHERE id IN ({placeholder}) and content like ?", (ids + [f"%{search_text}%", ]))
|
|
records = clibs.cursor.fetchall()
|
|
len_records = len(records)
|
|
return records, len_records
|
|
|
|
@db_lock
|
|
def db_write_users(username, password_encrypted, salt):
|
|
clibs.cursor.execute("INSERT INTO users (username, password, salt) VALUES (?, ?, ?)", (username, password_encrypted, salt))
|
|
|
|
@db_lock
|
|
def db_delete_users(username):
|
|
# clibs.cursor.execute("INSERT INTO users (username, password, salt) VALUES (?, ?, ?)", (username, password_encrypted, salt))
|
|
...
|
|
|
|
@db_lock
|
|
def db_query_users(username):
|
|
clibs.cursor.execute(f""" SELECT * FROM users where username = "{username}" """)
|
|
record = clibs.cursor.fetchall()
|
|
return record
|