v0.0.6(2024/05/23) 1. 为了调整多功能框架,aio.py文件将会作为入口程序存在,不实现具体功能,功能的实现将由具体的功能脚本实现,aio.py只负责条件调用 2. 新增了自动化处理电流数据(电机/过载)的功能
104 lines
3.1 KiB
Python
104 lines
3.1 KiB
Python
import openpyxl
|
||
import os
|
||
import sys
|
||
import pandas
|
||
|
||
|
||
def warn_pause_exit(msg, pause_num, exit_num):
|
||
# 功能:打印告警信息,并推出程序
|
||
# 参数:告警信息,暂停的次数,退出的值
|
||
# 返回值:-
|
||
print(msg + '\n')
|
||
for i in range(pause_num):
|
||
_ = input("Press ENTER to continue......\n")
|
||
sys.exit(exit_num)
|
||
|
||
|
||
def traversal_files(path):
|
||
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
|
||
# 参数:路径
|
||
# 返回值:路径下的文件夹列表 路径下的文件列表
|
||
if not os.path.exists(path):
|
||
msg = f'数据文件夹{path}不存在,请确认后重试......'
|
||
warn_pause_exit(msg, 1, 11)
|
||
else:
|
||
dirs = []
|
||
files = []
|
||
for item in os.scandir(path):
|
||
if item.is_dir():
|
||
dirs.append(item.path)
|
||
elif item.is_file():
|
||
files.append(item.path)
|
||
|
||
return dirs, files
|
||
|
||
|
||
def initialization():
|
||
try:
|
||
# read init configurations from config file
|
||
wb_conf = openpyxl.load_workbook('./configs.xlsx', read_only=True)
|
||
ws_conf = wb_conf['current']
|
||
except Exception as Err:
|
||
msg = "无法在当前路径下找到或打开【configs.xlsx】文件,请确认!"
|
||
warn_pause_exit(msg, 1, 2)
|
||
|
||
FUNC = ws_conf.cell(row=2, column=2).value
|
||
DATA_DIR = ws_conf.cell(row=3, column=2).value
|
||
AXIS = int(ws_conf.cell(row=4, column=2).value)
|
||
RC = ws_conf.cell(row=5, column=2).value.split(',')
|
||
COLUMN = int(ws_conf.cell(row=6, column=2).value)
|
||
wb_conf.close()
|
||
|
||
_, data_files = traversal_files(DATA_DIR)
|
||
for data_file in data_files:
|
||
if not data_file.endswith('.data'):
|
||
msg = f"所有文件必须以 .data 结尾,请检查后重新运行。"
|
||
warn_pause_exit(msg, 1, 1)
|
||
|
||
return data_files, FUNC, RC, COLUMN, AXIS
|
||
|
||
|
||
def current_max(*args):
|
||
data_files, FUNC, RC, COLUMN, AXIS = args
|
||
for data_file in data_files:
|
||
df = pandas.read_csv(data_file, sep='\t')
|
||
col = df.columns.values[COLUMN-1]
|
||
c_max = df[col].max()
|
||
print(f"{data_file}: {c_max/1000*RC[AXIS-1]:.3f}")
|
||
|
||
msg = f"数据处理完毕......"
|
||
warn_pause_exit(msg, 1, 0)
|
||
|
||
|
||
def current_avg(*args):
|
||
data_files, FUNC, RC, COLUMN, AXIS = args
|
||
for data_file in data_files:
|
||
df = pandas.read_csv(data_file, sep='\t')
|
||
col = df.columns.values[COLUMN-1]
|
||
c_std = df[col].std()
|
||
c_avg = df[col].mean()
|
||
|
||
axis = int(data_file.split('\\')[-1].split('_')[0].replace('j', ''))
|
||
print(f"{data_file}: {(abs(c_avg)+c_std)/1000*float(RC[axis-1]):.3f}")
|
||
|
||
msg = f"数据处理完毕......"
|
||
warn_pause_exit(msg, 1, 0)
|
||
|
||
|
||
def full_cycle(*args):
|
||
pass
|
||
|
||
|
||
def main():
|
||
args = initialization()
|
||
if args[1] == 'max':
|
||
current_max(*args)
|
||
elif args[1] == 'avg':
|
||
current_avg(*args)
|
||
elif args[1] == 'cycle':
|
||
full_cycle(*args)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|