148 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import sqlite3
 | 
						|
import time
 | 
						|
from inspect import currentframe
 | 
						|
from functools import singledispatch, wraps
 | 
						|
 | 
						|
from codes.common import clibs
 | 
						|
 | 
						|
 | 
						|
def db_init():
 | 
						|
    if clibs.db_file.exists():
 | 
						|
        return
 | 
						|
 | 
						|
    conn = sqlite3.connect(clibs.db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=10.0)
 | 
						|
    cursor = conn.cursor()
 | 
						|
    cursor.execute("PRAGMA journal_mode=wal")
 | 
						|
    cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
 | 
						|
    cursor.execute("PRAGMA synchronous=normal")
 | 
						|
    cursor.execute("PRAGMA temp_store=memory")
 | 
						|
    cursor.execute("PRAGMA mmap_size=30000000000")
 | 
						|
    cursor.execute("PRAGMA cache_size=200000")
 | 
						|
    cursor.execute(
 | 
						|
        """
 | 
						|
        create table if not exists logs(
 | 
						|
            id integer primary key autoincrement,
 | 
						|
            timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
 | 
						|
            level text,
 | 
						|
            module text,
 | 
						|
            content text
 | 
						|
        )
 | 
						|
        """
 | 
						|
    )
 | 
						|
    cursor.execute(
 | 
						|
        """
 | 
						|
        create table if not exists users(
 | 
						|
            id integer primary key autoincrement,
 | 
						|
            timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')),
 | 
						|
            username text not null unique,
 | 
						|
            password text not null,
 | 
						|
            salt text not null
 | 
						|
        )
 | 
						|
        """
 | 
						|
    )
 | 
						|
    cursor.execute(f"INSERT INTO logs (level, module, content) VALUES (?, ?, ?)", ("info", "login_ui", "数据库初始化成功!"))
 | 
						|
    cursor.close()
 | 
						|
    conn.close()
 | 
						|
 | 
						|
def db_lock(func):
 | 
						|
    @wraps(func)
 | 
						|
    def wrapper(*args, **kwargs):
 | 
						|
        clibs.caller_frame = currentframe().f_back
 | 
						|
        try:
 | 
						|
            clibs.lock.acquire(True)
 | 
						|
            ret = func(*args, **kwargs)
 | 
						|
        except Exception as e:
 | 
						|
            print(f"db operation error: {e}")
 | 
						|
            ret = None
 | 
						|
        finally:
 | 
						|
            clibs.lock.release()
 | 
						|
        return ret
 | 
						|
    return wrapper
 | 
						|
 | 
						|
def db_backup():
 | 
						|
    t = time.strftime("%Y%m%d%H%M%S", time.localtime())
 | 
						|
    db_file_backup = clibs.base_path / f"assets/database/toolbox.{t}.db"
 | 
						|
    db_file_backup.write_bytes(clibs.db_file.read_bytes())
 | 
						|
    db_dir = clibs.base_path / "assets/database"
 | 
						|
    db_list = [db for db in db_dir.glob("*.db")]
 | 
						|
    for db in sorted(db_list)[:-clibs.config["maximum_db_number"]]:
 | 
						|
        db.unlink()
 | 
						|
 | 
						|
def db_conn():
 | 
						|
    # import traceback, inspect
 | 
						|
    # print("[Conn] 被调用", traceback.format_stack()[-2])
 | 
						|
    # print("[Conn] conn=", clibs.conn, "cursor=", clibs.cursor)
 | 
						|
 | 
						|
    if clibs.conn is not None:
 | 
						|
        return
 | 
						|
 | 
						|
    clibs.conn = sqlite3.connect(clibs.db_file, isolation_level=None, check_same_thread=False, cached_statements=2048, timeout=3.0)
 | 
						|
    clibs.cursor = clibs.conn.cursor()
 | 
						|
    clibs.cursor.execute("PRAGMA journal_mode=wal")
 | 
						|
    clibs.cursor.execute("PRAGMA wal_checkpoint=TRUNCATE")
 | 
						|
    clibs.cursor.execute("PRAGMA synchronous=normal")
 | 
						|
    clibs.cursor.execute("PRAGMA temp_store=memory")
 | 
						|
    clibs.cursor.execute("PRAGMA mmap_size=30000000000")
 | 
						|
    clibs.cursor.execute("PRAGMA cache_size=200000")
 | 
						|
 | 
						|
@db_lock
 | 
						|
def db_close():
 | 
						|
    if clibs.cursor is not None:
 | 
						|
        clibs.cursor.close()
 | 
						|
    if clibs.conn is not None:
 | 
						|
        clibs.conn.close()
 | 
						|
    clibs.conn, clibs.cursor = None, None
 | 
						|
 | 
						|
@db_lock
 | 
						|
def db_write_logs(content, module="", level="info"):
 | 
						|
    if module == "" and clibs.caller_frame is not None:
 | 
						|
        module_name = clibs.caller_frame.f_globals["__name__"].split(".")[-1]  #
 | 
						|
        func_name = clibs.caller_frame.f_code.co_name
 | 
						|
        line_no = clibs.caller_frame.f_lineno
 | 
						|
        module = f"{module_name}-{func_name}:{line_no}"
 | 
						|
 | 
						|
    if level.lower() not in ["info", "warning", "error", "exception"]:
 | 
						|
        level = "unknown"
 | 
						|
 | 
						|
    clibs.cursor.execute(f"INSERT INTO logs (level, module, content) VALUES (?, ?, ?)", (level, module, content))
 | 
						|
 | 
						|
@singledispatch
 | 
						|
@db_lock
 | 
						|
def db_query_logs(dummy: object = None):
 | 
						|
    clibs.cursor.execute(f"SELECT * FROM logs")
 | 
						|
    records = clibs.cursor.fetchall()
 | 
						|
    len_records = len(records)
 | 
						|
    return records, len_records
 | 
						|
 | 
						|
@db_query_logs.register
 | 
						|
def _(levels: list):
 | 
						|
    placeholders = ",".join("?" * len(levels))
 | 
						|
    clibs.cursor.execute(f"SELECT * FROM logs WHERE level IN ({placeholders})", (*levels, ))
 | 
						|
    records = clibs.cursor.fetchall()
 | 
						|
    len_records = len(records)
 | 
						|
    return records, len_records
 | 
						|
 | 
						|
@db_query_logs.register
 | 
						|
def _(search_text: str, records: list):
 | 
						|
    ids = [_[0] for _ in records]
 | 
						|
    placeholder = ",".join("?" * len(ids))
 | 
						|
    clibs.cursor.execute(f"SELECT * FROM logs WHERE id IN ({placeholder}) and content like ?", (ids + [f"%{search_text}%", ]))
 | 
						|
    records = clibs.cursor.fetchall()
 | 
						|
    len_records = len(records)
 | 
						|
    return records, len_records
 | 
						|
 | 
						|
@db_lock
 | 
						|
def db_write_users(username, password_encrypted, salt):
 | 
						|
    clibs.cursor.execute("INSERT INTO users (username, password, salt) VALUES (?, ?, ?)", (username, password_encrypted, salt))
 | 
						|
 | 
						|
@db_lock
 | 
						|
def db_delete_users(username):
 | 
						|
    # clibs.cursor.execute("INSERT INTO users (username, password, salt) VALUES (?, ?, ?)", (username, password_encrypted, salt))
 | 
						|
    ...
 | 
						|
 | 
						|
@db_lock
 | 
						|
def db_query_users(username):
 | 
						|
    clibs.cursor.execute(f""" SELECT * FROM users where username = "{username}" """)
 | 
						|
    record = clibs.cursor.fetchall()
 | 
						|
    return record
 |