This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea 25fc43be81 优化 ssh 输入密码的部分
create_plot 函数中增加 close('all'),解决循环画图不销毁占用内存的问题
2024-07-31 11:21:24 +08:00

130 lines
4.7 KiB
Python

from os import scandir
from threading import Thread
from time import sleep
from os.path import exists
from paramiko import SSHClient, AutoAddPolicy
from socket import setdefaulttimeout
from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, Formatter, StreamHandler, basicConfig
from concurrent_log_handler import ConcurrentRotatingFileHandler
ip_addr = '192.168.0.160'
RADIAN = 57.3 # 180 / 3.1415926
MAX_FRAME_SIZE = 1024
TIMEOUT = 5
setdefaulttimeout(TIMEOUT)
tab_names = {'dp': 'Data Process', 'at': 'Automatic Test', 'da': 'Duration Action', 'op': 'openapi'}
# PREFIX = '' # for pyinstaller packaging
PREFIX = '../assets/' # for source code debug
log_data = f'{PREFIX}templates/c_msg.log'
heartbeat = f'{PREFIX}templates/heartbeat'
durable_data_current_xlsx = f'{PREFIX}templates/durable/durable_data_current.xlsx'
durable_data_current_max_xlsx = f'{PREFIX}templates/durable/durable_data_current_max.xlsx'
durable_data_current = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
durable_data_current_max = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
fmt = Formatter('%(asctime)s # %(levelname)s-%(filename)s-%(funcName)s # %(message)s')
# file_handler = logging.FileHandler(log_data)
# file_handler = RotatingFileHandler(filename=log_data, backupCount=10, maxBytes=50*1024*1024, encoding='utf-8')
file_handler = ConcurrentRotatingFileHandler(filename=log_data, backupCount=10, maxBytes=50*1024*1024, encoding='utf-8')
file_handler.setFormatter(fmt)
file_handler.setLevel(INFO)
console_handler = StreamHandler()
console_handler.setFormatter(fmt)
console_handler.setLevel(ERROR)
# basicConfig(level=WARNING, # for product
basicConfig(level=WARNING,
datefmt='%Y-%m-%dT%H:%M:%S',
# handlers=[file_handler]) # for product
handlers=[file_handler, console_handler])
class GetThreadResult(Thread):
def __init__(self, func, args=()):
super(GetThreadResult, self).__init__()
self.func = func
self.args = args
self.result = 0
def run(self):
sleep(1)
self.result = self.func(*self.args)
def get_result(self):
Thread.join(self) # 等待线程执行完毕
try:
return self.result
except Exception as Err:
return None
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(ip_addr, 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo chmod -R 755 projects; rm /tmp/*.prj; sudo mv projects/target/_build/*.prj /tmp; cd /tmp; '
cmd += 'prj=($(ls *.prj)); sudo mv ${prj[0]} /home/luoshi/bin/controller/projects/target/_build/target.prj; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
ssh.close()
def execution(cmd, hr, w2t, tab_name, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name)
else:
return eval(_msg.split('#')[2])