v0.1.4(2024/06/06)

1. AV/RR支持小数
2. 可处理电机电流单轴以及多轴数据,可根据需要进行参数设定处理不同轴的数据
3. 界面初始位置修改,以及删除所有entry的长度设定,因为设定无效
4. 修改了layout.xlsx布局,增加了duration/trqH/STO字段,以及额外的RC行,整体扩展了区域
5. 更改label/entry/optionmenu等控件的生成方式,使用循环实现,更加简洁和容易维护(暂未实现)
6. 支持工业/协作两条产品线的电机电流数据处理,包括单轴,场景,max/avg计算
This commit is contained in:
gitea 2024-06-06 19:04:55 +08:00
parent 97ba8ffc51
commit a0404c0003
4 changed files with 172 additions and 76 deletions

View File

@ -167,10 +167,10 @@ v0.1.2(2024/06/01)
2. 重新修改了README.md 2. 重新修改了README.md
3. 单独将rokae拉出来作为一个独立的repo进行维护与scripts分离 3. 单独将rokae拉出来作为一个独立的repo进行维护与scripts分离
v0.1.3(预计2024/06/06)-准备实现如下 v0.1.4(2024/06/06)
1. AV/RR支持小数 1. AV/RR支持小数
2. 可处理电机电流单轴以及多轴数据,可根据需要进行参数设定处理不同轴的数据 2. 可处理电机电流单轴以及多轴数据,可根据需要进行参数设定处理不同轴的数据
3. 界面初始位置修改以及删除所有entry的长度设定因为设定无效 3. 界面初始位置修改以及删除所有entry的长度设定因为设定无效
4. 修改了layout.xlsx布局增加了duration字段以及额外的RC行整体扩展了区域 4. 修改了layout.xlsx布局增加了duration/trqH/STO字段以及额外的RC行整体扩展了区域
5. 更改label/entry/optionmenu等控件的生成方式使用循环实现更加简洁和容易维护 5. 更改label/entry/optionmenu等控件的生成方式使用循环实现更加简洁和容易维护(暂未实现)
6. 支持工业/协作两条产品线的数据处理 6. 支持工业/协作两条产品线的电机电流数据处理包括单轴场景max/avg计算

View File

@ -24,7 +24,7 @@ class App(customtkinter.CTk):
# configure window # configure window
self.title("AIO - All in one automatic toolbox") self.title("AIO - All in one automatic toolbox")
# self.iconbitmap('./icon.ico') # self.iconbitmap('./icon.ico')
self.geometry("1180x550+100+100") self.geometry("1180x550+30+30")
self.protocol("WM_DELETE_WINDOW", self.func_end_call_back) self.protocol("WM_DELETE_WINDOW", self.func_end_call_back)
self.grid_rowconfigure(4, weight=1) self.grid_rowconfigure(4, weight=1)
@ -136,31 +136,37 @@ class App(customtkinter.CTk):
self.label_rc_1.grid(row=3, column=2, sticky='e', pady=(5, 5)) self.label_rc_1.grid(row=3, column=2, sticky='e', pady=(5, 5))
self.entry_rc_1 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_1 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_1.grid(row=3, column=3, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_1.grid(row=3, column=3, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_1.configure(state='disabled')
self.label_rc_2 = customtkinter.CTkLabel(self.frame_param, text="RC2", font=self.my_font) self.label_rc_2 = customtkinter.CTkLabel(self.frame_param, text="RC2", font=self.my_font)
self.label_rc_2.grid(row=3, column=4, sticky='e', pady=(5, 5)) self.label_rc_2.grid(row=3, column=4, sticky='e', pady=(5, 5))
self.entry_rc_2 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_2 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_2.grid(row=3, column=5, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_2.grid(row=3, column=5, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_2.configure(state='disabled')
self.label_rc_3 = customtkinter.CTkLabel(self.frame_param, text="RC3", font=self.my_font) self.label_rc_3 = customtkinter.CTkLabel(self.frame_param, text="RC3", font=self.my_font)
self.label_rc_3.grid(row=3, column=6, sticky='e', pady=(5, 5)) self.label_rc_3.grid(row=3, column=6, sticky='e', pady=(5, 5))
self.entry_rc_3 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_3 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_3.grid(row=3, column=7, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_3.grid(row=3, column=7, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_3.configure(state='disabled')
self.label_rc_4 = customtkinter.CTkLabel(self.frame_param, text="RC4", font=self.my_font) self.label_rc_4 = customtkinter.CTkLabel(self.frame_param, text="RC4", font=self.my_font)
self.label_rc_4.grid(row=3, column=8, sticky='e', pady=(5, 5)) self.label_rc_4.grid(row=3, column=8, sticky='e', pady=(5, 5))
self.entry_rc_4 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_4 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_4.grid(row=3, column=9, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_4.grid(row=3, column=9, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_4.configure(state='disabled')
self.label_rc_5 = customtkinter.CTkLabel(self.frame_param, text="RC5", font=self.my_font) self.label_rc_5 = customtkinter.CTkLabel(self.frame_param, text="RC5", font=self.my_font)
self.label_rc_5.grid(row=3, column=10, sticky='e', pady=(5, 5)) self.label_rc_5.grid(row=3, column=10, sticky='e', pady=(5, 5))
self.entry_rc_5 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_5 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_5.grid(row=3, column=11, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_5.grid(row=3, column=11, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_5.configure(state='disabled')
self.label_rc_6 = customtkinter.CTkLabel(self.frame_param, text="RC6", font=self.my_font) self.label_rc_6 = customtkinter.CTkLabel(self.frame_param, text="RC6", font=self.my_font)
self.label_rc_6.grid(row=3, column=12, sticky='e', pady=(5, 5)) self.label_rc_6.grid(row=3, column=12, sticky='e', pady=(5, 5))
self.entry_rc_6 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font) self.entry_rc_6 = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"optional", font=self.my_font)
self.entry_rc_6.grid(row=3, column=13, padx=(5, 10), pady=(5, 5), sticky='w') self.entry_rc_6.grid(row=3, column=13, padx=(5, 10), pady=(5, 5), sticky='w')
self.entry_rc_6.configure(state='disabled')
# ===================================================================== # =====================================================================
# create textbox # create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue") self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue")
@ -287,6 +293,8 @@ class App(customtkinter.CTk):
self.label_rpm.configure(text="RPM", text_color='black') self.label_rpm.configure(text="RPM", text_color='black')
self.entry_rpm.configure(state="disabled") self.entry_rpm.configure(state="disabled")
elif func_name == "max": elif func_name == "max":
self.label_rpm.configure(text="RPM", text_color='black')
self.entry_rpm.configure(state="disabled", placeholder_text='额定转速')
self.label_dur.configure(text="Dur", text_color='black') self.label_dur.configure(text="Dur", text_color='black')
self.entry_dur.configure(state="disabled") self.entry_dur.configure(state="disabled")
self.label_vel.configure(text="Vel", text_color='black') self.label_vel.configure(text="Vel", text_color='black')
@ -294,6 +302,8 @@ class App(customtkinter.CTk):
self.label_trq.configure(text="Trq", text_color='black') self.label_trq.configure(text="Trq", text_color='black')
self.option_trq.configure(state="disabled") self.option_trq.configure(state="disabled")
elif func_name == 'avg': elif func_name == 'avg':
self.label_rpm.configure(text="RPM", text_color='black')
self.entry_rpm.configure(state="disabled", placeholder_text='额定转速')
self.label_dur.configure(text="Dur", text_color='black') self.label_dur.configure(text="Dur", text_color='black')
self.entry_dur.configure(state="disabled") self.entry_dur.configure(state="disabled")
self.label_vel.configure(text="Vel", text_color='black') self.label_vel.configure(text="Vel", text_color='black')
@ -301,6 +311,8 @@ class App(customtkinter.CTk):
self.label_trq.configure(text="Trq", text_color='black') self.label_trq.configure(text="Trq", text_color='black')
self.option_trq.configure(state="disabled") self.option_trq.configure(state="disabled")
elif func_name == 'cycle': elif func_name == 'cycle':
self.label_rpm.configure(text="RPM", text_color='blue')
self.entry_rpm.configure(state="normal", placeholder_text='cycle')
self.label_dur.configure(text="Dur", text_color='blue') self.label_dur.configure(text="Dur", text_color='blue')
self.entry_dur.configure(state="normal", placeholder_text='scenario') self.entry_dur.configure(state="normal", placeholder_text='scenario')
self.label_vel.configure(text="Vel", text_color='red') self.label_vel.configure(text="Vel", text_color='red')
@ -308,7 +320,6 @@ class App(customtkinter.CTk):
self.label_trq.configure(text="Trq", text_color='red') self.label_trq.configure(text="Trq", text_color='red')
self.option_trq.configure(state="normal") self.option_trq.configure(state="normal")
def write2textbox(self, text, wait=0, exitcode=0): def write2textbox(self, text, wait=0, exitcode=0):
if wait != 0: if wait != 0:
self.textbox.configure(text_color='blue') self.textbox.configure(text_color='blue')
@ -337,8 +348,8 @@ class App(customtkinter.CTk):
elif flag == 'required': elif flag == 'required':
_ = float(item) _ = float(item)
except Exception as Err: except Exception as Err:
self.write2textbox(str(Err)) tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
self.write2textbox("参数数据缺失,或者数据类型错误,更正后重新运行...", 0, 3) self.write2textbox(f"错误信息:{Err}\n参数数据缺失,或者数据类型错误,更正后重新运行...", 0, 3)
return True return True
def check_param(self): def check_param(self):
@ -353,7 +364,7 @@ class App(customtkinter.CTk):
sub_func = self.menu_sub.get() sub_func = self.menu_sub.get()
c1 = exists(path) c1 = exists(path)
c2 = self.is_float(av, rr) c2 = self.is_float('required', av, rr)
c3 = rpm = 1 c3 = rpm = 1
c4 = sub_func in ['industrial', 'cobot'] c4 = sub_func in ['industrial', 'cobot']
c5 = True if vel != trq else False c5 = True if vel != trq else False
@ -374,6 +385,7 @@ class App(customtkinter.CTk):
elif func_name == 'current': elif func_name == 'current':
path = self.entry_path.get() path = self.entry_path.get()
rc = self.entry_rc.get() rc = self.entry_rc.get()
rpm = self.entry_rpm.get()
dur = self.entry_dur.get() dur = self.entry_dur.get()
vel = self.option_vel.get() vel = self.option_vel.get()
trq = self.option_trq.get() trq = self.option_trq.get()
@ -386,9 +398,10 @@ class App(customtkinter.CTk):
rc5 = self.entry_rc_5.get() rc5 = self.entry_rc_5.get()
rc6 = self.entry_rc_6.get() rc6 = self.entry_rc_6.get()
c1 = exists(path) c0 = exists(path)
c2 = sub in ['max', 'avg', 'cycle'] c1 = sub in ['max', 'avg', 'cycle']
c3 = self.is_float('optional', rc) c2 = self.is_float('optional', rc)
c3 = self.is_float('optional', rpm)
_ = [rc1, rc2, rc3, rc4, rc5, rc6] _ = [rc1, rc2, rc3, rc4, rc5, rc6]
c4 = self.is_float('required', *_) c4 = self.is_float('required', *_)
@ -397,12 +410,15 @@ class App(customtkinter.CTk):
c5 = True if vel != trq else False c5 = True if vel != trq else False
c6 = self.is_float('optional', dur) c6 = self.is_float('optional', dur)
if c1 and c2 and c3 and c4 and c5 and c6: if c0 and c1 and c2 and c3 and c4 and c5 and c6:
rcs = [] rcs = []
for x in _: for x in _:
rcs.append(float(x)) rcs.append(float(x))
rc = 0 if rc == '' else rc
dur = 0 if sub != 'cycle' or dur == '' else dur dur = 0 if sub != 'cycle' or dur == '' else dur
return 2, path, sub, rcs, int(vel), int(trq), int(trqh), float(dur) rpm = 0 if sub != 'cycle' or rpm == '' else rpm
rcs.append(float(rc))
return 2, path, sub, rcs, int(vel), int(trq), int(trqh), float(dur), float(rpm)
else: else:
return 0, 0 return 0, 0
# ======================================================= # =======================================================
@ -427,7 +443,7 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], av=args[1], rr=args[2], rpm=args[3], axis=args[4], vel=args[5], trq=args[6], w2t=self.write2textbox) func_dict[flag](path=args[0], av=args[1], rr=args[2], rpm=args[3], axis=args[4], vel=args[5], trq=args[6], w2t=self.write2textbox)
elif flag == 2: elif flag == 2:
# tkinter.messagebox.showinfo(title="TBD", message="功能待实现......") # tkinter.messagebox.showinfo(title="TBD", message="功能待实现......")
func_dict[flag](path=args[0], sub=args[1], rcs=args[2], vel=args[3], trq=args[4], trqh=args[5], dur=args[6], w2t=self.write2textbox) func_dict[flag](path=args[0], sub=args[1], rcs=args[2], vel=args[3], trq=args[4], trqh=args[5], dur=args[6], rpm=args[7], w2t=self.write2textbox)
elif flag == 3: elif flag == 3:
func_dict[flag](path=args[0], w2t=self.write2textbox) func_dict[flag](path=args[0], w2t=self.write2textbox)
else: else:
@ -436,11 +452,14 @@ class App(customtkinter.CTk):
self.textbox.configure(state='disabled') self.textbox.configure(state='disabled')
def func_check_callback(self): def func_check_callback(self):
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param() flag, *args = self.check_param()
if flag: if flag:
tkinter.messagebox.showinfo(title="参数正确", message="所有参数形式上填写无误,可以开始运行!") tkinter.messagebox.showinfo(title="参数正确", message="所有参数形式上填写无误,可以开始运行!")
else: else:
tkinter.messagebox.showerror(title="参数错误", message="需要检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="需要检查对应参数是否填写正确!", )
self.textbox.configure(state='disabled')
def func_log_callback(self): def func_log_callback(self):
content = self.textbox.get(index1='1.0', index2='end') content = self.textbox.get(index1='1.0', index2='end')
@ -453,7 +472,7 @@ class App(customtkinter.CTk):
tkinter.messagebox.showinfo(title="保存成功", message=f'{log_name}已被保存存至↓↓↓\n{getcwd()}') tkinter.messagebox.showinfo(title="保存成功", message=f'{log_name}已被保存存至↓↓↓\n{getcwd()}')
except Exception as Err: except Exception as Err:
print(Err) self.write2textbox(Err)
tkinter.messagebox.showerror(title="保存失败", message="未能保存本次日志或未能完整保存请准备好相关数据联系fanmingfu@rokae.com查看详细信息", ) tkinter.messagebox.showerror(title="保存失败", message="未能保存本次日志或未能完整保存请准备好相关数据联系fanmingfu@rokae.com查看详细信息", )
else: else:
tkinter.messagebox.showwarning(title="未能保存", message="日志数据为空,不可保存!") tkinter.messagebox.showwarning(title="未能保存", message="日志数据为空,不可保存!")

View File

@ -2,11 +2,11 @@ from openpyxl import load_workbook
from os import scandir from os import scandir
from os.path import exists from os.path import exists
from sys import argv from sys import argv
from pandas import read_csv from pandas import read_csv, concat, set_option
from re import match from re import match
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from csv import reader
class GetThreadResult(Thread): class GetThreadResult(Thread):
def __init__(self, func, args=()): def __init__(self, func, args=()):
super(GetThreadResult, self).__init__() super(GetThreadResult, self).__init__()
@ -42,7 +42,7 @@ def traversal_files(path, w2t):
# 返回值:路径下的文件夹列表 路径下的文件列表 # 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path): if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......' msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1) w2t(msg, 0, 8)
else: else:
dirs = [] dirs = []
files = [] files = []
@ -64,16 +64,16 @@ def initialization(path, sub, w2t):
if sub != 'cycle': if sub != 'cycle':
if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)): if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。" msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
w2t(msg, 0, 2) w2t(msg, 0, 6)
else: else:
if filename.endswith('.xlsx'): if filename.endswith('.xlsx'):
count += 1 count += 1
elif not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)): elif not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。" msg = f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
w2t(msg, 0, 3) w2t(msg, 0, 7)
if sub == 'cycle' and count != 1: if sub == 'cycle' and count != 1:
w2t("未找到电机电流数据处理excel表格确认后重新运行", 0, 4) w2t("未找到电机电流数据处理excel表格确认后重新运行", 0, 5)
return data_files return data_files
@ -125,7 +125,7 @@ def current_avg(data_files, rcs, trqh, w2t):
return current return current
def current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t): def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpm, w2t):
result = None result = None
hold = [] hold = []
single = [] single = []
@ -162,9 +162,9 @@ def current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t):
pass pass
if dur == 0: if dur == 0:
p_single(wb, single, vel, trq, w2t) p_single(wb, single, vel, trq, rpm, w2t)
else: else:
p_scenario() p_scenario(wb, single, vel, trq, rpm, dur, w2t)
w2t(f"正在保存文件 {result},需要 10s 左右", 1) w2t(f"正在保存文件 {result},需要 10s 左右", 1)
stop = 0 stop = 0
@ -180,94 +180,171 @@ def current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t):
w2t("全部处理完毕") w2t("全部处理完毕")
def p_single(wb, single, vel, trq, w2t): def find_point(flag, df, _row_s, _row_e, w2t, exitcode, threshold, step, end_point):
if flag == 'lt':
while _row_e > end_point:
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
if speed_avg < threshold:
_row_e -= step
_row_s -= step
continue
else:
return _row_s, _row_e
else:
w2t(f"数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, exitcode)
elif flag == 'gt':
while _row_e > end_point:
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
if speed_avg > threshold:
_row_e -= step
_row_s -= step
continue
else:
return _row_s, _row_e
else:
w2t(f"数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode)
def p_single(wb, single, vel, trq, rpm, w2t):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑 # 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑 # 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置 # 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single: for data_file in single:
rpm = 1 if rpm == 0 else rpm
scale = 1000 if data_file.endswith('.csv') else 1
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j')) axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}" shtname = f"J{axis}"
ws = wb[shtname] ws = wb[shtname]
set_option("display.precision", 2)
if data_file.endswith('.data'): if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t') df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'): elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8) df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
i = 0
cycle = 0.001
for row in csv_reader:
i += 1
if i == 3:
cycle = float(row[0].split(':')[1].split('ms')[0]) / 1000
break
ws["H11"] = cycle
# 过滤尾部无效数据 col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm)
df_2 = df[col_names[trq-1]].multiply(scale)
df = concat([df_1, df_2], axis=1)
_step = 5 if data_file.endswith('.csv') else 50
_end_point = 30 if data_file.endswith('.csv') else 200
_adjust = 0 if data_file.endswith('.csv') else 150
_row_e = df.index[-1] _row_e = df.index[-1]
_row_s = _row_e - 200 _row_s = _row_e - _end_point
while _row_e > 200: speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean() if speed_avg < 2:
if speed_avg > 2: # 过滤尾部为零无效数据
_row_e -= 50 _row_s, _row_e = find_point('lt', df, _row_s, _row_e, w2t, 1, threshold=2, step=_step, end_point=_end_point)
_row_s -= 50 # 找到第一个起始点 row_end继续找到有数据的部分后面有一段有效数据区
continue row_end = _row_e - _adjust
else: _row_e -= _end_point
break _row_s -= _end_point
else: _row_s, _row_e = find_point('gt', df, _row_s, _row_e, w2t, 3, threshold=2, step=_step, end_point=_end_point)
w2t("数据有误,需要检查,无法找到第一个有效起始点...", 0, 1) # 速度已经快要降为零了,继续寻找下一个速度上升点
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point('lt', df, _row_s, _row_e, w2t, 3, threshold=2, step=_step, end_point=_end_point)
elif speed_avg > 2:
# 过滤尾部非零无效数据
_row_s, _row_e = find_point('gt', df, _row_s, _row_e, w2t, 2, threshold=2, step=_step, end_point=_end_point)
# 找到第一个起始点 row_end继续找到有数据的部分后面有一段零数据区
row_end = _row_e - _adjust
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point('lt', df, _row_s, _row_e, w2t, 4, threshold=2, step=_step, end_point=_end_point)
#目前已经有一点的速度值了,继续往前搜寻下一个速度为零的点
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point('gt', df, _row_s, _row_e, w2t, 4, threshold=2, step=_step, end_point=_end_point)
# 找到第一个起始点 row_end继续找到有数据的部分 row_start = _row_s + _adjust
row_end = _row_e - 100
_row_e -= 200
_row_s -= 200
while _row_e > 200:
speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean()
if speed_avg < 2:
_row_e -= 50
_row_s -= 50
continue
else:
break
else:
w2t("数据有误,需要检查,无法找到第二个有效起始点...", 0, 2)
# 目前已经有一点的速度值了,继续往前搜寻下一个速度为零的点
_row_e -= 200
_row_s -= 200
while _row_e > 200:
speed_avg = df.iloc[_row_s:_row_e, vel-1].abs().mean()
if speed_avg > 2:
_row_e -= 50
_row_s -= 50
continue
else:
break
else:
w2t("数据有误,需要检查,无法找到第三个有效起始点...", 0, 3)
row_start = _row_s + 180
data = [] data = []
for row in range(row_start, row_end): for row in range(row_start, row_end):
data.append(df.iloc[row, vel-1]) data.append(df.iloc[row, 0])
data.append(df.iloc[row, trq-1]) data.append(df.iloc[row, 1])
i = 0 i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=15000, max_col=3): for row in ws.iter_rows(min_row=2, min_col=2, max_row=15000, max_col=3):
for cell in row: for cell in row:
try: try:
cell.value = data[i] _ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1 i += 1
except: except:
cell.value = None cell.value = None
def p_scenario(): def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
pass for data_file in single:
cycle = 0.001
rpm = 1 if rpm == 0 else rpm
scale = 1000 if data_file.endswith('.csv') else 1
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
ws = wb[shtname]
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
i = 0
for row in csv_reader:
i += 1
if i == 3:
cycle = float(row[0].split(':')[1].split('ms')[0]) / 1000
break
ws["H11"] = cycle
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm)
df_2 = df[col_names[trq-1]].multiply(scale)
df = concat([df_1, df_2], axis=1)
row_start = 300
row_end = row_start + int(dur/cycle)
if row_end > df.index[-1]:
w2t(f"位置超限:{data_file} 共有 {df.index[-1]} 条数据,无法取到第 {row_end} 条数据...", 0, 9)
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=15000, max_col=3):
for cell in row:
try:
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except:
cell.value = None
# ======================================= # =======================================
def main(path, sub, rcs, vel, trq, trqh, dur, w2t): def main(path, sub, rcs, vel, trq, trqh, dur, rpm, w2t):
data_files = initialization(path, sub, w2t) data_files = initialization(path, sub, w2t)
if sub == 'max': if sub == 'max':
current_max(data_files, rcs, trqh, w2t) current_max(data_files, rcs, trqh, w2t)
elif sub == 'avg': elif sub == 'avg':
current_avg(data_files, rcs, trqh, w2t) current_avg(data_files, rcs, trqh, w2t)
elif sub == 'cycle': elif sub == 'cycle':
current_cycle(dur, data_files, rcs, vel, trq, trqh, w2t) current_cycle(dur, data_files, rcs, vel, trq, trqh, rpm, w2t)
else: else:
pass pass

View File

@ -1 +1 @@
0.1.2 @ 06/01/2024 0.1.4 @ 06/06/2024