[t_change_ui: aio.py/brake.py/current.py] 整体修改了操作界面,删除了大部分的配置输入框,改用 configs.xlsx 配置文件替代,并优化了max/avg功能中写入结果数据的方式

This commit is contained in:
2024-06-28 09:47:59 +08:00
parent 79797a3bdd
commit 61fa840e53
7 changed files with 253 additions and 227 deletions

View File

@ -27,78 +27,46 @@ class GetThreadResult(Thread):
return None
def data_process(result_file, raw_data_dirs, av, rr, axis, vel, trq, w2t, estop):
# 功能:完成一个结果文件的数据处理
# 参数:结果文件,数据目录,以及预读取的参数
# 返回值:-
file_name = result_file.split('\\')[-1]
w2t(f"正在打开文件 {file_name} 需要 1min 左右", 1, 0, 'orange')
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
global stop
stop = 0
t_excel = GetThreadResult(load_workbook, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
wb_result = t_excel.get_result()
stop = 1
sleep(1.1)
w2t('')
prefix = result_file.split('\\')[-1].split('_')[0]
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now_doing_msg(raw_data_dir, 'start', w2t)
_, data_files = traversal_files(raw_data_dir, w2t)
# 数据文件串行处理模式---------------------------------
# count = 1
# for data_file in data_files:
# now_doing_msg(data_file, 'start', w2t)
# single_file_process(data_file, wb_result, count, av, rr, axis, vel, trq, w2t, estop)
# count += 1
# now_doing_msg(data_file, 'done', w2t)
# ---------------------------------------------------
# 数据文件并行处理模式---------------------------------
threads = [Thread(target=single_file_process, args=(data_files[0], wb_result, 1, av, rr, axis, vel, trq, w2t, estop)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, av, rr, axis, vel, trq, w2t, estop)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, av, rr, axis, vel, trq, w2t, estop))]
[t.start() for t in threads]
[t.join() for t in threads]
# ---------------------------------------------------
now_doing_msg(raw_data_dir, 'done', w2t)
now_doing_msg(result_file, 'done', w2t)
w2t(f"正在保存文件 {file_name} 需要 1min 左右", 1, 0, 'orange')
stop = 0
t_excel = Thread(target=wb_result.save, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
stop = 1
sleep(1.1)
w2t('\n')
return dirs, files
def check_files(raw_data_dirs, result_files, w2t):
def check_files(path, raw_data_dirs, result_files, w2t):
# 功能:检查数据文件以及结果文件的合规性
# 参数:数据文件夹,结果文件
# 返回值:-
if len(result_files) != 3:
msg = "结果文件数目错误,结果文件有且只有三个,请确认!"
if len(result_files) != 4:
for result_file in result_files:
w2t(result_file)
w2t(msg, 0, 2, 'red')
w2t("需要有四个文件,包括三个结果文件,以及一个配置文件,请确认!", 0, 2, 'red')
for result_file in result_files:
if result_file.endswith('configs.xlsx'):
result_files.remove(result_file)
break
else:
w2t("未找到配置文件,请确认!", 0, 8, 'red')
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['reach33', 'reach66', 'reach100']):
wd = result_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
msg = f"""请关闭所有相关数据文件,并检查工作目录 {wd} 下,有且只允许有类似如下三个文件:
msg = f"""请关闭所有相关数据文件,并检查工作目录 {path} 下,有且只允许有类似如下三个文件:
1. reach33_XXX制动性能测试.xlsx
2. reach66_XXX制动性能测试.xlsx
3. reach100_XX制动性能测试.xlsx"""
@ -127,6 +95,20 @@ def check_files(raw_data_dirs, result_files, w2t):
w2t("数据目录合规性检查结束,未发现问题......")
def get_configs(configfile, w2t):
axis = configfile.split('\\')[-2][-1]
if axis not in ['1', '2', '3']:
w2t("被处理的根文件夹命名必须是 [Jj][123] 的格式", 0, 9, 'red')
else:
axis = int(axis)
_wb = load_workbook(configfile, read_only=True)
_ws = _wb['Target']
rr = float(_ws.cell(row=2, column=axis+1).value)
av = float(_ws.cell(row=3, column=axis+1).value)
return av, rr
def now_doing_msg(docs, flag, w2t):
# 功能:输出正在处理的文件或目录
# 参数文件或目录start 或 done 标识
@ -153,20 +135,6 @@ def w2t_local(msg, wait, w2t):
break
def single_file_process(data_file, wb_result, count, av, rr, axis, vel, trq, w2t, estop):
# 功能:完成单个数据文件的处理
# 参数:如上
# 返回值:-
df = read_csv(data_file, sep='\t')
conditions = sorted(data_file.split('\\')[-2].split('_')) # ['loadxx', 'reachxx', 'speedxx']
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
row_start, row_end = find_row_start(data_file, df, conditions, av, rr, axis, vel, w2t, estop)
copy_data_to_result(df, ws_result, row_start, row_end, vel, trq, estop)
def copy_data_to_result(df, ws_result, row_start, row_end, vel, trq, estop):
# 功能:将数据文件中有效数据拷贝至结果文件对应的 sheet
# 参数:如上
@ -193,26 +161,13 @@ def copy_data_to_result(df, ws_result, row_start, row_end, vel, trq, estop):
ws_result.cell(row=_row, column=3).value = None
def find_result_sheet_name(conditions, count):
# 功能获取结果文件准确的sheet页名称
# 参数:臂展和速度的列表
# 返回值结果文件对应的sheet name
# 33%负载_33%速度_1 - ['loadxx', 'reachxx', 'speedxx']
load = conditions[0].removeprefix('load')
speed = conditions[2].removeprefix('speed')
result_sheet_name = f"{load}%负载_{speed}%速度_{count}"
return result_sheet_name
def find_row_start(data_file, df, conditions, av, rr, axis, vel, w2t, estop):
def find_row_start(data_file, df, conditions, av, rr, vel, estop, w2t):
# 功能:查找数据文件中有效数据的行号,也即最后一个速度下降的点位
# 参数:如上
# 返回值:速度下降点位,最后的数据点位
ratio = float(conditions[2].removeprefix('speed'))/100
av_max = av * ratio
row_max = df.index[-1]
# threshold = 30 if axis == 2 and conditions[0].removeprefix('load') == '100' else 10
threshold = 0.95
for _row in range(row_max, -1, -1):
@ -238,26 +193,89 @@ def find_row_start(data_file, df, conditions, av, rr, axis, vel, w2t, estop):
return row_start, row_end
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
def find_result_sheet_name(conditions, count):
# 功能:获取结果文件准确的sheet页名称
# 参数:臂展和速度的列表
# 返回值:结果文件对应的sheet name
# 33%负载_33%速度_1 - ['loadxx', 'reachxx', 'speedxx']
load = conditions[0].removeprefix('load')
speed = conditions[2].removeprefix('speed')
result_sheet_name = f"{load}%负载_{speed}%速度_{count}"
return dirs, files
return result_sheet_name
def main(path, av, rr, axis, vel, trq, estop, w2t):
def single_file_process(data_file, wb_result, count, av, rr, vel, trq, estop, w2t):
# 功能:完成单个数据文件的处理
# 参数:如上
# 返回值:-
df = read_csv(data_file, sep='\t')
conditions = sorted(data_file.split('\\')[-2].split('_')) # ['loadxx', 'reachxx', 'speedxx']
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
row_start, row_end = find_row_start(data_file, df, conditions, av, rr, vel, estop, w2t)
copy_data_to_result(df, ws_result, row_start, row_end, vel, trq, estop)
def data_process(result_file, raw_data_dirs, av, rr, vel, trq, estop, w2t):
# 功能:完成一个结果文件的数据处理
# 参数:结果文件,数据目录,以及预读取的参数
# 返回值:-
file_name = result_file.split('\\')[-1]
w2t(f"正在打开文件 {file_name} 需要 1min 左右", 1, 0, 'orange')
global stop
stop = 0
t_excel = GetThreadResult(load_workbook, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
wb_result = t_excel.get_result()
stop = 1
sleep(1.1)
w2t('')
prefix = result_file.split('\\')[-1].split('_')[0]
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now_doing_msg(raw_data_dir, 'start', w2t)
_, data_files = traversal_files(raw_data_dir, w2t)
# 数据文件串行处理模式---------------------------------
# count = 1
# for data_file in data_files:
# now_doing_msg(data_file, 'start', w2t)
# single_file_process(data_file, wb_result, count, av, rr, vel, trq, estop, w2t)
# count += 1
# now_doing_msg(data_file, 'done', w2t)
# ---------------------------------------------------
# 数据文件并行处理模式---------------------------------
threads = [
Thread(target=single_file_process, args=(data_files[0], wb_result, 1, av, rr, vel, trq, estop, w2t)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, av, rr, vel, trq, estop, w2t)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, av, rr, vel, trq, estop, w2t))
]
[t.start() for t in threads]
[t.join() for t in threads]
# ---------------------------------------------------
now_doing_msg(raw_data_dir, 'done', w2t)
now_doing_msg(result_file, 'done', w2t)
w2t(f"正在保存文件 {file_name} 需要 1min 左右", 1, 0, 'orange')
stop = 0
t_excel = Thread(target=wb_result.save, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
stop = 1
sleep(1.1)
w2t('\n')
def main(path, vel, trq, estop, w2t):
# 功能:执行处理所有数据文件
# 参数initialization函数的返回值
# 返回值:-
@ -266,7 +284,8 @@ def main(path, av, rr, axis, vel, trq, estop, w2t):
try:
# threads = []
check_files(raw_data_dirs, result_files, w2t)
check_files(path, raw_data_dirs, result_files, w2t)
av, rr = get_configs(path + '\\configs.xlsx', w2t)
prefix = []
for raw_data_dir in raw_data_dirs:
@ -277,8 +296,8 @@ def main(path, av, rr, axis, vel, trq, estop, w2t):
continue
else:
now_doing_msg(result_file, 'start', w2t)
data_process(result_file, raw_data_dirs, av, rr, axis, vel, trq, w2t, estop)
# threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, AV, RR, RC, AXIS)))
data_process(result_file, raw_data_dirs, av, rr, vel, trq, estop, w2t)
# threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, av, rr, vel, trq, estop, w2t)))
# [t.start() for t in threads]
# [t.join() for t in threads]
except Exception as Err:
@ -295,4 +314,4 @@ def main(path, av, rr, axis, vel, trq, estop, w2t):
if __name__ == "__main__":
stop = 0
main(path=argv[1], av=argv[2], rr=argv[3], axis=argv[4], vel=argv[5], trq=argv[6], estop=argv[7], w2t=argv[8])
main(*argv[1:])

View File

@ -63,19 +63,18 @@ def initialization(path, sub, w2t):
for data_file in data_files:
filename = data_file.split('\\')[-1]
if sub != 'cycle':
if data_file.endswith('configs.xlsx'):
count += 1
elif sub == 'cycle' and data_file.endswith('.xlsx'):
count += 1
else:
if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
print(f"不合规 {data_file}")
msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
w2t(msg, 0, 6, 'red')
else:
if filename.endswith('.xlsx'):
count += 1
elif not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
w2t(msg, 0, 7, 'red')
if sub == 'cycle' and count != 1:
w2t("未找到电机电流数据处理excel表格,确认后重新运行!", 0, 5, 'red')
if not ((sub == 'cycle' and count == 2) or (sub != 'cycle' and count == 1)):
w2t("使用max/avg功能时需要有配置文件表格使用cycle功能时需要有电机电流数据处理和配置文件两个表格,确认后重新运行!", 0, 5, 'red')
return data_files
@ -87,7 +86,10 @@ def current_max(data_files, rcs, trqh, w2t):
df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
else:
continue
cols = len(df.columns)
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
@ -100,9 +102,9 @@ def current_max(data_files, rcs, trqh, w2t):
w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
sep = '\t' if data_file.endswith('.data') else ','
csv_writer = writer(f_data, delimiter=sep)
csv_writer.writerow([''] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
@ -123,7 +125,10 @@ def current_avg(data_files, rcs, trqh, w2t):
df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
else:
continue
cols = len(df.columns)
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
@ -137,9 +142,9 @@ def current_avg(data_files, rcs, trqh, w2t):
w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
sep = '\t' if data_file.endswith('.data') else ','
csv_writer = writer(f_data, delimiter=sep)
csv_writer.writerow([''] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
@ -153,17 +158,17 @@ def current_avg(data_files, rcs, trqh, w2t):
return current
def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpm, w2t):
def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t):
result = None
hold = []
single = []
for data_file in data_files:
filename = data_file.split('\\')[-1]
if data_file.endswith('.xlsx'):
if data_file.endswith('.xlsx') and not data_file.endswith('configs.xlsx'):
result = data_file
elif match('j[1-7]_hold_.*\\.data', filename) or match('j[1-7]_hold_.*\\.csv', filename):
hold.append(data_file)
else:
elif match('j[1-7]_.*\\.data', filename) or match('j[1-7]_.*\\.csv', filename):
single.append(data_file)
w2t(f"正在打开文件 {result},需要 10s 左右", 1, 0, 'orange')
@ -189,9 +194,9 @@ def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpm, w2t):
pass
if dur == 0:
p_single(wb, single, vel, trq, rpm, w2t)
p_single(wb, single, vel, trq, rpms, w2t)
else:
p_scenario(wb, single, vel, trq, rpm, dur, w2t)
p_scenario(wb, single, vel, trq, rpms, dur, w2t)
w2t(f"正在保存文件 {result},需要 10s 左右", 1, 0, 'orange')
stop = 0
@ -232,15 +237,15 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
def p_single(wb, single, vel, trq, rpm, w2t):
def p_single(wb, single, vel, trq, rpms, w2t):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single:
rpm = 1 if rpm == 0 else rpm
scale = 1000 if data_file.endswith('.csv') else 1
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
rpm = rpms[axis-1] if data_file.endswith('.csv') else 1
scale = 1000 if data_file.endswith('.csv') else 1
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
@ -313,13 +318,13 @@ def p_single(wb, single, vel, trq, rpm, w2t):
cell.value = None
def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
def p_scenario(wb, single, vel, trq, rpms, dur, w2t):
for data_file in single:
cycle = 0.001
rpm = 1 if rpm == 0 else rpm
scale = 1000 if data_file.endswith('.csv') else 1
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
rpm = rpms[axis-1] if data_file.endswith('.csv') else 1
scale = 1000 if data_file.endswith('.csv') else 1
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
@ -365,17 +370,34 @@ def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
cell.value = None
# =======================================
def get_configs(configfile, w2t):
_wb = load_workbook(configfile, read_only=True)
_ws = _wb['Target']
rcs = []
rpms = []
for i in range(2, 9):
try:
rpms.append(float(_ws.cell(row=4, column=i).value))
except:
rpms.append(0.0)
try:
rcs.append(float(_ws.cell(row=6, column=i).value))
except:
rcs.append(0.0)
return rpms, rcs
def main(path, sub, rcs, vel, trq, trqh, dur, rpm, w2t):
def main(path, sub, dur, vel, trq, trqh, w2t):
data_files = initialization(path, sub, w2t)
rpms, rcs = get_configs(path + '\\configs.xlsx', w2t)
if sub == 'max':
current_max(data_files, rcs, trqh, w2t)
elif sub == 'avg':
current_avg(data_files, rcs, trqh, w2t)
elif sub == 'cycle':
current_cycle(dur, data_files, rcs, vel, trq, trqh, rpm, w2t)
current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t)
else:
pass