20 Commits

Author SHA1 Message Date
ea56dfce52 last commit 2025-02-25 10:51:22 +08:00
88776c6794 fix: 电机电流异常情况修复 2024-12-30 09:04:24 +08:00
14f6d43027 1. [main: do_brake.py] 修改了 SSH 的固定 IP 为 clibs 中读取的内容,并删除了每次都 reload 工程的动作,改为只在修改 RL 工程时 reload 一次,旨在减少最近频繁出现的“无法获取overview.reload-xxxxxx”请求的响应,初步判断是 xCore 的问题,非 AIO 问题,已反馈待版本修复
2. [main: wavelogger.py] 新增异常数据校验功能
2024-12-16 13:51:04 +08:00
4b6f78dd7e update source files 2024-12-05 16:24:54 +08:00
4d297118e0 1. [current: do_current.py] 增加了 hw_sensor_trq_feedback 曲线的采集
2. [current: current.py] 增加了 hw_sensor_trq_feedback 曲线数据的处理,以及修改了之前数据处理的逻辑
3. [current: clibs.py] 新增可手动修改连接 IP 地址的功能,存储在 assets/templates/ipaddr.txt 中,默认是 192.168.0.160
2024-12-05 16:14:59 +08:00
5c5168442f update configs.xlsx 2024-10-25 14:21:21 +08:00
c9fa3a4473 modify current max senario time from 150s to 250s 2024-10-12 21:49:16 +08:00
880964f675 重新修改制动结束延迟时间洛杉矶 2024-10-12 16:09:58 +08:00
3481d3b496 每次制动完成之后,pending时间修改为3s 2024-10-10 09:19:43 +08:00
bebaf292ac 修复制动性能测试,采集的数据速度未降为0的问题 2024-10-09 16:00:47 +08:00
9f78b0e563 minor modification of UI for current data process 2024-09-20 14:13:54 +08:00
59711d9c65 4. [main: openapi.py]:新增 rl_task.set_run_params 指令支持,可设定速度滑块以及是否重复运行
5. [main: do_brake/do_current/factory_test.py]:在初始化运动时增加 `clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)`
2024-08-20 18:03:44 +08:00
edafd91567 v0.2.0.8(2024/08/20)
1. [t_change_ui: clibs.py]
   - 从外部拷贝 icon.ico 文件到 templates 目录
   - 在 assets 目录新建 logs 目录,存放日志文件,并增加了相应的逻辑保证正常执行
2. [t_change_ui: aio.py]:增加 App 窗口图标代码
3. [t_change_ui: openapi.py]:将重复输出的网络错误提示,从 textbox 中转移到 debug.log 日志文件中
2024-08-20 11:13:45 +08:00
03b15751c2 add exception handle for openapi-selector 2024-08-17 09:28:33 +08:00
29bd4185c4 version change 2024-08-16 17:48:08 +08:00
97071d231f Merge branch 't_change_ui' 2024-08-16 17:23:25 +08:00
2d12c160b9 v0.2.0.7(2024/08/16)
1. [t_change_ui: clibs.py]:修改了 hmi.log 的日志等级为 WARNING
2. [t_change_ui: openapi.py]:根据第一步的修改,将此模块日志记录等级调整至 warning
3. [current: current.py]
   - README新增了整机自动化测试的前置条件,即滑块需要滑动到最右端
   - current修改了文件校验的逻辑
4. [t_change_ui: aio.py]
   - 修改变量命名,widgit -> widget
   - 根据第 5 点变动,同步修改代码实现
   - 调整 UI 界面代码顺序,使之符合 layout.xlsx 描述
   - 将版本检查的部分单独封装成一个函数,在 detect_network 线程初始化时调用一次,并且程序启动也不会受到阻塞
5. [t_change_ui: layout.xlsx]:修改了组件布局方式
2024-08-16 17:22:52 +08:00
62e5e6ab50 README中增加整机自动化测试前需要调整速度滑块的提醒,完善current数据预处理的逻辑 2024-08-16 16:05:31 +08:00
8f342832b2 fix merge while merging from main 2024-08-16 15:57:00 +08:00
addb123a8a [current: current.py]: 在find_point函数种,当无法找到正确点位时,继续执行,而不是直接终止执行 2024-07-13 13:32:57 +08:00
22 changed files with 438 additions and 287 deletions

4
.gitignore vendored
View File

@ -7,10 +7,8 @@ aio/venv
aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/
aio/assets/templates/c_msg.log*
aio/code/durable_action/__pycache__/
aio/assets/templates/durable/
aio/assets/templates/.__c_msg.lock
aio/code/commons/__pycache__/
aio/assets/templates/debug.log
aio/assets/templates/logs/
dial_gauge/results.xlsx

View File

@ -1,3 +0,0 @@
# rokae
测试内容自动化处理

View File

@ -34,7 +34,7 @@
打包时,只需要修改 clibs.py 中的 PREFIX 即可,调试时再修改回来
```
pyinstaller --noconfirm --onedir --windowed --optimize 2 --contents-directory . --upx-dir "D:/Syncthing/common/A_Program/upx-4.2.4-win64/" --add-data "C:/Users/Administrator/AppData/Local/Programs/Python/Python312/Lib/site-packages/customtkinter;customtkinter/" --add-data "D:/Syncthing/company/D-测试工作/X-自动化测试/01-AIO/rokae/aio/assets/templates:templates" --version-file ../assets/file_version_info.txt -i ../assets/icon.ico ../code/aio.py -p ../code/data_process/brake.py -p ../code/data_process/iso.py -p ../code/data_process/current.py -p ../code/data_process/wavelogger.py -p ../code/commons/openapi.py -p ../code/commons/clibs.py -p ../code/automatic_test/btn_functions.py -p ../code/automatic_test/do_current.py -p ../code/automatic_test/do_brake.py -p ../code/durable_action/factory_test.py
pyinstaller --noconfirm --onedir --windowed --optimize 2 --contents-directory . --upx-dir "D:/Syncthing/common/A_Program/upx-4.2.4-win64/" --add-data "C:/Users/Administrator/AppData/Local/Programs/Python/Python312/Lib/site-packages/customtkinter;customtkinter/" --add-data "D:\Syncthing\company\D-测试工作\X-自动化测试\01-Gitea\aio\aio\assets\templates:templates" --version-file ../assets/file_version_info.txt -i ../assets/templates/icon.ico ../code/aio.py -p ../code/data_process/brake.py -p ../code/data_process/iso.py -p ../code/data_process/current.py -p ../code/data_process/wavelogger.py -p ../code/commons/openapi.py -p ../code/commons/clibs.py -p ../code/automatic_test/btn_functions.py -p ../code/automatic_test/do_current.py -p ../code/automatic_test/do_brake.py -p ../code/durable_action/factory_test.py
```
---
@ -146,6 +146,7 @@ pyinstaller --noconfirm --onedir --windowed --optimize 2 --contents-directory .
10. 由于xCore系统问题运行过程中可能会出现机器人宕机问题如果遇到可以手动重启控制柜重新运行
11. 务必正确填写configs.xlsx中的Target页面A1单元格可以选择正负方向急停
12. 工程文件可以手动重命名,按照机型存档,或者导出用于自动化测试
13. 可废弃但未验证自动化测试前需要将HMI程序速度设置为100%并同步至控制器,也即左下方速度滑条滑动至最大
#### 6) 电机电流自动化测试
@ -274,7 +275,7 @@ v0.1.5.1(2024/06/12)
5. [requirements.txt] 新增必要库配置文件
v0.1.5.2(2024/06/13)
1. [brake.py/aio.py]: 将sto修改为estop
1. [brake.py/aio.py] 将sto修改为estop
2. [brake.py] 修改了速度计算逻辑新版本的vel列数据遵循如下规则av = vel * 180 / pi根据av再计算speed
3. [brake.py] 将threshold修改为常量50
4. [brake.py] 提高了输出提示语的明确性,删除了不必要的省略号
@ -448,18 +449,18 @@ v0.1.7.6(2024/07/04)
3. [APIs: openapi.py]
- 增加了modbus读取浮点数的功能
- 优化了get_from_id的逻辑
4. [autotest.xml]: 新增了scenario_time只写寄存器
4. [autotest.xml] 新增了scenario_time只写寄存器
v0.1.8.0(2024/07/04)
1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成
1. [APIs: do_current.py] 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成
v0.1.8.1(2024/07/05)
1. [APIs: do_brake.py]: 完成了制动性能测试框架的搭建,可以顺利执行完整的测试程序,但是未实现急停和数据处理
2. [APIs: aio.py]: 修改了do_brake主函数的参数
1. [APIs: do_brake.py] 完成了制动性能测试框架的搭建,可以顺利执行完整的测试程序,但是未实现急停和数据处理
2. [APIs: aio.py] 修改了do_brake主函数的参数
3. 增加工程文件target.zip
v0.1.8.2(2024/07/08)
1. [APIs: do_brake.py]: 完成了制动性能测试逻辑只不过制动信号传递生效延迟不可控暂时pending
1. [APIs: do_brake.py] 完成了制动性能测试逻辑只不过制动信号传递生效延迟不可控暂时pending
2. [APIs: do_current.py]: 修改曲线数据时序主要是value data取反即可解决了波形锯齿明细的问题
3. [APIs: openapi.py]: modbus新增了触发急停信号的寄存器 stop0_signal并重写了解除急停socket新增了register.set_value协议
@ -590,8 +591,8 @@ v0.2.0.5(2024/07/31)
- 保持电流,只取最后 15s
- 优化 ssh 输入密码的部分
6. [t_change_ui: all the part]: 引入 commons 包,并定制了 logging 输出,后续持续优化
7. [APIs: btn_functions.py]: 重写了告警输出函数,从日志中拿数据
8. [APIs: aio.py]: 将日志框输出的内容,也保存至日志文件
7. [APIs: btn_functions.py] 重写了告警输出函数,从日志中拿数据
8. [APIs: aio.py] 将日志框输出的内容,也保存至日志文件
9. [APIs: do_brake.py]
- 修改获取初始速度的逻辑只获取configs文件中配置的时间内的速度
- 新增 configs 参数 single_brake可针对特定条件做测试
@ -601,3 +602,42 @@ v0.2.0.6(2024/08/09)
1. [t_change_ui: all files]
- 修改了 logger 的实现
- 尤其是 clibs.py使用日志字典重写了日志记录的功能
v0.2.0.7(2024/08/16)
1. [t_change_ui: clibs.py]:修改了 hmi.log 的日志等级为 WARNING
2. [t_change_ui: openapi.py]:根据第一步的修改,将此模块日志记录等级调整至 warning
3. [current: current.py]
- README新增了整机自动化测试的前置条件即滑块需要滑动到最右端
- current修改了文件校验的逻辑
4. [t_change_ui: aio.py]
- 修改变量命名widgit -> widget
- 根据第 5 点变动,同步修改代码实现
- 调整 UI 界面代码顺序,使之符合 layout.xlsx 描述
- 将版本检查的部分单独封装成一个函数,在 detect_network 线程初始化时调用一次,并且程序启动也不会受到阻塞
5. [t_change_ui: layout.xlsx]:修改了组件布局方式
> 前两个修改点,修复的是网络提示的颜色不正确问题,因为日志将 textbox 中的内容也作为 debug 信息写入 hmi.log 了
v0.2.0.8(2024/08/20)
1. [t_change_ui: clibs.py]
- 从外部拷贝 icon.ico 文件到 templates 目录
- 在 assets 目录新建 logs 目录,存放日志文件,并增加了相应的逻辑保证正常执行
2. [t_change_ui: aio.py]:增加 App 窗口图标代码
3. [t_change_ui: openapi.py]:将重复输出的网络错误提示,从 textbox 中转移到 debug.log 日志文件中
4. [main: openapi.py]:新增 rl_task.set_run_params 指令支持,可设定速度滑块以及是否重复运行
5. [main: do_brake/do_current/factory_test.py]:在初始化运动时增加 `clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)`
v0.2.0.9(2024/10/09)
1. [main: do_brake.py] 采集完成后pending 3s使速度完全将为 0
v0.2.1.0(2024/12/05)
1. [current: do_current.py] 增加了 hw_sensor_trq_feedback 曲线的采集
2. [current: current.py] 增加了 hw_sensor_trq_feedback 曲线数据的处理,以及修改了之前数据处理的逻辑
3. [current: clibs.py] 新增可手动修改连接 IP 地址的功能,存储在 assets/templates/ipaddr.txt 中,默认是 192.168.0.160
v0.2.1.1(2024/12/16)
1. [main: do_brake.py] 修改了 SSH 的固定 IP 为 clibs 中读取的内容,并删除了每次都 reload 工程的动作,改为只在修改 RL 工程时 reload 一次旨在减少最近频繁出现的“无法获取overview.reload-xxxxxx”请求的响应初步判断是 xCore 的问题,非 AIO 问题,已反馈待版本修复
2. [main: wavelogger.py] 新增异常数据校验功能

Binary file not shown.

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 2, 0, 5),
prodvers=(0, 2, 0, 5),
filevers=(0, 2, 1, 1),
prodvers=(0, 2, 1, 1),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.2.0.5 (2024-08-02)'),
StringStruct('FileVersion', '0.2.1.1 (2024-12-16)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.2.0.5 (2024-08-02)')])
StringStruct('ProductVersion', '0.2.1.1 (2024-12-16)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]

Binary file not shown.

Binary file not shown.

View File

@ -1 +1 @@
1
0

View File

Before

Width:  |  Height:  |  Size: 162 KiB

After

Width:  |  Height:  |  Size: 162 KiB

View File

@ -0,0 +1 @@
192.168.0.160

View File

@ -5,8 +5,8 @@
"data": {
"open": false,
"display_open": false,
"overrun": false,
"turn_area": false,
"overrun": true,
"turn_area": true,
"delay_motion": false
}
}

View File

@ -0,0 +1,9 @@
{
"id": "xxxxxxxxxxx",
"module": "project",
"command": "rl_task.set_run_params",
"data": {
"loop_mode": true,
"override": 1.0
}
}

View File

@ -1 +1 @@
0.2.0.5 @ 08/02/2024
0.2.1.1 @ 12/16/2024

View File

@ -37,21 +37,21 @@ btns_func = {
'log': {'btn': '', 'row': 3, 'text': '保存日志'},
'end': {'btn': '', 'row': 4, 'text': '结束运行'},
}
widgits_dp = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'dur': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '周期时间'},
'vel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 4, 'text': ''},
'trq': {'label': '', 'optionmenu': '', 'row': 2, 'col': 6, 'text': ''},
'trqh': {'label': '', 'optionmenu': '', 'row': 2, 'col': 8, 'text': ''},
'estop': {'label': '', 'optionmenu': '', 'row': 2, 'col': 10, 'text': ''},
widgets_dp = {
'path': {'label': '', 'entry': '', 'row': 0, 'col': 1, 'text': '数据文件夹路径'},
'dur': {'label': '', 'entry': '', 'row': 1, 'col': 9, 'text': '周期时间'},
'vel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': ''},
'trq': {'label': '', 'optionmenu': '', 'row': 1, 'col': 3, 'text': ''},
'trqh': {'label': '', 'optionmenu': '', 'row': 1, 'col': 5, 'text': ''},
'estop': {'label': '', 'optionmenu': '', 'row': 1, 'col': 7, 'text': ''},
}
widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
widgets_at = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 1, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 0, 'text': '负载信息'},
}
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '指标选择'},
widgets_da = {
'path': {'label': '', 'entry': '', 'row': 0, 'col': 1, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 0, 'col': 0, 'text': '指标选择'},
}
@ -70,21 +70,24 @@ class App(customtkinter.CTk):
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
# self.iconbitmap('./icon.ico')
self.wm_iconbitmap(clibs.app_icon)
self.geometry("1200x550+30+30")
self.protocol("WM_DELETE_WINDOW", self.func_end_callback)
self.config(bg='#E9E9E9')
self.grid_rowconfigure(6, weight=1)
self.grid_columnconfigure((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), weight=1)
self.grid_rowconfigure(0, weight=1)
self.grid_rowconfigure(1, weight=19)
self.grid_columnconfigure(0, weight=1)
self.grid_columnconfigure(1, weight=19)
self.minsize(1200, 550)
# =====================================================================
# create frame sidebar(left)
self.frame_func = customtkinter.CTkFrame(self, width=120, corner_radius=0, fg_color='#E9E9E9')
self.frame_func.grid(row=0, column=0, rowspan=7, sticky='nsew')
# create AIO logo
# 1. create frame sidebar(left)
# =====================================================================
self.frame_func = customtkinter.CTkFrame(self, width=200, corner_radius=0, fg_color='#E9E9E9')
self.frame_func.grid(row=0, column=0, rowspan=2, sticky='nsew')
# 1.1 create AIO logo
self.label_logo = customtkinter.CTkLabel(self.frame_func, text="Rokae AIO", height=60, font=customtkinter.CTkFont(family="Segoe Script Bold", size=24, weight="bold"), text_color="#4F4F4F")
self.label_logo.grid(row=0, column=0, padx=15, pady=15)
# create buttons
# 1.2 create buttons
for func in btns_func:
btns_func[func]['btn'] = customtkinter.CTkButton(self.frame_func, corner_radius=10, text=btns_func[func]['text'], fg_color='#4F4F4F', font=self.my_font)
btns_func[func]['btn'].grid(row=btns_func[func]['row'], column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
@ -92,95 +95,104 @@ class App(customtkinter.CTk):
btns_func['check']['btn'].configure(command=lambda: self.thread_it(self.func_check_callback))
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.2.0.5\nDate: 08/02/2024", font=self.my_font, text_color="#4F4F4F")
# 1.3 create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.2.1.1\nDate: 12/16/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================
# create tabviews
self.tabview = customtkinter.CTkTabview(self, width=10000, height=100, anchor='w', fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', command=self.tabview_click)
# 2. create tabviews
# =====================================================================
self.tabview = customtkinter.CTkTabview(self, anchor='w', fg_color='#E9E9E9', width=1000, height=45, border_width=2, border_color='#CDCDCD', command=self.tabview_click)
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process")
self.tabview.add("Automatic Test")
self.tabview.add("Durable Action")
# create main menu for data process
# 2.1 create widgets of tab "Data Process"
# 2.1.1 create main menu
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
self.menu_main_dp.grid(row=0, column=0, sticky='we', padx=5, pady=10)
self.menu_main_dp.set("Start Here!")
# create sub menu for data process
# 2.2.2 create sub menu
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'))
# create main menu for automatic test
# 2.2.3 create labels, entries and option menus
for widget in widgets_dp:
if widget == 'path':
self.tabview.tab('Data Process').columnconfigure(12, weight=1)
widgets_dp[widget]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widget.upper()}', font=self.my_font)
widgets_dp[widget]['label'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col'], sticky='e', pady=10)
widgets_dp[widget]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), placeholder_text=widgets_dp[widget]['text'], font=self.my_font)
widgets_dp[widget]['entry'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgets_dp[widget]['entry'].configure(state='disabled')
elif widget in ['dur']:
widgets_dp[widget]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widget.upper()}", font=self.my_font)
widgets_dp[widget]['label'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col'], sticky='e', pady=5)
widgets_dp[widget]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgets_dp[widget]['text']}", font=self.my_font)
widgets_dp[widget]['entry'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgets_dp[widget]['entry'].configure(state='disabled')
elif widget in ['vel', 'trq', 'trqh', 'estop']:
widgets_dp[widget]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widget.upper()}", font=self.my_font)
widgets_dp[widget]['label'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col'], sticky='e', pady=5)
widgets_dp[widget]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
widgets_dp[widget]['optionmenu'].grid(row=widgets_dp[widget]['row'], column=widgets_dp[widget]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgets_dp[widget]['optionmenu'].configure(state='disabled')
# =====================================================================
# 2.2 create widgets of tab "Automatic Test"
# 2.2.1 create main menu
self.menu_main_at = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), values=["init", "brake", "current"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_at.grid(row=1, column=1, sticky='we', padx=5, pady=5)
self.menu_main_at.grid(row=0, column=0, sticky='we', padx=5, pady=5)
self.menu_main_at.set("Start Here!")
# For data process tab START =====================================================================
# create widgits_dp
for widgit in widgits_dp:
if widgit == 'path':
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widgit.upper()}', font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=10)
widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=670, placeholder_text=widgits_dp[widgit]['text'], font=self.my_font)
widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['dur']:
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgits_dp[widgit]['text']}", font=self.my_font)
widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq', 'trqh', 'estop']:
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits_dp[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
widgits_dp[widgit]['optionmenu'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
# For data process tab END =====================================================================
# For automatic test tab START =====================================================================
# create buttons
# 2.2.2 create segment buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.grid(row=0, column=1, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
# 2.2.3 create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.progressbar.configure(mode="determinnate", width=10)
self.progressbar.grid(row=2, column=0, padx=5, pady=5, sticky="ew")
self.progressbar.configure(mode="determinnate", width=self.w_param)
self.progressbar.start()
# create widgits_at
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', padx=(20, 5), pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100", "inertia"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# For durable_action tab START =====================================================================
# create progress bar
# 2.2.4 create labels, entries and option menus
for widget in widgets_at:
if widget == 'path':
self.tabview.tab('Automatic Test').columnconfigure(12, weight=1)
widgets_at[widget]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widget.upper()}', font=self.my_font)
widgets_at[widget]['label'].grid(row=widgets_at[widget]['row'], column=widgets_at[widget]['col'], sticky='e', padx=(20, 5), pady=5)
widgets_at[widget]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), placeholder_text=widgets_at[widget]['text'], font=self.my_font)
widgets_at[widget]['entry'].grid(row=widgets_at[widget]['row'], column=widgets_at[widget]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgets_at[widget]['entry'].configure(state='disabled')
elif widget in ['loadsel', ]:
widgets_at[widget]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100", "inertia"], width=self.w_param, font=self.my_font)
widgets_at[widget]['optionmenu'].grid(row=widgets_at[widget]['row'], column=widgets_at[widget]['col'], padx=5, pady=5, sticky='we')
widgets_at[widget]['optionmenu'].set(widgets_at[widget]['text'])
widgets_at[widget]['optionmenu'].configure(state='disabled')
# =====================================================================
# 2.3 create widgets of tab "Durable Action"
# 2.3.1 create progress bar
self.progressbar_da = customtkinter.CTkProgressBar(self.tabview.tab('Durable Action'))
self.progressbar_da.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_da.grid(row=1, column=0, padx=5, pady=5, sticky="ew")
self.progressbar_da.configure(mode="determinnate", width=self.w_param)
self.progressbar_da.start()
for widgit in widgits_da:
if widgit == 'path':
widgits_da[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widgit.upper()}', font=self.my_font)
widgits_da[widgit]['label'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], sticky='e', padx=(20, 5), pady=10)
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), dynamic_resizing=False, button_color='#708090', fg_color='#778899', values=['device_servo_trq_feedback', '[max] device_servo_trq_feedback'], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
# create textbox
# 2.3.2 create labels, entries and option menus
for widget in widgets_da:
if widget == 'path':
self.tabview.tab('Durable Action').columnconfigure(12, weight=1)
widgets_da[widget]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widget.upper()}', font=self.my_font)
widgets_da[widget]['label'].grid(row=widgets_da[widget]['row'], column=widgets_da[widget]['col'], sticky='e', padx=(20, 5), pady=10)
widgets_da[widget]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), placeholder_text=widgets_da[widget]['text'], font=self.my_font)
widgets_da[widget]['entry'].grid(row=widgets_da[widget]['row'], column=widgets_da[widget]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widget in ['curvesel']:
widgets_da[widget]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), dynamic_resizing=False, button_color='#708090', fg_color='#778899', values=['device_servo_trq_feedback', '[max] device_servo_trq_feedback'], font=self.my_font)
widgets_da[widget]['optionmenu'].grid(row=widgets_da[widget]['row'], column=widgets_da[widget]['col'], padx=5, pady=10, sticky='we')
widgets_da[widget]['optionmenu'].set(widgets_da[widget]['text'])
# =====================================================================
# 3. create textbox
# =====================================================================
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.grid(row=1, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.configure(state='normal')
# functions below ↓ ----------------------------------------------------------------------------------------
def version_check(self):
# =====================================================================
# version check
cur_vers = self.label_version.cget("text").replace('\n', ' @ ').replace("Vers: ", '').replace("Date: ", '')
@ -192,13 +204,12 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message=msg)
except:
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def create_canvas(self, _figure):
self.canvas = FigureCanvasTkAgg(_figure, self.tabview.tab('Durable Action'))
self.canvas.draw()
self.canvas.get_tk_widget().configure(height=600)
self.canvas.get_tk_widget().grid(row=3, column=1, rowspan=3, columnspan=13, padx=20, pady=10, sticky="nsew")
self.canvas.get_tk_widget().grid(row=2, column=0, columnspan=13, padx=10, pady=10, sticky="nsew")
def create_plot(self):
rcParams['font.sans-serif'] = ['SimHei']
@ -207,7 +218,7 @@ class App(customtkinter.CTk):
rcParams['font.size'] = 14
rcParams['lines.marker'] = 'o'
curvesel = widgits_da['curvesel']['optionmenu'].get()
curvesel = widgets_da['curvesel']['optionmenu'].get()
while True:
if not self.hr.durable_lock:
self.hr.durable_lock = 1
@ -228,7 +239,7 @@ class App(customtkinter.CTk):
if not df.equals(self.df_copy) or self.flg == 0 or curvesel != self.old_curve:
self.flg = 1
self.df_copy = df.copy()
self.old_curve = widgits_da['curvesel']['optionmenu'].get()
self.old_curve = widgets_da['curvesel']['optionmenu'].get()
close('all')
_figure = figure(frameon=True, facecolor='#E9E9E9')
subplots_adjust(left=0.04, right=0.98, bottom=0.1, top=0.95)
@ -276,6 +287,7 @@ class App(customtkinter.CTk):
# self.tabview.configure(state='normal')
def detect_network(self):
self.version_check()
df = DataFrame(clibs.durable_data_current)
df.to_excel(clibs.durable_data_current_xlsx, index=False)
df = DataFrame(clibs.durable_data_current_max)
@ -319,130 +331,130 @@ class App(customtkinter.CTk):
tab_name = self.tabview.get()
self.textbox.delete(index1='1.0', index2='end')
if tab_name == 'Data Process':
for widgit in widgits_dp:
if widgit in ['path', 'dur']:
widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq', 'trqh', 'estop']:
widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color="black")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
for widget in widgets_dp:
if widget in ['path', 'dur']:
widgets_dp[widget]['label'].configure(text=f'{widget.upper()}', text_color='black')
widgets_dp[widget]['entry'].delete(0, tkinter.END)
widgets_dp[widget]['entry'].configure(placeholder_text=widgets_dp[widget]['text'], state='normal')
widgets_dp[widget]['entry'].configure(state='disabled')
elif widget in ['vel', 'trq', 'trqh', 'estop']:
widgets_dp[widget]['label'].configure(text=f'{widget.upper()}', text_color="black")
widgets_dp[widget]['optionmenu'].configure(state='normal')
widgets_dp[widget]['optionmenu'].set('1')
widgets_dp[widget]['optionmenu'].configure(state='disabled')
self.menu_sub_dp.grid_forget()
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', ]:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel']:
widgits_at[widgit]['optionmenu'].configure(state='normal')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
for widget in widgets_at:
if widget in ['path', ]:
widgets_at[widget]['label'].configure(text=f'{widget.upper()}', text_color='black')
widgets_at[widget]['entry'].delete(0, tkinter.END)
widgets_at[widget]['entry'].configure(placeholder_text=widgets_at[widget]['text'], state='normal')
widgets_at[widget]['entry'].configure(state='disabled')
elif widget in ['loadsel']:
widgets_at[widget]['optionmenu'].configure(state='normal')
widgets_at[widget]['optionmenu'].set(widgets_at[widget]['text'])
widgets_at[widget]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
elif tab_name == 'Durable Action':
for widgit in widgits_da:
if widgit in ['path', ]:
widgits_da[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_da[widgit]['entry'].delete(0, tkinter.END)
widgits_da[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'].configure(state='normal')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
for widget in widgets_da:
if widget in ['path', ]:
widgets_da[widget]['label'].configure(text=f'{widget.upper()}', text_color='black')
widgets_da[widget]['entry'].delete(0, tkinter.END)
widgets_da[widget]['entry'].configure(placeholder_text=widgets_at[widget]['text'], state='normal')
elif widget in ['curvesel']:
widgets_da[widget]['optionmenu'].configure(state='normal')
widgets_da[widget]['optionmenu'].set(widgets_da[widget]['text'])
def func_main_callback(self, func_name):
self.initialization()
tab_name = self.tabview.get()
if tab_name == 'Data Process':
if func_name == 'brake':
for widgit in widgits_dp:
if widgit in ['path']:
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['vel', 'trq', 'estop']:
widgits_dp[widgit]['label'].configure(text_color="red")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
for widget in widgets_dp:
if widget in ['path']:
widgets_dp[widget]['label'].configure(text_color='red')
widgets_dp[widget]['entry'].configure(state='normal')
elif widget in ['vel', 'trq', 'estop']:
widgets_dp[widget]['label'].configure(text_color="red")
widgets_dp[widget]['optionmenu'].configure(state='normal')
elif func_name == 'current':
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["max", "avg", "cycle"], font=self.my_font, button_color='red', fg_color='green', command=self.func_sub_callback)
self.menu_sub_dp.grid(row=2, column=1, sticky='we', padx=5, pady=5)
self.menu_sub_dp.grid(row=1, column=0, sticky='we', padx=5, pady=5)
self.menu_sub_dp.set("--select--")
self.menu_sub_dp.configure(text_color='yellow')
elif func_name == 'iso' or func_name == 'wavelogger':
for widgit in widgits_dp:
if widgit in ['path']:
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['entry'].configure(state='normal')
for widget in widgets_dp:
if widget in ['path']:
widgets_dp[widget]['label'].configure(text_color='red')
widgets_dp[widget]['entry'].configure(state='normal')
else:
self.initialization()
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
if func_name == 'brake':
for widgit in widgits_at:
if widgit in ['path',]:
widgits_at[widgit]['label'].configure(text_color='red')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='normal', text_color='red')
for widget in widgets_at:
if widget in ['path',]:
widgets_at[widget]['label'].configure(text_color='red')
widgets_at[widget]['entry'].delete(0, tkinter.END)
widgets_at[widget]['entry'].configure(placeholder_text=widgets_at[widget]['text'], state='normal')
widgets_at[widget]['entry'].configure(state='normal')
elif widget in ['loadsel', ]:
widgets_at[widget]['optionmenu'].set(widgets_at[widget]['text'])
widgets_at[widget]['optionmenu'].configure(state='normal', text_color='red')
elif func_name == 'current':
for widgit in widgits_at:
if widgit in ['path',]:
widgits_at[widgit]['label'].configure(text_color='red')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='normal')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='normal', text_color='red')
for widget in widgets_at:
if widget in ['path',]:
widgets_at[widget]['label'].configure(text_color='red')
widgets_at[widget]['entry'].delete(0, tkinter.END)
widgets_at[widget]['entry'].configure(placeholder_text=widgets_at[widget]['text'], state='normal')
widgets_at[widget]['entry'].configure(state='normal')
elif widget in ['loadsel', ]:
widgets_at[widget]['optionmenu'].set(widgets_at[widget]['text'])
widgets_at[widget]['optionmenu'].configure(state='normal', text_color='red')
else:
self.initialization()
self.menu_main_at.set("Start Here!")
def func_sub_callback(self, func_name):
if func_name == "max" or func_name == "avg":
for widgit in widgits_dp:
if widgit in ['path']:
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['dur']:
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trqh', 'estop']:
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
elif widgit in ['trq']:
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='normal')
for widget in widgets_dp:
if widget in ['path']:
widgets_dp[widget]['label'].configure(text_color='red')
widgets_dp[widget]['entry'].delete(0, tkinter.END)
widgets_dp[widget]['entry'].configure(placeholder_text=widgets_dp[widget]['text'], state='normal')
widgets_dp[widget]['entry'].configure(state='normal')
elif widget in ['dur']:
widgets_dp[widget]['label'].configure(text_color='black')
widgets_dp[widget]['entry'].delete(0, tkinter.END)
widgets_dp[widget]['entry'].configure(placeholder_text=widgets_dp[widget]['text'], state='normal')
widgets_dp[widget]['entry'].configure(state='disabled')
elif widget in ['vel', 'trqh', 'estop']:
widgets_dp[widget]['label'].configure(text_color='black')
widgets_dp[widget]['optionmenu'].set('1')
widgets_dp[widget]['optionmenu'].configure(state='disabled')
elif widget in ['trq']:
widgets_dp[widget]['label'].configure(text_color='red')
widgets_dp[widget]['optionmenu'].set('1')
widgets_dp[widget]['optionmenu'].configure(state='normal')
elif func_name == 'cycle':
for widgit in widgits_dp:
if widgit in ['path', 'dur']:
color = 'blue' if widgit == 'dur' else 'red'
widgits_dp[widgit]['label'].configure(text_color=color)
widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['vel', 'trq', 'trqh']:
color = 'blue' if widgit == 'trqh' else 'red'
widgits_dp[widgit]['label'].configure(text_color=color)
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='normal')
elif widgit in ['estop']:
widgits_dp[widgit]['label'].configure(text_color="black")
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
for widget in widgets_dp:
if widget in ['path', 'dur']:
color = 'blue' if widget == 'dur' else 'red'
widgets_dp[widget]['label'].configure(text_color=color)
widgets_dp[widget]['entry'].delete(0, tkinter.END)
widgets_dp[widget]['entry'].configure(placeholder_text=widgets_dp[widget]['text'], state='normal')
widgets_dp[widget]['entry'].configure(state='normal')
elif widget in ['vel', 'trq', 'trqh']:
color = 'blue' if widget == 'trqh' else 'red'
widgets_dp[widget]['label'].configure(text_color=color)
widgets_dp[widget]['optionmenu'].set('1')
widgets_dp[widget]['optionmenu'].configure(state='normal')
elif widget in ['estop']:
widgets_dp[widget]['label'].configure(text_color="black")
widgets_dp[widget]['optionmenu'].set('1')
widgets_dp[widget]['optionmenu'].configure(state='disabled')
def write2textbox(self, text, wait=0, exitcode=0, color='blue', tab_name='Data Process'):
self.textbox.tag_add(color, 'insert', 'end')
@ -497,10 +509,10 @@ class App(customtkinter.CTk):
if tab_name == 'Data Process':
func_name = self.menu_main_dp.get()
if func_name == 'brake':
path = widgits_dp['path']['entry'].get().strip()
vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits_dp['trq']['optionmenu'].get()
estop = widgits_dp['estop']['optionmenu'].get()
path = widgets_dp['path']['entry'].get().strip()
vel = widgets_dp['vel']['optionmenu'].get()
trq = widgets_dp['trq']['optionmenu'].get()
estop = widgets_dp['estop']['optionmenu'].get()
c1 = exists(path)
c2 = True if len({vel, trq, estop}) == 3 else False
@ -510,11 +522,11 @@ class App(customtkinter.CTk):
return 0, 0
# =======================================================
elif func_name == 'current':
path = widgits_dp['path']['entry'].get().strip()
dur = widgits_dp['dur']['entry'].get().strip()
vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits_dp['trq']['optionmenu'].get()
trqh = widgits_dp['trqh']['optionmenu'].get()
path = widgets_dp['path']['entry'].get().strip()
dur = widgets_dp['dur']['entry'].get().strip()
vel = widgets_dp['vel']['optionmenu'].get()
trq = widgets_dp['trq']['optionmenu'].get()
trqh = widgets_dp['trqh']['optionmenu'].get()
sub = self.menu_sub_dp.get()
c1 = exists(path)
@ -533,14 +545,14 @@ class App(customtkinter.CTk):
return 0, 0
# =======================================================
elif func_name == 'iso':
path = widgits_dp['path']['entry'].get().strip()
path = widgets_dp['path']['entry'].get().strip()
if exists(path):
return 3, path
else:
return 0, 0
# =======================================================
elif func_name == 'wavelogger':
path = widgits_dp['path']['entry'].get().strip()
path = widgets_dp['path']['entry'].get().strip()
if exists(path):
return 4, path
else:
@ -551,8 +563,8 @@ class App(customtkinter.CTk):
elif tab_name == 'Automatic Test':
func_name = self.menu_main_at.get()
if func_name == 'brake':
path = widgits_at['path']['entry'].get().strip()
loadsel = widgits_at['loadsel']['optionmenu'].get()
path = widgets_at['path']['entry'].get().strip()
loadsel = widgets_at['loadsel']['optionmenu'].get()
c1 = exists(path)
c2 = loadsel in ['tool100', 'tool66', 'tool33']
if c1 and c2:
@ -560,8 +572,8 @@ class App(customtkinter.CTk):
else:
return 0, 0
elif func_name == 'current':
path = widgits_at['path']['entry'].get().strip()
loadsel = widgits_at['loadsel']['optionmenu'].get()
path = widgets_at['path']['entry'].get().strip()
loadsel = widgets_at['loadsel']['optionmenu'].get()
c1 = exists(path)
c2 = loadsel in ['tool100', 'inertia']
if c1 and c2:
@ -571,8 +583,8 @@ class App(customtkinter.CTk):
else:
return 0, 0
elif tab_name == 'Durable Action':
path = widgits_da['path']['entry'].get().strip()
curvesel = widgits_da['curvesel']['optionmenu'].get()
path = widgets_da['path']['entry'].get().strip()
curvesel = widgets_da['curvesel']['optionmenu'].get()
c1 = exists(path)
c2 = curvesel in ['device_servo_trq_feedback', '[max] device_servo_trq_feedback']
if c1 and c2:

View File

@ -155,10 +155,10 @@ def run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t):
sleep(write_diagnosis) # 软急停超差后等待写诊断时间可通过configs.xlsx配置
while count == 1:
# 2. 修改要执行的场景
# 2. 修改要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
ssh.connect(hostname=clibs.ip_addr, port=22, username='luoshi', password='luoshi2019')
if ws.cell(row=1, column=1).value == 'positive':
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
@ -184,6 +184,7 @@ def run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t):
clibs.execution('rl_task.pp_to_main', hr, w2t, tab_name, tasks=['brake', 'stop0_related'])
clibs.execution('state.switch_auto', hr, w2t, tab_name)
clibs.execution('state.switch_motor_on', hr, w2t, tab_name)
clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)
clibs.execution('rl_task.run', hr, w2t, tab_name, tasks=['brake', 'stop0_related'])
_t_start = time()
while True:
@ -237,7 +238,7 @@ def run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t):
md.reset_estop() # 其实没必要
md.clear_alarm()
clibs.execution('overview.reload', hr, w2t, tab_name, prj_path=prj_path, tasks=['brake', 'stop0_related'])
# clibs.execution('overview.reload', hr, w2t, tab_name, prj_path=prj_path, tasks=['brake', 'stop0_related'])
clibs.execution('rl_task.pp_to_main', hr, w2t, tab_name, tasks=['brake', 'stop0_related'])
clibs.execution('state.switch_auto', hr, w2t, tab_name)
clibs.execution('state.switch_motor_on', hr, w2t, tab_name)
@ -256,7 +257,7 @@ def run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t):
_t_start = time()
while True:
if md.read_brake_done() == 1:
sleep(1) # 保证速度归零
sleep(4) # 保证速度归零
md.write_probe(0)
break
else:

View File

@ -21,6 +21,12 @@ display_pdo_params = [
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "hw_sensor_trq_feedback", "channel": 0},
{"name": "hw_sensor_trq_feedback", "channel": 1},
{"name": "hw_sensor_trq_feedback", "channel": 2},
{"name": "hw_sensor_trq_feedback", "channel": 3},
{"name": "hw_sensor_trq_feedback", "channel": 4},
{"name": "hw_sensor_trq_feedback", "channel": 5},
]
@ -63,6 +69,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines[-500:]: # 保留最后25s的数据
data = eval(line.strip())['data']
for item in data:
@ -74,10 +81,13 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\single\\j{channel+1}_single_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(6, 9)):
@ -85,16 +95,22 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel_0 = {'hw_joint_vel_feedback': []}
_d2d_trq_0 = {'device_servo_trq_feedback': []}
_d2d_sensor_0 = {'hw_sensor_trq_feedback': []}
_d2d_vel_1 = {'hw_joint_vel_feedback': []}
_d2d_trq_1 = {'device_servo_trq_feedback': []}
_d2d_sensor_1 = {'hw_sensor_trq_feedback': []}
_d2d_vel_2 = {'hw_joint_vel_feedback': []}
_d2d_trq_2 = {'device_servo_trq_feedback': []}
_d2d_sensor_2 = {'hw_sensor_trq_feedback': []}
_d2d_vel_3 = {'hw_joint_vel_feedback': []}
_d2d_trq_3 = {'device_servo_trq_feedback': []}
_d2d_sensor_3 = {'hw_sensor_trq_feedback': []}
_d2d_vel_4 = {'hw_joint_vel_feedback': []}
_d2d_trq_4 = {'device_servo_trq_feedback': []}
_d2d_sensor_4 = {'hw_sensor_trq_feedback': []}
_d2d_vel_5 = {'hw_joint_vel_feedback': []}
_d2d_trq_5 = {'device_servo_trq_feedback': []}
_d2d_sensor_5 = {'hw_sensor_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
@ -106,60 +122,78 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_0['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_0['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_1['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_1['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_1['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_2['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_2['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_2['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_3['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_3['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_3['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_4['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_4['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_4['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_5['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_5['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_5['hw_sensor_trq_feedback'].extend(item['value'])
df_01 = DataFrame.from_dict(_d2d_vel_0)
df_02 = DataFrame.from_dict(_d2d_trq_0)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_0)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_1)
df_02 = DataFrame.from_dict(_d2d_trq_1)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_1)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_2)
df_02 = DataFrame.from_dict(_d2d_trq_2)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_2)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_3)
df_02 = DataFrame.from_dict(_d2d_trq_3)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_3)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_4)
df_02 = DataFrame.from_dict(_d2d_trq_4)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_4)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_5)
df_02 = DataFrame.from_dict(_d2d_trq_5)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_5)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(9, 15)):
@ -167,6 +201,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines[-300:]: # 保留最后15s的数据
data = eval(line.strip())['data']
for item in data:
@ -178,10 +213,13 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\single\\j{channel-8}_hold_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
@ -191,6 +229,7 @@ def data_proc_inertia(path, filename, channel):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
@ -202,10 +241,13 @@ def data_proc_inertia(path, filename, channel):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_trq['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\inertia\\j{channel+4}_inertia_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
@ -290,6 +332,7 @@ def run_rl(path, hr, md, loadsel, w2t):
clibs.execution('state.switch_motor_on', hr, w2t, tab_name)
# 3. 开始运行程序单轴运行35s
clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)
clibs.execution('rl_task.run', hr, w2t, tab_name, tasks=['current'])
_t_start = time()
while True:

View File

@ -1,4 +1,4 @@
from os import scandir
from os import scandir, mkdir
from threading import Thread
from time import sleep
from os.path import exists
@ -8,17 +8,18 @@ from logging import getLogger
from logging.config import dictConfig
import concurrent_log_handler
ip_addr = '192.168.0.160' # for product
# ip_addr = '192.168.84.129' # for test
RADIAN = 57.3 # 180 / 3.1415926
MAX_FRAME_SIZE = 1024
TIMEOUT = 5
setdefaulttimeout(TIMEOUT)
tab_names = {'dp': 'Data Process', 'at': 'Automatic Test', 'da': 'Duration Action', 'op': 'openapi'}
# PREFIX = '' # for pyinstaller packaging
PREFIX = '../assets/' # for source code debug
log_data_hmi = f'{PREFIX}templates/c_msg.log'
log_data_debug = f'{PREFIX}templates/debug.log'
PREFIX = '../assets/' # for source code testing and debug
app_icon = f'{PREFIX}templates/icon.ico'
ip_file = f'{PREFIX}templates/ipaddr.txt'
log_path = f'{PREFIX}templates/logs/'
log_data_hmi = f'{PREFIX}templates/logs/c_msg.log'
log_data_debug = f'{PREFIX}templates/logs/debug.log'
heartbeat = f'{PREFIX}templates/heartbeat'
durable_data_current_xlsx = f'{PREFIX}templates/durable/durable_data_current.xlsx'
durable_data_current_max_xlsx = f'{PREFIX}templates/durable/durable_data_current_max.xlsx'
@ -40,6 +41,17 @@ durable_data_current_max = {
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
try:
with open(ip_file, mode="r", encoding="utf-8") as f_ipaddr:
ip_addr = f_ipaddr.read().strip()
except:
ip_addr = '192.168.0.160'
# ip_addr = '192.168.0.160' # for product
# ip_addr = '192.168.84.129' # for test
if not exists(log_path):
mkdir(log_path)
# version表示版本该键值为从1开始的整数。该key必选除此之外其它key都是可选。
# formatters日志格式化器其value值为一个字典该字典的每个键值对都代表一个Formatter键值对中key代表Formatter ID(自定义ID)value为字典描述如何配置相应的Formatter实例。默认格式为 %(message)s
@ -78,7 +90,7 @@ log_dicts = {
'formatter': 'test',
},
'hmi.log': {
'level': 'DEBUG',
'level': 'WARNING',
'class': 'concurrent_log_handler.ConcurrentRotatingFileHandler',
'filename': log_data_hmi,
'maxBytes': 1024*1024*50,

View File

@ -19,7 +19,7 @@ class ModbusRequest(object):
self.host = clibs.ip_addr
self.port = 502
self.interval = 0.3
self.c = ModbusTcpClient(self.host, self.port)
self.c = ModbusTcpClient(host=self.host, port=self.port)
self.c.connect()
def motor_off(self):
@ -203,7 +203,7 @@ class HmiRequest(object):
self.c_xs.connect((clibs.ip_addr, 6666))
self.c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
logger.info("Connection success...")
with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('1')
md = ModbusRequest(self.w2t)
@ -213,7 +213,7 @@ class HmiRequest(object):
md.write_probe(False)
md.write_axis(1)
except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
logger.info("Connection failed...")
with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
@ -255,13 +255,13 @@ class HmiRequest(object):
with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write(_flag)
if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
logger.info(f"{_id} 心跳丢失,连接失败,重新连接中...")
sleep(2)
def msg_storage(self, response, flag=0):
# response是解码后的字符串
messages = self.c_msg if flag == 0 else self.c_msg_xs
logger.debug(f"{loads(response)}")
logger.warning(f"{loads(response)}")
if 'move.monitor' in response:
pass
elif len(messages) < 10000:
@ -539,6 +539,7 @@ class HmiRequest(object):
sel.unregister(conn)
conn.close()
try:
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
@ -547,6 +548,8 @@ class HmiRequest(object):
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as Err:
logger.warning(Err)
def unpackage_xs(self, sock):
def to_read(conn, mask):
@ -559,6 +562,7 @@ class HmiRequest(object):
sel.unregister(conn)
conn.close()
try:
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
@ -567,6 +571,8 @@ class HmiRequest(object):
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
except Exception as Err:
logger.warning(Err)
def gen_id(self, command):
_now = time()
@ -593,6 +599,9 @@ class HmiRequest(object):
req['data']['tasks'] = kwargs['tasks']
case 'rl_task.pp_to_main' | 'rl_task.run' | 'rl_task.stop':
req['data']['tasks'] = kwargs['tasks']
case 'rl_task.set_run_params':
req['data']['loop_mode'] = kwargs['loop_mode']
req['data']['override'] = kwargs['override']
case 'diagnosis.set_params':
req['data']['display_pdo_params'] = kwargs['display_pdo_params']
case 'diagnosis.open':
@ -615,7 +624,8 @@ class HmiRequest(object):
self.c.send(self.package(cmd))
sleep(0.5)
except Exception as Err:
self.w2t(f"{cmd}: 请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name)
# self.w2t(f"{cmd}: 请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name)
logger.info(f"{cmd}: 请求发送失败...{Err}")
return req['id']

View File

@ -28,16 +28,17 @@ def initialization(path, sub, w2t):
filename = data_file.split('\\')[-1]
if data_file.endswith('configs.xlsx'):
count += 1
elif sub == 'cycle' and data_file.endswith('.xlsx'):
elif sub == 'cycle' and data_file.endswith('T_电机电流.xlsx'):
count += 1
else:
if not (match('j[1-7].*\\.data', filename) or match('j[1-7].*\\.csv', filename)):
msg = f"不合规 {data_file}\n"
msg += f"所有文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7请检查后重新运行。"
msg += f"所有数据文件必须以 jx_ 开头,以 .data/csv 结尾x取值1-7配置文件需要命名为\"configs.xlsx\",结果文件需要命名为\"T_电机电流.xlsx\"请检查后重新运行。\n"
msg += "使用max/avg功能时需要有配置文件表格\"configs.xlsx\"使用cycle功能时需要有电机电流数据处理\"T_电机电流.xlsx\"和配置文件\"configs.xlsx\"两个表格,确认后重新运行!"
w2t(msg, 0, 6, 'red')
if not ((sub == 'cycle' and count == 2) or (sub != 'cycle' and count == 1)):
w2t("使用max/avg功能时需要有配置文件表格使用cycle功能时需要有电机电流数据处理和配置文件两个表格,确认后重新运行!", 0, 5, 'red')
w2t("使用max/avg功能时需要有配置文件表格\"configs.xlsx\"使用cycle功能时需要有电机电流数据处理\"T_电机电流.xlsx\"和配置文件\"configs.xlsx\"两个表格,确认后重新运行!", 0, 5, 'red')
return data_files
@ -186,7 +187,7 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
else:
return _row_s, _row_e
else:
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, exitcode, 'red')
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, 0, 'red')
elif flag == 'gt':
while _row_e > end_point:
speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
@ -197,7 +198,7 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
else:
return _row_s, _row_e
else:
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, 0, 'red')
def p_single(wb, single, vel, trq, rpms, rrs, w2t):
@ -232,6 +233,7 @@ def p_single(wb, single, vel, trq, rpms, rrs, w2t):
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
# print(df_1.abs().max())
df_origin = df
df = concat([df_1, df_2], axis=1)
_step = 5 if data_file.endswith('.csv') else 50
@ -247,32 +249,40 @@ def p_single(wb, single, vel, trq, rpms, rrs, w2t):
row_end = _row_e - _adjust
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'a2', 'gt', df, _row_s, _row_e, w2t, 3, threshold=5, step=_step, end_point=_end_point)
_row_s, _row_e = find_point(data_file, 'a2', 'gt', df, _row_s, _row_e, w2t, 2, threshold=5, step=_step, end_point=_end_point)
# 速度已经快要降为零了,继续寻找下一个速度上升点
_row_middle = _row_s
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'a3', 'lt', df, _row_s, _row_e, w2t, 3, threshold=5, step=_step, end_point=_end_point)
if abs((_row_s-_row_middle)-(_row_middle-row_end)) > 1000: # 两段相差太大判定后者较小很罕见的情况所以只假设之前遇到的情况所以可能有bug
row_end = _row_middle
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'a4', 'gt', df, _row_s, _row_e, w2t, 4, threshold=5, step=_step, end_point=_end_point)
elif speed_avg > 2:
# 过滤尾部非零无效数据
_row_s, _row_e = find_point(data_file, 'b1', 'gt', df, _row_s, _row_e, w2t, 2, threshold=5, step=_step, end_point=_end_point)
_row_s, _row_e = find_point(data_file, 'b1', 'gt', df, _row_s, _row_e, w2t, 1, threshold=5, step=_step, end_point=_end_point)
# 找到第一个起始点 row_end继续找到有数据的部分后面有一段零数据区
row_end = _row_e - _adjust
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'b2', 'lt', df, _row_s, _row_e, w2t, 4, threshold=5, step=_step, end_point=_end_point)
_row_s, _row_e = find_point(data_file, 'b2', 'lt', df, _row_s, _row_e, w2t, 2, threshold=5, step=_step, end_point=_end_point)
# 目前已经有一点的速度值了,继续往前搜寻下一个速度为零的点
_row_e -= _end_point
_row_s -= _end_point
_row_s, _row_e = find_point(data_file, 'b3', 'gt', df, _row_s, _row_e, w2t, 4, threshold=5, step=_step, end_point=_end_point)
_row_s, _row_e = find_point(data_file, 'b3', 'gt', df, _row_s, _row_e, w2t, 3, threshold=5, step=_step, end_point=_end_point)
row_start = _row_s + _adjust
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
data.append(df_origin.iloc[row, 2])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=4):
for cell in row:
try:
_ = f"{data[i]:.2f}"
@ -311,6 +321,7 @@ def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
df_origin = df
df = concat([df_1, df_2], axis=1)
row_start = 300
@ -320,11 +331,11 @@ def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
data = []
for row in range(row_start, row_end):
data.append(df.iloc[row, 0])
data.append(df.iloc[row, 1])
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=3):
for row in ws.iter_rows(min_row=2, min_col=2, max_row=250000, max_col=3):
for cell in row:
try:
_ = f"{data[i]:.2f}"

View File

@ -13,7 +13,7 @@ def find_point(bof, step, pos, data_file, flag, df, row, w2t):
# flag: greater than or lower than
if flag == 'gt':
while 0 < row < df.index[-1]-100:
_value = df.iloc[row, 2]
_value = float(df.iloc[row, 2])
if _value > 2:
if bof == 'backward':
row -= step
@ -33,7 +33,7 @@ def find_point(bof, step, pos, data_file, flag, df, row, w2t):
row_target = row + 100
elif flag == 'lt':
while 0 < row < df.index[-1]-100:
_value = df.iloc[row, 2]
_value = float(df.iloc[row, 2])
if _value < 2:
if bof == 'backward':
row -= step
@ -61,14 +61,19 @@ def get_cycle_info(data_file, df, row, step, w2t):
# 1. 从最后读取数据无论是大于1还是小于1都舍弃找到相反的值的起始点
# 2. 从起始点,继续往前寻找,找到与之数值相反的中间点
# 3. 从中间点,继续往前寻找,找到与之数值相反的结束点,至此,得到了高低数值的时间区间以及一轮的周期时间
if df.iloc[row, 2] < 2:
# print(f"row = {row}")
# print(df.iloc[row, 2])
if float(df.iloc[row, 2]) < 2:
row = find_point('backward', step, 'a1', data_file, 'lt', df, row, w2t)
_row = find_point('backward', step, 'a2', data_file, 'gt', df, row, w2t)
_row = find_point('backward', step, 'a3', data_file, 'lt', df, _row, w2t)
row_end = find_point('backward', step, 'a4', data_file, 'gt', df, _row, w2t)
# print(f"row_end = {row_end}")
row_middle = find_point('backward', step, 'a5', data_file, 'lt', df, row_end, w2t)
# print(f"row_middle = {row_middle}")
row_start = find_point('backward', step, 'a6', data_file, 'gt', df, row_middle, w2t)
# print(f"row_start = {row_start}")
return row_end-row_middle, row_middle-row_start, row_end-row_start
@ -95,13 +100,14 @@ def preparation(data_file, wb, w2t):
begin = int(row[1])
break
df = read_csv(data_file, sep=',', encoding='gbk', skip_blank_lines=False, header=begin - 1, on_bad_lines='warn')
# print(f"df = {df}")
low, high, cycle = get_cycle_info(data_file, df, df.index[-1]-110, 5, w2t)
return ws, df, low, high, cycle
def single_file_proc(ws, data_file, df, low, high, cycle, w2t):
_row = _row_lt = _row_gt = count = 1
_row = _row_lt = _row_gt = count = count_i = 1
_step = 5
_data = {}
row_max = df.index[-1]-100
@ -110,19 +116,29 @@ def single_file_proc(ws, data_file, df, low, high, cycle, w2t):
if count not in _data.keys():
_data[count] = []
_value = df.iloc[_row, 2]
_value = float(df.iloc[_row, 2])
if _value < 2:
_row_lt = find_point('forward', _step, 'c'+str(_row), data_file, 'lt', df, _row, w2t)
# print(f"_row_lt = {_row_lt}")
_start = int(_row_gt + (_row_lt - _row_gt - 50) / 2)
# print(f"_start = {_start}")
_end = _start + 50
# print(f"_end = {_end}")
# print("========================================\n")
value = df.iloc[_start:_end, 2].mean() + 3 * df.iloc[_start:_end, 2].std()
if value > 1:
w2t(f"{data_file} 文件第 {count} 轮 第 {count_i} 个数据可能有问题,需人工手动确认,确认有问题可删除,无问题则保留", 0, 0, 'orange')
_data[count].append(value)
count_i += 1
else:
_row_gt = find_point('forward', _step, 'c'+str(_row), data_file, 'gt', df, _row, w2t)
# print(f"_row_gt = {_row_gt}")
if _row_gt - _row_lt > cycle * 2:
count += 1
count_i = 1
_row = max(_row_gt, _row_lt)
# print(f"_row = {_row}")
for i in range(2, 10):
ws.cell(row=1, column=i).value = f"{i-1}次测试"

View File

@ -61,6 +61,7 @@ def run_rl(path, config_file, data_all, hr, md, w2t):
clibs.execution('state.switch_motor_on', hr, w2t, tab_name)
# 3. 开始运行程序
clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)
clibs.execution('rl_task.run', hr, w2t, tab_name, tasks=['current'])
_t_start = time()
while True:

0
main.py Normal file
View File