[transform] from scripts to independent repo

This commit is contained in:
gitea 2024-06-01 07:49:50 +08:00
parent 5876306549
commit 0fd8a4e8b5
8 changed files with 937 additions and 0 deletions

126
aio/README.md Normal file
View File

@ -0,0 +1,126 @@
程序功能自动化处理制动性能采集的数据减少人工处理时长目前测试单轴可从原来的4-6h减少到15min
使用方法:修改 configs.xlsx 配置文件中的一些参数(数据文件路径/减速比/最大角速度/额定电流),然后直接执行即可
第三方库pandas/pywin32/openpyxl
pip3 install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install openpyxl -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install pywin32 -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install Pillow -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
python.exe -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install --upgrade --force-reinstall numpy -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
打包方法pyinstaller.exe -F --version-file file_version_info.txt -i .\icon.ico .\aio.py
最好不用虚拟环境
pyinstaller.exe -F --version-file file_version_info.txt -i .\icon.ico .\aio.py -p .\brake.py -p .\current.py
https://customtkinter.tomschimansky.com/documentation/packaging
pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/AppData/Local/Programs/Python/Python312/Lib/site-packages/customtkinter;customtkinter/" --version-file file_version_info.txt -i .\icon.ico .\aio.py -p .\brake.py
注意事项:
1. 数据文件存储存储规则
所谓数据文件,就是我们拍急停的时候,采集到的 .data 文件,正方向拍三次急停,会采集到三个 .data 文件,存储在同一个文件夹内,即每组(三个 .data 文件)文件必须存储在同一个文件夹内,数据文件的命名无要求,
2. 文件夹命名规则
虽然对采集到的 .data 文件没有命名要求,但是对于文件夹的命名是有要求的,必须是如下格式:
loadXX_speedXX_reachXX 或者 loadXX_reachXX_speedXX
这里的XX代表不同条件下的测试数值比如
load100_speed33_reach66指的是负载100%速度33%臂展66%
3. 结果文件命名规则
所谓结果文件,就是处理数据的那个 excle 文件,该文件名字的前缀必须是 loadXX_XXXXXXXXX.xlsx比如
load33_自研_制动性能测试.xlsx
load66_自研_制动性能测试.xlsx
load100_自研_制动性能测试.xlsx
!!结果文件可以是没有数据的,也可以是之前有数据的,只要保证第 6 点中的那几个数据准确即可
4. 数据存储的组织结
..../j1/load100_speed33_reach100
..../j1/load100_speed66_reach100
....
..../j1/load100_speed100_reach100
..../j1/load100_speed33_reach100/2024_05_16_09_18_52.data
..../j1/load100_speed33_reach100/2024_05_16_09_19_52.data
..../j1/load100_speed33_reach100/2024_05_16_09_20_52.data
..../j1/load33_自研_制动性能测试.xlsx
..../j1/load66_自研_制动性能测试.xlsx
..../j1/load100_自研_制动性能测试.xlsx
5. 文件的打开与关闭
a. 在执行程序之前,需要关闭所有相关 excle 文件
b. 在执行程序之中,不允许打开相关 excle 文件
c. 在执行程序之后,需要逐个打开结果文件,并保存一次
6. 参数一致性检查
执行程序前,需要确定 configs.xlsx 中设定的减速比/最大角速度/额定电流的值是正确的
7. 数据准确性检查
执行完程序之后需要对结果文件的数据准确性做核对通过我自己的数据观察误差基本在10ms以内也即10个数据点误差较大的情况可自行调整
8. .data 数据顺序
.data 文件的第一列和第二列必须分别是速度和电流
9. 其他
程序运行主要的耗时集中在打开,保存和关闭结果文件,第一次打开的时候会比较慢,是因为 excel 在做首次公式的计算保存关闭之后再打开会比较快一些另外如果在运行出错并重复运行程序的时候无响应或者出现异常请打开任务管理器关闭一切和excel相关的进程重新运行即可
RELEASE CHANGES
已知问题:
1. office套件下运行好像有问题WPS无问题集中在just_open函数的实现上
v0.0.1(2024/05/18)
Draft
v0.0.2(2024/05/20)
1. 功能模块化,为后面其他功能奠定一个基本的框架
2. 使用了多线程提高效率
3. 优化了准备工作中的细节
4. 运行初始化时自动删除 raw_data_dir 中的 .xlsx 文件
5. 优化了输出格式
6. 使用 pyinstaller 库进行代码冻结并调试成功
v0.0.3(2024/05/21)
1. just_open函数打开失败的信息中添加文件名
2. 删除global变量函数全部通过传参实现
3. configuration.xlsx配置文件增加AXIS常量表示那个轴取值为 j1/j2/j3/j4/j5/j6/j7
4. [bugfix] 增加get_threshold_step函数用来获取在计算row_start时合适的阈值和步长主要是解决了二轴最差工况下最大速度是个尖端的问题
a. load100_speed100_reachxxx 二轴 threshold = 50 step = 20
b. 其他 threshold = 50 step = 100
如上是一个比较保守的设定因为设定的step比较小找到点之后要往后延200最好
5. 在find_row_start_dp函数中新增一个参数data_file方便后续调试
v0.0.4(2024/05/22)
1. 重新标定了get_threshold_step函数让处理更加准确
2. 新定义了now_doing_msg函数实时输出处理信息
3. 修改了find_row_start和find_row_start_dp函数增加的部分相同处理数据的时候先判断是否是空值或者是0此时可以加快步进
4. 修改了just_open函数不在做重试
v0.0.5(2024/05/23)
1. 完善了函数注释
2. 调整了阈值和步长
3. 删除了just_open函数以及对应的win32com库Thank GOD!终于可以不用这个库了)
4. 重写了获取开始点位的代码直接使用speed来判断而不用角度所以find_row_start_dp以及copy_data_to_excel_file函数也被一并删除
5. 修改了配置文件configs.xlsx的初始参数顺序及结构使程序通用性更强
6. 将initialazation中的预定义变量赋值调整到try...except...之外,更方便排查问题
7. 修改结束时间的格式,精确到秒
v0.0.6(2024/05/23)
1. 为了调整多功能框架aio.py文件将会作为入口程序存在不实现具体功能功能的实现将由具体的功能脚本实现aio.py只负责条件调用
2. 新增了自动化处理电流数据(电机/过载)的功能
v0.1.0(2024/05/29)
1. 修改为customtkinter图形化界面
2. 支持工业机器人制动数据处理(理论上支持,测试数据有问题,待验证)
3. 删除configs.xlsx配置表格直接在界面配置新增layout.xlsx文件存储customtkinter布局
4. 电流尚未支持
v0.1.1(2024/05/30)
1. 增加版本检测功能
2. 修改了无效数据下的动作
3. textbox只在开始和结束时改变状态而不是每次写入都更改
4. 调整了brake的结构
5. 重新在write2textbox中添加exitcode参数并补齐相关逻辑和修改brake中的调用方式
6. 修复参数检查无效的情况
7. 屏蔽电流相关的功能

338
aio/aio.py Normal file
View File

@ -0,0 +1,338 @@
from os.path import exists
from os import getcwd
from threading import Thread
import tkinter.messagebox
import customtkinter
import brake
import current
from time import time, strftime, localtime
from urllib.request import urlopen
import socket
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions
socket.setdefaulttimeout(10)
class App(customtkinter.CTk):
def __init__(self):
super().__init__()
self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold")
self.w_param = 96
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
# self.iconbitmap('./icon.ico')
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
width = screen_width - 200
height = screen_height - 200
self.geometry(f"{width}x{height}+{100}+{100}")
self.protocol("WM_DELETE_WINDOW", self.func_end_call_back)
self.grid_rowconfigure(3, weight=1)
self.grid_columnconfigure((1, 2, 3, 4, 5, 6, 7, 8, 9), weight=1)
self.minsize(1100, 500)
# =====================================================================
# create frame sidebar(left)
self.frame_func = customtkinter.CTkFrame(self, width=120, corner_radius=0)
self.frame_func.grid(row=0, column=0, rowspan=7, sticky='nsew')
# create AIO logo
self.label_logo = customtkinter.CTkLabel(self.frame_func, text="Rokae AIO", height=60, font=customtkinter.CTkFont(family="Segoe Script Bold", size=24, weight="bold"), text_color="DarkSlateGray")
self.label_logo.grid(row=0, column=0, padx=15, pady=15)
# create start button
self.btn_start = customtkinter.CTkButton(self.frame_func, corner_radius=10, text='开始运行', font=self.my_font, command=lambda: self.thread_it(self.func_start_callback))
self.btn_start.grid(row=1, column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
# create param check button
self.btn_check = customtkinter.CTkButton(self.frame_func, corner_radius=10, text='检查参数', font=self.my_font, command=lambda: self.thread_it(self.func_check_callback))
self.btn_check.grid(row=2, column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
# create start button
self.btn_log = customtkinter.CTkButton(self.frame_func, corner_radius=10, text='保存日志', font=self.my_font, command=lambda: self.thread_it(self.func_log_callback))
self.btn_log.grid(row=3, column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
# create start button
self.btn_end = customtkinter.CTkButton(self.frame_func, corner_radius=10, text='结束运行', font=self.my_font, command=lambda: self.thread_it(self.func_end_call_back))
self.btn_end.grid(row=4, column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.1\nDate: 05/30/2024", font=self.my_font, text_color="DarkCyan")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================
# create frame
self.frame_param = customtkinter.CTkFrame(self, height=240, corner_radius=10)
self.frame_param.grid(row=0, column=1, rowspan=3, columnspan=9, sticky='new', ipadx=20, ipady=10, padx=10, pady=10)
# create main menu
self.menu_main = customtkinter.CTkOptionMenu(self.frame_param, values=["INIT", "brake", "current"], font=self.my_font, command=self.func_main_callback)
self.menu_main.grid(row=0, column=1, sticky='we', padx=(20, 10), pady=(10, 0))
self.menu_main.set("Start Here!")
# create sub menu
self.menu_sub = customtkinter.CTkOptionMenu(self.frame_param)
# create path related
self.label_path = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="Path", font=self.my_font)
self.label_path.grid(row=0, column=2, sticky='e', pady=(10, 5))
self.entry_path = customtkinter.CTkEntry(self.frame_param, width=680, placeholder_text="数据文件夹路径", font=self.my_font)
self.entry_path.grid(row=0, column=3, columnspan=7, padx=(5, 10), pady=(10, 5), sticky='we')
self.entry_path.configure(state='disabled')
# create av related
self.label_av = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="AV", font=self.my_font)
self.label_av.grid(row=1, column=2, sticky='e', pady=(5, 5))
self.entry_av = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"角速度", font=self.my_font)
self.entry_av.grid(row=1, column=3, padx=(5, 20), pady=(5, 5), sticky='w')
self.entry_av.configure(state='disabled')
# create rc related
self.label_rc = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="RC", font=self.my_font)
self.label_rc.grid(row=1, column=4, sticky='e', pady=(5, 5))
self.entry_rc = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"额定电流", font=self.my_font)
self.entry_rc.grid(row=1, column=5, padx=(5, 20), pady=(5, 5), sticky='w')
self.entry_rc.configure(state='disabled')
# create rpm related
self.label_rpm = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="RPM", font=self.my_font)
self.label_rpm.grid(row=1, column=6, sticky='e', pady=(5, 5))
self.entry_rpm = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"额定转速", font=self.my_font)
self.entry_rpm.grid(row=1, column=7, padx=(5, 20), pady=(5, 5), sticky='w')
self.entry_rpm.configure(state='disabled')
# create rr related
self.label_rr = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="RR", font=self.my_font)
self.label_rr.grid(row=1, column=8, sticky='e', pady=(5, 5))
self.entry_rr = customtkinter.CTkEntry(self.frame_param, width=self.w_param, placeholder_text=f"减速比", font=self.my_font)
self.entry_rr.grid(row=1, column=9, padx=(5, 20), pady=(5, 5), sticky='w')
self.entry_rr.configure(state='disabled')
# create axis related
self.label_axis = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="AXIS", font=self.my_font)
self.label_axis.grid(row=2, column=2, sticky='e', pady=(5, 5))
self.option_axis = customtkinter.CTkOptionMenu(self.frame_param, values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
self.option_axis.grid(row=2, column=3, padx=(5, 20), pady=(5, 5), sticky='w')
self.option_axis.configure(state='disabled')
# create vel related
self.label_vel = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="Vel", font=self.my_font)
self.label_vel.grid(row=2, column=4, sticky='e', pady=(5, 5))
self.option_vel = customtkinter.CTkOptionMenu(self.frame_param, values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
self.option_vel.grid(row=2, column=5, padx=(5, 20), pady=(5, 5), sticky='w')
self.option_vel.configure(state='disabled')
# create trq related
self.label_trq = customtkinter.CTkLabel(self.frame_param, width=self.w_param//10, text="Trq", font=self.my_font)
self.label_trq.grid(row=2, column=6, sticky='e', pady=(5, 5))
self.option_trq = customtkinter.CTkOptionMenu(self.frame_param, values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
self.option_trq.grid(row=2, column=7, padx=(5, 20), pady=(5, 5), sticky='w')
self.option_trq.configure(state='disabled')
# =====================================================================
# create textbox
self.textbox = customtkinter.CTkTextbox(self, height=640, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue")
self.textbox.grid(row=3, column=1, rowspan=5, columnspan=13, ipadx=10, ipady=10, padx=10, pady=10, sticky='nsew')
self.textbox.configure(state='disabled')
# =====================================================================
# version check
cur_vers = self.label_version.cget("text").replace('\n', ' @ ').replace("Vers: ", '').replace("Date: ", '')
url_vers = 'http://10.2.23.150:10008/vers'
try:
new_vers = urlopen(url_vers).read().decode('utf-8')
if cur_vers.strip() != new_vers.strip():
msg = f"""当前版本:{cur_vers}\n更新版本:{new_vers}\n\n请及时更新 http://10.2.23.150:10003/s/jRfM"""
tkinter.messagebox.showwarning(title="版本更新", message=msg)
except:
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
def thread_it(self, func, *args):
""" 将函数打包进线程 """
self.myThread = Thread(target=func, args=args)
self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
self.myThread.start()
def initialization(self):
self.label_path.configure(text="Path", text_color="black")
self.label_av.configure(text="AV", text_color="black")
self.label_rc.configure(text="RC", text_color="black")
self.label_rpm.configure(text="RPM", text_color="black")
self.label_rr.configure(text="RR", text_color="black")
self.label_axis.configure(text="AXIS", text_color="black")
self.label_vel.configure(text="Vel", text_color="black")
self.label_trq.configure(text="Trq", text_color="black")
self.entry_path.configure(placeholder_text="数据文件夹路径", state="disabled")
self.entry_av.configure(placeholder_text="角速度", state="disabled")
self.entry_rc.configure(placeholder_text="额定电流", state="disabled")
self.entry_rpm.configure(placeholder_text="额定转速", state="disabled")
self.entry_rr.configure(placeholder_text="减速比", state="disabled")
self.option_axis.configure(state="disabled")
self.option_vel.configure(state="disabled")
self.option_trq.configure(state="disabled")
self.menu_sub.grid_forget()
self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
def func_main_callback(self, func_name):
self.initialization()
if func_name == 'brake':
self.menu_sub = customtkinter.CTkOptionMenu(self.frame_param, values=["industrial", "cobot"], font=self.my_font, command=self.func_sub_callback)
self.menu_sub.grid(row=1, column=1, sticky='we', padx=(20, 10), pady=(5, 5))
self.menu_sub.set("--select--")
self.label_path.configure(text="*Path", text_color='red')
self.label_av.configure(text="*AV", text_color='red')
self.label_rr.configure(text="*RR", text_color='red')
self.label_axis.configure(text="*AXIS", text_color='red')
self.label_vel.configure(text="*Vel", text_color='red')
self.label_trq.configure(text="*Trq", text_color='red')
self.entry_path.configure(state="normal")
self.entry_av.configure(state="normal")
self.entry_rr.configure(state="normal")
self.option_axis.configure(state="normal")
self.option_vel.configure(state="normal")
self.option_trq.configure(state="normal")
elif func_name == 'current':
self.menu_sub = customtkinter.CTkOptionMenu(self.frame_param, values=["max", "avg"], font=self.my_font, command=self.func_sub_callback)
self.menu_sub.grid(row=1, column=1, sticky='we', padx=(20, 10), pady=(5, 5))
self.menu_sub.set("--select--")
self.label_path.configure(text="*Path", text_color='red')
self.label_rc.configure(text="*RC", text_color='red')
self.label_trq.configure(text="*Trq", text_color='red')
self.entry_path.configure(state="normal")
self.entry_rc.configure(state="normal")
self.option_trq.configure(state="normal")
else:
self.initialization()
self.menu_main.set("Start Here!")
def func_sub_callback(self, func_name):
if func_name == "industrial":
self.label_rpm.configure(text="*RPM", text_color='red')
self.entry_rpm.configure(state="normal")
elif func_name == "cobot":
self.label_rpm.configure(text="RPM", text_color='black')
self.entry_rpm.configure(state="disabled")
elif func_name == "max":
pass
elif func_name == 'avg':
pass
def write2textbox(self, text, wait=0, exitcode=0):
if wait != 0:
self.textbox.configure(text_color='blue')
self.textbox.insert(index='end', text=text)
self.textbox.update()
self.textbox.see('end')
elif exitcode != 0:
self.textbox.configure(text_color='red')
self.textbox.insert(index='end', text=text + '\n')
self.textbox.update()
self.textbox.see('end')
raise Exception(f"Error code: {exitcode}")
else:
self.textbox.configure(text_color='blue')
self.textbox.insert(index='end', text=text + '\n')
self.textbox.update()
self.textbox.see('end')
def check_param(self):
func_name = self.menu_main.get()
if func_name == 'brake':
path = self.entry_path.get().strip(' ')
av = self.entry_av.get().strip('- ')
rr = self.entry_rr.get().strip('- ')
axis = self.option_axis.get()
vel = self.option_vel.get()
trq = self.option_trq.get()
sub_func = self.menu_sub.get()
c1 = exists(path)
c2 = av.isdigit()
c3 = rr.isdigit()
c4 = rpm = 1
c5 = sub_func in ['industrial', 'cobot']
if self.menu_sub.get() == 'industrial':
rpm = self.entry_rpm.get().strip('- ')
c4 = rpm.isdigit()
elif self.menu_sub.get() == 'cobot':
pass
else:
pass
if c1 and c2 and c3 and c4 and c5:
return 1, path, int(av), int(rr), int(rpm), int(axis), int(vel), int(trq)
else:
return 0, 0
elif func_name == 'current':
path = self.entry_path.get()
rc = self.entry_rc.get()
vel = self.option_vel.get()
trq = self.option_trq.get()
sub_func = self.menu_sub.get()
c1 = exists(path)
c2 = sub_func in ['max', 'avg']
try:
_ = float(rc)
c3 = True
except ValueError:
c3 = False
if c1 and c2 and c3:
return 2, path, float(rc), int(vel), int(trq), sub_func
else:
return 0, 0
else:
return 0, 0
def func_start_callback(self):
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main}
if flag == 1:
func_dict[flag](path=args[0], av=args[1], rr=args[2], rpm=args[3], axis=args[4], vel=args[5], trq=args[6], w2t=self.write2textbox)
elif flag == 2:
tkinter.messagebox.showinfo(title="TBD", message="功能待实现......")
# func_dict[flag](path=args[0], rc=args[1], sub_func=args[2])
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
self.textbox.configure(state='disabled')
def func_check_callback(self):
flag, *args = self.check_param()
if flag:
tkinter.messagebox.showinfo(title="参数正确", message="所有参数形式上填写无误,可以开始运行!")
else:
tkinter.messagebox.showerror(title="参数错误", message="需要检查对应参数是否填写正确!", )
def func_log_callback(self):
content = self.textbox.get(index1='1.0', index2='end')
if len(content) > 1:
try:
now = strftime('%Y%m%d%H%M%S', localtime(time()))
log_name = f"{now}_aio.log"
with open(f'{log_name}', 'w', encoding='utf-8') as objlog:
objlog.write(content)
tkinter.messagebox.showinfo(title="保存成功", message=f'{log_name}已被保存存至↓↓↓\n{getcwd()}')
except Exception as Err:
print(Err)
tkinter.messagebox.showerror(title="保存失败", message="未能保存本次日志或未能完整保存请准备好相关数据联系fanmingfu@rokae.com查看详细信息", )
else:
tkinter.messagebox.showwarning(title="未能保存", message="日志数据为空,不可保存!")
def func_end_call_back(self):
if tkinter.messagebox.askyesno(title="关闭程序", message="相关数据可能未保存,正在运行程序时有概率会损坏数据文件,确定要终止程序运行吗?"):
self.destroy()
else:
pass
pass
if __name__ == "__main__":
aio = App()
aio.mainloop()

326
aio/brake.py Normal file
View File

@ -0,0 +1,326 @@
# coding: utf-8
from os import scandir
from os.path import isfile, exists
from sys import argv
from openpyxl import load_workbook
from time import time, sleep, strftime, localtime
from threading import Thread
from pandas import read_csv
class GetThreadResult(Thread):
def __init__(self, func, args=()):
super(GetThreadResult, self).__init__()
self.func = func
self.args = args
self.result = 0
def run(self):
sleep(1)
self.result = self.func(*self.args)
def get_result(self):
Thread.join(self) # 等待线程执行完毕
try:
return self.result
except Exception as Err:
return None
def data_process(result_file, raw_data_dirs, av, rr, axis, vel, trq, w2t, rpm):
# 功能:完成一个结果文件的数据处理
# 参数:结果文件,数据目录,以及预读取的参数
# 返回值:-
file_name = result_file.split('\\')[-1]
w2t(f"打开文件 {file_name} 需要 1min 左右", 1)
global stop
stop = 0
t_excel = GetThreadResult(load_workbook, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
wb_result = t_excel.get_result()
stop = 1
sleep(1.1)
w2t('')
prefix = result_file.split('\\')[-1].split('_')[0]
for raw_data_dir in raw_data_dirs:
if raw_data_dir.split('\\')[-1].split('_')[0] == prefix:
now_doing_msg(raw_data_dir, 'start', w2t)
_, data_files = traversal_files(raw_data_dir, w2t)
# 数据文件串行处理模式---------------------------------
# count = 1
# for data_file in data_files:
# now_doing_msg(data_file, 'start', w2t)
# single_file_process(data_file, wb_result, count, av, rr, axis, vel, trq, w2t, rpm)
# count += 1
# now_doing_msg(data_file, 'done', w2t)
# ---------------------------------------------------
# 数据文件并行处理模式---------------------------------
threads = [Thread(target=single_file_process, args=(data_files[0], wb_result, 1, av, rr, axis, vel, trq, w2t, rpm)),
Thread(target=single_file_process, args=(data_files[1], wb_result, 2, av, rr, axis, vel, trq, w2t, rpm)),
Thread(target=single_file_process, args=(data_files[2], wb_result, 3, av, rr, axis, vel, trq, w2t, rpm))]
[t.start() for t in threads]
[t.join() for t in threads]
# ---------------------------------------------------
now_doing_msg(raw_data_dir, 'done', w2t)
now_doing_msg(result_file, 'done', w2t)
w2t(f"保存文件 {file_name} 需要 1min 左右", 1)
stop = 0
t_excel = Thread(target=wb_result.save, args=(result_file, ))
t_wait = Thread(target=w2t_local, args=('.', 1, w2t))
t_excel.start()
t_wait.start()
t_excel.join()
stop = 1
sleep(1.1)
w2t('\n')
def check_files(raw_data_dirs, result_files, w2t):
# 功能:检查数据文件以及结果文件的合规性
# 参数:数据文件夹,结果文件
# 返回值:-
if len(result_files) != 3:
msg = "结果文件数目错误,结果文件有且只有三个,请确认!"
for result_file in result_files:
w2t(result_file)
w2t(msg, 0, 2)
prefix = []
for result_file in result_files:
prefix.append(result_file.split('\\')[-1].split('_')[0])
if not sorted(prefix) == sorted(['load33', 'load66', 'load100']):
wd = result_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
msg = f"""请关闭所有相关数据文件,并检查工作目录 {wd} 下,有且只允许有类似如下三个文件:
1. load33_自研_制动性能测试.xlsx
2. load66_自研_制动性能测试.xlsx
3. load100_自研_制动性能测试.xlsx"""
w2t(msg, 0, 3)
for raw_data_dir in raw_data_dirs:
components = raw_data_dir.split('\\')[-1].split('_')
sorted(components)
if components[0] not in ['load33', 'load66', 'load100'] or \
components[1] not in ['speed33', 'speed66', 'speed100'] or \
components[2] not in ['reach33', 'reach66', 'reach100']:
msg = f"报错信息:数据目录 {raw_data_dir} 命名不合规,请参考如下形式\n" \
f"命名规则:\n 1. loadAA_speedBB_reachCC\n 2. loadAA_reachBB_speedCC\n" \
f"规则解释AA/BB/CC 指的是负载/速度/臂展的比例\n" \
f"load66_speed100_reach3366% 负载100% 速度以及 33% 臂展情况下的测试结果文件夹"
w2t(msg, 0, 4)
_, raw_data_files = traversal_files(raw_data_dir, w2t)
if len(raw_data_files) != 3:
msg = f"数据目录 {raw_data_dir} 下数据文件个数错误,每个数据目录下有且只能有三个以 .data 为后缀的数据文件"
w2t(msg, 0, 5)
for raw_data_file in raw_data_files:
if not (raw_data_file.split('\\')[-1].endswith('.data') or raw_data_file.split('\\')[-1].endswith('.csv')):
msg = f"数据文件 {raw_data_file} 后缀错误,每个数据目录下有且只能有三个以 .data/csv 为后缀的数据文件"
w2t(msg, 0, 6)
w2t("数据目录合规性检查结束,未发现问题......")
def now_doing_msg(docs, flag, w2t):
# 功能:输出正在处理的文件或目录
# 参数文件或目录start 或 done 标识
# 返回值:-
now = strftime('%Y-%m-%d %H:%M:%S', localtime(time()))
file_type = 'file' if isfile(docs) else 'dir'
if flag == 'start' and file_type == 'dir':
w2t(f"[{now}] 正在处理目录 {docs} 中的数据......")
elif flag == 'start' and file_type == 'file':
w2t(f"[{now}] 正在处理文件 {docs} 中的数据......")
elif flag == 'done' and file_type == 'dir':
w2t(f"[{now}] 目录 {docs} 数据文件已处理完毕......")
elif flag == 'done' and file_type == 'file':
w2t(f"[{now}] 文件 {docs} 数据已处理完毕......")
def w2t_local(msg, wait, w2t):
while True:
global stop
if stop == 0 and wait != 0:
sleep(1)
w2t(msg, wait)
else:
break
def single_file_process(data_file, wb_result, count, av, rr, axis, vel, trq, w2t, rpm):
# 功能:完成单个数据文件的处理
# 参数:如上
# 返回值:-
if data_file.endswith('.data'):
sep = '\t'
df = read_csv(data_file, sep=sep)
elif data_file.endswith('.csv'):
sep = ','
df = read_csv(data_file, sep=sep, encoding='gbk', header=8)
conditions = sorted(data_file.split('\\')[-2].split('_')[1:])
result_sheet_name = find_result_sheet_name(conditions, count)
ws_result = wb_result[result_sheet_name]
row_max, row_start, flag = find_row_start(data_file, df, conditions, av, rr, axis, vel, w2t, rpm)
copy_data_to_result(flag, df, ws_result, row_max, row_start, vel, trq, rpm)
ws_result["C2"] = int(2)
ws_result["G2"] = int(10+4)
def copy_data_to_result(flag, df, ws_result, row_max, row_start, vel, trq, rpm):
# 功能:将数据文件中有效数据拷贝至结果文件对应的 sheet
# 参数:如上
# 返回值:-
# 结果文件数据清零
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=2000, max_col=2):
for cell in row:
cell.value = None
if flag == 1:
# 将合适的数据复制到结果文件
row_max = row_start + 399 if row_max-row_start > 400 else row_max
rc = 1 if rpm == 1 else 1000
data = []
for i in range(row_start, row_max+1):
data.append(df.iloc[i, vel-1] * rpm)
data.append(df.iloc[i, trq-1] * rc)
i = 0
for row in ws_result.iter_rows(min_row=2, min_col=1, max_row=row_max - row_start + 2, max_col=2):
for cell in row:
cell.value = data[i]
i = i + 1
else:
pass
def find_result_sheet_name(conditions, count):
# 功能获取结果文件准确的sheet页名称
# 参数:臂展和速度的列表
# 返回值结果文件对应的sheet name
# 33%臂展_33%速度_正1
reach = conditions[0].removeprefix('reach')
speed = conditions[1].removeprefix('speed')
result_sheet_name = f"{reach}%臂展_{speed}%速度_正{count}"
return result_sheet_name
def find_row_start(data_file, df, conditions, av, rr, axis, vel, w2t, rpm):
# 功能:查找数据文件中有效数据的行号,也即最后一个速度下降的点位
# 参数:如上
# 返回值:速度下降点位,最后的数据点位
ratio = float(conditions[1].removeprefix('speed'))/100
speed_max = av * ratio * rr / 6
row_max = row_start = df.index[-1]
threshold, step = get_threshold_step(data_file, axis)
while row_start > step+1:
speed = abs(df.iloc[row_start, vel-1] * rpm)
if int(speed) < 1:
row_start -= 50
continue
_ = []
for i in range(row_start, row_start-step+1, -1):
_.append(df.iloc[i, vel-1] * rpm)
speed_avg = abs(sum(_))/len(_)
if abs(speed_avg-speed_max) < threshold:
row_start = row_start - 10
flag = 1
break
else:
row_start -= step
else:
msg = f"可能是{data_file}这个文件数据采集有问题,比如未采集理论速度值,也有可能是程序步长设定问题,请检查......"
w2t(msg)
flag = 0
return row_max, row_start, flag
def get_threshold_step(excel_file, axis):
# 功能负载和速度100%且是j2的时候做特殊处理
# 参数新生成的excel轴号
# 返回值:速度差阈值,处理步长
conditions = sorted(excel_file.split('\\')[-2].split('_'))
# 只有负载和速度是100%时才会启用更敏感的step
flg = 1 if conditions[0][-3:] == '100' and conditions[2][-3:] == '100' else 0
if flg == 1 and axis == 2:
threshold = 30
step = 5
else:
threshold = 10
step = 5
return threshold, step
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1)
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def main(path, av, rr, rpm, axis, vel, trq, w2t):
# 功能:执行处理所有数据文件
# 参数initialization函数的返回值
# 返回值:-
time_start = time()
raw_data_dirs, result_files = traversal_files(path, w2t)
prefix = []
for raw_data_dir in raw_data_dirs:
prefix.append(raw_data_dir.split('\\')[-1].split("_")[0])
try:
# threads = []
check_files(raw_data_dirs, result_files, w2t)
for result_file in result_files:
if result_file.split('\\')[-1].split('_')[0] not in set(prefix):
continue
else:
now_doing_msg(result_file, 'start', w2t)
data_process(result_file, raw_data_dirs, av, rr, axis, vel, trq, w2t, rpm)
# threads.append(Thread(target=data_process, args=(result_file, raw_data_dirs, AV, RR, RC, AXIS)))
# [t.start() for t in threads]
# [t.join() for t in threads]
except Exception as Err:
msg = f"出现错误:{Err}\n程序运行错误,请检查配置文件是否准确设定,以及数据文件组织是否正确,也有可能是结果文件损坏,尝试重新复制一份,再运行!"
w2t(msg)
w2t("----------------------------------------------------------")
w2t("全部处理完毕")
time_end = time()
time_total = time_end - time_start
msg = f"数据处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s"
w2t(msg)
if __name__ == "__main__":
stop = 0
main(path=argv[1], av=argv[2], rr=argv[3], rpm=argv[4], axis=argv[5], vel=argv[6], trq=argv[7], w2t=argv[8])

103
aio/current.py Normal file
View File

@ -0,0 +1,103 @@
import openpyxl
import os
import sys
import pandas
def warn_pause_exit(msg, pause_num, exit_num):
# 功能:打印告警信息,并推出程序
# 参数:告警信息,暂停的次数,退出的值
# 返回值:-
print(msg + '\n')
for i in range(pause_num):
_ = input("Press ENTER to continue......\n")
sys.exit(exit_num)
def traversal_files(path):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not os.path.exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
warn_pause_exit(msg, 1, 11)
else:
dirs = []
files = []
for item in os.scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def initialization():
try:
# read init configurations from config file
wb_conf = openpyxl.load_workbook('./configs.xlsx', read_only=True)
ws_conf = wb_conf['current']
except Exception as Err:
msg = "无法在当前路径下找到或打开【configs.xlsx】文件请确认"
warn_pause_exit(msg, 1, 2)
FUNC = ws_conf.cell(row=2, column=2).value
DATA_DIR = ws_conf.cell(row=3, column=2).value
AXIS = int(ws_conf.cell(row=4, column=2).value)
RC = ws_conf.cell(row=5, column=2).value.split(',')
COLUMN = int(ws_conf.cell(row=6, column=2).value)
wb_conf.close()
_, data_files = traversal_files(DATA_DIR)
for data_file in data_files:
if not data_file.endswith('.data'):
msg = f"所有文件必须以 .data 结尾,请检查后重新运行。"
warn_pause_exit(msg, 1, 1)
return data_files, FUNC, RC, COLUMN, AXIS
def current_max(*args):
data_files, FUNC, RC, COLUMN, AXIS = args
for data_file in data_files:
df = pandas.read_csv(data_file, sep='\t')
col = df.columns.values[COLUMN-1]
c_max = df[col].max()
print(f"{data_file}: {c_max/1000*RC[AXIS-1]:.3f}")
msg = f"数据处理完毕......"
warn_pause_exit(msg, 1, 0)
def current_avg(*args):
data_files, FUNC, RC, COLUMN, AXIS = args
for data_file in data_files:
df = pandas.read_csv(data_file, sep='\t')
col = df.columns.values[COLUMN-1]
c_std = df[col].std()
c_avg = df[col].mean()
axis = int(data_file.split('\\')[-1].split('_')[0].replace('j', ''))
print(f"{data_file}: {(abs(c_avg)+c_std)/1000*float(RC[axis-1]):.3f}")
msg = f"数据处理完毕......"
warn_pause_exit(msg, 1, 0)
def full_cycle(*args):
pass
def main():
args = initialization()
if args[1] == 'max':
current_max(*args)
elif args[1] == 'avg':
current_avg(*args)
elif args[1] == 'cycle':
full_cycle(*args)
if __name__ == '__main__':
main()

43
aio/file_version_info.txt Normal file
View File

@ -0,0 +1,43 @@
# UTF-8
#
# For more details about fixed file info 'ffi' see:
# http://msdn.microsoft.com/en-us/library/ms646997.aspx
VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 1, 1, 0),
prodvers=(0, 1, 1, 0),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
flags=0x0,
# The operating system for which this file was designed.
# 0x4 - NT and there is no need to change it.
OS=0x4,
# The general type of file.
# 0x1 - the file is an application.
fileType=0x1,
# The function of the file.
# 0x0 - the function is not defined for this fileType
subtype=0x0,
# Creation date and time stamp.
date=(0, 0)
),
kids=[
StringFileInfo(
[
StringTable(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.1 (2024-05-24)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.1 (2024-05-29)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]
)

BIN
aio/icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

BIN
aio/layout.xlsx Normal file

Binary file not shown.

1
aio/vers Normal file
View File

@ -0,0 +1 @@
1.1.1 @ 05/30/2024