26 Commits

Author SHA1 Message Date
79797a3bdd 20240626
9. [aio.py] 修改了版本
10. [current.py] max/avg功能结束之前会将结果数据追加写入源文件,avg算法更改为average+3×std
11. [wavelogger.py] 算法更改为 average+3×std
2024-06-26 21:38:21 +08:00
7143a19fa1 v0.1.7.0(2024/06/26)-初步可用
1. [aio.py] 在detect_network函数中需改查询时间间隔是1s,在tabview_click中增加textbox配置normal的语句
2. [do_brake.py -> btn_functions.py] 新增执行相应函数,并在get_state函数中设置无示教器模式
3. [openapi.py] 新增sock_conn函数,并做连接时的异常处理,新增类参数w2t
4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件,参考https://github.com/TomSchimansky/CustomTkinter/issues/2296,实现修改tabview组件的字体大小,使用原生字体,同时将segmented button字体修改为原生,为了解决segmented button在禁用和启用时,屏幕抖动的问题,并将大小修改为16
5. [aio.py] 修改了segmented_button_callback的实现逻辑,使代码更简洁
6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作,使每次切换标签都会重新实例化,也就是每次都会重新连接,修复显示不正确的问题
7. [openapi.py] 新增了socket关闭的函数,并增加msg_id为None的处理逻辑
8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑
2024-06-26 19:54:51 +08:00
a75775c869 v0.1.7.0(2024/06/25)-未发布
1. [aio.py] 取消了在本文件中开启openapi线程的做法,并修改如下:
	- 通过包的方式导入其他模块
    - 使用current_path来规避文件路径问题
    - 声名了 self.hr 变量,用来接收openapi的实例化
    - 修改了对于segment button的错误调用
    - 设定progress bar的长度是10
    - 完善了segmented_button_callback函数
    - 在detect_network函数中增加heartbeat初始化
    - tabview_click函数中新增textbox清屏功能,以及实例化openapi,并做检测
2. [openapi.py] 取消了初始化中无限循环检测,因为阻塞了aio主界面进程!!!socket也无法多次连接!!!浪费了好多时间!!!很生气!!!!
	- 通过tabview切换来实现重新连接,并保留了异常处理部分
    - 将所有的 __xxxx 函数都替换成 xxxx 函数,去掉了 __
    - 使用current_path来规避文件路径问题
3. [do_brake.py] 初步完成了机器状态收集的功能,还需要完善
    - 使用current_path来规避文件路径问题
    - 新增validate_resp函数,校验数据
    - 完善了调用接口
2024-06-25 21:40:27 +08:00
6604f0ba06 20240624
11. [openapi.py] 建联部分做容错处理,并将读写文件做自适应处理
12. [aio.py] 将读写文件做自适应处理,引入openapi模块并生成实例,做心跳检测,将socket超时时间修改为3s
2024-06-24 19:22:56 +08:00
a4009eb17c 20240623
8. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例
9. [aio.py] 完成detect_network,并在main函数开启线程
10. 将templates文件夹移动到assets内
2024-06-23 20:18:41 +08:00
295894a843 2024-06-12
4. [openapi.py] 使用 int.to_bytes 和 int.from_bytes 替换 binascii 模块的功能
5. [aio.py] 修改了Data Process中初始化的动作,使得初始化时的状态统一成程序刚启动时的样子
6. [aio.py] 增加了tabview的点击行为函数,每次点击tab都会初始化
7. [aio.py] 增加了Automatic Test界面元素,包括如下,并完成了功能框架的搭建
- 标签:文件/角速度/减速比
- 按钮:急停及恢复
- 输入框:文件路径/角速度/减速比
- OptionMenu:负载
- 进度条
2024-06-21 16:53:13 +08:00
ff4b0cc04c 20240621
2. [openapi.py] 修改了封包的规则,使之更加明晰,封包操作没有实现分包功能,目前看实际场景用不到
3. [openapi.py] 定义 MAX_FRAME_SIZE 常量(1024),替换socket接收以及响应数据处理相关部分
4. [openapi.py] ʹÓ int.to_bytes ºÍint.from_bytes Ì»» binascii ģ¿é¹¦Ä
2024-06-21 08:13:51 +08:00
8975d8a37c 完善了解包分帧的情况,现在测试基本无问题 2024-06-20 20:48:03 +08:00
14f269b570 中间版本,对于解封超过1024的消息有问题,暂存历史 2024-06-20 17:15:54 +08:00
2917f4ae97 [openapi.py] 初步搭建起框架,完成了新老协议的封包/解包/异步采集日志的操作(未充分测试) 2024-06-20 12:49:41 +08:00
d04d90f1a7 Merge branch 'main' of gitea.rustle.cc:gitea/rokae into APIs
keep the same content with main
2024-06-19 15:21:46 +08:00
284dabee76 re-organize file storage structure, and add API related files, just framework 2024-06-19 15:20:43 +08:00
0646ae13de Merge branch 'main' of gitea.rustle.cc:gitea/rokae into APIs 2024-06-19 07:41:52 +08:00
c3dbb2cff0 v0.1.6.3(2024/06/18)
1. [current.py] 适配电机电流中速度使用hw_joint_vel_feedback的数据,取消对device_servo_vel_feedback的支持,后续所有涉及到速度相关的数据均已前者为准,现已完成对单轴和场景的适配

> !!WARNING:目前版本的电机电流程序还支持DriverMaster采集的数据处理,等明确后,将不再支持,也即所有的电机电流数据(工业+协作),都是用诊断曲线来采集
2024-06-18 20:42:48 +08:00
780fed29af fix merging conflicts 2024-06-17 15:11:54 +08:00
4ba8af842c unused 2024-06-17 15:07:28 +08:00
a0fe9b5939 fix merging 2024-06-16 14:49:46 +08:00
2f2f0d430d fix merging 2024-06-16 14:30:12 +08:00
78a79e4aa0 v0.1.6.2(2024/06/16)
1. [current.py] 修改了max/avg相关功能中对于返回值的处理逻辑,并在输出框以行的形式打印出来
2024-06-16 14:25:41 +08:00
346379ec1a re-commit 2024-06-16 14:09:26 +08:00
215e312480 iv0.1.6.1(2024/06/16)
1. [wavelogger.py]: bugfix single_file_proc函数中,修改_start起始点的计算逻辑
2. [wavelogger.py]: bugfix find_point函数中,当判断条件为临界值 2.0 的时候,针对forward和backward两种情况,对row_target做与判断逻辑相同的处理,目的是避免形成死循环
2024-06-16 14:07:36 +08:00
5ba3c76386 modify vers info 2024-06-15 19:32:57 +08:00
224af36bfe change to new version because of new function added 2024-06-15 19:27:31 +08:00
95071f363b fix merging 2024-06-15 19:17:53 +08:00
fc30fcde80 v0.1.5.4(2024/06/15)
[aio.py]: 新增wavelogger处理界面
[wavelogger.py]: 新增精度数据处理模块
2024-06-15 19:14:34 +08:00
a1b45bf941 [init] for apis 2024-06-12 14:56:50 +08:00
25 changed files with 1152 additions and 250 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@ aio/.idea/
aio/code/__pycache__/
aio/package/
aio/venv
aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/

View File

@ -1,90 +1,101 @@
### 程序功能
### 一、程序功能
自动化测试数据处理工具,减少人工处理时长,提高测试数据处理的效率和准确度:
1. 制动数据单轴数据处理3min以内
2. 电机电流数据全部轴数据处理1min以内
3. ISO激光数据整理1min以内
### 使用方法
1. 制动数据,单轴数据处理 3min 以内
2. 电机电流数据,全部轴数据处理 1min 以内
3. ISO 激光数据整理1min 以内
4. wavelogger 波形处理,几乎不花费时间
点击可执行程序 AIO.exe然后选择功能根据提示填写必要参数点击运行即可
---
### 第三方库
### 二、使用方法
点击可执行程序 AIO.exe然后选择功能根据提示填写必要参数红色标签提示是必填项蓝色标签提示是可选项然后点击运行即可
> 需要注意的是,可选项填写的不合适,有可能导致数据错误,或者无法运行!!
---
### 三、第三方库
```text
参考requirements.txt
```
### 打包方法
```
pyinstaller.exe -F --version-file file_version_info.txt -i .\icon.ico .\aio.py -p .\brake.py -p .\current.py
pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/AppData/Local/Programs/Python/Python312/Lib/site-packages/customtkinter;customtkinter/" --version-file ..\assets\file_version_info.txt -i ..\assets\icon.ico ..\code\aio.py -p ..\code\brake.py -p ..\code\iso.py -p ..\code\current.py
参考 requirements.txt
```
---
### 注意事项
### 四、打包方法
#### 制动数据
```
pyinstaller.exe -F --version-file file_version_info.txt -i .\icon.ico .\aio.py -p .\brake.py -p .\current.py
pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/AppData/Local/Programs/Python/Python312/Lib/site-packages/customtkinter;customtkinter/" --version-file ..\assets\file_version_info.txt -i ..\assets\icon.ico ..\code\aio.py -p ..\code\data_process\brake.py -p ..\code\data_process\iso.py -p ..\code\data_process\current.py -p ..\code\data_process\wavelogger.py
```
---
### 五、注意事项
#### 1) 制动数据
```text
1. 数据文件存储存储规则
数据文件,就是我们拍急停的时候,采集到的 .data 文件,方向拍三次急停,会采集到三个 .data 文件,存储在同一个文件夹内,即每组(三个 .data 文件)文件必须存储在同一个文件夹内,数据文件的命名无要求
数据文件,就是我们拍急停的时候,采集到的 .data 文件,根据规则选择某个方向拍三次急停,会采集到三个 .data 文件,存储在同一个文件夹内,即每组(三个 .data 文件)文件必须存储在同一个文件夹内,数据文件的命名无要求
2. 文件夹命名规则
采集到的 .data 文件没有命名要求,但是对于文件夹的命名是有要求的,必须是如下格式:
dXX_speedXX_reachXX 或者 loadXX_reachXX_speedXX
采集到的 .data 文件没有命名要求,但是对于其存储的文件夹的命名是有要求的,必须是如下格式:
reachXX_loadXX_speedXX
XX代表不同条件下的测试数值比如
d100_speed33_reach66,指的是,负载100%速度33%臂展66%
reach100_load66_speed33,指的是,臂展100%负载66%速度33%
3. 结果文件命名规则
所谓结果文件,就是处理数据的那个 excle 文件,该文件名字的前缀必须是 loadXX_XXXXXXXXX.xlsx比如
load33_自研_制动性能测试.xlsx
load66_自研_制动性能测试.xlsx
load100_自研_制动性能测试.xlsx
所谓结果文件,就是处理数据的那个 excle 文件,该文件名字的前缀必须是 reachXX_XXXXXXXXX.xlsx比如
reach33_自研_制动性能测试.xlsx
reach66_自研_制动性能测试.xlsx
reach100_自研_制动性能测试.xlsx
!!结果文件可以是没有数据的,也可以是之前有数据的,只要保证第 6 点中的那几个数据准确即可
!!结果文件可以是没有数据的,也可以是之前有数据的,只要保证输入参数的正确性即可
4. 数据存储的组织结
..../j1/load100_speed33_reach100
..../j1/load100_speed66_reach100
..../j1/reach100_load66_speed33
..../j1/reach100_load66_speed66
....
..../j1/load100_speed100_reach100
..../j1/load100_speed33_reach100/2024_05_16_09_18_52.data
..../j1/load100_speed33_reach100/2024_05_16_09_19_52.data
..../j1/load100_speed33_reach100/2024_05_16_09_20_52.data
..../j1/load33_自研_制动性能测试.xlsx
..../j1/load66_自研_制动性能测试.xlsx
..../j1/load100_自研_制动性能测试.xlsx
..../j1/reach100_load33_speed33
..../j1/reach100_load100_speed33/2024_05_16_09_18_52.data
..../j1/reach100_load100_speed33/2024_05_16_09_19_52.data
..../j1/reach100_load100_speed33/2024_05_16_09_20_52.data
..../j1/reach33_自研_制动性能测试.xlsx
..../j1/reach66_自研_制动性能测试.xlsx
..../j1/reach100_自研_制动性能测试.xlsx
5. 文件的打开与关闭
a. 在执行程序之前,需要关闭所有相关 excle 文件
b. 在执行程序之中,不允许打开相关 excle 文件
c. 在执行程序之后,需要逐个打开结果文件,并保存一次
6. 参数一致性检查
执行程序前,需要确定 configs.xlsx 中设定的减速比/最大角速度/额定电流的值是正确的
6. 数据准确性检查
执行程序之后,可以在日志输出框中看到全部文件的处理过程,对于有问题的文件,会用特殊颜色进行标识,需要注意观察
7. 数据准确性检查
执行完程序之后需要对结果文件的数据准确性做核对通过我自己的数据观察误差基本在10ms以内也即10个数据点误差较大的情况可自行调整
8. 其他
程序运行主要的耗时集中在打开,保存和关闭结果文件,第一次打开的时候会比较慢,是因为 excel 在做首次公式的计算保存关闭之后再打开会比较快一些另外如果在运行出错并重复运行程序的时候无响应或者出现异常请打开任务管理器关闭一切和excel相关的进程重新运行即可
7. 其他
程序运行主要的耗时集中在打开,保存和关闭结果文件,第一次打开的时候会比较慢,另外还需要注意采集的数据长度和结果文件中预设的数据长度是否一致,若采集的数据长度大于预设的数据长度,则需要补齐数据
```
#### 电机电流
#### 2) 电机电流
1. 单独使用max/avg功能时对于文件命名同第二点,存放数据的文件夹,只允许有 .data 或者 .csv 文件,且每次只能处理rc相同的轴的数据
2. cycle功能处理单文件单轴数据,可以批量处理所有轴,但要确保遵守如下规则:
a. 数据整理文件以 .xlsx 为后缀
b. 其他文件
A. 单轴j1_xxxxx.data/csv
B. 保持j1_hold_xxxx.data/csv
C. 所有文件放在同一个文件夹即可
d. 界面输入rc参数时需要输入所有轴的数据
1. 单独使用 max/avg 功能时,要求文件命名以 "jx_" 开头,例如 j1_2024_06_18_09_09_11.data,只允许有 .data 或者 .csv 文件,可同时处理所有轴的数据
2. cycle 功能支持处理单轴数据以及场景电机电流的数据,可以批量处理所有轴,但要确保遵守如下规则:
- 包含电机电流结果汇总文件excel
- 单轴文件jx_xxxxx.data/csv
- 保持电流jx_hold_xxxx.data/csv
- 场景文件factory_53.8_2024_06_18_09_01_26.data需手动拆分
- 所有文件放在同一个文件夹即可
- 界面输入rc参数时需要输入所有轴的数据,即使只处理个别轴的数据
#### ISO数据
> 程序运行主要的耗时集中在打开,保存和关闭结果文件
> 需要注意采集的数据长度和结果文件中预设的数据长度是否一致,若采集的数据长度大于预设的数据长度,则需要补齐数据
> 和制动数据统一后续将仅支持对hw_joint_vel_feedback的解析和处理不再支持device_servo_vel_feedback
#### 3) iso 数据
所有文件放在同一个文件夹即可,命名规则如下:
a. ISO.pdf
@ -92,6 +103,15 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
c. ISO-V100.pdf
d. iso-results.xlsx
> 目前仅能处理 6 轴数据
#### 4) wavelogger 波形数据
1. 需要提前将 .xdt 波形数据转换成 .csv 文件
2. 组织目录下只允许有 .csv 文件,对文件夹无要求
3. 运行结束后,会生成 result.xlsx 文件,结果按照 .csv 文件名存放
4. 采集数据时,不同轮次数据时间间隔最好大于 2 倍的周期时间,否则会出现采集的轮数不正确的情况,但数据是完整的
#### 其他
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
a. 运行 `pip show customtkinter`,获取到库的路径
@ -180,32 +200,110 @@ v0.1.4(2024/06/06)
6. 支持工业/协作两条产品线的电机电流数据处理包括单轴场景max/avg计算
v0.1.5(2024/06/12)
1. [aio.py]: 主界面切换不同功能时保持placehold一致
2. [brake.py]: 由于制动采集模板和内容的更改,适配了新的数据,更新了算法
3. [aio.py]: 新增tabview组件区分数据处理和自动化测试功能
4. [aio.py]: 重新调整界面配色
5. [aio.py]: 修改了write2textbox函数定制化显示每一行的颜色针对每一行可自定义输出内容颜色
6. [brake.py/iso.py/current.py]: 由于第 5 点的更改,同时修改了其他文件相关引用的部分
7. [aio.py]: 更改label/entry/optionmenu等控件的生成方式使用循环实现更加简洁和容易维护
8. [aio.py]: 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小
9. [aio.py]: 修改menu_main->menu_main_dpmenu_sub->menu_sub_dp为后续其他tab功能按钮做扩展是针对第三点做出的相应调整
10. [layout.xlsx]: 添加了各个功能的流程图
1. [aio.py] 主界面切换不同功能时保持placehold一致
2. [brake.py] 由于制动采集模板和内容的更改,适配了新的数据,更新了算法
3. [aio.py] 新增tabview组件区分数据处理和自动化测试功能
4. [aio.py] 重新调整界面配色
5. [aio.py] 修改了write2textbox函数定制化显示每一行的颜色针对每一行可自定义输出内容颜色
6. [brake.py/iso.py/current.py] 由于第 5 点的更改,同时修改了其他文件相关引用的部分
7. [aio.py] 更改label/entry/optionmenu等控件的生成方式使用循环实现更加简洁和容易维护
8. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小
9. [aio.py] 修改menu_main->menu_main_dpmenu_sub->menu_sub_dp为后续其他tab功能按钮做扩展是针对第三点做出的相应调整
10. [layout.xlsx] 添加了各个功能的流程图
v0.1.5.1(2024/06/12)
[current.py]: 修改cycle功能中数据清理范围为70000行并将threshold从2调整为5
[current.py]: 修改位置超限提示,使更清楚了解问题原因
[current.py]: 修改find_point函数中错误提示增加定位信息
[README.md]: 精简打包命令
[requirements.txt]: 新增必要库配置文件
1. [current.py] 修改cycle功能中数据清理范围为70000行并将threshold从2调整为5
2. [current.py] 修改位置超限提示,使更清楚了解问题原因
3. [current.py] 修改find_point函数中错误提示增加定位信息
4. [README.md] 精简打包命令
5. [requirements.txt] 新增必要库配置文件
v0.1.5.2(2024/06/13)
[brake.py/aio.py]: 将sto修改为estop
[brake.py]: 修改了速度计算逻辑新版本的vel列数据遵循如下规则av = vel * 180 / pi根据av再计算speed
[brake.py]: 将threshold修改为常量50
[brake.py]: 提高了输出提示语的明确性,删除了不必要的省略号
[brake.py]: 更正了之前的数据copy错误重新优化了estop处是否达到指定百分比的判定逻辑
1. [brake.py/aio.py]: 将sto修改为estop
2. [brake.py] 修改了速度计算逻辑新版本的vel列数据遵循如下规则av = vel * 180 / pi根据av再计算speed
3. [brake.py] 将threshold修改为常量50
4. [brake.py] 提高了输出提示语的明确性,删除了不必要的省略号
5. [brake.py] 更正了之前的数据copy错误重新优化了estop处是否达到指定百分比的判定逻辑
v0.1.5.3(2024/06/14)
[aio.py]: 修改w_param为84适配14寸电脑屏幕
[brake.py]: 将判定合规逻辑修改为角速度超过指定角速度的95%
[README.md]: 稍作修改,包括打包方式,功能特性等
1. [aio.py] 修改w_param为84适配14寸电脑屏幕
2. [brake.py] 将判定合规逻辑修改为角速度超过指定角速度的95%
3. [README.md] 稍作修改,包括打包方式,功能特性等
v0.1.6.0(2024/06/15)
1. [aio.py] 新增wavelogger处理界面
2. [wavelogger.py] 新增精度数据处理模块
v0.1.6.1(2024/06/16)
1. [wavelogger.py] bugfix single_file_proc函数中修改_start起始点的计算逻辑
2. [wavelogger.py] bugfix find_point函数中当判断条件为临界值 2.0 的时候针对forward和backward两种情况对row_target做与判断逻辑相同的处理目的是避免形成死循环
v0.1.6.2(2024/06/16)
1. [current.py] 修改了max/avg相关功能中对于返回值的处理逻辑并在输出框以行的形式打印出来
v0.1.6.3(2024/06/18)
1. [current.py] 适配电机电流中速度使用hw_joint_vel_feedback的数据取消对device_servo_vel_feedback的支持后续所有涉及到速度相关的数据均已前者为准现已完成对单轴和场景的适配
> WARNING目前版本的电机电流程序还支持DriverMaster采集的数据处理等明确后将不再支持也即所有的电机电流数据工业+协作),都是用诊断曲线来采集
v0.1.7.0(2024/06/19)-未发布
1. [openapi.py] 初步搭建起框架,完成了新老协议的封包/解包/异步采集日志的操作(未充分测试,但基本无问题)
2. [openapi.py] 修改了封包的规则,使之更加明晰,封包操作没有实现分包功能,目前看实际场景用不到
v0.1.7.0(2024/06/21)-未发布
1. [openapi.py] 定义 MAX_FRAME_SIZE 常量1024替换socket接收以及响应数据处理相关部分
2. [openapi.py] 使用 int.to_bytes 和 int.from_bytes 替换 binascii 模块的功能
3. [aio.py] 修改了Data Process中初始化的动作使得初始化时的状态统一成程序刚启动时的样子
v0.1.7.0(2024/06/23)-未发布
1. [aio.py] 增加了tabview的点击行为函数每次点击tab都会初始化
2. [aio.py] 增加了Automatic Test界面元素包括如下并完成了功能框架的搭建
- 标签:文件/角速度/减速比
- 按钮:急停及恢复
- 输入框:文件路径/角速度/减速比
- OptionMenu负载
- 进度条
3. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例
4. [aio.py] 完成detect_network并在main函数开启线程
5. 将templates文件夹移动到assets内
v0.1.7.0(2024/06/24)-未发布
1. [openapi.py] 建联部分做容错逻辑,并将读写文件做自适应处理
2. [aio.py] 将读写文件做自适应处理引入openapi模块并生成实例做心跳检测将socket超时时间修改为3s
v0.1.7.0(2024/06/25)-未发布
1. [aio.py] 取消了在本文件中开启openapi线程的做法并修改如下
- 通过包的方式导入其他模块
- 使用current_path来规避文件路径问题
- 声名了 self.hr 变量用来接收openapi的实例化
- 修改了对于segment button的错误调用
- 设定progress bar的长度是10
- 完善了segmented_button_callback函数
- 在detect_network函数中增加heartbeat初始化
- tabview_click函数中新增textbox清屏功能以及实例化openapi并做检测
2. [openapi.py] 取消了初始化中无限循环检测因为阻塞了aio主界面进程socket也无法多次连接浪费了好多时间很生气
- 通过tabview切换来实现重新连接并保留了异常处理部分
- 将所有的 __xxxx 函数都替换成 xxxx 函数,去掉了 __
- 使用current_path来规避文件路径问题
3. [do_brake.py] 初步完成了机器状态收集的功能,还需要完善
- 使用current_path来规避文件路径问题
- 新增validate_resp函数校验数据
- 完善了调用接口
> **关于HMI接口**
> - 封包解包顺序:帧长度二字节/包长度四字节/协议二字节/预留二字节,\x04\x00:\x00\x00\tR:\x02:\x00
> - 帧长度和包长度没有必然关系单帧的时候是帧长度减去包长度等于6包长度指的是所有内容的长度
> - HMI内部每次发送1024个字节进行分包内容长度规则是第一帧1024-6=1018(帧大小减去包头长度),第二帧(包含)及之后的帧,帧长度即是数据长度
v0.1.7.0(2024/06/26)-初步可用
1. [aio.py] 在detect_network函数中需改查询时间间隔是1s在tabview_click中增加textbox配置normal的语句
2. [do_brake.py -> btn_functions.py] 新增执行相应函数并在get_state函数中设置无示教器模式
3. [openapi.py] 新增sock_conn函数并做连接时的异常处理新增类参数w2t
4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小使用原生字体同时将segmented button字体修改为原生为了解决segmented button在禁用和启用时屏幕抖动的问题并将大小修改为16
5. [aio.py] 修改了segmented_button_callback的实现逻辑使代码更简洁
6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作使每次切换标签都会重新实例化也就是每次都会重新连接修复显示不正确的问题
7. [openapi.py] 新增了socket关闭的函数并增加msg_id为None的处理逻辑
8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑
9. [aio.py] 修改了版本
10. [current.py] max/avg功能结束之前会将结果数据追加写入源文件avg算法更改为average+3×std
11. [wavelogger.py] 算法更改为 average+3×std

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 1, 5, 3),
prodvers=(0, 1, 5, 3),
filevers=(0, 1, 7, 0),
prodvers=(0, 1, 7, 0),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.5.3 (2024-06-14)'),
StringStruct('FileVersion', '0.1.7.0 (2024-06-26)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.5.3 (2024-06-14)')])
StringStruct('ProductVersion', '0.1.7.0 (2024-06-26)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]

Binary file not shown.

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "controller.heart"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "device.get_params"
}

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_state"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_tp_mode"
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.set_tp_mode",
"data": {
"tp_mode": "with"
}
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_auto"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_manual"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_off"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_on"
}

View File

@ -1 +1 @@
0.1.5.3 @ 06/14/2024
0.1.7.0 @ 06/26/2024

1
aio/code/__init__.py Normal file
View File

@ -0,0 +1 @@
__all__ = ['automatic_test', 'data_process']

View File

@ -1,26 +1,30 @@
from os.path import exists
import sys
import tkinter
from os.path import exists, dirname
from os import getcwd
from threading import Thread
import tkinter.messagebox
import customtkinter
import brake, current, iso
from time import time, strftime, localtime
from time import time, strftime, localtime, sleep
from urllib.request import urlopen
from socket import setdefaulttimeout
from data_process import *
from automatic_test import *
current_path = dirname(__file__)
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(10)
setdefaulttimeout(3)
# global vars
btns = {
btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'},
'log': {'btn': '', 'row': 3, 'text': '保存日志'},
'end': {'btn': '', 'row': 4, 'text': '结束运行'},
}
widgits = {
widgits_dp = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'av': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '角速度'},
'rc': {'label': '', 'entry': '', 'row': 2, 'col': 4, 'text': '额定电流'},
@ -39,6 +43,28 @@ widgits = {
'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'},
'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'},
}
widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
'av1': {'label': '', 'entry': '', 'row': 3, 'col': 2, 'text': '角速度'},
'av2': {'label': '', 'entry': '', 'row': 3, 'col': 4, 'text': '角速度'},
'av3': {'label': '', 'entry': '', 'row': 3, 'col': 6, 'text': '角速度'},
'av4': {'label': '', 'entry': '', 'row': 3, 'col': 8, 'text': '角速度'},
'av5': {'label': '', 'entry': '', 'row': 3, 'col': 10, 'text': '角速度'},
'av6': {'label': '', 'entry': '', 'row': 3, 'col': 12, 'text': '角速度'},
'rc1': {'label': '', 'entry': '', 'row': 4, 'col': 2, 'text': '额定电流'},
'rc2': {'label': '', 'entry': '', 'row': 4, 'col': 4, 'text': '额定电流'},
'rc3': {'label': '', 'entry': '', 'row': 4, 'col': 6, 'text': '额定电流'},
'rc4': {'label': '', 'entry': '', 'row': 4, 'col': 8, 'text': '额定电流'},
'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'},
'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'},
'rr1': {'label': '', 'entry': '', 'row': 5, 'col': 2, 'text': '额定转速'},
'rr2': {'label': '', 'entry': '', 'row': 5, 'col': 4, 'text': '额定转速'},
'rr3': {'label': '', 'entry': '', 'row': 5, 'col': 6, 'text': '额定转速'},
'rr4': {'label': '', 'entry': '', 'row': 5, 'col': 8, 'text': '额定转速'},
'rr5': {'label': '', 'entry': '', 'row': 5, 'col': 10, 'text': '额定转速'},
'rr6': {'label': '', 'entry': '', 'row': 5, 'col': 12, 'text': '额定转速'},
}
class App(customtkinter.CTk):
@ -46,6 +72,7 @@ class App(customtkinter.CTk):
super().__init__()
self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold")
self.w_param = 84
self.hr = None
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
@ -53,7 +80,7 @@ class App(customtkinter.CTk):
self.geometry("1200x550+30+30")
self.protocol("WM_DELETE_WINDOW", self.func_end_callback)
self.config(bg='#E9E9E9')
self.grid_rowconfigure(5, weight=1)
self.grid_rowconfigure(6, weight=1)
self.grid_columnconfigure((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), weight=1)
self.minsize(1200, 550)
# =====================================================================
@ -64,54 +91,89 @@ class App(customtkinter.CTk):
self.label_logo = customtkinter.CTkLabel(self.frame_func, text="Rokae AIO", height=60, font=customtkinter.CTkFont(family="Segoe Script Bold", size=24, weight="bold"), text_color="#4F4F4F")
self.label_logo.grid(row=0, column=0, padx=15, pady=15)
# create buttons
for func in btns:
btns[func]['btn'] = customtkinter.CTkButton(self.frame_func, corner_radius=10, text=btns[func]['text'], fg_color='#4F4F4F', font=self.my_font)
btns[func]['btn'].grid(row=btns[func]['row'], column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
btns['start']['btn'].configure(command=lambda: self.thread_it(self.func_start_callback))
btns['check']['btn'].configure(command=lambda: self.thread_it(self.func_check_callback))
btns['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
for func in btns_func:
btns_func[func]['btn'] = customtkinter.CTkButton(self.frame_func, corner_radius=10, text=btns_func[func]['text'], fg_color='#4F4F4F', font=self.my_font)
btns_func[func]['btn'].grid(row=btns_func[func]['row'], column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
btns_func['start']['btn'].configure(command=lambda: self.thread_it(self.func_start_callback))
btns_func['check']['btn'].configure(command=lambda: self.thread_it(self.func_check_callback))
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.5.3\nDate: 06/14/2024", font=self.my_font, text_color="#4F4F4F")
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.7.0\nDate: 06/26/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================
# create tabviews
self.tabview = customtkinter.CTkTabview(self, width=10000, height=100, anchor='w', fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD')
self.tabview = customtkinter.CTkTabview(self, width=10000, height=100, anchor='w', fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', command=self.tabview_click)
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process")
self.tabview.add("Automatic Test")
# create main menu
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["INIT", "brake", "current", "iso"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=5)
# create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
self.menu_main_dp.set("Start Here!")
# create sub menu
# create sub menu for data process
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'))
# =====================================================================
# create widgits
for widgit in widgits:
# create main menu for automatic test
self.menu_main_at = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), values=["init", "brake", "current"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_at.grid(row=1, column=1, sticky='we', padx=5, pady=5)
self.menu_main_at.set("Start Here!")
# For data process tab START =====================================================================
# create widgits_dp
for widgit in widgits_dp:
if widgit == 'path':
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widgit.upper()}', font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5)
widgits[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=670, placeholder_text=widgits[widgit]['text'], font=self.my_font)
widgits[widgit]['entry'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits[widgit]['entry'].configure(state='disabled')
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widgit.upper()}', font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=10)
widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=670, placeholder_text=widgits_dp[widgit]['text'], font=self.my_font)
widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5)
widgits[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgits[widgit]['text']}", font=self.my_font)
widgits[widgit]['entry'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits[widgit]['entry'].configure(state='disabled')
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgits_dp[widgit]['text']}", font=self.my_font)
widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']:
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5)
widgits[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
widgits[widgit]['optionmenu'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits[widgit]['optionmenu'].configure(state='disabled')
# =====================================================================
widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits_dp[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
widgits_dp[widgit]['optionmenu'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
# For data process tab END =====================================================================
# For automatic test tab START =====================================================================
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.progressbar.configure(mode="determinnate", width=10)
self.progressbar.start()
# create widgits_at
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f"{widgit.upper()}", font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=self.w_param, placeholder_text=f"{widgits_at[widgit]['text']}", font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=5, column=1, rowspan=2, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.configure(state='disabled')
# =====================================================================
# version check
@ -124,6 +186,7 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message=msg)
except:
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def thread_it(self, func, *args):
""" 将函数打包进线程 """
@ -131,78 +194,158 @@ class App(customtkinter.CTk):
self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
self.myThread.start()
def segmented_button_callback(self):
_btn_funcs = {'get_state': '机器状态', 'warning_info': '告警信息', '3': '4', '5': '6', '7': '8'}
value = self.seg_button.get()
self.seg_button.configure(state='disabled')
self.textbox.configure(state='normal')
# self.tabview.configure(state='disabled')
self.textbox.delete(index1='1.0', index2='end')
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
connection_state = f_h.read().strip()
if connection_state == '0' and value != '功能切换':
self.write2textbox("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 50, 'red')
else:
for _func in _btn_funcs:
if _btn_funcs[_func] == value:
btn_functions.main(self.hr, _func, self.write2textbox)
break
self.seg_button.configure(state='normal')
self.textbox.configure(state='disable')
# self.tabview.configure(state='normal')
def detect_network(self):
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
while True:
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
pb_color = 'green' if f_h.read().strip() == '1' else 'red'
self.progressbar.configure(progress_color=pb_color)
sleep(1)
def tabview_click(self):
self.initialization()
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
tab_name = self.tabview.get()
if tab_name == 'Data Process':
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal')
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
self.textbox.configure(state='normal')
try:
self.hr.close_sock()
self.hr = openapi.HmiRequest(self.write2textbox)
except:
self.hr = openapi.HmiRequest(self.write2textbox)
# if connection_state == '0' or self.hr is None:
# self.textbox.configure(state='normal')
# self.hr = openapi.HmiRequest(self.write2textbox)
# self.textbox.configure(state='disabled')
def initialization(self):
for widgit in widgits:
tab_name = self.tabview.get()
if tab_name == 'Data Process':
for widgit in widgits_dp:
if widgit in ['path', 'av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
widgits[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits[widgit]['entry'].configure(placeholder_text=widgits[widgit]['text'], state='disabled')
widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']:
widgits[widgit]['label'].configure(text=f'{widgit.upper()}', text_color="black")
widgits[widgit]['optionmenu'].configure(state='disabled')
widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color="black")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
self.menu_sub_dp.grid_forget()
self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel']:
widgits_at[widgit]['optionmenu'].configure(state='normal')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
def func_main_callback(self, func_name):
self.initialization()
tab_name = self.tabview.get()
if tab_name == 'Data Process':
if func_name == 'brake':
for widgit in widgits:
for widgit in widgits_dp:
if widgit in ['path', 'av', 'rr']:
widgits[widgit]['label'].configure(text_color='red')
widgits[widgit]['entry'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['axis', 'vel', 'trq', 'estop']:
widgits[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color="red")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
elif func_name == 'current':
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["max", "avg", "cycle"], font=self.my_font, button_color='red', fg_color='green', command=self.func_sub_callback)
self.menu_sub_dp.grid(row=2, column=1, sticky='we', padx=5, pady=5)
self.menu_sub_dp.set("--select--")
self.menu_sub_dp.configure(text_color='yellow')
for widgit in widgits:
for widgit in widgits_dp:
if widgit in ['path', 'rc', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
color = 'blue' if widgit == 'rc' else 'red'
widgits[widgit]['label'].configure(text_color=color)
widgits[widgit]['entry'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color=color)
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['trqh',]:
widgits[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal')
elif func_name == 'iso':
for widgit in widgits:
widgits_dp[widgit]['label'].configure(text_color="red")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
elif func_name == 'iso' or func_name == 'wavelogger':
for widgit in widgits_dp:
if widgit in ['path',]:
widgits[widgit]['label'].configure(text_color='red')
widgits[widgit]['entry'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color='red')
widgits_dp[widgit]['entry'].configure(state='normal')
else:
self.initialization()
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
pass
def func_sub_callback(self, func_name):
if func_name == "max":
for widgit in widgits:
for widgit in widgits_dp:
if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='black')
widgits[widgit]['entry'].configure(state='disabled')
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color='black')
widgits[widgit]['optionmenu'].configure(state='disabled')
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
elif func_name == 'avg':
for widgit in widgits:
for widgit in widgits_dp:
if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='black')
widgits[widgit]['entry'].configure(state='disabled')
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color='black')
widgits[widgit]['optionmenu'].configure(state='disabled')
widgits_dp[widgit]['label'].configure(text_color='black')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
elif func_name == 'cycle':
for widgit in widgits:
for widgit in widgits_dp:
if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='blue')
widgits[widgit]['entry'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color='blue')
widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal')
widgits_dp[widgit]['label'].configure(text_color="red")
widgits_dp[widgit]['optionmenu'].configure(state='normal')
def write2textbox(self, text, wait=0, exitcode=0, color='blue'):
self.textbox.tag_add(color, 'insert', 'end')
@ -237,15 +380,17 @@ class App(customtkinter.CTk):
return True
def check_param(self):
tab_name = self.tabview.get()
if tab_name == 'Data Process':
func_name = self.menu_main_dp.get()
if func_name == 'brake':
path = widgits['path']['entry'].get().strip()
av = widgits['av']['entry'].get().strip('- ')
rr = widgits['rr']['entry'].get().strip('- ')
axis = widgits['axis']['optionmenu'].get()
vel = widgits['vel']['optionmenu'].get()
trq = widgits['trq']['optionmenu'].get()
estop = widgits['estop']['optionmenu'].get()
path = widgits_dp['path']['entry'].get().strip()
av = widgits_dp['av']['entry'].get().strip('- ')
rr = widgits_dp['rr']['entry'].get().strip('- ')
axis = widgits_dp['axis']['optionmenu'].get()
vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits_dp['trq']['optionmenu'].get()
estop = widgits_dp['estop']['optionmenu'].get()
c1 = exists(path)
c2 = self.is_float('required', av, rr)
@ -257,19 +402,19 @@ class App(customtkinter.CTk):
return 0, 0
# =======================================================
elif func_name == 'current':
path = widgits['path']['entry'].get().strip()
rc = widgits['rc']['entry'].get().strip('- ')
rpm = widgits['rpm']['entry'].get().strip()
dur = widgits['dur']['entry'].get().strip()
rc1 = widgits['rc1']['entry'].get().strip()
rc2 = widgits['rc2']['entry'].get().strip()
rc3 = widgits['rc3']['entry'].get().strip()
rc4 = widgits['rc4']['entry'].get().strip()
rc5 = widgits['rc5']['entry'].get().strip()
rc6 = widgits['rc6']['entry'].get().strip()
vel = widgits['vel']['optionmenu'].get()
trq = widgits['trq']['optionmenu'].get()
trqh = widgits['trqh']['optionmenu'].get()
path = widgits_dp['path']['entry'].get().strip()
rc = widgits_dp['rc']['entry'].get().strip('- ')
rpm = widgits_dp['rpm']['entry'].get().strip()
dur = widgits_dp['dur']['entry'].get().strip()
rc1 = widgits_dp['rc1']['entry'].get().strip()
rc2 = widgits_dp['rc2']['entry'].get().strip()
rc3 = widgits_dp['rc3']['entry'].get().strip()
rc4 = widgits_dp['rc4']['entry'].get().strip()
rc5 = widgits_dp['rc5']['entry'].get().strip()
rc6 = widgits_dp['rc6']['entry'].get().strip()
vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits_dp['trq']['optionmenu'].get()
trqh = widgits_dp['trqh']['optionmenu'].get()
sub = self.menu_sub_dp.get()
c1 = exists(path)
@ -295,27 +440,44 @@ class App(customtkinter.CTk):
return 0, 0
# =======================================================
elif func_name == 'iso':
path = widgits['path']['entry'].get().strip()
path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path)
if c1:
return 3, path
else:
return 0, 0
# =======================================================
elif func_name == 'wavelogger':
path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path)
if c1:
return 4, path
else:
return 0, 0
# =======================================================
else:
return 0, 0
elif tab_name == 'Automatic Test':
func_name = self.menu_main_at.get()
if func_name == 'brake':
pass
elif func_name == 'current':
pass
def func_start_callback(self):
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main}
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main}
if flag == 1:
func_dict[flag](path=args[0], av=args[1], rr=args[2], axis=args[3], vel=args[4], trq=args[5], estop=args[6], w2t=self.write2textbox)
elif flag == 2:
func_dict[flag](path=args[0], sub=args[1], rcs=args[2], vel=args[3], trq=args[4], trqh=args[5], dur=args[6], rpm=args[7], w2t=self.write2textbox)
elif flag == 3:
func_dict[flag](path=args[0], w2t=self.write2textbox)
elif flag == 4:
func_dict[flag](path=args[0], w2t=self.write2textbox)
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
@ -356,4 +518,7 @@ class App(customtkinter.CTk):
if __name__ == "__main__":
aio = App()
aio.net_detect = Thread(target=aio.detect_network)
aio.net_detect.daemon = True
aio.net_detect.start()
aio.mainloop()

View File

@ -0,0 +1 @@
__all__ = ['openapi', 'btn_functions']

View File

@ -0,0 +1,88 @@
import json
from sys import argv
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.excution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red')
else:
_response = json.loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def get_state(hr, w2t):
# 获取机器状态
_response = execution('state.get_state', hr, w2t)
stat_desc = {'engine': '上电状态', 'operate': '操作模式', 'rc_state': '控制器状态', 'robot_action': '机器人动作', 'safety_mode': '安全模式', 'servo_mode': '伺服工作模式', 'task_space': '工作任务空间'}
for component, state in _response['data'].items():
w2t(f"{stat_desc[component]}: {state}")
# 获取设备伺服信息
_response = execution('device.get_params', hr, w2t)
dev_desc = {0: '伺服版本', 1: '伺服参数', 2: '安全板固件', 3: '控制器', 4: '通讯总线', 5: '解释器', 6: '运动控制', 8: '力控版本', 9: '末端固件', 10: '机型文件', 11: '环境包'}
dev_vers = {}
for device in _response['data']['devices']:
dev_vers[device['type']] = device['version']
for i in sorted(dev_desc.keys()):
w2t(f"{dev_desc[i]}: {dev_vers[i]}")
# 设置示教器模式
_response = execution('state.set_tp_mode', hr, w2t, tp_mode='without')
def warning_info(hr, w2t):
for msg in hr.c_msg:
if 'alarm' in msg.lower():
w2t(msg)
for msg in hr.c_msg_xs:
if 'alarm' in msg.lower():
w2t(msg)
def main(hr, func, w2t):
if hr is None:
w2t("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 49, 'red')
# func: get_state/
match func:
case 'get_state':
get_state(hr, w2t)
case 'warning_info':
warning_info(hr, w2t)
if __name__ == '__main__':
main(*argv[1:])
# 一、设置/检测机器人状态:
# 1. 上电
# 2. 软限位打开
# 3. 示教器断开
# 4. 操作模式/机器人类型
# 5. 控制器状态/工作任务控件/机器人动态
# 二、加载RL程序开始运行
# 1. 怎么触发急停
# 2. 怎么恢复急停
# 3. 怎么采集曲线
# 4.
# 三、运行过程中,收集数据,并处理出结果
# 四

View File

@ -0,0 +1,280 @@
import json
import socket
import threading
import selectors
import time
import os
MAX_FRAME_SIZE = 1024
socket.setdefaulttimeout(2)
current_path = os.path.dirname(__file__)
class HmiRequest(object):
def __init__(self, w2t):
super().__init__()
self.w2t = w2t
self.c = None
self.c_xs = None
def sock_conn():
# while True:
with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_h:
connection_state = f_h.read().strip()
if connection_state == '0':
try:
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect(('192.168.0.160', 5050))
# c.connect(('192.168.84.129', 5050))
c.setblocking(False)
c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c_xs.connect(('192.168.0.160', 6666))
# c_xs.connect(('192.168.84.129', 6666))
c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('1')
return c, c_xs
except Exception as Err:
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
self.w2t("Connection failed...", 0, 0, 'red')
return None, None
self.c, self.c_xs = sock_conn()
if self.c is None or self.c_xs is None:
self.w2t("Aborting! Try to switch tabs to re-connect!", 0, 1, 'red')
self.c_msg = []
self.c_msg_xs = []
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
self.t_heartbeat = threading.Thread(target=self.heartbeat)
self.t_heartbeat.daemon = True
self.t_heartbeat.start()
self.t_unpackage = threading.Thread(target=self.unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
def close_sock(self):
self.c.close()
self.c_xs.close()
def header_check(self, index, data):
try:
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index+6:index+7], byteorder='big')
_reserved = int.from_bytes(data[index+7:index+8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
return 'DATA ERR'
except Exception as Err:
print(f"Err = {Err}")
print("无法读取数据,需要确认")
return 'DATA READ ERR'
def heartbeat(self):
while True:
_id = self.excution('controller.heart')
_flag = 0 if self.get_from_id(_id) is None else 1
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write(str(_flag))
time.sleep(3)
def msg_storage(self, response, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 1000:
messages.insert(0, response)
else:
messages.insert(0, response)
while len(messages) > 1000:
messages.pop()
def get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.header_check(_index, data)
if _pkg_size <= len(data) - _index:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_pkg_size].decode()
self.msg_storage(flag=0, response=self.response)
_index += _pkg_size
self.flag = 0
self.response = ''
self.leftover = 0
elif _pkg_size > len(data) - _index:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2其中 6 就是六个字节的包头
break
elif self.flag == 1:
# 处理完之后将flag重置为0
_index = self.leftover
self.response += data[:_index].decode()
_index += 2
_frame_size = int.from_bytes(data[_index - 2:_index], byteorder='big')
if _frame_size == 0:
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
if _frame_size == MAX_FRAME_SIZE:
self.leftover = MAX_FRAME_SIZE - (len(data) - _index)
self.response += data[_index:].decode()
break
else:
if _index+_frame_size <= MAX_FRAME_SIZE:
self.response += data[_index:_index+_frame_size].decode()
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
else:
self.response += data[_index:].decode()
self.leftover = _index + _frame_size - MAX_FRAME_SIZE
break
def get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id is None:
self.w2t("未能成功获取到 message id...", 0, 10, 'red')
if msg_id in msg:
return msg
time.sleep(1)
else:
return None
def package(self, cmd):
_frame_head = (len(cmd)+6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, flg=0, **kwargs):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
return 'NOT SUPPORT'
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 1:
pass
req['id'] = self.gen_id(command)
print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.package(cmd))
time.sleep(2)
return req['id']
else: # for xService
pass

View File

@ -0,0 +1 @@
__all__ = ['brake', 'current', 'iso', 'wavelogger']

View File

@ -6,7 +6,7 @@ from pandas import read_csv, concat, set_option
from re import match
from threading import Thread
from time import sleep
from csv import reader
from csv import reader, writer
class GetThreadResult(Thread):
@ -81,7 +81,7 @@ def initialization(path, sub, w2t):
def current_max(data_files, rcs, trqh, w2t):
current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
@ -96,15 +96,28 @@ def current_max(data_files, rcs, trqh, w2t):
scale = 1 if data_file.endswith('.csv') else 1000
_ = abs(c_max/scale*rca)
current[axis] = _
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}")
w2t("【MAX】数据处理完毕......")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:", 1, 0, 'purple')
for value in cur:
w2t(f"{value:.4f} ", 1, 0, 'purple')
w2t('')
w2t("\n【MAX】数据处理完毕......")
return current
def current_avg(data_files, rcs, trqh, w2t):
current = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
@ -119,11 +132,24 @@ def current_avg(data_files, rcs, trqh, w2t):
c_avg = df[col].mean()
scale = 1 if data_file.endswith('.csv') else 1000
_ = (abs(c_avg)+c_std)/scale*rca
current[axis] = _
_ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}")
w2t("【AVG】数据处理完毕......")
with open(data_file, 'a+') as f_data:
csv_writer = writer(f_data)
csv_writer.writerow([''] * 4)
csv_writer.writerow([_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:", 1, 0, 'purple')
for value in cur:
w2t(f"{value:.4f} ", 1, 0, 'purple')
w2t('')
w2t("\n【AVG】数据处理完毕......")
return current
@ -216,10 +242,12 @@ def p_single(wb, single, vel, trq, rpm, w2t):
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = float(wb['统计'].cell(row=2, column=axis+1).value)
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
@ -233,7 +261,7 @@ def p_single(wb, single, vel, trq, rpm, w2t):
ws["H11"] = cycle
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
df = concat([df_1, df_2], axis=1)
@ -293,10 +321,12 @@ def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
shtname = f"J{axis}"
ws = wb[shtname]
addition = 1
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = float(wb['统计'].cell(row=2, column=axis+1).value)
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
csv_reader = reader(open(data_file))
@ -310,7 +340,7 @@ def p_scenario(wb, single, vel, trq, rpm, dur, w2t):
ws["H11"] = cycle
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
df = concat([df_1, df_2], axis=1)

View File

@ -0,0 +1,186 @@
import os
import random
from pandas import read_csv
from csv import reader
from sys import argv
from os.path import exists
from os import scandir, remove
from openpyxl import Workbook
from random import randint
def traversal_files(path, w2t):
# 功能:以列表的形式分别返回指定路径下的文件和文件夹,不包含子目录
# 参数:路径
# 返回值:路径下的文件夹列表 路径下的文件列表
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red')
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def find_point(bof, step, pos, data_file, flag, df, row, w2t):
# bof: backward or forward
# pos: used for debug
# flag: greater than or lower than
if flag == 'gt':
while 0 < row < df.index[-1]-100:
_value = df.iloc[row, 2]
if _value > 2:
if bof == 'backward':
row -= step
elif bof == 'forward':
row += step
continue
else:
if bof == 'backward':
row_target = row - step
elif bof == 'forward':
row_target = row + step
break
else:
if bof == 'backward':
w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", 0, 2, 'red')
elif bof == 'forward':
row_target = row + 100
elif flag == 'lt':
while 0 < row < df.index[-1]-100:
_value = df.iloc[row, 2]
if _value < 2:
if bof == 'backward':
row -= step
elif bof == 'forward':
row += step
continue
else:
if bof == 'backward':
row_target = row - step
elif bof == 'forward':
row_target = row + step
break
else:
if bof == 'backward':
w2t(f"[{pos}] 在 {data_file} 中,无法正确识别数据,需要确认...", 0, 3, 'red')
elif bof == 'forward':
row_target = row + 100
return row_target
def get_cycle_info(data_file, df, row, step, w2t):
# end -> middle: low
# middle -> start: high
# 1. 从最后读取数据无论是大于1还是小于1都舍弃找到相反的值的起始点
# 2. 从起始点,继续往前寻找,找到与之数值相反的中间点
# 3. 从中间点,继续往前寻找,找到与之数值相反的结束点,至此,得到了高低数值的时间区间以及一轮的周期时间
if df.iloc[row, 2] < 2:
row = find_point('backward', step, 'a1', data_file, 'lt', df, row, w2t)
_row = find_point('backward', step, 'a2', data_file, 'gt', df, row, w2t)
_row = find_point('backward', step, 'a3', data_file, 'lt', df, _row, w2t)
row_end = find_point('backward', step, 'a4', data_file, 'gt', df, _row, w2t)
row_middle = find_point('backward', step, 'a5', data_file, 'lt', df, row_end, w2t)
row_start = find_point('backward', step, 'a6', data_file, 'gt', df, row_middle, w2t)
return row_end-row_middle, row_middle-row_start, row_end-row_start
def initialization(path, w2t):
_, data_files = traversal_files(path, w2t)
for data_file in data_files:
if not data_file.lower().endswith('.csv'):
w2t(f"{data_file} 文件后缀错误,只允许 .csv 文件,需要确认!", 0, 1, 'red')
return data_files
def preparation(data_file, wb, w2t):
shtname = data_file.split('\\')[-1].split('.')[0]
ws = wb.create_sheet(shtname)
csv_reader = reader(open(data_file))
i = 0
begin = 70
for row in csv_reader:
i += 1
if i == 1:
begin = int(row[1])
break
df = read_csv(data_file, sep=',', encoding='gbk', skip_blank_lines=False, header=begin - 1, on_bad_lines='warn')
low, high, cycle = get_cycle_info(data_file, df, df.index[-1]-110, 5, w2t)
return ws, df, low, high, cycle
def single_file_proc(ws, data_file, df, low, high, cycle, w2t):
_row = _row_lt = _row_gt = count = 1
_step = 5
_data = {}
row_max = df.index[-1]-100
print(data_file)
while _row < row_max:
if count not in _data.keys():
_data[count] = []
_value = df.iloc[_row, 2]
if _value < 2:
_row_lt = find_point('forward', _step, 'c'+str(_row), data_file, 'lt', df, _row, w2t)
_start = int(_row_gt + (_row_lt - _row_gt - 50) / 2)
_end = _start + 50
value = df.iloc[_start:_end, 2].mean() + 3 * df.iloc[_start:_end, 2].std()
_data[count].append(value)
else:
_row_gt = find_point('forward', _step, 'c'+str(_row), data_file, 'gt', df, _row, w2t)
if _row_gt - _row_lt > cycle * 2:
count += 1
_row = max(_row_gt, _row_lt)
for i in range(2, 10):
ws.cell(row=1, column=i).value = f"{i-1}次测试"
ws.cell(row=i, column=1).value = f"{i-1}次精度变化"
print(_data)
for i in sorted(_data.keys()):
_row = 2
_column = i + 1
for value in _data[i]:
ws.cell(row=_row, column=_column).value = float(value)
_row += 1
def execution(data_files, w2t):
wb = Workbook()
for data_file in data_files:
ws, df, low, high, cycle = preparation(data_file, wb, w2t)
print(f"low = {low}")
print(f"high = {high}")
print(f"cycle = {cycle}")
single_file_proc(ws, data_file, df, low, high, cycle, w2t)
wd = data_files[0].split('\\')
del wd[-1]
wd = '\\'.join(wd)
filename = wd + '\\result.xlsx'
wb.save(filename)
wb.close()
w2t('----------------------------------------')
w2t('所有文件均已处理完毕')
def main(path, w2t):
data_files = initialization(path, w2t)
execution(data_files, w2t)
if __name__ == '__main__':
main(path=argv[1], w2t=argv[2])