17 Commits

Author SHA1 Message Date
3010cb8931 v0.2.0.0(2024/07/17)
1. [profile: aio.py]
   - 增加velocity相关逻辑
   - 修改负载信息为曲线信息
2. [profile: factory_test.py]
   - 增加velocity相关逻辑
3. [profile: current.py]
   - 修正减速比获取的规则
4. [profile: openapi.py]
   - HmiRequest模块:日志取消记录move.monitor相关
   - HmiRequest模块:增加了durable_lock变量,控制文件读写互斥
2024-07-17 14:17:00 +08:00
da5ddcea0a v0.1.9.4(2024/07/15)
1. [profile: aio.py]:完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]:删除validate_resp函数,修改execution函数
3. [profile: factory_test.py]
   - 新增耐久/老化测试程序
   - 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
2024-07-17 10:09:06 +08:00
cf9d51b475 fix merge 2024-07-15 13:42:10 +08:00
f4a70a0034 Merge branch 'main' of gitea.rustle.cc:gitea/rokae into profile
fetch the newest codes of main
2024-07-15 13:34:09 +08:00
cdbe1c40c6 v0.1.9.3(2024/07/15)
1. [APIs: openapi.py]
   - иÄodbusl½Ó§°ܱ¨´í³ö½£¬ʹֻ֮Ôautomatic testҳÃÏʾ
   - 将该文件移动至toplevel,为后面扩展做准备
   - 修改heartbeat文件路径,使后续打包的时候更方便
2. [APIs: aio.py]:
   - 修改heartbeat文件路径,使后续打包的时候更方便
   - 修改write2textbox函数的打印逻辑,先判断网络相关
2024-07-15 13:32:55 +08:00
5ed38b4b2a delete template sheet 2024-07-13 16:51:18 +08:00
27877e2b64 fix merge 2024-07-13 16:46:21 +08:00
edda9defdd - 减速比rr数据源修改为configs.xlsx
4. 在current工程main函数增加 VelSet 100语句
2024-07-13 16:45:01 +08:00
485dffdd0b - 减速比rr数据源修改为configs.xlsx
4. 在current工程main函数增加 VelSet 100语句
2024-07-13 16:43:56 +08:00
d35858e14e v0.1.9.2(2024/07/13)
1. [APIs: do_current.py]
   - 删除多余的时序矫正语句——item['value'].reverse(),使输出的曲线为平滑的自然顺序
2. [current: current.py]
   - max功能计算逻辑矫正,应该是取绝对值的最大值
   - 整体梳理了trq/trqh的传递路径,现已修正完毕
2024-07-13 15:40:13 +08:00
718db9ec45 change version 2024-07-13 13:43:51 +08:00
d76ee3d223 1. [APIs: do_brake.py]
- 修改ready_to_go信号的接收逻辑,适配大负载机型
2. [APIs: do_current.py]
   - 修改ready_to_go信号的接收逻辑,适配大负载机型
   - 调整单轴测试时间为35s,适配大负载机型,调整堵转电流持续时间15s,适当减少测试时间
   - 将act信号置为False的动作放在初始化,增加程序健壮性
   - 修改所有输出文件的命名,在扩展名之前加入时间戳
2024-07-13 13:40:04 +08:00
a66a55bcd3 v0.1.9.2(2024/07/13)
1. [APIs: do_brake.py]
   - 修改ready_to_go信号的接收逻辑,适配大负载机型
2. [APIs: do_current.py]
   - 修改ready_to_go信号的接收逻辑,适配大负载机型
   - 调整单轴测试时间为35s,适配大负载机型,调整堵转电流持续时间15s,适当减少测试时间
   - 将act信号置为False的动作放在初始化,增加程序健壮性
2024-07-13 11:36:35 +08:00
fe27dbf91f fix merge 2024-07-12 10:50:32 +08:00
dee07b77bb v0.1.9.1(2024/07/12)
1. [APIs: do_brake.py]
   - 修改正负方向拍急停的逻辑,基本原理为:运行之前发送正负方向信号pon给RL,RL根据信号以及速度正负号运作
   - 由于上述修改,正负方向急停准确率可达100%
2. [APIs: aio.py]
   - 修改write2textbox的输出逻辑,实现更加灵活的自定义输出,同时修改相关部分
3. [APIs: openapi.py]
   - modbus类新增指示政府方向急停的信号pon,将modbus类入参中的tab_name删除,并修改tab_name的值为'openapi'
   - socket类种修改tab_name的值为'openapi'
2024-07-12 10:48:50 +08:00
70e6c269fe v0.1.9.1(2024/07/12)
1. [APIs: do_brake.py]
   - 修改正负方向拍急停的逻辑,基本原理为:运行之前发送正负方向信号pon给RL,RL根据信号以及速度正负号运作
   - 由于上述修改,正负方向急停准确率可达100%
2. [APIs: aio.py]
   - 修改write2textbox的输出逻辑,实现更加灵活的自定义输出,同时修改相关部分
3. [APIs: openapi.py]
   - modbus类新增指示政府方向急停的信号pon,将modbus类入参中的tab_name删除,并修改tab_name的值为'openapi'
   - socket类种修改tab_name的值为'openapi'
2024-07-12 10:46:59 +08:00
71b2d9d42e pending dev 2024-07-11 19:09:08 +08:00
40 changed files with 669 additions and 178 deletions

3
.gitignore vendored
View File

@ -7,4 +7,5 @@ aio/venv
aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/
aio/assets/templates/c_msg.log
aio/assets/templates/c_msg.log
aio/code/durable_action/__pycache__/

View File

@ -132,13 +132,16 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
> **需要注意的点**
1. 使用之前需要手动修改点位信息,确保所有点位不会发生撞击之后,再进行自动化测试
1. 【重要】使用之前需要手动修改!!负载信息!!点位信息!!,确保所有点位不会发生撞击,确保所有程序正常运行之后,导出工程,再进行自动化测试
2. 工程文件不能手动重命名需要重命名存档可以导入HMI然后另存为
3. 务必正确填写configs.xlsx中的Target页面A1单元格可以选择正负方向急停但不完全保证100%大概有95%左右的准确度
4. 由于xCore系统问题运行过程中可能会出现机器人宕机问题如果遇到可以手动重启控制柜重新运行
5. 运行过程中,如果是因为机器问题无法达到额定百分比速度,会在日志输出框提示,注意观察
6. 运行自动化程序之前,确保机器处于正常状态,无故障,未触发急停
7. 需要额外硬件接线详细参考configs.xlsx中急停接线图sheet页
8. 注意观察二轴100%臂展时,是否可以获取到正确的数据
9. 将autotest.xml导入到寄存器并新建一个modbus命名为autotest
10. 针对五轴机型六轴数据可以填写1-5轴任意一轴的点位信息
#### 6) 电机电流自动化测试
@ -450,4 +453,61 @@ v0.1.8.2(2024/07/08)
v0.1.9.0(2024/07/10)
1. 完成了制动性能的自动化采集
2. 完善了modbus浮点数读写相关的功能
3. 修改了target.zip工程该工程目前适配电机电流和制动性能
3. 修改了target.zip工程该工程目前适配电机电流和制动性能
v0.1.9.1(2024/07/12)
1. [APIs: do_brake.py]
- 修改正负方向拍急停的逻辑基本原理为运行之前发送正负方向信号pon给RLRL根据信号以及速度正负号运作
- 由于上述修改正负方向急停准确率可达100%
2. [APIs: aio.py]
- 修改write2textbox的输出逻辑实现更加灵活的自定义输出同时修改相关部分
3. [APIs: openapi.py]
- modbus类新增指示政府方向急停的信号pon将modbus类入参中的tab_name删除并修改tab_name的值为'openapi'
- socket类种修改tab_name的值为'openapi'
v0.1.9.2(2024/07/13)
1. [APIs: do_brake.py]
- 修改ready_to_go信号的接收逻辑适配大负载机型
2. [APIs: do_current.py]
- 修改ready_to_go信号的接收逻辑适配大负载机型
- 调整单轴测试时间为35s适配大负载机型调整堵转电流持续时间15s适当减少测试时间
- 将act信号置为False的动作放在初始化增加程序健壮性
- 修改所有输出文件的命名,在扩展名之前加入时间戳
- 删除多余的时序矫正语句——item['value'].reverse(),使输出的曲线为平滑的自然顺序
3. [current: current.py]
- 在find_point函数种当无法找到正确点位时继续执行而不是直接终止执行
- max功能计算逻辑矫正应该是取绝对值的最大值
- 整体梳理了trq/trqh的传递路径现已修正完毕
- 减速比rr数据源修改为configs.xlsx
4. 在current工程main函数增加 VelSet 100语句
v0.1.9.3(2024/07/15)
1. [APIs: openapi.py]
- 修改modbus连接失败报错输出形式使之只在automatic test页面显示
- 将该文件移动至toplevel为后面扩展做准备
- 修改heartbeat文件路径使后续打包的时候更方便
2. [APIs: aio.py]
- 修改heartbeat文件路径使后续打包的时候更方便
- 修改write2textbox函数的打印逻辑先判断网络相关
v0.1.9.4(2024/07/15)
1. [profile: aio.py]完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]删除validate_resp函数修改execution函数
3. [profile: factory_test.py]
- 新增耐久/老化测试程序
- 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
v0.2.0.0(2024/07/17)
1. [profile: aio.py]
- 增加velocity相关逻辑
- 修改负载信息为曲线信息
2. [profile: factory_test.py]
- 增加velocity相关逻辑
3. [profile: current.py]
- 修正减速比获取的规则
4. [profile: openapi.py]
- HmiRequest模块日志取消记录move.monitor相关
- HmiRequest模块增加了durable_lock变量控制文件读写互斥

View File

@ -943,6 +943,7 @@
<c name="type" type="10" value="float"/>
<c name="value"/>
<c name="value_single" type="10" value="0"/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41004"/>
@ -950,7 +951,7 @@
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value="触发或者接触急停"/>
<c name="description" type="10" value="【弃用】触发或者接触急停"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41004"/>
@ -961,7 +962,8 @@
<c name="rw" type="10" value="rd"/>
<c name="type" type="10" value="bool"/>
<c name="value"/>
<c name="value_single" type="10" value="true"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41005"/>
@ -969,7 +971,7 @@
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value=""/>
<c name="description" type="10" value="pc to robot指定工况下的速度值"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41006"/>
@ -980,7 +982,8 @@
<c name="rw" type="10" value="rd"/>
<c name="type" type="10" value="float"/>
<c name="value"/>
<c name="value_single" type="10" value="0"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41007"/>
@ -988,7 +991,7 @@
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value=""/>
<c name="description" type="10" value="robot to pc已触发急停标志"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41007"/>
@ -999,7 +1002,8 @@
<c name="rw" type="10" value="rdwr"/>
<c name="type" type="10" value="bool"/>
<c name="value"/>
<c name="value_single" type="10" value="true"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41008"/>
@ -1007,7 +1011,7 @@
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value=""/>
<c name="description" type="10" value="pc to robot轴的位置"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41009"/>
@ -1019,6 +1023,7 @@
<c name="type" type="10" value="int32"/>
<c name="value"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41010"/>
@ -1026,7 +1031,7 @@
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value=""/>
<c name="description" type="10" value="pc to robot急停开始标志"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41010"/>
@ -1038,5 +1043,25 @@
<c name="type" type="10" value="bool"/>
<c name="value"/>
<c name="value_single" type="10" value=""/>
<c name="bias" type="2" value="0"/>
</l>
<l>
<c name="addr" type="2" value="41011"/>
<c name="addr_1st" type="2" value="0"/>
<c name="addr_2nd" type="2" value="0"/>
<c name="bit_bias" type="2" value="0"/>
<c name="byte_bias" type="4" value="0"/>
<c name="description" type="10" value="pc to robot指示正负方向拍急停"/>
<c name="dev_name" type="10" value="autotest"/>
<c name="dev_type" type="10" value="MODBUS"/>
<c name="end_addr" type="2" value="41011"/>
<c name="function" type="10" value=""/>
<c name="len" type="2" value="1"/>
<c name="name" type="10" value="pon"/>
<c name="retain" type="1" value="false"/>
<c name="rw" type="10" value="rd"/>
<c name="type" type="10" value="bool"/>
<c name="value"/>
<c name="value_single" type="10" value=""/>
</l>
</m>

Binary file not shown.

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 1, 9, 0),
prodvers=(0, 1, 9, 0),
filevers=(0, 2, 0, 0),
prodvers=(0, 2, 0, 0),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.9.0 (2024-07-10)'),
StringStruct('FileVersion', '0.2.0.0 (2024-07-17)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.9.0 (2024-07-10)')])
StringStruct('ProductVersion', '0.2.0.0 (2024-07-17)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1 +1 @@
0.1.9.0 @ 07/10/2024
0.2.0.0 @ 07/17/2024

View File

@ -7,16 +7,43 @@ import customtkinter
from time import time, strftime, localtime, sleep
from urllib.request import urlopen
from socket import setdefaulttimeout
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from data_process import *
from automatic_test import *
from durable_action import *
import openapi
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
current_path = dirname(__file__)
matplotlib.use('Agg')
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
durable_data_current_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_velocity.xlsx'
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(3)
# global vars
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'},
@ -35,6 +62,10 @@ widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
}
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '曲线选择'},
}
class App(customtkinter.CTk):
@ -43,8 +74,11 @@ class App(customtkinter.CTk):
self.my_font = customtkinter.CTkFont(family="Consolas", size=16, weight="bold")
self.w_param = 84
self.hr = None
self.md_at = None
self.md_dp = None
self.md = None
self.canvas = None
self.flg = 0
self.df_copy = None
self.old_curve = None
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
@ -71,7 +105,7 @@ class App(customtkinter.CTk):
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.0\nDate: 07/10/2024", font=self.my_font, text_color="#4F4F4F")
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.2.0.0\nDate: 07/17/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================
@ -80,6 +114,7 @@ class App(customtkinter.CTk):
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process")
self.tabview.add("Automatic Test")
self.tabview.add("Durable Action")
# create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
@ -115,7 +150,7 @@ class App(customtkinter.CTk):
# For automatic test tab START =====================================================================
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
@ -127,7 +162,7 @@ class App(customtkinter.CTk):
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', padx=(20, 5), pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
@ -137,6 +172,23 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# For durable_action tab START =====================================================================
# create progress bar
self.progressbar_da = customtkinter.CTkProgressBar(self.tabview.tab('Durable Action'))
self.progressbar_da.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_da.configure(mode="determinnate", width=self.w_param)
self.progressbar_da.start()
for widgit in widgits_da:
if widgit == 'path':
widgits_da[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widgit.upper()}', font=self.my_font)
widgits_da[widgit]['label'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], sticky='e', padx=(20, 5), pady=10)
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), button_color='#708090', fg_color='#778899', values=["device_servo_trq_feedback", "hw_joint_vel_feedback"], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
# create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
@ -154,6 +206,53 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def create_canvas(self, figure):
self.canvas = FigureCanvasTkAgg(figure, self)
self.canvas.draw()
self.canvas.get_tk_widget().configure(height=640)
self.canvas.get_tk_widget().grid(row=3, column=1, rowspan=3, columnspan=13, padx=20, pady=10, sticky="nsew")
def create_plot(self):
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.size'] = 14
plt.rcParams['lines.marker'] = 'o'
curvesel = widgits_da['curvesel']['optionmenu'].get()
while True:
if not self.hr.durable_lock:
self.hr.durable_lock = 1
if curvesel == 'device_servo_trq_feedback':
df = pd.read_excel(durable_data_current_xlsx)
_title = 'device_servo_trq_feedback'
elif curvesel == 'hw_joint_vel_feedback':
_title = 'hw_joint_vel_feedback'
df = pd.read_excel(durable_data_velocity_xlsx)
else:
_title = 'device_servo_trq_feedback'
df = pd.read_excel(durable_data_current_xlsx)
self.hr.durable_lock = 0
break
else:
sleep(1)
if not df.equals(self.df_copy) or self.flg == 0 or curvesel != self.old_curve:
self.flg = 1
self.df_copy = df.copy()
self.old_curve = widgits_da['curvesel']['optionmenu'].get()
figure = plt.figure(frameon=True, facecolor='#E9E9E9')
plt.subplots_adjust(left=0.04, right=0.98, bottom=0.05, top=0.95)
ax = figure.add_subplot(1, 1, 1)
df.plot(grid=True, x='time', y='axis1', ax=ax)
df.plot(grid=True, x='time', y='axis2', ax=ax)
df.plot(grid=True, x='time', y='axis3', ax=ax)
df.plot(grid=True, x='time', y='axis4', ax=ax)
df.plot(grid=True, x='time', y='axis5', ax=ax)
df.plot(grid=True, x='time', y='axis6', ax=ax, title=_title, legend='upper left')
self.create_canvas(figure)
def thread_it(self, func, *args):
""" 将函数打包进线程 """
self.myThread = Thread(target=func, args=args)
@ -167,7 +266,7 @@ class App(customtkinter.CTk):
self.seg_button.configure(state='disabled')
# self.tabview.configure(state='disabled')
self.textbox.delete(index1='1.0', index2='end')
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
with open(heartbeat, 'r', encoding='utf-8') as f_h:
c_state = f_h.read().strip()
if c_state == '0' and value != '功能切换':
@ -175,26 +274,33 @@ class App(customtkinter.CTk):
else:
for _func in _btn_funcs:
if _btn_funcs[_func] == value:
btn_functions.main(self.hr, self.md_at, _func, self.write2textbox)
btn_functions.main(self.hr, self.md, _func, self.write2textbox)
break
self.seg_button.configure(state='normal')
# self.tabview.configure(state='normal')
def detect_network(self):
with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
df = pd.DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = pd.DataFrame(durable_data_velocity)
df.to_excel(durable_data_velocity_xlsx, index=False)
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
self.hr = openapi.HmiRequest(self.write2textbox)
self.md_at = openapi.ModbusRequest(self, 'Automatic Test')
self.md_dp = openapi.ModbusRequest(self, 'Data Process')
self.md = openapi.ModbusRequest(self.write2textbox)
while True:
with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb:
if self.tabview.get() == 'Durable Action':
self.create_plot()
with open(heartbeat, 'r', encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
pb_color = 'green' if c_state == '1' else 'red'
self.progressbar.configure(progress_color=pb_color)
self.progressbar_da.configure(progress_color=pb_color)
if c_state == '0':
# self.textbox.delete(index1='1.0', index2='end')
self.hr.t_bool = False
sleep(3)
del self.hr
@ -203,17 +309,24 @@ class App(customtkinter.CTk):
def tabview_click(self):
self.initialization()
# self.textbox.delete(index1='1.0', index2='end')
tab_name = self.tabview.get()
if tab_name == 'Data Process':
self.flg = 0
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
self.flg = 0
self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal')
elif tab_name == 'Durable Action':
pass
def initialization(self):
tab_name = self.tabview.get()
try:
self.canvas.get_tk_widget().grid_forget()
except:
pass
self.textbox.delete(index1='1.0', index2='end')
if tab_name == 'Data Process':
for widgit in widgits_dp:
@ -231,7 +344,7 @@ class App(customtkinter.CTk):
self.menu_sub_dp.grid_forget()
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
if widgit in ['path', ]:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
@ -241,6 +354,15 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
elif tab_name == 'Durable Action':
for widgit in widgits_da:
if widgit in ['path', ]:
widgits_da[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_da[widgit]['entry'].delete(0, tkinter.END)
widgits_da[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'].configure(state='normal')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
def func_main_callback(self, func_name):
self.initialization()
@ -351,7 +473,21 @@ class App(customtkinter.CTk):
self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update()
self.textbox.see('end')
elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test' or tab_name_cur == 'Durable Action':
if wait != 0:
self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update()
self.textbox.see('end')
elif exitcode != 0:
self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update()
self.textbox.see('end')
raise Exception(f"Error code: {exitcode}")
else:
self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update()
self.textbox.see('end')
def is_float(self, flag, *args):
for item in args:
try:
@ -443,12 +579,24 @@ class App(customtkinter.CTk):
return 0, 0
else:
return 0, 0
elif tab_name == 'Durable Action':
path = widgits_da['path']['entry'].get().strip()
curvesel = widgits_da['curvesel']['optionmenu'].get()
c1 = exists(path)
c2 = curvesel in ['device_servo_trq_feedback', 'hw_joint_vel_feedback']
if c1 and c2:
return 7, path, curvesel
else:
return 0, 0
def func_start_callback(self):
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main}
func_dict = {
1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main,
7: factory_test.main
}
if flag == 1:
func_dict[flag](path=args[0], vel=args[1], trq=args[2], estop=args[3], w2t=self.write2textbox)
elif flag == 2:
@ -459,10 +607,13 @@ class App(customtkinter.CTk):
func_dict[flag](path=args[0], w2t=self.write2textbox)
elif flag == 5:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
elif flag == 6:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md_at, loadsel=args[1], w2t=self.write2textbox)
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
elif flag == 7:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, w2t=self.write2textbox)
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )

View File

@ -1 +1 @@
__all__ = ['openapi', 'btn_functions', 'do_brake', 'do_current']
__all__ = ['btn_functions', 'do_brake', 'do_current']

View File

@ -2,18 +2,6 @@ from json import loads
from sys import argv
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -21,7 +9,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response

View File

@ -101,18 +101,6 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -120,7 +108,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response
@ -168,6 +157,13 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
]
wb = load_workbook(config_file, read_only=True)
ws = wb['Target']
if ws.cell(row=1, column=1).value == 'positive':
md.write_pon(True)
elif ws.cell(row=1, column=1).value == 'negative':
md.write_pon(False)
else:
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', 'Automatic Test')
for condition in result_dirs:
_reach = condition.split('_')[0].removeprefix('reach')
_load = condition.split('_')[1].removeprefix('load')
@ -196,17 +192,17 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
if ws.cell(row=1, column=1).value == 'positive':
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
else:
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', 'Automatic Test')
_rl_speed = f"VelSet {_speed}"
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; '
cmd += 'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
@ -220,20 +216,21 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(1)
# 4. 第一次打开诊断曲线并执行采集8s之后触发软急停关闭曲线采集找出最大速度传递给RL程序最后清除相关记录
if count == 1:
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(8) # 前秒获取实际最大速度
sleep(10) # 前10秒获取实际最大速度
md.trigger_estop()
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
@ -277,7 +274,7 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
# sleep(randint(3, 6))
sleep(randint(3, 6))
md.write_probe(True)
_t_start = time()
while True:
@ -288,8 +285,12 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
sleep(1) # 保证所有数据均已返回
break
else:
if (time() - _t_start) // 60 > 1:
w2t(f"规定时间内未找到合适的点触发急停需要确认RL/Python程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
if (time() - _t_start) > 30:
w2t(f"30s内未触发急停该条数据无效需要确认RL/Python程序编写正确并正常执行,或者判别是否是机器本体问题...", 0, 0, 'red', 'Automatic Test')
md.write_probe(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 保证所有数据均已返回
break
else:
sleep(1)

View File

@ -86,18 +86,6 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -105,7 +93,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response
@ -120,16 +109,14 @@ def data_proc_regular(path, filename, channel, scenario_time):
for item in data:
item['value'].reverse()
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
item['value'].reverse()
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
item['value'].reverse()
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel+1}_single.data'
_filename = f'{path}\\single\\j{channel+1}_single_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(6, 9)):
with open(filename, 'r', encoding='utf-8') as f_obj:
@ -178,37 +165,37 @@ def data_proc_regular(path, filename, channel, scenario_time):
df_01 = pandas.DataFrame.from_dict(_d2d_vel_0)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_0)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_1)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_1)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_2)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_2)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_3)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_3)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_4)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_4)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_5)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_5)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}.data'
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(9, 15)):
with open(filename, 'r', encoding='utf-8') as f_obj:
@ -227,7 +214,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel-8}_hold.data'
_filename = f'{path}\\single\\j{channel-8}_hold_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
@ -248,7 +235,7 @@ def data_proc_inertia(path, filename, channel):
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\inertia\\j{channel+4}_inertia.data'
_filename = f'{path}\\inertia\\j{channel+4}_inertia_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
@ -308,6 +295,8 @@ def run_rl(path, hr, md, loadsel, w2t):
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
@ -334,19 +323,19 @@ def run_rl(path, hr, md, loadsel, w2t):
# 4. 开始运行程序单轴运行15s
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
for i in range(3):
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
sleep(1)
md.write_act(False)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(1)
# 5. 打开诊断曲线,并执行采集
sleep(7) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定
sleep(10) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
@ -365,7 +354,9 @@ def run_rl(path, hr, md, loadsel, w2t):
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
scenario_time = 0
if number < 6 or number > 8:
if number < 6:
sleep(35)
elif number > 8:
sleep(15)
else:
_t_start = time()

View File

@ -79,7 +79,7 @@ def initialization(path, sub, w2t):
return data_files
def current_max(data_files, rcs, trqh, w2t):
def current_max(data_files, rcs, trq, w2t):
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
@ -93,8 +93,8 @@ def current_max(data_files, rcs, trqh, w2t):
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
col = df.columns.values[trqh-1]
c_max = df[col].max()
col = df.columns.values[trq-1]
c_max = df[col].abs().max()
scale = 1 if data_file.endswith('.csv') else 1000
_ = abs(c_max/scale*rca)
@ -118,7 +118,7 @@ def current_max(data_files, rcs, trqh, w2t):
return current
def current_avg(data_files, rcs, trqh, w2t):
def current_avg(data_files, rcs, trq, w2t):
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
for data_file in data_files:
if data_file.endswith('.data'):
@ -132,7 +132,7 @@ def current_avg(data_files, rcs, trqh, w2t):
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
rca = rcs[axis-1]
col = df.columns.values[trqh - 1]
col = df.columns.values[trq-1]
c_std = df[col].std()
c_avg = df[col].mean()
@ -158,7 +158,7 @@ def current_avg(data_files, rcs, trqh, w2t):
return current
def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t):
def current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t):
result = None
hold = []
single = []
@ -194,9 +194,9 @@ def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t):
pass
if dur == 0:
p_single(wb, single, vel, trq, rpms, w2t)
p_single(wb, single, vel, trq, rpms, rrs, w2t)
else:
p_scenario(wb, single, vel, trq, rpms, dur, w2t)
p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t)
w2t(f"正在保存文件 {result},需要 10s 左右", 1, 0, 'orange')
stop = 0
@ -237,7 +237,7 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
def p_single(wb, single, vel, trq, rpms, w2t):
def p_single(wb, single, vel, trq, rpms, rrs, w2t):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
@ -251,7 +251,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = float(wb['统计'].cell(row=2, column=axis+1).value)
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
@ -268,6 +268,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
col_names = list(df.columns)
df_1 = df[col_names[vel-1]].multiply(rpm*addition)
df_2 = df[col_names[trq-1]].multiply(scale)
print(df_1.abs().max())
df = concat([df_1, df_2], axis=1)
_step = 5 if data_file.endswith('.csv') else 50
@ -318,7 +319,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
cell.value = None
def p_scenario(wb, single, vel, trq, rpms, dur, w2t):
def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
for data_file in single:
cycle = 0.001
axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
@ -330,7 +331,7 @@ def p_scenario(wb, single, vel, trq, rpms, dur, w2t):
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = float(wb['统计'].cell(row=2, column=axis+1).value)
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
@ -374,6 +375,7 @@ def get_configs(configfile, w2t):
_wb = load_workbook(configfile, read_only=True)
_ws = _wb['Target']
rcs = []
rrs = []
rpms = []
for i in range(2, 9):
try:
@ -386,18 +388,23 @@ def get_configs(configfile, w2t):
except:
rcs.append(0.0)
return rpms, rcs
try:
rrs.append(float(_ws.cell(row=2, column=i).value))
except:
rrs.append(0.0)
return rpms, rcs, rrs
def main(path, sub, dur, vel, trq, trqh, w2t):
data_files = initialization(path, sub, w2t)
rpms, rcs = get_configs(path + '\\configs.xlsx', w2t)
rpms, rcs, rrs = get_configs(path + '\\configs.xlsx', w2t)
if sub == 'max':
current_max(data_files, rcs, trqh, w2t)
current_max(data_files, rcs, trq, w2t)
elif sub == 'avg':
current_avg(data_files, rcs, trqh, w2t)
current_avg(data_files, rcs, trq, w2t)
elif sub == 'cycle':
current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t)
current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t)
else:
pass

View File

@ -0,0 +1 @@
__all__ = ['factory_test']

View File

@ -0,0 +1,279 @@
from sys import argv
from os.path import exists, dirname
from os import scandir
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from time import sleep, time
import pandas as pd
from openpyxl import load_workbook
from math import sqrt
tab_name = 'Durable Action'
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_velocity.xlsx'
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
data_all = [durable_data_current, durable_data_velocity]
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name=tab_name)
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0,
10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort()
if _files != ['configs.xlsx', 'target.zip']:
w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0, 10, 'red', tab_name)
data_files.sort()
return data_files
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name=tab_name)
else:
_response = loads(_msg)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name=tab_name)
return _response
def run_rl(path, config_file, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 3. 开始运行程序
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}", 0, 0, 'green', tab_name)
break
else:
if (time() - _t_start) // 60 > 3:
w2t(f"未收到场景的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time())
sleep(scenario_time * 0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
get_durable_data(path, config_file, data_all, scenario_time, hr, w2t)
# 8. 继续运行
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
while True:
# 每3分钟更新一次数据打开曲线获取周期内电流关闭曲线
sleep(180)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(scenario_time + 10)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
# 7. 保留数据并处理输出
get_durable_data(path, config_file, data_all, scenario_time, hr, w2t)
def get_durable_data(path, config_file, data, scenario_time, hr, w2t):
_data_list = []
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_data_list.insert(0, loads(_msg))
else:
_index = 210
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
break
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
# with open('log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_wb = load_workbook(config_file, read_only=True)
_ws = _wb['Target']
rcs = []
for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value))
_d2d_trq = {
'device_servo_trq_feedback_0': [], 'device_servo_trq_feedback_1': [], 'device_servo_trq_feedback_2': [],
'device_servo_trq_feedback_3': [], 'device_servo_trq_feedback_4': [], 'device_servo_trq_feedback_5': [],
}
_d2d_vel = {
'hw_joint_vel_feedback_0': [], 'hw_joint_vel_feedback_1': [], 'hw_joint_vel_feedback_2': [],
'hw_joint_vel_feedback_3': [], 'hw_joint_vel_feedback_4': [], 'hw_joint_vel_feedback_5': [],
}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[f'device_servo_trq_feedback_{i}'].extend(item['value'])
elif item.get('channel', None) == i and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel[f'hw_joint_vel_feedback_{i}'].extend(item['value'])
if len(_d2d_trq['device_servo_trq_feedback_0']) / 1000 > scenario_time + 1:
_df = pd.DataFrame(_d2d_trq)
for i in range(6):
_ = sqrt(_df[f'device_servo_trq_feedback_{i}'].apply(lambda x: (rcs[i]*x/1000)**2).sum()/len(_df[f'device_servo_trq_feedback_{i}']))
del data[0][f"axis{i + 1}"][0]
data[0][f"axis{i + 1}"].append(_)
_df = pd.DataFrame(data[0])
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df.to_excel(durable_data_current_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
_df = pd.DataFrame(_d2d_vel)
for i in range(6):
_ = sqrt(_df[f'hw_joint_vel_feedback_{i}'].apply(lambda x: (rcs[i]*x/1000)**2).sum()/len(_df[f'hw_joint_vel_feedback_{i}']))
del data[1][f"axis{i + 1}"][0]
data[1][f"axis{i + 1}"].append(_)
_df = pd.DataFrame(data[1])
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df.to_excel(durable_data_velocity_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
break
else:
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq['device_servo_trq_feedback_0']:
f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 2, 'red', tab_name)
def main(path, hr, md, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, config_file, hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])

View File

@ -11,13 +11,14 @@ from pymodbus.constants import Endian
MAX_FRAME_SIZE = 1024
setdefaulttimeout(2)
current_path = dirname(__file__)
heartbeat = f'{current_path}/../assets/templates/heartbeat'
class ModbusRequest(object):
def __init__(self, w2t, tab_name):
def __init__(self, w2t):
super().__init__()
self.w2t = w2t
self.tab_name = tab_name
self.tab_name = 'openapi'
self.host = '192.168.0.160'
self.port = 502
self.c = ModbusTcpClient(self.host, self.port)
@ -27,22 +28,19 @@ class ModbusRequest(object):
try:
self.c.write_register(40002, 1)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法正常下电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法正常下电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def motor_on(self):
try:
self.c.write_register(40003, 1)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法正常上电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法正常上电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def trigger_estop(self):
try:
self.c.write_register(40012, 0)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法触发软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法触发软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def reset_estop(self):
try:
@ -54,66 +52,50 @@ class ModbusRequest(object):
sleep(0.2)
self.c.write_register(40001, 0)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def clear_alarm(self):
try:
self.c.write_register(40000, 1)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法清除告警连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法清除告警连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def get_cart_vel(self):
try:
results = self.c.read_holding_registers(40537, 7)
print(f"cart vel: {results.registers}")
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取笛卡尔速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取笛卡尔速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def get_jnt_vel(self):
try:
results = self.c.read_holding_registers(40579, 7)
print(f"joint vel: {results.registers}")
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取关节速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取关节速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def get_tcp_vel(self):
try:
results = self.c.read_holding_registers(40607, 7)
print(f"tcp vel: {results.registers}")
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取TCP速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取TCP速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def get_tcp_mag_vel(self):
try:
results = self.c.read_holding_registers(40621, 1)
print(f"tcp mag: {results.registers}")
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取TCP合成速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取TCP合成速度连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_act(self, number):
try:
self.c.write_register(41000, number)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法发送执行信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法发送执行信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def read_ready_to_go(self):
try:
results = self.c.read_holding_registers(41001, 1)
return results.registers[0]
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def read_scenario_time(self):
try:
@ -122,15 +104,13 @@ class ModbusRequest(object):
result = f"{result.decode_32bit_float():.3f}"
return result
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取准备信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_stop0(self, number):
try:
self.c.write_register(41004, number)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法通过IO操作stop0急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法通过IO操作stop0急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_speed_max(self, speed):
try:
@ -139,16 +119,14 @@ class ModbusRequest(object):
payload = builder.build()
self.c.write_registers(41005, payload, skip_encode=True)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def read_brake_done(self):
try:
results = self.c.read_holding_registers(41007, 1)
return results.registers[0]
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法读取制动已执行信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法读取制动已执行信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_axis(self, axis):
try:
@ -157,15 +135,19 @@ class ModbusRequest(object):
payload = builder.to_registers()
self.c.write_registers(41008, payload)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法写入速度值连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_probe(self, probe):
try:
self.c.write_register(41010, probe)
except Exception as Err:
self.w2t(f"{Err}")
self.w2t("无法写入速度探测信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
self.w2t(f"{Err}\n无法写入速度探测信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def write_pon(self, pon): # positive or negative
try:
self.c.write_register(41011, pon)
except Exception as Err:
self.w2t(f"{Err}\n无法写入正负方向信号连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
class HmiRequest(object):
@ -182,13 +164,14 @@ class HmiRequest(object):
self.flag_xs = 0
self.response_xs = ''
self.t_bool = True
self.tab_name = 'Automatic Test'
self.tab_name = 'openapi'
self.pkg_size = 0
self.broke = 0
self.half = 0
self.half_length = 0
self.index = 0
self.reset_index = 0
self.durable_lock = 0
self.sock_conn()
self.t_heartbeat = Thread(target=self.heartbeat)
@ -203,7 +186,7 @@ class HmiRequest(object):
def sock_conn(self):
# while True:
with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb:
with open(heartbeat, "r", encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
if c_state == '0':
try:
@ -217,9 +200,9 @@ class HmiRequest(object):
self.c_xs.setblocking(False)
self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('1')
md = ModbusRequest(self.w2t, self.tab_name)
md = ModbusRequest(self.w2t)
md.reset_estop()
md.clear_alarm()
md.write_act(False)
@ -227,7 +210,7 @@ class HmiRequest(object):
md.write_axis(1)
except Exception as Err:
self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
def header_check(self, index, data):
@ -265,18 +248,21 @@ class HmiRequest(object):
print(f"hb = {_flag}", end=' ')
print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n')
with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write(_flag)
if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(1.5)
# with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
sleep(2)
# with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n')
def msg_storage(self, response, flag=0):
# response是解码后的字符串
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 20000:
if 'move.monitor' in response:
pass
elif len(messages) < 20000:
messages.insert(0, response)
else:
messages.insert(0, response)
@ -583,7 +569,7 @@ class HmiRequest(object):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8',
with open(f'{current_path}/../assets/templates/json/{command}.json', encoding='utf-8',
mode='r') as f_json:
req = load(f_json)
except: