from os import scandir, mkdir from threading import Thread from time import sleep from os.path import exists from paramiko import SSHClient, AutoAddPolicy from socket import setdefaulttimeout from logging import getLogger from logging.config import dictConfig import concurrent_log_handler ip_addr = '192.168.0.160' # for product # ip_addr = '192.168.84.129' # for test RADIAN = 57.3 # 180 / 3.1415926 MAX_FRAME_SIZE = 1024 TIMEOUT = 5 setdefaulttimeout(TIMEOUT) tab_names = {'dp': 'Data Process', 'at': 'Automatic Test', 'da': 'Duration Action', 'op': 'openapi'} PREFIX = '' # for pyinstaller packaging # PREFIX = '../assets/' # for source code debug app_icon = f'{PREFIX}templates/icon.ico' log_path = f'{PREFIX}templates/logs/' log_data_hmi = f'{PREFIX}templates/logs/c_msg.log' log_data_debug = f'{PREFIX}templates/logs/debug.log' heartbeat = f'{PREFIX}templates/heartbeat' durable_data_current_xlsx = f'{PREFIX}templates/durable/durable_data_current.xlsx' durable_data_current_max_xlsx = f'{PREFIX}templates/durable/durable_data_current_max.xlsx' durable_data_current = { 'time': list(range(1, 19)), 'axis1': [0 for _ in range(18)], 'axis2': [0 for _ in range(18)], 'axis3': [0 for _ in range(18)], 'axis4': [0 for _ in range(18)], 'axis5': [0 for _ in range(18)], 'axis6': [0 for _ in range(18)], } durable_data_current_max = { 'time': list(range(1, 19)), 'axis1': [0 for _ in range(18)], 'axis2': [0 for _ in range(18)], 'axis3': [0 for _ in range(18)], 'axis4': [0 for _ in range(18)], 'axis5': [0 for _ in range(18)], 'axis6': [0 for _ in range(18)], } if not exists(log_path): mkdir(log_path) # version:表示版本,该键值为从1开始的整数。该key必选,除此之外,其它key都是可选。 # formatters:日志格式化器,其value值为一个字典,该字典的每个键值对都代表一个Formatter,键值对中,key代表Formatter ID(自定义ID),value为字典,描述如何配置相应的Formatter实例。默认格式为 ‘%(message)s’ # filters:日志过滤器,其value值为一个字典,该字典的每个键值对都代表一个Filter,键值对中,key代表Filter ID(自定义ID),value为字典,描述如何配置相应的Filter实例。 # handlers:日志处理器,其value值为一个字典,该字典的每个键值对都代表一个Handler,键值对中,key代表Handler ID(自定义ID),value为字典,描述如何配置相应的Handler实例,包含以下配置key: # class (必选):日志处理器类全称 # level (可选):指定该日志处理器需要处理哪些级别的日志,低于该级别的日志将不被该handler处理。level可以为代表日志级别的整数或者表大写字符串,字符串日志级别和数字日志级别对应关系如下: # CRITICAL 50 # ERROR 40 # WARNING 30 # INFO 20 # DEBUG 10 # NOTSET 0 f_complex = '%(asctime)s # %(name)s-%(levelname)s-%(module)s-%(funcName)s-%(lineno)d # %(message)s' f_simple = '%(levelname)s-%(module)s-%(funcName)s-%(lineno)d: %(message)s' log_dicts = { 'version': 1, 'disable_existing_loggers': True, 'formatters': { 'standard': { 'format': f_complex, 'style': '%', 'datefmt': '%Y-%m-%dT%H:%M:%S', }, 'test': { 'format': f_simple, 'style': '%', 'datefmt': '%Y-%m-%dT%H:%M:%S', }, }, 'filters': {}, 'handlers': { 'console': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', 'formatter': 'test', }, 'hmi.log': { 'level': 'WARNING', 'class': 'concurrent_log_handler.ConcurrentRotatingFileHandler', 'filename': log_data_hmi, 'maxBytes': 1024*1024*50, 'backupCount': 10, 'encoding': 'utf-8', 'formatter': 'standard', }, 'debug.log': { 'level': 'INFO', 'class': 'logging.FileHandler', 'filename': log_data_debug, 'encoding': 'utf-8', 'formatter': 'standard', }, }, 'loggers': { 'normal': { 'handlers': ['hmi.log', 'debug.log'], 'level': 'DEBUG', 'propagate': False }, 'debug': { 'handlers': ['console'], 'level': 'DEBUG', 'propagate': False }, '': { 'handlers': ['hmi.log', 'debug.log'], 'level': 'DEBUG', 'propagate': False }, } } dictConfig(log_dicts) log_prod = getLogger('normal') log_debug = getLogger('debug') class GetThreadResult(Thread): def __init__(self, func, args=()): super(GetThreadResult, self).__init__() self.func = func self.args = args self.result = 0 def run(self): sleep(1) self.result = self.func(*self.args) def get_result(self): Thread.join(self) # 等待线程执行完毕 try: return self.result except Exception as Err: return None def traversal_files(path, w2t): # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 # 参数:路径 # 返回值:路径下的文件夹列表 路径下的文件列表 if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1, 'red') else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def prj_to_xcore(prj_file): ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect(ip_addr, 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() sftp.put(prj_file, '/tmp/target.zip') cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; ' cmd += 'chmod 777 -R target/; rm target.zip' ssh.exec_command(cmd) cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() cmd = 'cd /home/luoshi/bin/controller/; ' cmd += 'sudo chmod -R 755 projects; rm /tmp/*.prj; sudo mv projects/target/_build/*.prj /tmp; cd /tmp; ' cmd += 'prj=($(ls *.prj)); sudo mv ${prj[0]} /home/luoshi/bin/controller/projects/target/_build/target.prj; ' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdout.read().decode() # 需要read一下才能正常执行 stderr.read().decode() ssh.close() def execution(cmd, hr, w2t, tab_name, **kwargs): _id = hr.execution(cmd, **kwargs) _msg = hr.get_from_id(_id) if not _msg: w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name) else: return eval(_msg.split('#')[2])