scripts/rokae/aio/current.py
gitea e11dc60438 [modify]
v0.0.6(2024/05/23)
1. 为了调整多功能框架,aio.py文件将会作为入口程序存在,不实现具体功能,功能的实现将由具体的功能脚本实现,aio.py只负责条件调用
2. 新增了自动化处理电流数据(电机/过载)的功能
2024-05-23 17:35:34 +08:00

104 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import openpyxl
import os
import sys
import pandas
def warn_pause_exit(msg, pause_num, exit_num):
# 功能:打印告警信息,并推出程序
# 参数:告警信息,暂停的次数,退出的值
# 返回值:-
print(msg + '\n')
for i in range(pause_num):
_ = input("Press ENTER to continue......\n")
sys.exit(exit_num)
def traversal_files(path):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not os.path.exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
warn_pause_exit(msg, 1, 11)
else:
dirs = []
files = []
for item in os.scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def initialization():
try:
# read init configurations from config file
wb_conf = openpyxl.load_workbook('./configs.xlsx', read_only=True)
ws_conf = wb_conf['current']
except Exception as Err:
msg = "无法在当前路径下找到或打开【configs.xlsx】文件请确认"
warn_pause_exit(msg, 1, 2)
FUNC = ws_conf.cell(row=2, column=2).value
DATA_DIR = ws_conf.cell(row=3, column=2).value
AXIS = int(ws_conf.cell(row=4, column=2).value)
RC = ws_conf.cell(row=5, column=2).value.split(',')
COLUMN = int(ws_conf.cell(row=6, column=2).value)
wb_conf.close()
_, data_files = traversal_files(DATA_DIR)
for data_file in data_files:
if not data_file.endswith('.data'):
msg = f"所有文件必须以 .data 结尾,请检查后重新运行。"
warn_pause_exit(msg, 1, 1)
return data_files, FUNC, RC, COLUMN, AXIS
def current_max(*args):
data_files, FUNC, RC, COLUMN, AXIS = args
for data_file in data_files:
df = pandas.read_csv(data_file, sep='\t')
col = df.columns.values[COLUMN-1]
c_max = df[col].max()
print(f"{data_file}: {c_max/1000*RC[AXIS-1]:.3f}")
msg = f"数据处理完毕......"
warn_pause_exit(msg, 1, 0)
def current_avg(*args):
data_files, FUNC, RC, COLUMN, AXIS = args
for data_file in data_files:
df = pandas.read_csv(data_file, sep='\t')
col = df.columns.values[COLUMN-1]
c_std = df[col].std()
c_avg = df[col].mean()
axis = int(data_file.split('\\')[-1].split('_')[0].replace('j', ''))
print(f"{data_file}: {(abs(c_avg)+c_std)/1000*float(RC[axis-1]):.3f}")
msg = f"数据处理完毕......"
warn_pause_exit(msg, 1, 0)
def full_cycle(*args):
pass
def main():
args = initialization()
if args[1] == 'max':
current_max(*args)
elif args[1] == 'avg':
current_avg(*args)
elif args[1] == 'cycle':
full_cycle(*args)
if __name__ == '__main__':
main()