from PySide6.QtCore import QThread, Signal, QRunnable, QThreadPool, QMetaObject, Q_ARG, QMetaType, Qt from typing import Callable, Any class Worker(QThread): result = Signal(dict) error = Signal(dict) def __init__(self, func, *args, **kwargs): super().__init__() self.func = func self.args = args self.kwargs = kwargs def run(self): try: result = self.func(*self.args, **self.kwargs) self.result.emit({"result": result}) except Exception as error: self.error.emit({"error": str(error)}) # launch函数必须在主进程中定义调用 # def launch(self, func, on_anything: Callable[..., Any] = print, *args, **kwargs): # self.thread = Worker(func, *args, **kwargs) # self.thread.started.connect(lambda: on_anything({"started": True})) # self.thread.result.connect(on_anything) # self.thread.error.connect(on_anything) # self.thread.finished.connect(lambda: on_anything({"finished": True})) # self.thread.start() # # # def on_anything(results): # if "started" in results: # print("运行开始:", results["started"]) # if "result" in results: # print("正常结束:", results["result"]) # if "error" in results: # print(f"有异常发生:", results["error"]) # if "finished" in results: # print("运行结束:", results["finished"])