from sys import argv from os import scandir from os.path import exists import paramiko import json def traversal_files(path, w2t): # 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录 # 参数:路径 # 返回值:路径下的文件夹列表 路径下的文件列表 if not exists(path): msg = f'数据文件夹{path}不存在,请确认后重试......' w2t(msg, 0, 1, 'red') else: dirs = [] files = [] for item in scandir(path): if item.is_dir(): dirs.append(item.path) elif item.is_file(): files.append(item.path) return dirs, files def check_files(data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 5: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red') config_file = reach33 = reach66 = reach100 = prj_file = None for data_file in data_files: filename = data_file.split('\\')[-1] if filename == 'configs.xlsx': config_file = data_file elif filename.startswith('reach33_') and filename.endswith('.xlsx'): reach33 = data_file elif filename.startswith('reach66_') and filename.endswith('.xlsx'): reach66 = data_file elif filename.startswith('reach100_') and filename.endswith('.xlsx'): reach100 = data_file elif filename.endswith('.zip'): prj_file = data_file else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red') if config_file and reach33 and reach66 and reach100 and prj_file: w2t("数据目录合规性检查结束,未发现问题......") return config_file, reach33, reach66, reach100, prj_file else: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red') w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red') def prj_to_xcore(prj_file): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019') sftp = ssh.open_sftp() # stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip') # ssh.exec_command('rm /tmp/target.zip') sftp.put(prj_file, '/tmp/target.zip') cmd = 'cd /tmp; ' cmd += 'rm -rf target/; ' cmd += 'mkdir target; ' cmd += 'unzip -d target/ -q target.zip; ' cmd += 'rm target.zip; ' ssh.exec_command(cmd) cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; ' cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/' stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True) stdin.write('luoshi2019' + '\n') stdin.flush() print(stdout.read().decode()) # 必须得输出一下stdout,才能正确执行sudo print(stderr.read().decode()) # 顺便也执行以下stderr ssh.close() def modify_prj(): pass def validate_resp(_id, response, w2t): match _id: case 'DATA ERR': w2t(f"数据处理错误,需要确认", 0, 4, 'red') case 'DATA READ ERR': w2t(f"无法读取数据,需要确认", 0, 3, 'red') case 'NOT SUPPORT': w2t(f"不支持的功能,需要确认", 0, 2, 'red') if not response: w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red') def execution(cmd, hr, w2t, **kwargs): _id = hr.excution(cmd, **kwargs) _msg = hr.get_from_id(_id) if not _msg: w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red') else: _response = json.loads(_msg) validate_resp(_id, _response, w2t) return _response def run_rl(hr, w2t): prj_path = '/home/luoshi/bin/controller/projects/target' _response = execution('overview.set_autoload', hr, w2t, autoload_prj_path=prj_path) print(f"set prj auto load: {_response}") _response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['Durable_Test_Com', 'Mechanical_Test_Com']) print(f"reload prj: {_response}") _response = execution('overview.get_cur_prj', hr, w2t) print(f"get prj name: {_response}") _response = execution('overview.get_autoload', hr, w2t) print(f"get auto load: {_response}") def main(path, hr, loadsel, w2t): data_dirs, data_files = traversal_files(path, w2t) config_file, reach33, reach66, reach100, prj_file = check_files(data_dirs, data_files, w2t) prj_to_xcore(prj_file) run_rl(hr, w2t) if __name__ == '__main__': main(*argv[1:])