2 Commits

Author SHA1 Message Date
802ccd8e97 v0.1.7.1(2024/06/29)
1. [APIs: aio.py]
   - 对于automatic test删除了输入框,使用configs.xlsx配置文件作为参数输入
   - 完善initialization/param_check/func_start_callback函数中对于automatic test的处理
   - 将textbox组件一直设置为normal状态,不再频繁切换disabled
   - 将所有的f_h文件对象修改为f_hb,并将connection_state修改为c_state
   - 在detect_network函数中,实例化HmiRequest,并在无限循环中检测心跳是否正常,如异常,则销毁hr,重新生成
   - 取消在tabview切换时,检测心跳的逻辑,这样做无法保证实时性
2. [APIs: openapi.py]
   - 将sock_conn函数移出__init__,单独作为连接函数存在
   - 新增全局变量self.t_bool,控制所有的线程中无限循环的启停,也就是可以人为的退出线程
   - 移除close_sock函数
   - heartbeat函数中新增打印所有消息的代码,调试时打开,平常关闭
   - execution函数中,新增对overview.set_autoload和overview.reload的支持
   - execution函数中,对send动作增加异常处理逻辑
3. [APIs: do_brake.py]
   - 新增文件,处理制动测试流程,建立连接,导入project,pp2main,run,采集并处理曲线数据,本地修改RL程序,推送至控制器等
   - 目前完成:
     - 文件合规性检查
     - 导入工程并设置为运行工程
4. [APIs: current.py] 修改scenario/single电机电流最大长度为150s
5. 在本文件中更新关于制动自动化测试的相关内容
2024-06-29 07:48:54 +08:00
79797a3bdd 20240626
9. [aio.py] 修改了版本
10. [current.py] max/avg功能结束之前会将结果数据追加写入源文件,avg算法更改为average+3×std
11. [wavelogger.py] 算法更改为 average+3×std
2024-06-26 21:38:21 +08:00
14 changed files with 319 additions and 125 deletions

View File

@ -6,6 +6,8 @@
2. 电机电流数据,全部轴数据处理 1min 以内 2. 电机电流数据,全部轴数据处理 1min 以内
3. ISO 激光数据整理1min 以内 3. ISO 激光数据整理1min 以内
4. wavelogger 波形处理,几乎不花费时间 4. wavelogger 波形处理,几乎不花费时间
5. 制动自动化测试
--- ---
@ -112,6 +114,14 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
3. 运行结束后,会生成 result.xlsx 文件,结果按照 .csv 文件名存放 3. 运行结束后,会生成 result.xlsx 文件,结果按照 .csv 文件名存放
4. 采集数据时,不同轮次数据时间间隔最好大于 2 倍的周期时间,否则会出现采集的轮数不正确的情况,但数据是完整的 4. 采集数据时,不同轮次数据时间间隔最好大于 2 倍的周期时间,否则会出现采集的轮数不正确的情况,但数据是完整的
#### 5) 制动自动化测试
只需要提前将如下文件放在指定路径下即可:
1. zip 工程文件
2. excel 制动结果处理文件
3. excel configs.xlsx 配置文件
#### 其他 #### 其他
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现: customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
a. 运行 `pip show customtkinter`,获取到库的路径 a. 运行 `pip show customtkinter`,获取到库的路径
@ -125,7 +135,7 @@ RELEASE CHANGES
已知问题: 已知问题:
1. office套件下运行好像有问题WPS无问题集中在just_open函数的实现上 1. -
v0.0.1(2024/05/18) v0.0.1(2024/05/18)
Draft Draft
@ -301,6 +311,32 @@ v0.1.7.0(2024/06/26)-初步可用
3. [openapi.py] 新增sock_conn函数并做连接时的异常处理新增类参数w2t 3. [openapi.py] 新增sock_conn函数并做连接时的异常处理新增类参数w2t
4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小使用原生字体同时将segmented button字体修改为原生为了解决segmented button在禁用和启用时屏幕抖动的问题并将大小修改为16 4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小使用原生字体同时将segmented button字体修改为原生为了解决segmented button在禁用和启用时屏幕抖动的问题并将大小修改为16
5. [aio.py] 修改了segmented_button_callback的实现逻辑使代码更简洁 5. [aio.py] 修改了segmented_button_callback的实现逻辑使代码更简洁
6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作,使每次切换标签都会重新实例化,也就是每次都会重新连接,修复显示不正确的问题 6. [aio.py] 修改了在tabview_click函数中对于实例化HmiRequest的动作,使每次切换标签都会重新实例化,也就是每次都会重新连接,修复显示不正确的问题
7. [openapi.py] 新增了socket关闭的函数并增加msg_id为None的处理逻辑 7. [openapi.py] 新增了socket关闭的函数并增加msg_id为None的处理逻辑
8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑 8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑
9. [aio.py] 修改了版本
10. [current.py] max/avg功能结束之前会将结果数据追加写入源文件avg算法更改为average+3×std
11. [wavelogger.py] 算法更改为 average+3×std
v0.1.7.1(2024/06/29)
1. [APIs: aio.py]
- 对于automatic test删除了输入框使用configs.xlsx配置文件作为参数输入
- 完善initialization/param_check/func_start_callback函数中对于automatic test的处理
- 将textbox组件一直设置为normal状态不再频繁切换disabled
- 将所有的f_h文件对象修改为f_hb并将connection_state修改为c_state
- 在detect_network函数中实例化HmiRequest并在无限循环中检测心跳是否正常如异常则销毁hr重新生成
- 取消在tabview切换时检测心跳的逻辑这样做无法保证实时性
2. [APIs: openapi.py]
- 将sock_conn函数移出__init__单独作为连接函数存在
- 新增全局变量self.t_bool控制所有的线程中无限循环的启停也就是可以人为的退出线程
- 移除close_sock函数
- heartbeat函数中新增打印所有消息的代码调试时打开平常关闭
- execution函数中新增对overview.set_autoload和overview.reload的支持
- execution函数中对send动作增加异常处理逻辑
3. [APIs: do_brake.py]
- 新增文件处理制动测试流程建立连接导入projectpp2mainrun采集并处理曲线数据本地修改RL程序推送至控制器等
- 目前完成:
- 文件合规性检查
- 导入工程并设置为运行工程
4. [APIs: current.py] 修改scenario/single电机电流最大长度为150s
5. 在本文件中更新关于制动自动化测试的相关内容

BIN
aio/assets/configs.xlsx Normal file

Binary file not shown.

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.get_autoload"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.get_cur_prj"
}

View File

@ -0,0 +1,9 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.reload",
"data": {
"prj_path": "",
"tasks": []
}
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "overview.set_autoload",
"data": {
"autoload_prj_path": ""
}
}

View File

@ -1,4 +1,3 @@
import sys
import tkinter import tkinter
from os.path import exists, dirname from os.path import exists, dirname
from os import getcwd from os import getcwd
@ -46,24 +45,6 @@ widgits_dp = {
widgits_at = { widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'}, 'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'}, 'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
'av1': {'label': '', 'entry': '', 'row': 3, 'col': 2, 'text': '角速度'},
'av2': {'label': '', 'entry': '', 'row': 3, 'col': 4, 'text': '角速度'},
'av3': {'label': '', 'entry': '', 'row': 3, 'col': 6, 'text': '角速度'},
'av4': {'label': '', 'entry': '', 'row': 3, 'col': 8, 'text': '角速度'},
'av5': {'label': '', 'entry': '', 'row': 3, 'col': 10, 'text': '角速度'},
'av6': {'label': '', 'entry': '', 'row': 3, 'col': 12, 'text': '角速度'},
'rc1': {'label': '', 'entry': '', 'row': 4, 'col': 2, 'text': '额定电流'},
'rc2': {'label': '', 'entry': '', 'row': 4, 'col': 4, 'text': '额定电流'},
'rc3': {'label': '', 'entry': '', 'row': 4, 'col': 6, 'text': '额定电流'},
'rc4': {'label': '', 'entry': '', 'row': 4, 'col': 8, 'text': '额定电流'},
'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'},
'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'},
'rr1': {'label': '', 'entry': '', 'row': 5, 'col': 2, 'text': '额定转速'},
'rr2': {'label': '', 'entry': '', 'row': 5, 'col': 4, 'text': '额定转速'},
'rr3': {'label': '', 'entry': '', 'row': 5, 'col': 6, 'text': '额定转速'},
'rr4': {'label': '', 'entry': '', 'row': 5, 'col': 8, 'text': '额定转速'},
'rr5': {'label': '', 'entry': '', 'row': 5, 'col': 10, 'text': '额定转速'},
'rr6': {'label': '', 'entry': '', 'row': 5, 'col': 12, 'text': '额定转速'},
} }
@ -99,7 +80,7 @@ class App(customtkinter.CTk):
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback)) btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback)) btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info # create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.6.3\nDate: 06/18/2024", font=self.my_font, text_color="#4F4F4F") self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.7.0\nDate: 06/26/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1) self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s') self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# ===================================================================== # =====================================================================
@ -159,12 +140,6 @@ class App(customtkinter.CTk):
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font) widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we') widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled') widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f"{widgit.upper()}", font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=self.w_param, placeholder_text=f"{widgits_at[widgit]['text']}", font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]: elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font) widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we') widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
@ -174,7 +149,7 @@ class App(customtkinter.CTk):
# create textbox # create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5) self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew') self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.configure(state='disabled') self.textbox.configure(state='normal')
# ===================================================================== # =====================================================================
# version check # version check
cur_vers = self.label_version.cget("text").replace('\n', ' @ ').replace("Vers: ", '').replace("Date: ", '') cur_vers = self.label_version.cget("text").replace('\n', ' @ ').replace("Vers: ", '').replace("Date: ", '')
@ -199,13 +174,12 @@ class App(customtkinter.CTk):
value = self.seg_button.get() value = self.seg_button.get()
self.seg_button.configure(state='disabled') self.seg_button.configure(state='disabled')
self.textbox.configure(state='normal')
# self.tabview.configure(state='disabled') # self.tabview.configure(state='disabled')
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h: with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
connection_state = f_h.read().strip() c_state = f_h.read().strip()
if connection_state == '0' and value != '功能切换': if c_state == '0' and value != '功能切换':
self.write2textbox("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 50, 'red') self.write2textbox("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 50, 'red')
else: else:
for _func in _btn_funcs: for _func in _btn_funcs:
@ -214,23 +188,29 @@ class App(customtkinter.CTk):
break break
self.seg_button.configure(state='normal') self.seg_button.configure(state='normal')
self.textbox.configure(state='disable')
# self.tabview.configure(state='normal') # self.tabview.configure(state='normal')
def detect_network(self): def detect_network(self):
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_h.write('0') f_hb.write('0')
self.hr = openapi.HmiRequest(self.write2textbox)
while True: while True:
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h: with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb:
pb_color = 'green' if f_h.read().strip() == '1' else 'red' c_state = f_hb.read().strip()
pb_color = 'green' if c_state == '1' else 'red'
self.progressbar.configure(progress_color=pb_color) self.progressbar.configure(progress_color=pb_color)
sleep(1) sleep(1)
if c_state == '0':
self.textbox.delete(index1='1.0', index2='end')
self.hr.t_bool = False
sleep(1)
del self.hr
self.hr = openapi.HmiRequest(self.write2textbox)
def tabview_click(self): def tabview_click(self):
self.initialization() self.initialization()
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
tab_name = self.tabview.get() tab_name = self.tabview.get()
if tab_name == 'Data Process': if tab_name == 'Data Process':
@ -238,20 +218,6 @@ class App(customtkinter.CTk):
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
self.menu_main_at.set("Start Here!") self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal') self.seg_button.configure(state='normal')
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
self.textbox.configure(state='normal')
try:
self.hr.close_sock()
self.hr = openapi.HmiRequest(self.write2textbox)
except:
self.hr = openapi.HmiRequest(self.write2textbox)
# if connection_state == '0' or self.hr is None:
# self.textbox.configure(state='normal')
# self.hr = openapi.HmiRequest(self.write2textbox)
# self.textbox.configure(state='disabled')
def initialization(self): def initialization(self):
tab_name = self.tabview.get() tab_name = self.tabview.get()
@ -270,7 +236,6 @@ class App(customtkinter.CTk):
self.menu_sub_dp.grid_forget() self.menu_sub_dp.grid_forget()
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
for widgit in widgits_at: for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']: if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
@ -319,7 +284,29 @@ class App(customtkinter.CTk):
self.initialization() self.initialization()
self.menu_main_dp.set("Start Here!") self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
pass if func_name == 'brake':
for widgit in widgits_at:
if widgit in ['path',]:
widgits_at[widgit]['label'].configure(text_color='red')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='normal', text_color='red')
elif func_name == 'current':
for widgit in widgits_at:
if widgit in ['path',]:
widgits_at[widgit]['label'].configure(text_color='red')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(text_color='black', state='disabled')
else:
self.initialization()
self.menu_main_at.set("Start Here!")
def func_sub_callback(self, func_name): def func_sub_callback(self, func_name):
if func_name == "max": if func_name == "max":
@ -359,7 +346,6 @@ class App(customtkinter.CTk):
self.textbox.insert(index='end', text=text + '\n', tags=color) self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update() self.textbox.update()
self.textbox.see('end') self.textbox.see('end')
self.textbox.configure(state='disabled')
raise Exception(f"Error code: {exitcode}") raise Exception(f"Error code: {exitcode}")
else: else:
self.textbox.insert(index='end', text=text + '\n', tags=color) self.textbox.insert(index='end', text=text + '\n', tags=color)
@ -441,16 +427,14 @@ class App(customtkinter.CTk):
# ======================================================= # =======================================================
elif func_name == 'iso': elif func_name == 'iso':
path = widgits_dp['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path) if exists(path):
if c1:
return 3, path return 3, path
else: else:
return 0, 0 return 0, 0
# ======================================================= # =======================================================
elif func_name == 'wavelogger': elif func_name == 'wavelogger':
path = widgits_dp['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path) if exists(path):
if c1:
return 4, path return 4, path
else: else:
return 0, 0 return 0, 0
@ -460,16 +444,28 @@ class App(customtkinter.CTk):
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
func_name = self.menu_main_at.get() func_name = self.menu_main_at.get()
if func_name == 'brake': if func_name == 'brake':
pass path = widgits_at['path']['entry'].get().strip()
loadsel = widgits_at['loadsel']['optionmenu'].get()
c1 = exists(path)
c2 = loadsel in ['tool100', 'tool66', 'tool33']
if c1 and c2:
return 5, path, loadsel
else:
return 0, 0
elif func_name == 'current': elif func_name == 'current':
pass path = widgits_at['path']['entry'].get().strip()
if exists(path):
return 6, path
else:
return 0, 0
else:
return 0, 0
def func_start_callback(self): def func_start_callback(self):
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param() flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main} func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main}
if flag == 1: if flag == 1:
func_dict[flag](path=args[0], av=args[1], rr=args[2], axis=args[3], vel=args[4], trq=args[5], estop=args[6], w2t=self.write2textbox) func_dict[flag](path=args[0], av=args[1], rr=args[2], axis=args[3], vel=args[4], trq=args[5], estop=args[6], w2t=self.write2textbox)
elif flag == 2: elif flag == 2:
@ -478,13 +474,12 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], w2t=self.write2textbox) func_dict[flag](path=args[0], w2t=self.write2textbox)
elif flag == 4: elif flag == 4:
func_dict[flag](path=args[0], w2t=self.write2textbox) func_dict[flag](path=args[0], w2t=self.write2textbox)
elif flag == 5:
func_dict[flag](path=args[0], hr=self.hr, loadsel=args[1], w2t=self.write2textbox)
else: else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
self.textbox.configure(state='disabled')
def func_check_callback(self): def func_check_callback(self):
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param() flag, *args = self.check_param()
@ -493,8 +488,6 @@ class App(customtkinter.CTk):
else: else:
tkinter.messagebox.showerror(title="参数错误", message="需要检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="需要检查对应参数是否填写正确!", )
self.textbox.configure(state='disabled')
def func_log_callback(self): def func_log_callback(self):
content = self.textbox.get(index1='1.0', index2='end') content = self.textbox.get(index1='1.0', index2='end')
if len(content) > 1: if len(content) > 1:

View File

@ -1 +1 @@
__all__ = ['openapi', 'btn_functions'] __all__ = ['openapi', 'btn_functions', 'do_brake', 'do_current']

View File

@ -1,10 +1,6 @@
import json import json
import socket
from os.path import dirname
from sys import argv from sys import argv
current_path = dirname(__file__)
def validate_resp(_id, response, w2t): def validate_resp(_id, response, w2t):
match _id: match _id:

View File

@ -0,0 +1,129 @@
from sys import argv
from os import scandir
from os.path import exists
import paramiko
import json
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 5:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red')
config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files:
filename = data_file.split('\\')[-1]
if filename == 'configs.xlsx':
config_file = data_file
elif filename.startswith('reach33_') and filename.endswith('.xlsx'):
reach33 = data_file
elif filename.startswith('reach66_') and filename.endswith('.xlsx'):
reach66 = data_file
elif filename.startswith('reach100_') and filename.endswith('.xlsx'):
reach100 = data_file
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red')
if config_file and reach33 and reach66 and reach100 and prj_file:
w2t("数据目录合规性检查结束,未发现问题......")
return config_file, reach33, reach66, reach100, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red')
def prj_to_xcore(prj_file):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; '
cmd += 'rm -rf target/; '
cmd += 'mkdir target; '
cmd += 'unzip -d target/ -q target.zip; '
cmd += 'rm target.zip; '
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def modify_prj():
pass
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.excution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red')
else:
_response = json.loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def run_rl(hr, w2t):
prj_path = '/home/luoshi/bin/controller/projects/target'
_response = execution('overview.set_autoload', hr, w2t, autoload_prj_path=prj_path)
print(f"set prj auto load: {_response}")
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['Durable_Test_Com', 'Mechanical_Test_Com'])
print(f"reload prj: {_response}")
_response = execution('overview.get_cur_prj', hr, w2t)
print(f"get prj name: {_response}")
_response = execution('overview.get_autoload', hr, w2t)
print(f"get auto load: {_response}")
def main(path, hr, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, reach33, reach66, reach100, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(hr, w2t)
if __name__ == '__main__':
main(*argv[1:])

View File

View File

@ -16,37 +16,6 @@ class HmiRequest(object):
self.w2t = w2t self.w2t = w2t
self.c = None self.c = None
self.c_xs = None self.c_xs = None
def sock_conn():
# while True:
with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_h:
connection_state = f_h.read().strip()
if connection_state == '0':
try:
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect(('192.168.0.160', 5050))
# c.connect(('192.168.84.129', 5050))
c.setblocking(False)
c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c_xs.connect(('192.168.0.160', 6666))
# c_xs.connect(('192.168.84.129', 6666))
c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('1')
return c, c_xs
except Exception as Err:
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
self.w2t("Connection failed...", 0, 0, 'red')
return None, None
self.c, self.c_xs = sock_conn()
if self.c is None or self.c_xs is None:
self.w2t("Aborting! Try to switch tabs to re-connect!", 0, 1, 'red')
self.c_msg = [] self.c_msg = []
self.c_msg_xs = [] self.c_msg_xs = []
self.flag = 0 self.flag = 0
@ -54,7 +23,9 @@ class HmiRequest(object):
self.leftover = 0 self.leftover = 0
self.flag_xs = 0 self.flag_xs = 0
self.response_xs = '' self.response_xs = ''
self.t_bool = True
self.sock_conn()
self.t_heartbeat = threading.Thread(target=self.heartbeat) self.t_heartbeat = threading.Thread(target=self.heartbeat)
self.t_heartbeat.daemon = True self.t_heartbeat.daemon = True
self.t_heartbeat.start() self.t_heartbeat.start()
@ -65,9 +36,29 @@ class HmiRequest(object):
self.t_unpackage_xs.daemon = True self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start() self.t_unpackage_xs.start()
def close_sock(self): def sock_conn(self):
self.c.close() # while True:
self.c_xs.close() with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
if c_state == '0':
try:
self.c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c.connect(('192.168.0.160', 5050))
# self.c.connect(('192.168.84.129', 5050))
self.c.setblocking(False)
self.c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.c_xs.connect(('192.168.0.160', 6666))
# self.c_xs.connect(('192.168.84.129', 6666))
self.c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write('1')
except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_hb.write('0')
def header_check(self, index, data): def header_check(self, index, data):
try: try:
@ -87,12 +78,15 @@ class HmiRequest(object):
return 'DATA READ ERR' return 'DATA READ ERR'
def heartbeat(self): def heartbeat(self):
while True: while self.t_bool:
_id = self.excution('controller.heart') _id = self.excution('controller.heart')
_flag = 0 if self.get_from_id(_id) is None else 1 _flag = '0' if self.get_from_id(_id) is None else '1'
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h: with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
f_h.write(str(_flag)) f_hb.write(_flag)
time.sleep(3) time.sleep(3)
# with open('log.txt', 'w') as f:
# for msg in self.c_msg:
# f.write(msg + '\n')
def msg_storage(self, response, flag=0): def msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs messages = self.c_msg if flag == 0 else self.c_msg_xs
@ -224,7 +218,7 @@ class HmiRequest(object):
sel = selectors.DefaultSelector() sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read) sel.register(sock, selectors.EVENT_READ, to_read)
while True: while self.t_bool:
events = sel.select() events = sel.select()
for key, mask in events: for key, mask in events:
callback = key.data callback = key.data
@ -244,7 +238,7 @@ class HmiRequest(object):
sel = selectors.DefaultSelector() sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read) sel.register(sock, selectors.EVENT_READ, to_read)
while True: while self.t_bool:
events = sel.select() events = sel.select()
for key, mask in events: for key, mask in events:
callback = key.data callback = key.data
@ -268,13 +262,22 @@ class HmiRequest(object):
match command: match command:
case 'state.set_tp_mode': case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode'] req['data']['tp_mode'] = kwargs['tp_mode']
case 1: case 'overview.set_autoload':
pass req['data']['autoload_prj_path'] = kwargs['autoload_prj_path']
case 'overview.reload':
req['data']['prj_path'] = kwargs['prj_path']
req['data']['tasks'] = kwargs['tasks']
req['id'] = self.gen_id(command) req['id'] = self.gen_id(command)
print(f"req = {req}") print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':')) cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.package(cmd)) try:
time.sleep(2) self.c.send(self.package(cmd))
time.sleep(2)
except Exception as Err:
self.w2t(f"{cmd}\n请求发送失败...\n{Err}", 0, 11)
return req['id'] return req['id']
else: # for xService else: # for xService
pass pass

View File

@ -6,7 +6,7 @@ from pandas import read_csv, concat, set_option
from re import match from re import match
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from csv import reader from csv import reader, writer
class GetThreadResult(Thread): class GetThreadResult(Thread):
@ -99,6 +99,11 @@ def current_max(data_files, rcs, trqh, w2t):
current[axis].append(_) current[axis].append(_)
w2t(f"{data_file}: {_:.4f}") w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
for axis, cur in current.items(): for axis, cur in current.items():
if not cur: if not cur:
continue continue
@ -127,10 +132,15 @@ def current_avg(data_files, rcs, trqh, w2t):
c_avg = df[col].mean() c_avg = df[col].mean()
scale = 1 if data_file.endswith('.csv') else 1000 scale = 1 if data_file.endswith('.csv') else 1000
_ = (abs(c_avg)+c_std)/scale*rca _ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_) current[axis].append(_)
w2t(f"{data_file}: {_:.4f}") w2t(f"{data_file}: {_:.4f}")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
for axis, cur in current.items(): for axis, cur in current.items():
if not cur: if not cur:
continue continue
@ -293,7 +303,7 @@ def p_single(wb, single, vel, trq, rpm, w2t):
data.append(df.iloc[row, 1]) data.append(df.iloc[row, 1])
i = 0 i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=70000, max_col=3): for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for cell in row: for cell in row:
try: try:
_ = f"{data[i]:.2f}" _ = f"{data[i]:.2f}"
@ -345,7 +355,7 @@ def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
data.append(df.iloc[row, 1]) data.append(df.iloc[row, 1])
i = 0 i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=70000, max_col=3): for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for cell in row: for cell in row:
try: try:
_ = f"{data[i]:.2f}" _ = f"{data[i]:.2f}"

View File

@ -136,7 +136,7 @@ def single_file_proc(ws, data_file, df, low, high, cycle, w2t):
_row_lt = find_point('forward', _step, 'c'+str(_row), data_file, 'lt', df, _row, w2t) _row_lt = find_point('forward', _step, 'c'+str(_row), data_file, 'lt', df, _row, w2t)
_start = int(_row_gt + (_row_lt - _row_gt - 50) / 2) _start = int(_row_gt + (_row_lt - _row_gt - 50) / 2)
_end = _start + 50 _end = _start + 50
value = df.iloc[_start:_end, 2].mean() + df.iloc[_start:_end, 2].std() value = df.iloc[_start:_end, 2].mean() + 3 * df.iloc[_start:_end, 2].std()
_data[count].append(value) _data[count].append(value)
else: else:
_row_gt = find_point('forward', _step, 'c'+str(_row), data_file, 'gt', df, _row, w2t) _row_gt = find_point('forward', _step, 'c'+str(_row), data_file, 'gt', df, _row, w2t)