This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea 3010cb8931 v0.2.0.0(2024/07/17)
1. [profile: aio.py]
   - 增加velocity相关逻辑
   - 修改负载信息为曲线信息
2. [profile: factory_test.py]
   - 增加velocity相关逻辑
3. [profile: current.py]
   - 修正减速比获取的规则
4. [profile: openapi.py]
   - HmiRequest模块:日志取消记录move.monitor相关
   - HmiRequest模块:增加了durable_lock变量,控制文件读写互斥
2024-07-17 14:17:00 +08:00

415 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from openpyxl import load_workbook
from os import scandir
from os.path import exists
from sys import argv
from pandas import read_csv, concat, set_option
from re import match
from threading import Thread
from time import sleep
from csv import reader, writer
class GetThreadResult(Thread):
def __init__(self, func, args=()):
super(GetThreadResult, self).__init__()
self.func = func
self.args = args
self.result = 0
def run(self):
sleep(1)
self.result = self.func(*self.args)
def get_result(self):
Thread.join(self) # 等待线程执行完毕
try:
return self.result
except Exception as Err:
return None
def w2t_local(msg, wait, w2t):
while True:
global stop
if stop == 0 and wait != 0:
sleep(1)
w2t(msg, wait, 0, 'orange')
else:
break
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 8, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def initialization(path, sub, w2t):
_, data_files = traversal_files(path, w2t)
count = 0
for data_file in data_files:
filename = data_file.split('\\')[-1]
if data_file.endswith('configs.xlsx'):
count += 1
elif sub == 'cycle' and data_file.endswith('.xlsx'):
count += 1
else:
if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
print(f"不合规 {data_file}")
msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
w2t(msg, 0, 6, 'red')
if not ((sub == 'cycle' and count == 2) or (sub != 'cycle' and count == 1)):
w2t("使用max/avg功能时需要有配置文件表格使用cycle功能时需要有电机电流数据处理和配置文件两个表格确认后重新运行", 0, 5, 'red')
return data_files
def current_max(data_files, rcs, trq, w2t):
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
else:
continue
cols = len(df.columns)
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
col = df.columns.values[trq-1]
c_max = df[col].abs().max()
scale = 1 if data_file.endswith('.csv') else 1000
_ = abs(c_max/scale*rca)
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
sep = '\t' if data_file.endswith('.data') else ','
csv_writer = writer(f_data, delimiter=sep)
csv_writer.writerow([''] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:", 1, 0, 'purple')
for value in cur:
w2t(f"{value:.4f} ", 1, 0, 'purple')
w2t('')
w2t("\n【MAX】数据处理完毕......")
return current
def current_avg(data_files, rcs, trq, w2t):
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
else:
continue
cols = len(df.columns)
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
col = df.columns.values[trq-1]
c_std = df[col].std()
c_avg = df[col].mean()
scale = 1 if data_file.endswith('.csv') else 1000
_ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
sep = '\t' if data_file.endswith('.data') else ','
csv_writer = writer(f_data, delimiter=sep)
csv_writer.writerow([''] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:", 1, 0, 'purple')
for value in cur:
w2t(f"{value:.4f} ", 1, 0, 'purple')
w2t('')
w2t("\n【AVG】数据处理完毕......")
return current
def current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t):
result = None
hold = []
single = []
for data_file in data_files:
filename = data_file.split('\\')[-1]
if data_file.endswith('.xlsx') and not data_file.endswith('configs.xlsx'):
result = data_file
elif match('j[1-7]_hold_.*\\.data', filename) or match('j[1-7]_hold_.*\\.csv', filename):
hold.append(data_file)
elif match('j[1-7]_.*\\.data', filename) or match('j[1-7]_.*\\.csv', filename):
single.append(data_file)
w2t(f"正在打开文件 {result},需要 10s 左右", 1, 0, 'orange')
global stop
stop = 0
t_excel = GetThreadResult(load_workbook, args=(result, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
wb = t_excel.get_result()
stop = 1
sleep(1.1)
w2t('')
if hold != []:
avg = current_avg(hold, rcs, trqh, w2t)
for axis, cur_value in avg.items():
try:
shtname = f"J{axis}"
wb[shtname]["J4"].value = float(cur_value)
except:
pass
if dur == 0:
p_single(wb, single, vel, trq, rpms, rrs, w2t)
else:
p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t)
w2t(f"正在保存文件 {result},需要 10s 左右", 1, 0, 'orange')
stop = 0
t_excel = Thread(target=wb.save, args=(result, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
stop = 1
sleep(1.1)
w2t('\n')
w2t("----------------------------------------------------------")
w2t("全部处理完毕")
def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshold, step, end_point):
if flag == 'lt':
while _row_e > end_point:
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
if speed_avg < threshold:
_row_e -= step
_row_s -= step
continue
else:
return _row_s, _row_e
else:
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, exitcode, 'red')
elif flag == 'gt':
while _row_e > end_point:
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
if speed_avg > threshold:
_row_e -= step
_row_s -= step
continue
else:
return _row_s, _row_e
else:
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
def p_single(wb, single, vel, trq, rpms, rrs, w2t):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single:
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
rpm = rpms[axis-1] if data_file.endswith('.csv') else 1
scale = 1000 if data_file.endswith('.csv') else 1
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
i = 0
cycle = 0.001
for row in csv_reader:
i += 1
if i == 3:
cycle = float(row[0].split(':')[1].split('ms')[0]) / 1000
break
ws["H11"] = cycle
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
print(df_1.abs().max())
df = concat([df_1, df_2], axis=1)
_step = 5 if data_file.endswith('.csv') else 50
_end_point = 30 if data_file.endswith('.csv') else 200
_adjust = 0 if data_file.endswith('.csv') else 150
_row_e = df.index[-1]
_row_s = _row_e - _end_point
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
if speed_avg < 2:
# 过滤尾部为零无效数据
_row_s, _row_e = find_point(data_file, 'a1', 'lt', df, _row_s, _row_e, w2t, 1, threshold=5, step=_step, end_point=_end_point)
# 找到第一个起始点 row_end继续找到有数据的部分后面有一段有效数据区
row_end = _row_e - _adjust
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'a2', 'gt', df, _row_s, _row_e, w2t, 3, threshold=5, step=_step, end_point=_end_point)
# 速度已经快要降为零了,继续寻找下一个速度上升点
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'a3', 'lt', df, _row_s, _row_e, w2t, 3, threshold=5, step=_step, end_point=_end_point)
elif speed_avg > 2:
# 过滤尾部非零无效数据
_row_s, _row_e = find_point(data_file, 'b1', 'gt', df, _row_s, _row_e, w2t, 2, threshold=5, step=_step, end_point=_end_point)
# 找到第一个起始点 row_end继续找到有数据的部分后面有一段零数据区
row_end = _row_e - _adjust
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'b2', 'lt', df, _row_s, _row_e, w2t, 4, threshold=5, step=_step, end_point=_end_point)
# 目前已经有一点的速度值了,继续往前搜寻下一个速度为零的点
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'b3', 'gt', df, _row_s, _row_e, w2t, 4, threshold=5, step=_step, end_point=_end_point)
row_start = _row_s + _adjust
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for cell in row:
try:
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except:
cell.value = None
def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
for data_file in single:
cycle = 0.001
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
rpm = rpms[axis-1] if data_file.endswith('.csv') else 1
scale = 1000 if data_file.endswith('.csv') else 1
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
i = 0
for row in csv_reader:
i += 1
if i == 3:
cycle = float(row[0].split(':')[1].split('ms')[0]) / 1000
break
ws["H11"] = cycle
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
df = concat([df_1, df_2], axis=1)
row_start = 300
row_end = row_start + int(dur/cycle)
if row_end > df.index[-1]:
w2t(f"位置超限:{data_file} 共有 {df.index[-1]} 条数据,无法取到第 {row_end} 条数据,需要确认场景周期时间...", 0, 9, 'red')
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for cell in row:
try:
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except:
cell.value = None
def get_configs(configfile, w2t):
_wb = load_workbook(configfile, read_only=True)
_ws = _wb['Target']
rcs = []
rrs = []
rpms = []
for i in range(2, 9):
try:
rpms.append(float(_ws.cell(row=4, column=i).value))
except:
rpms.append(0.0)
try:
rcs.append(float(_ws.cell(row=6, column=i).value))
except:
rcs.append(0.0)
try:
rrs.append(float(_ws.cell(row=2, column=i).value))
except:
rrs.append(0.0)
return rpms, rcs, rrs
def main(path, sub, dur, vel, trq, trqh, w2t):
data_files = initialization(path, sub, w2t)
rpms, rcs, rrs = get_configs(path + '\\configs.xlsx', w2t)
if sub == 'max':
current_max(data_files, rcs, trq, w2t)
elif sub == 'avg':
current_avg(data_files, rcs, trq, w2t)
elif sub == 'cycle':
current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t)
else:
pass
if __name__ == '__main__':
stop = 0
main(*argv[1:])