13 Commits

Author SHA1 Message Date
7143a19fa1 v0.1.7.0(2024/06/26)-初步可用
1. [aio.py] 在detect_network函数中需改查询时间间隔是1s,在tabview_click中增加textbox配置normal的语句
2. [do_brake.py -> btn_functions.py] 新增执行相应函数,并在get_state函数中设置无示教器模式
3. [openapi.py] 新增sock_conn函数,并做连接时的异常处理,新增类参数w2t
4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件,参考https://github.com/TomSchimansky/CustomTkinter/issues/2296,实现修改tabview组件的字体大小,使用原生字体,同时将segmented button字体修改为原生,为了解决segmented button在禁用和启用时,屏幕抖动的问题,并将大小修改为16
5. [aio.py] 修改了segmented_button_callback的实现逻辑,使代码更简洁
6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作,使每次切换标签都会重新实例化,也就是每次都会重新连接,修复显示不正确的问题
7. [openapi.py] 新增了socket关闭的函数,并增加msg_id为None的处理逻辑
8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑
2024-06-26 19:54:51 +08:00
a75775c869 v0.1.7.0(2024/06/25)-未发布
1. [aio.py] 取消了在本文件中开启openapi线程的做法,并修改如下:
	- 通过包的方式导入其他模块
    - 使用current_path来规避文件路径问题
    - 声名了 self.hr 变量,用来接收openapi的实例化
    - 修改了对于segment button的错误调用
    - 设定progress bar的长度是10
    - 完善了segmented_button_callback函数
    - 在detect_network函数中增加heartbeat初始化
    - tabview_click函数中新增textbox清屏功能,以及实例化openapi,并做检测
2. [openapi.py] 取消了初始化中无限循环检测,因为阻塞了aio主界面进程!!!socket也无法多次连接!!!浪费了好多时间!!!很生气!!!!
	- 通过tabview切换来实现重新连接,并保留了异常处理部分
    - 将所有的 __xxxx 函数都替换成 xxxx 函数,去掉了 __
    - 使用current_path来规避文件路径问题
3. [do_brake.py] 初步完成了机器状态收集的功能,还需要完善
    - 使用current_path来规避文件路径问题
    - 新增validate_resp函数,校验数据
    - 完善了调用接口
2024-06-25 21:40:27 +08:00
6604f0ba06 20240624
11. [openapi.py] 建联部分做容错处理,并将读写文件做自适应处理
12. [aio.py] 将读写文件做自适应处理,引入openapi模块并生成实例,做心跳检测,将socket超时时间修改为3s
2024-06-24 19:22:56 +08:00
a4009eb17c 20240623
8. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例
9. [aio.py] 完成detect_network,并在main函数开启线程
10. 将templates文件夹移动到assets内
2024-06-23 20:18:41 +08:00
295894a843 2024-06-12
4. [openapi.py] 使用 int.to_bytes 和 int.from_bytes 替换 binascii 模块的功能
5. [aio.py] 修改了Data Process中初始化的动作,使得初始化时的状态统一成程序刚启动时的样子
6. [aio.py] 增加了tabview的点击行为函数,每次点击tab都会初始化
7. [aio.py] 增加了Automatic Test界面元素,包括如下,并完成了功能框架的搭建
- 标签:文件/角速度/减速比
- 按钮:急停及恢复
- 输入框:文件路径/角速度/减速比
- OptionMenu:负载
- 进度条
2024-06-21 16:53:13 +08:00
ff4b0cc04c 20240621
2. [openapi.py] 修改了封包的规则,使之更加明晰,封包操作没有实现分包功能,目前看实际场景用不到
3. [openapi.py] 定义 MAX_FRAME_SIZE 常量(1024),替换socket接收以及响应数据处理相关部分
4. [openapi.py] ʹÓ int.to_bytes ºÍint.from_bytes Ì»» binascii ģ¿é¹¦Ä
2024-06-21 08:13:51 +08:00
8975d8a37c 完善了解包分帧的情况,现在测试基本无问题 2024-06-20 20:48:03 +08:00
14f269b570 中间版本,对于解封超过1024的消息有问题,暂存历史 2024-06-20 17:15:54 +08:00
2917f4ae97 [openapi.py] 初步搭建起框架,完成了新老协议的封包/解包/异步采集日志的操作(未充分测试) 2024-06-20 12:49:41 +08:00
d04d90f1a7 Merge branch 'main' of gitea.rustle.cc:gitea/rokae into APIs
keep the same content with main
2024-06-19 15:21:46 +08:00
0646ae13de Merge branch 'main' of gitea.rustle.cc:gitea/rokae into APIs 2024-06-19 07:41:52 +08:00
780fed29af fix merging conflicts 2024-06-17 15:11:54 +08:00
a1b45bf941 [init] for apis 2024-06-12 14:56:50 +08:00
22 changed files with 791 additions and 292 deletions

2
.gitignore vendored
View File

@ -5,3 +5,5 @@ aio/code/__pycache__/
aio/package/ aio/package/
aio/venv aio/venv
aio/__pycache__/ aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/

View File

@ -246,5 +246,61 @@ v0.1.6.3(2024/06/18)
> WARNING目前版本的电机电流程序还支持DriverMaster采集的数据处理等明确后将不再支持也即所有的电机电流数据工业+协作),都是用诊断曲线来采集 > WARNING目前版本的电机电流程序还支持DriverMaster采集的数据处理等明确后将不再支持也即所有的电机电流数据工业+协作),都是用诊断曲线来采集
v0.1.7.0(2024/06/29) v0.1.7.0(2024/06/19)-未发布
1. [current.py] 适配电机电流中速度使用hw_joint_vel_feedback的数据取消对device_servo_vel_feedback的支持后续所有涉及到速度相关的数据均已前者为准现已完成对单轴和场景的适配 1. [openapi.py] 初步搭建起框架,完成了新老协议的封包/解包/异步采集日志的操作(未充分测试,但基本无问题)
2. [openapi.py] 修改了封包的规则,使之更加明晰,封包操作没有实现分包功能,目前看实际场景用不到
v0.1.7.0(2024/06/21)-未发布
1. [openapi.py] 定义 MAX_FRAME_SIZE 常量1024替换socket接收以及响应数据处理相关部分
2. [openapi.py] 使用 int.to_bytes 和 int.from_bytes 替换 binascii 模块的功能
3. [aio.py] 修改了Data Process中初始化的动作使得初始化时的状态统一成程序刚启动时的样子
v0.1.7.0(2024/06/23)-未发布
1. [aio.py] 增加了tabview的点击行为函数每次点击tab都会初始化
2. [aio.py] 增加了Automatic Test界面元素包括如下并完成了功能框架的搭建
- 标签:文件/角速度/减速比
- 按钮:急停及恢复
- 输入框:文件路径/角速度/减速比
- OptionMenu负载
- 进度条
3. [openapi.py] 增加心跳检测函数,并开启线程执行;取消在该文件中生成实例
4. [aio.py] 完成detect_network并在main函数开启线程
5. 将templates文件夹移动到assets内
v0.1.7.0(2024/06/24)-未发布
1. [openapi.py] 建联部分做容错逻辑,并将读写文件做自适应处理
2. [aio.py] 将读写文件做自适应处理引入openapi模块并生成实例做心跳检测将socket超时时间修改为3s
v0.1.7.0(2024/06/25)-未发布
1. [aio.py] 取消了在本文件中开启openapi线程的做法并修改如下
- 通过包的方式导入其他模块
- 使用current_path来规避文件路径问题
- 声名了 self.hr 变量用来接收openapi的实例化
- 修改了对于segment button的错误调用
- 设定progress bar的长度是10
- 完善了segmented_button_callback函数
- 在detect_network函数中增加heartbeat初始化
- tabview_click函数中新增textbox清屏功能以及实例化openapi并做检测
2. [openapi.py] 取消了初始化中无限循环检测因为阻塞了aio主界面进程socket也无法多次连接浪费了好多时间很生气
- 通过tabview切换来实现重新连接并保留了异常处理部分
- 将所有的 __xxxx 函数都替换成 xxxx 函数,去掉了 __
- 使用current_path来规避文件路径问题
3. [do_brake.py] 初步完成了机器状态收集的功能,还需要完善
- 使用current_path来规避文件路径问题
- 新增validate_resp函数校验数据
- 完善了调用接口
> **关于HMI接口**
> - 封包解包顺序:帧长度二字节/包长度四字节/协议二字节/预留二字节,\x04\x00:\x00\x00\tR:\x02:\x00
> - 帧长度和包长度没有必然关系单帧的时候是帧长度减去包长度等于6包长度指的是所有内容的长度
> - HMI内部每次发送1024个字节进行分包内容长度规则是第一帧1024-6=1018(帧大小减去包头长度),第二帧(包含)及之后的帧,帧长度即是数据长度
v0.1.7.0(2024/06/26)-初步可用
1. [aio.py] 在detect_network函数中需改查询时间间隔是1s在tabview_click中增加textbox配置normal的语句
2. [do_brake.py -> btn_functions.py] 新增执行相应函数并在get_state函数中设置无示教器模式
3. [openapi.py] 新增sock_conn函数并做连接时的异常处理新增类参数w2t
4. [aio.py] 修改customtkinter库中C:\Users\Administrator\AppData\Local\Programs\Python\Python312\Lib\site-packages\customtkinter\windows\widgets\ctk_tabview.py文件参考https://github.com/TomSchimansky/CustomTkinter/issues/2296实现修改tabview组件的字体大小使用原生字体同时将segmented button字体修改为原生为了解决segmented button在禁用和启用时屏幕抖动的问题并将大小修改为16
5. [aio.py] 修改了segmented_button_callback的实现逻辑使代码更简洁
6. [aio.py] 修改了在tabview_click函数中对于实例化openapi的动作使每次切换标签都会重新实例化也就是每次都会重新连接修复显示不正确的问题
7. [openapi.py] 新增了socket关闭的函数并增加msg_id为None的处理逻辑
8. [btn_functions.py] 完善了状态获取的功能,新增告警获取以及功能切换的逻辑

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo( ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4) # filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0. # Set not needed items to zero 0.
filevers=(0, 1, 6, 3), filevers=(0, 1, 7, 0),
prodvers=(0, 1, 6, 3), prodvers=(0, 1, 7, 0),
# Contains a bitmask that specifies the valid bits 'flags'r # Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f, mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file. # Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0', '040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'), [StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'), StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.6.3 (2024-06-18)'), StringStruct('FileVersion', '0.1.7.0 (2024-06-26)'),
StringStruct('InternalName', 'AIO.exe'), StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'), StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'), StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'), StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.6.3 (2024-06-18)')]) StringStruct('ProductVersion', '0.1.7.0 (2024-06-26)')])
]), ]),
VarFileInfo([VarStruct('Translation', [1033, 1200])]) VarFileInfo([VarStruct('Translation', [1033, 1200])])
] ]

Binary file not shown.

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "controller.heart"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "device.get_params"
}

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_state"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.get_tp_mode"
}

View File

@ -0,0 +1,8 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.set_tp_mode",
"data": {
"tp_mode": "with"
}
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_auto"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_manual"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_off"
}

View File

@ -0,0 +1,5 @@
{
"id": "xxxxxxxxxxx",
"module": "system",
"command": "state.switch_motor_on"
}

View File

@ -1 +1 @@
0.1.6.3 @ 06/18/2024 0.1.7.0 @ 06/26/2024

1
aio/code/__init__.py Normal file
View File

@ -0,0 +1 @@
__all__ = ['automatic_test', 'data_process']

View File

@ -1,29 +1,30 @@
from os.path import exists import sys
import tkinter
from os.path import exists, dirname
from os import getcwd from os import getcwd
from threading import Thread from threading import Thread
import tkinter.messagebox import tkinter.messagebox
import customtkinter import customtkinter
from time import time, strftime, localtime from time import time, strftime, localtime, sleep
from urllib.request import urlopen from urllib.request import urlopen
from socket import setdefaulttimeout from socket import setdefaulttimeout
import data_process.brake as brake from data_process import *
import data_process.current as current from automatic_test import *
import data_process.iso as iso
import data_process.wavelogger as wavelogger
current_path = dirname(__file__)
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light" customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue" customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(10) setdefaulttimeout(3)
# global vars # global vars
btns = { btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'}, 'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'}, 'check': {'btn': '', 'row': 2, 'text': '检查参数'},
'log': {'btn': '', 'row': 3, 'text': '保存日志'}, 'log': {'btn': '', 'row': 3, 'text': '保存日志'},
'end': {'btn': '', 'row': 4, 'text': '结束运行'}, 'end': {'btn': '', 'row': 4, 'text': '结束运行'},
} }
widgits = { widgits_dp = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'}, 'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'av': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '角速度'}, 'av': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '角速度'},
'rc': {'label': '', 'entry': '', 'row': 2, 'col': 4, 'text': '额定电流'}, 'rc': {'label': '', 'entry': '', 'row': 2, 'col': 4, 'text': '额定电流'},
@ -42,6 +43,28 @@ widgits = {
'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'}, 'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'},
'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'}, 'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'},
} }
widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
'av1': {'label': '', 'entry': '', 'row': 3, 'col': 2, 'text': '角速度'},
'av2': {'label': '', 'entry': '', 'row': 3, 'col': 4, 'text': '角速度'},
'av3': {'label': '', 'entry': '', 'row': 3, 'col': 6, 'text': '角速度'},
'av4': {'label': '', 'entry': '', 'row': 3, 'col': 8, 'text': '角速度'},
'av5': {'label': '', 'entry': '', 'row': 3, 'col': 10, 'text': '角速度'},
'av6': {'label': '', 'entry': '', 'row': 3, 'col': 12, 'text': '角速度'},
'rc1': {'label': '', 'entry': '', 'row': 4, 'col': 2, 'text': '额定电流'},
'rc2': {'label': '', 'entry': '', 'row': 4, 'col': 4, 'text': '额定电流'},
'rc3': {'label': '', 'entry': '', 'row': 4, 'col': 6, 'text': '额定电流'},
'rc4': {'label': '', 'entry': '', 'row': 4, 'col': 8, 'text': '额定电流'},
'rc5': {'label': '', 'entry': '', 'row': 4, 'col': 10, 'text': '额定电流'},
'rc6': {'label': '', 'entry': '', 'row': 4, 'col': 12, 'text': '额定电流'},
'rr1': {'label': '', 'entry': '', 'row': 5, 'col': 2, 'text': '额定转速'},
'rr2': {'label': '', 'entry': '', 'row': 5, 'col': 4, 'text': '额定转速'},
'rr3': {'label': '', 'entry': '', 'row': 5, 'col': 6, 'text': '额定转速'},
'rr4': {'label': '', 'entry': '', 'row': 5, 'col': 8, 'text': '额定转速'},
'rr5': {'label': '', 'entry': '', 'row': 5, 'col': 10, 'text': '额定转速'},
'rr6': {'label': '', 'entry': '', 'row': 5, 'col': 12, 'text': '额定转速'},
}
class App(customtkinter.CTk): class App(customtkinter.CTk):
@ -49,6 +72,7 @@ class App(customtkinter.CTk):
super().__init__() super().__init__()
self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold") self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold")
self.w_param = 84 self.w_param = 84
self.hr = None
# ===================================================================== # =====================================================================
# configure window # configure window
self.title("AIO - All in one automatic toolbox") self.title("AIO - All in one automatic toolbox")
@ -56,7 +80,7 @@ class App(customtkinter.CTk):
self.geometry("1200x550+30+30") self.geometry("1200x550+30+30")
self.protocol("WM_DELETE_WINDOW", self.func_end_callback) self.protocol("WM_DELETE_WINDOW", self.func_end_callback)
self.config(bg='#E9E9E9') self.config(bg='#E9E9E9')
self.grid_rowconfigure(5, weight=1) self.grid_rowconfigure(6, weight=1)
self.grid_columnconfigure((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), weight=1) self.grid_columnconfigure((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), weight=1)
self.minsize(1200, 550) self.minsize(1200, 550)
# ===================================================================== # =====================================================================
@ -67,54 +91,89 @@ class App(customtkinter.CTk):
self.label_logo = customtkinter.CTkLabel(self.frame_func, text="Rokae AIO", height=60, font=customtkinter.CTkFont(family="Segoe Script Bold", size=24, weight="bold"), text_color="#4F4F4F") self.label_logo = customtkinter.CTkLabel(self.frame_func, text="Rokae AIO", height=60, font=customtkinter.CTkFont(family="Segoe Script Bold", size=24, weight="bold"), text_color="#4F4F4F")
self.label_logo.grid(row=0, column=0, padx=15, pady=15) self.label_logo.grid(row=0, column=0, padx=15, pady=15)
# create buttons # create buttons
for func in btns: for func in btns_func:
btns[func]['btn'] = customtkinter.CTkButton(self.frame_func, corner_radius=10, text=btns[func]['text'], fg_color='#4F4F4F', font=self.my_font) btns_func[func]['btn'] = customtkinter.CTkButton(self.frame_func, corner_radius=10, text=btns_func[func]['text'], fg_color='#4F4F4F', font=self.my_font)
btns[func]['btn'].grid(row=btns[func]['row'], column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5) btns_func[func]['btn'].grid(row=btns_func[func]['row'], column=0, sticky='new', padx=10, pady=10, ipadx=5, ipady=5)
btns['start']['btn'].configure(command=lambda: self.thread_it(self.func_start_callback)) btns_func['start']['btn'].configure(command=lambda: self.thread_it(self.func_start_callback))
btns['check']['btn'].configure(command=lambda: self.thread_it(self.func_check_callback)) btns_func['check']['btn'].configure(command=lambda: self.thread_it(self.func_check_callback))
btns['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback)) btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback)) btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info # create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.6.3\nDate: 06/18/2024", font=self.my_font, text_color="#4F4F4F") self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.6.3\nDate: 06/18/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1) self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s') self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# ===================================================================== # =====================================================================
# create tabviews # create tabviews
self.tabview = customtkinter.CTkTabview(self, width=10000, height=100, anchor='w', fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD') self.tabview = customtkinter.CTkTabview(self, width=10000, height=100, anchor='w', fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', command=self.tabview_click)
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew") self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process") self.tabview.add("Data Process")
self.tabview.add("Automatic Test") self.tabview.add("Automatic Test")
# create main menu # create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback) self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=5) self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
self.menu_main_dp.set("Start Here!") self.menu_main_dp.set("Start Here!")
# create sub menu # create sub menu for data process
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process')) self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'))
# ===================================================================== # create main menu for automatic test
# create widgits self.menu_main_at = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), values=["init", "brake", "current"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
for widgit in widgits: self.menu_main_at.grid(row=1, column=1, sticky='we', padx=5, pady=5)
self.menu_main_at.set("Start Here!")
# For data process tab START =====================================================================
# create widgits_dp
for widgit in widgits_dp:
if widgit == 'path': if widgit == 'path':
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widgit.upper()}', font=self.my_font) widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f'{widgit.upper()}', font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5) widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=10)
widgits[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=670, placeholder_text=widgits[widgit]['text'], font=self.my_font) widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=670, placeholder_text=widgits_dp[widgit]['text'], font=self.my_font)
widgits[widgit]['entry'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we') widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits[widgit]['entry'].configure(state='disabled') widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']: elif widgit in ['av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font) widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5) widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgits[widgit]['text']}", font=self.my_font) widgits_dp[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Data Process'), width=self.w_param, placeholder_text=f"{widgits_dp[widgit]['text']}", font=self.my_font)
widgits[widgit]['entry'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w') widgits_dp[widgit]['entry'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits[widgit]['entry'].configure(state='disabled') widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']: elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']:
widgits[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font) widgits_dp[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Data Process'), text=f"{widgit.upper()}", font=self.my_font)
widgits[widgit]['label'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col'], sticky='e', pady=5) widgits_dp[widgit]['label'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col'], sticky='e', pady=5)
widgits[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font) widgits_dp[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), button_color='#708090', fg_color='#778899', values=["1", "2", "3", "4", "5", "6", "7"], width=self.w_param, font=self.my_font)
widgits[widgit]['optionmenu'].grid(row=widgits[widgit]['row'], column=widgits[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w') widgits_dp[widgit]['optionmenu'].grid(row=widgits_dp[widgit]['row'], column=widgits_dp[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits[widgit]['optionmenu'].configure(state='disabled') widgits_dp[widgit]['optionmenu'].configure(state='disabled')
# ===================================================================== # For data process tab END =====================================================================
# For automatic test tab START =====================================================================
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "停止运动", "继续运动", "零点位姿", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
self.progressbar = customtkinter.CTkProgressBar(self.tabview.tab('Automatic Test'))
self.progressbar.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.progressbar.configure(mode="determinnate", width=10)
self.progressbar.start()
# create widgits_at
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f"{widgit.upper()}", font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=self.w_param, placeholder_text=f"{widgits_at[widgit]['text']}", font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, padx=(5, 10), pady=5, sticky='w')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', ]:
widgits_at[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Automatic Test'), button_color='#708090', fg_color='#778899', values=["tool33", "tool66", "tool100"], width=self.w_param, font=self.my_font)
widgits_at[widgit]['optionmenu'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# create textbox # create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5) self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=5, column=1, rowspan=2, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew') self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
self.textbox.configure(state='disabled') self.textbox.configure(state='disabled')
# ===================================================================== # =====================================================================
# version check # version check
@ -127,6 +186,7 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message=msg) tkinter.messagebox.showwarning(title="版本更新", message=msg)
except: except:
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......") tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def thread_it(self, func, *args): def thread_it(self, func, *args):
""" 将函数打包进线程 """ """ 将函数打包进线程 """
@ -134,78 +194,158 @@ class App(customtkinter.CTk):
self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。 self.myThread.daemon = True # 主线程退出就直接让子线程跟随退出,不论是否运行完成。
self.myThread.start() self.myThread.start()
def segmented_button_callback(self):
_btn_funcs = {'get_state': '机器状态', 'warning_info': '告警信息', '3': '4', '5': '6', '7': '8'}
value = self.seg_button.get()
self.seg_button.configure(state='disabled')
self.textbox.configure(state='normal')
# self.tabview.configure(state='disabled')
self.textbox.delete(index1='1.0', index2='end')
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
connection_state = f_h.read().strip()
if connection_state == '0' and value != '功能切换':
self.write2textbox("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 50, 'red')
else:
for _func in _btn_funcs:
if _btn_funcs[_func] == value:
btn_functions.main(self.hr, _func, self.write2textbox)
break
self.seg_button.configure(state='normal')
self.textbox.configure(state='disable')
# self.tabview.configure(state='normal')
def detect_network(self):
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
while True:
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
pb_color = 'green' if f_h.read().strip() == '1' else 'red'
self.progressbar.configure(progress_color=pb_color)
sleep(1)
def tabview_click(self):
self.initialization()
self.textbox.configure(state='normal')
self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled')
tab_name = self.tabview.get()
if tab_name == 'Data Process':
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal')
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
f_h.write('0')
self.textbox.configure(state='normal')
try:
self.hr.close_sock()
self.hr = openapi.HmiRequest(self.write2textbox)
except:
self.hr = openapi.HmiRequest(self.write2textbox)
# if connection_state == '0' or self.hr is None:
# self.textbox.configure(state='normal')
# self.hr = openapi.HmiRequest(self.write2textbox)
# self.textbox.configure(state='disabled')
def initialization(self): def initialization(self):
for widgit in widgits: tab_name = self.tabview.get()
if tab_name == 'Data Process':
for widgit in widgits_dp:
if widgit in ['path', 'av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']: if widgit in ['path', 'av', 'rc', 'rpm', 'rr', 'dur', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
widgits[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black') widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits[widgit]['entry'].configure(placeholder_text=widgits[widgit]['text'], state='disabled') widgits_dp[widgit]['entry'].delete(0, tkinter.END)
widgits_dp[widgit]['entry'].configure(placeholder_text=widgits_dp[widgit]['text'], state='normal')
widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']: elif widgit in ['axis', 'vel', 'trq', 'trqh', 'estop']:
widgits[widgit]['label'].configure(text=f'{widgit.upper()}', text_color="black") widgits_dp[widgit]['label'].configure(text=f'{widgit.upper()}', text_color="black")
widgits[widgit]['optionmenu'].configure(state='disabled') widgits_dp[widgit]['optionmenu'].configure(state='normal')
widgits_dp[widgit]['optionmenu'].set('1')
widgits_dp[widgit]['optionmenu'].configure(state='disabled')
self.menu_sub_dp.grid_forget() self.menu_sub_dp.grid_forget()
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
self.textbox.configure(state='disabled') self.textbox.configure(state='disabled')
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
widgits_at[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel']:
widgits_at[widgit]['optionmenu'].configure(state='normal')
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
def func_main_callback(self, func_name): def func_main_callback(self, func_name):
self.initialization() self.initialization()
tab_name = self.tabview.get()
if tab_name == 'Data Process':
if func_name == 'brake': if func_name == 'brake':
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['path', 'av', 'rr']: if widgit in ['path', 'av', 'rr']:
widgits[widgit]['label'].configure(text_color='red') widgits_dp[widgit]['label'].configure(text_color='red')
widgits[widgit]['entry'].configure(state='normal') widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['axis', 'vel', 'trq', 'estop']: elif widgit in ['axis', 'vel', 'trq', 'estop']:
widgits[widgit]['label'].configure(text_color="red") widgits_dp[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal') widgits_dp[widgit]['optionmenu'].configure(state='normal')
elif func_name == 'current': elif func_name == 'current':
self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["max", "avg", "cycle"], font=self.my_font, button_color='red', fg_color='green', command=self.func_sub_callback) self.menu_sub_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["max", "avg", "cycle"], font=self.my_font, button_color='red', fg_color='green', command=self.func_sub_callback)
self.menu_sub_dp.grid(row=2, column=1, sticky='we', padx=5, pady=5) self.menu_sub_dp.grid(row=2, column=1, sticky='we', padx=5, pady=5)
self.menu_sub_dp.set("--select--") self.menu_sub_dp.set("--select--")
self.menu_sub_dp.configure(text_color='yellow') self.menu_sub_dp.configure(text_color='yellow')
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['path', 'rc', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']: if widgit in ['path', 'rc', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6']:
color = 'blue' if widgit == 'rc' else 'red' color = 'blue' if widgit == 'rc' else 'red'
widgits[widgit]['label'].configure(text_color=color) widgits_dp[widgit]['label'].configure(text_color=color)
widgits[widgit]['entry'].configure(state='normal') widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['trqh',]: elif widgit in ['trqh',]:
widgits[widgit]['label'].configure(text_color="red") widgits_dp[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal') widgits_dp[widgit]['optionmenu'].configure(state='normal')
elif func_name == 'iso' or func_name == 'wavelogger': elif func_name == 'iso' or func_name == 'wavelogger':
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['path',]: if widgit in ['path',]:
widgits[widgit]['label'].configure(text_color='red') widgits_dp[widgit]['label'].configure(text_color='red')
widgits[widgit]['entry'].configure(state='normal') widgits_dp[widgit]['entry'].configure(state='normal')
else: else:
self.initialization() self.initialization()
self.menu_main_dp.set("Start Here!") self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
pass
def func_sub_callback(self, func_name): def func_sub_callback(self, func_name):
if func_name == "max": if func_name == "max":
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['rpm', 'dur']: if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='black') widgits_dp[widgit]['label'].configure(text_color='black')
widgits[widgit]['entry'].configure(state='disabled') widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq']: elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color='black') widgits_dp[widgit]['label'].configure(text_color='black')
widgits[widgit]['optionmenu'].configure(state='disabled') widgits_dp[widgit]['optionmenu'].configure(state='disabled')
elif func_name == 'avg': elif func_name == 'avg':
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['rpm', 'dur']: if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='black') widgits_dp[widgit]['label'].configure(text_color='black')
widgits[widgit]['entry'].configure(state='disabled') widgits_dp[widgit]['entry'].configure(state='disabled')
elif widgit in ['vel', 'trq']: elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color='black') widgits_dp[widgit]['label'].configure(text_color='black')
widgits[widgit]['optionmenu'].configure(state='disabled') widgits_dp[widgit]['optionmenu'].configure(state='disabled')
elif func_name == 'cycle': elif func_name == 'cycle':
for widgit in widgits: for widgit in widgits_dp:
if widgit in ['rpm', 'dur']: if widgit in ['rpm', 'dur']:
widgits[widgit]['label'].configure(text_color='blue') widgits_dp[widgit]['label'].configure(text_color='blue')
widgits[widgit]['entry'].configure(state='normal') widgits_dp[widgit]['entry'].configure(state='normal')
elif widgit in ['vel', 'trq']: elif widgit in ['vel', 'trq']:
widgits[widgit]['label'].configure(text_color="red") widgits_dp[widgit]['label'].configure(text_color="red")
widgits[widgit]['optionmenu'].configure(state='normal') widgits_dp[widgit]['optionmenu'].configure(state='normal')
def write2textbox(self, text, wait=0, exitcode=0, color='blue'): def write2textbox(self, text, wait=0, exitcode=0, color='blue'):
self.textbox.tag_add(color, 'insert', 'end') self.textbox.tag_add(color, 'insert', 'end')
@ -240,15 +380,17 @@ class App(customtkinter.CTk):
return True return True
def check_param(self): def check_param(self):
tab_name = self.tabview.get()
if tab_name == 'Data Process':
func_name = self.menu_main_dp.get() func_name = self.menu_main_dp.get()
if func_name == 'brake': if func_name == 'brake':
path = widgits['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
av = widgits['av']['entry'].get().strip('- ') av = widgits_dp['av']['entry'].get().strip('- ')
rr = widgits['rr']['entry'].get().strip('- ') rr = widgits_dp['rr']['entry'].get().strip('- ')
axis = widgits['axis']['optionmenu'].get() axis = widgits_dp['axis']['optionmenu'].get()
vel = widgits['vel']['optionmenu'].get() vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits['trq']['optionmenu'].get() trq = widgits_dp['trq']['optionmenu'].get()
estop = widgits['estop']['optionmenu'].get() estop = widgits_dp['estop']['optionmenu'].get()
c1 = exists(path) c1 = exists(path)
c2 = self.is_float('required', av, rr) c2 = self.is_float('required', av, rr)
@ -260,19 +402,19 @@ class App(customtkinter.CTk):
return 0, 0 return 0, 0
# ======================================================= # =======================================================
elif func_name == 'current': elif func_name == 'current':
path = widgits['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
rc = widgits['rc']['entry'].get().strip('- ') rc = widgits_dp['rc']['entry'].get().strip('- ')
rpm = widgits['rpm']['entry'].get().strip() rpm = widgits_dp['rpm']['entry'].get().strip()
dur = widgits['dur']['entry'].get().strip() dur = widgits_dp['dur']['entry'].get().strip()
rc1 = widgits['rc1']['entry'].get().strip() rc1 = widgits_dp['rc1']['entry'].get().strip()
rc2 = widgits['rc2']['entry'].get().strip() rc2 = widgits_dp['rc2']['entry'].get().strip()
rc3 = widgits['rc3']['entry'].get().strip() rc3 = widgits_dp['rc3']['entry'].get().strip()
rc4 = widgits['rc4']['entry'].get().strip() rc4 = widgits_dp['rc4']['entry'].get().strip()
rc5 = widgits['rc5']['entry'].get().strip() rc5 = widgits_dp['rc5']['entry'].get().strip()
rc6 = widgits['rc6']['entry'].get().strip() rc6 = widgits_dp['rc6']['entry'].get().strip()
vel = widgits['vel']['optionmenu'].get() vel = widgits_dp['vel']['optionmenu'].get()
trq = widgits['trq']['optionmenu'].get() trq = widgits_dp['trq']['optionmenu'].get()
trqh = widgits['trqh']['optionmenu'].get() trqh = widgits_dp['trqh']['optionmenu'].get()
sub = self.menu_sub_dp.get() sub = self.menu_sub_dp.get()
c1 = exists(path) c1 = exists(path)
@ -298,7 +440,7 @@ class App(customtkinter.CTk):
return 0, 0 return 0, 0
# ======================================================= # =======================================================
elif func_name == 'iso': elif func_name == 'iso':
path = widgits['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path) c1 = exists(path)
if c1: if c1:
return 3, path return 3, path
@ -306,7 +448,7 @@ class App(customtkinter.CTk):
return 0, 0 return 0, 0
# ======================================================= # =======================================================
elif func_name == 'wavelogger': elif func_name == 'wavelogger':
path = widgits['path']['entry'].get().strip() path = widgits_dp['path']['entry'].get().strip()
c1 = exists(path) c1 = exists(path)
if c1: if c1:
return 4, path return 4, path
@ -315,6 +457,12 @@ class App(customtkinter.CTk):
# ======================================================= # =======================================================
else: else:
return 0, 0 return 0, 0
elif tab_name == 'Automatic Test':
func_name = self.menu_main_at.get()
if func_name == 'brake':
pass
elif func_name == 'current':
pass
def func_start_callback(self): def func_start_callback(self):
self.textbox.configure(state='normal') self.textbox.configure(state='normal')
@ -370,4 +518,7 @@ class App(customtkinter.CTk):
if __name__ == "__main__": if __name__ == "__main__":
aio = App() aio = App()
aio.net_detect = Thread(target=aio.detect_network)
aio.net_detect.daemon = True
aio.net_detect.start()
aio.mainloop() aio.mainloop()

View File

@ -0,0 +1 @@
__all__ = ['openapi', 'btn_functions']

View File

@ -0,0 +1,92 @@
import json
import socket
from os.path import dirname
from sys import argv
current_path = dirname(__file__)
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.excution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red')
else:
_response = json.loads(_msg)
validate_resp(_id, _response, w2t)
return _response
def get_state(hr, w2t):
# 获取机器状态
_response = execution('state.get_state', hr, w2t)
stat_desc = {'engine': '上电状态', 'operate': '操作模式', 'rc_state': '控制器状态', 'robot_action': '机器人动作', 'safety_mode': '安全模式', 'servo_mode': '伺服工作模式', 'task_space': '工作任务空间'}
for component, state in _response['data'].items():
w2t(f"{stat_desc[component]}: {state}")
# 获取设备伺服信息
_response = execution('device.get_params', hr, w2t)
dev_desc = {0: '伺服版本', 1: '伺服参数', 2: '安全板固件', 3: '控制器', 4: '通讯总线', 5: '解释器', 6: '运动控制', 8: '力控版本', 9: '末端固件', 10: '机型文件', 11: '环境包'}
dev_vers = {}
for device in _response['data']['devices']:
dev_vers[device['type']] = device['version']
for i in sorted(dev_desc.keys()):
w2t(f"{dev_desc[i]}: {dev_vers[i]}")
# 设置示教器模式
_response = execution('state.set_tp_mode', hr, w2t, tp_mode='without')
def warning_info(hr, w2t):
for msg in hr.c_msg:
if 'alarm' in msg.lower():
w2t(msg)
for msg in hr.c_msg_xs:
if 'alarm' in msg.lower():
w2t(msg)
def main(hr, func, w2t):
if hr is None:
w2t("无法连接机器人检查是否已经使用Robot Assist软件连接机器重试中...", 0, 49, 'red')
# func: get_state/
match func:
case 'get_state':
get_state(hr, w2t)
case 'warning_info':
warning_info(hr, w2t)
if __name__ == '__main__':
main(*argv[1:])
# 一、设置/检测机器人状态:
# 1. 上电
# 2. 软限位打开
# 3. 示教器断开
# 4. 操作模式/机器人类型
# 5. 控制器状态/工作任务控件/机器人动态
# 二、加载RL程序开始运行
# 1. 怎么触发急停
# 2. 怎么恢复急停
# 3. 怎么采集曲线
# 4.
# 三、运行过程中,收集数据,并处理出结果
# 四

View File

@ -1,67 +0,0 @@
import json
from socket import *
import threading
import time
import binascii
class HmiRequest(object):
def __init__(self):
self.c = socket(AF_INET, SOCK_STREAM)
self.c.connect(('192.168.84.129', 5050))
self.c_xs = socket(AF_INET, SOCK_STREAM)
self.c_xs.connect(('192.168.84.129', 6666))
self.c.setblocking(False)
self.c_xs.setblocking(False)
self.t = threading.Thread(target=self.__heartbeat_detection)
self.t.daemon = True
self.t.start()
def __handle_command(self, cmd):
len_frame, len_pkg = len(cmd), len(cmd) + 6
pkg_head = str(hex(len_pkg))[2:].rjust(4, '0')
frame_head = str(hex(len_frame))[2:].rjust(4, '0')
str0 = binascii.unhexlify(pkg_head) # 报文
str1 = chr(0) + chr(0) # 保留字段
str2 = binascii.unhexlify(frame_head) # 帧
str3 = chr(2) + chr(0) # 协议类型
return str0 + str1.encode() + str2 + str3.encode() + cmd.encode()
def __heartbeat_detection(self):
data = {
"id": "system.controller.heart_0",
"module": "system",
"command": "controller.heart",
}
_id = 1
while True:
data["id"] = f"#system.controller.heart_{_id}"
cmd = json.dumps(data, separators=(',', ':'))
self.c.send(self.__handle_command(cmd))
time.sleep(10)
_id += 1
def motor_on(self):
"""HMI上电"""
data = {
"command": "state.switch_motor_on",
"id": str(1234),
"module": "system"
}
cmd = json.dumps(data, separators=(',', ':'))
self.c.send(self.__handle_command(cmd))
time.sleep(2)
response = self.c.recv(102400)
print(response)
print(type(response))
print(response.decode())
# response = json.loads(self.c.recv(102400).decode('utf-16-be'))
# print(response)
hr = HmiRequest()
hr.motor_on()

View File

@ -1,67 +1,280 @@
import json import json
from socket import * import socket
import threading import threading
import selectors
import time import time
import binascii import os
MAX_FRAME_SIZE = 1024
socket.setdefaulttimeout(2)
current_path = os.path.dirname(__file__)
class HmiRequest(object): class HmiRequest(object):
def __init__(self, w2t):
super().__init__()
self.w2t = w2t
self.c = None
self.c_xs = None
def __init__(self): def sock_conn():
self.c = socket(AF_INET, SOCK_STREAM) # while True:
self.c.connect(('192.168.84.129', 5050)) with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_h:
self.c_xs = socket(AF_INET, SOCK_STREAM) connection_state = f_h.read().strip()
self.c_xs.connect(('192.168.84.129', 6666)) if connection_state == '0':
self.c.setblocking(False) try:
self.c_xs.setblocking(False) c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.t = threading.Thread(target=self.__heartbeat_detection) c.connect(('192.168.0.160', 5050))
self.t.daemon = True # c.connect(('192.168.84.129', 5050))
self.t.start() c.setblocking(False)
c_xs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c_xs.connect(('192.168.0.160', 6666))
# c_xs.connect(('192.168.84.129', 6666))
c_xs.setblocking(False)
def __handle_command(self, cmd): self.w2t("Connection success", 0, 0, 'green')
len_frame, len_pkg = len(cmd), len(cmd) + 6 with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
pkg_head = str(hex(len_pkg))[2:].rjust(4, '0') f_h.write('1')
frame_head = str(hex(len_frame))[2:].rjust(4, '0') return c, c_xs
str0 = binascii.unhexlify(pkg_head) # 报文
str1 = chr(0) + chr(0) # 保留字段
str2 = binascii.unhexlify(frame_head) # 帧
str3 = chr(2) + chr(0) # 协议类型
return str0 + str1.encode() + str2 + str3.encode() + cmd.encode()
def __heartbeat_detection(self): except Exception as Err:
data = { with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
"id": "system.controller.heart_0", f_h.write('0')
"module": "system", self.w2t("Connection failed...", 0, 0, 'red')
"command": "controller.heart", return None, None
}
_id = 1 self.c, self.c_xs = sock_conn()
if self.c is None or self.c_xs is None:
self.w2t("Aborting! Try to switch tabs to re-connect!", 0, 1, 'red')
self.c_msg = []
self.c_msg_xs = []
self.flag = 0
self.response = ''
self.leftover = 0
self.flag_xs = 0
self.response_xs = ''
self.t_heartbeat = threading.Thread(target=self.heartbeat)
self.t_heartbeat.daemon = True
self.t_heartbeat.start()
self.t_unpackage = threading.Thread(target=self.unpackage, args=(self.c, ))
self.t_unpackage.daemon = True
self.t_unpackage.start()
self.t_unpackage_xs = threading.Thread(target=self.unpackage_xs, args=(self.c_xs, ))
self.t_unpackage_xs.daemon = True
self.t_unpackage_xs.start()
def close_sock(self):
self.c.close()
self.c_xs.close()
def header_check(self, index, data):
try:
_frame_size = int.from_bytes(data[index:index+2], byteorder='big')
_pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big')
_protocol = int.from_bytes(data[index+6:index+7], byteorder='big')
_reserved = int.from_bytes(data[index+7:index+8], byteorder='big')
if _reserved == 0 and _protocol == 2:
return index+8, _frame_size, _pkg_size
else:
print("数据有误,需要确认")
return 'DATA ERR'
except Exception as Err:
print(f"Err = {Err}")
print("无法读取数据,需要确认")
return 'DATA READ ERR'
def heartbeat(self):
while True: while True:
data["id"] = f"#system.controller.heart_{_id}" _id = self.excution('controller.heart')
cmd = json.dumps(data, separators=(',', ':')) _flag = 0 if self.get_from_id(_id) is None else 1
self.c.send(self.__handle_command(cmd)) with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_h:
time.sleep(10) f_h.write(str(_flag))
_id += 1 time.sleep(3)
def motor_on(self): def msg_storage(self, response, flag=0):
"""HMI上电""" messages = self.c_msg if flag == 0 else self.c_msg_xs
data = { if len(messages) < 1000:
"command": "state.switch_motor_on", messages.insert(0, response)
"id": str(1234), else:
"module": "system" messages.insert(0, response)
} while len(messages) > 1000:
cmd = json.dumps(data, separators=(',', ':')) messages.pop()
self.c.send(self.__handle_command(cmd))
def get_response(self, data):
_index = 0
while _index < len(data):
if self.flag == 0:
_index, _frame_size, _pkg_size = self.header_check(_index, data)
if _pkg_size <= len(data) - _index:
# 说明剩余部分的数据正好就是完整的包数据
self.response = data[_index:_index+_pkg_size].decode()
self.msg_storage(flag=0, response=self.response)
_index += _pkg_size
self.flag = 0
self.response = ''
self.leftover = 0
elif _pkg_size > len(data) - _index:
# 说有有分包的情况发生了需要flag=1的处理
self.flag = 1
self.response = data[_index:].decode()
self.leftover = _frame_size - 6 - (len(data) - _index) # 其实就是常量 2其中 6 就是六个字节的包头
break
elif self.flag == 1:
# 处理完之后将flag重置为0
_index = self.leftover
self.response += data[:_index].decode()
_index += 2
_frame_size = int.from_bytes(data[_index - 2:_index], byteorder='big')
if _frame_size == 0:
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
if _frame_size == MAX_FRAME_SIZE:
self.leftover = MAX_FRAME_SIZE - (len(data) - _index)
self.response += data[_index:].decode()
break
else:
if _index+_frame_size <= MAX_FRAME_SIZE:
self.response += data[_index:_index+_frame_size].decode()
self.msg_storage(flag=0, response=self.response)
self.flag = 0
self.response = ''
self.leftover = 0
break
else:
self.response += data[_index:].decode()
self.leftover = _index + _frame_size - MAX_FRAME_SIZE
break
def get_response_xs(self, data):
if self.flag_xs == 0:
if data[-1].decode() == '\r':
_responses = data.decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
else:
_responses = data.decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
else:
if data[-1].decode() == '\r':
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses:
self.msg_storage(flag=1, response=_response)
self.response_xs = ''
self.flag_xs = 0
else:
_responses = (self.response_xs.encode() + data).decode().split('\r')
for _response in _responses[:-1]:
if not _response:
break
self.msg_storage(flag=1, response=_response)
self.response_xs = _responses[-1]
self.flag_xs = 1
def get_from_id(self, msg_id, flag=0):
messages = self.c_msg if flag == 0 else self.c_msg_xs
for i in range(3):
for msg in messages:
if msg_id is None:
self.w2t("未能成功获取到 message id...", 0, 10, 'red')
if msg_id in msg:
return msg
time.sleep(1)
else:
return None
def package(self, cmd):
_frame_head = (len(cmd)+6).to_bytes(length=2, byteorder='big')
_pkg_head = len(cmd).to_bytes(length=4, byteorder='big')
_protocol = int(2).to_bytes(length=1, byteorder='big')
_reserved = int(0).to_bytes(length=1, byteorder='big')
return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode()
def package_xs(self, cmd):
return f"{json.dumps(cmd, separators=(',', ':'))}\r".encode()
def unpackage(self, sock):
def to_read(conn):
data = conn.recv(MAX_FRAME_SIZE)
if data:
# print(data)
self.get_response(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def unpackage_xs(self, sock):
def to_read(conn):
data = conn.recv(1024) # Should be ready
if data:
# print(data)
self.get_response_xs(data)
else:
print('closing', sock)
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, to_read)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def gen_id(self, command):
_now = time.time()
_id = f"{command}-{_now}"
return _id
def excution(self, command, flg=0, **kwargs):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8', mode='r') as f_json:
req = json.load(f_json)
except:
print(f"暂不支持 {command} 功能,或确认该功能存在...")
return 'NOT SUPPORT'
match command:
case 'state.set_tp_mode':
req['data']['tp_mode'] = kwargs['tp_mode']
case 1:
pass
req['id'] = self.gen_id(command)
print(f"req = {req}")
cmd = json.dumps(req, separators=(',', ':'))
self.c.send(self.package(cmd))
time.sleep(2) time.sleep(2)
response = self.c.recv(102400) return req['id']
print(response) else: # for xService
print(type(response)) pass
print(response.decode())
# response = json.loads(self.c.recv(102400).decode('utf-16-be'))
# print(response)
hr = HmiRequest()
hr.motor_on()

View File

@ -0,0 +1 @@
__all__ = ['brake', 'current', 'iso', 'wavelogger']