21 Commits

Author SHA1 Message Date
b01f8dc19c v0.2.0.3(2024/07/27)
1. [APIs: do_brake.py]: 精简程序,解决 OOM 问题
2. [APIs: do_current.py]: 精简程序,解决 OOM 问题
3. [APIs: factory_test.py]: 精简程序,解决 OOM 问题
4. [APIsL openapi.py]
   - 心跳修改为 1 s,因为 OOM 问题的解决依赖于长久的打开曲线开关,此时对于 hr.c_msg 的定时清理是个挑战,将心跳缩短,有利于清理日志后,避免丢失心跳
   - 新增 diagnosis.save 命令,但是执行时,有问题,待解决
2024-07-27 21:31:09 +08:00
d2794b2de7 remove sys module 2024-07-26 13:38:39 +08:00
0d814d384d fix version and will exit if not the latest one 2024-07-26 13:33:19 +08:00
82ae2681bb v0.2.0.2(2024/07/26)
1. [main: current.py]
   - 修正堵转电流无法正确写入结果文件的问题
2. [main: do_brake.py]
   - 初始速度采集等待时间设置为可通过configs.xlsx配置文件调整的
   - 初次速度采集停止逻辑修改为tasks.stop指令(未验证)
   - 急停信号触发前,pending时间设置为固定值10s
   - 实现正负方向速度采集逻辑
   - 工程名变更逻辑实现修改为通配符,方便后续根据机型保存文件
   - 增加超差后写诊断的逻辑,并可以通过configs.xlsx配置文件调整
   - 程序输出中增加时间戳,方便调试定位日志时间
3. [main: do_current.py]
   - 工程名变更逻辑实现修改为通配符,方便后续根据机型保存文件
4. 为工程文件添加更详细的注释
5. 补充了do_current/do_brake的流程图
6. [main: openapi.py]
   - ½«modbus motor_on/offµÄµÏ·½·¨¸Äª¸ߵç³崥·¢
7. configs.xlsx配置表新增write_diagnosis/get_init_speed两个参数
2024-07-26 13:24:44 +08:00
8b49d3b6e4 将count恢复为全局标量 2024-07-19 17:19:19 +08:00
2f782c9693 another try 2024-07-19 16:32:31 +08:00
73230a7133 remove durable folder 2024-07-19 11:21:27 +08:00
97cb4fe1d4 minor modification 2024-07-19 11:13:51 +08:00
bdccf3da47 新增保留历史数据功能,修改x轴坐标刻度显示 2024-07-19 11:09:34 +08:00
370fa051ae change axis x to time 2024-07-18 17:45:40 +08:00
e1866758b7 change x-axis content 2024-07-18 16:50:50 +08:00
1cefe4a16b minor modifications 2024-07-18 14:59:20 +08:00
dd0873637f minor fix 2024-07-18 11:04:31 +08:00
5ab03d23f2 aio.py 将canvas设定到tabview下,并将高度减小到600,将曲线选择的OP设定为不可伸缩 2024-07-17 17:31:32 +08:00
9088b90e34 fix OverflowError: (34, "Result too large") problem 2024-07-17 16:29:10 +08:00
9d252cc36f add projects for different conditions 2024-07-17 14:40:25 +08:00
3010cb8931 v0.2.0.0(2024/07/17)
1. [profile: aio.py]
   - 增加velocity相关逻辑
   - 修改负载信息为曲线信息
2. [profile: factory_test.py]
   - 增加velocity相关逻辑
3. [profile: current.py]
   - 修正减速比获取的规则
4. [profile: openapi.py]
   - HmiRequest模块:日志取消记录move.monitor相关
   - HmiRequest模块:增加了durable_lock变量,控制文件读写互斥
2024-07-17 14:17:00 +08:00
da5ddcea0a v0.1.9.4(2024/07/15)
1. [profile: aio.py]:完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]:删除validate_resp函数,修改execution函数
3. [profile: factory_test.py]
   - 新增耐久/老化测试程序
   - 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
2024-07-17 10:09:06 +08:00
cf9d51b475 fix merge 2024-07-15 13:42:10 +08:00
f4a70a0034 Merge branch 'main' of gitea.rustle.cc:gitea/rokae into profile
fetch the newest codes of main
2024-07-15 13:34:09 +08:00
71b2d9d42e pending dev 2024-07-11 19:09:08 +08:00
41 changed files with 5567 additions and 735 deletions

4
.gitignore vendored
View File

@ -7,4 +7,6 @@ aio/venv
aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/
aio/assets/templates/c_msg.log
aio/assets/templates/c_msg.log
aio/code/durable_action/__pycache__/
aio/assets/templates/durable/

View File

@ -8,6 +8,7 @@
4. wavelogger 波形处理,几乎不花费时间
5. 制动自动化测试
6. 电机电流自动化测试
7. 耐久工程曲线指标采集(仅适用于六轴)
---
@ -132,16 +133,18 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
> **需要注意的点**
1. 【重要】使用之前需要手动修改!!负载信息!!点位信息!!,确保所有点位不会发生撞击,确保所有程序正常运行之后,导出工程,再进行自动化测试
2. 工程文件不能手动重命名需要重命名存档可以导入HMI然后另存为
3. 务必正确填写configs.xlsx中的Target页面A1单元格可以选择正负方向急停但不完全保证100%大概有95%左右的准确度
4. 由于xCore系统问题运行过程中可能会出现机器人宕机问题如果遇到可以手动重启控制柜重新运行
5. 运行过程中,如果是因为机器问题无法达到额定百分比速度,会在日志输出框提示,注意观察
6. 运行自动化程序之前,确保机器处于正常状态,无故障,未触发急停
7. 需要额外硬件接线详细参考configs.xlsx中急停接线图sheet页
8. 注意观察二轴100%臂展时,是否可以获取到正确的数据
9. 将autotest.xml导入到寄存器并新建一个modbus命名为autotest
10. 针对五轴机型六轴数据可以填写1-5轴任意一轴的点位信息
1. 修改该文件点位信息
2. 修改“编程”→“工具列表”中的工具信息制动只允许使用tool33/tool66/tool100/inertia这四个工具不可重命名
3. 除了调试行,其他行请勿修改,包括增加空行等,尤其是有特别注释的地方
4. stop0_related半静态任务需要注意DO0_0的命名不同控制柜可能不一样
5. 需要导入 autotest.xml 寄存器文件以及新建modbus总线设备命名为autotest其他默认即可
6. 其他无需修改如有其他需求可联系fanmingfu@rokae.com沟通
7. 需要额外硬件接线详细参考configs.xlsx中急停接线图sheet页其中DO0_0视硬件情况可能为其他名称
8. 运行自动化程序之前,确保机器处于正常状态,无故障,未触发急停
9. 运行过程中,如果是因为机器问题无法达到额定百分比速度,会在日志输出框提示,注意观察
10. 由于xCore系统问题运行过程中可能会出现机器人宕机问题如果遇到可以手动重启控制柜重新运行
11. 务必正确填写configs.xlsx中的Target页面A1单元格可以选择正负方向急停
12. 工程文件可以手动重命名,按照机型存档,或者导出用于自动化测试
#### 6) 电机电流自动化测试
@ -152,7 +155,16 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
> **需要注意的点**
基本同第五点
基本同第五点,针对五轴以及以下的机型,缺失轴数据可以填写其他有效轴任意一轴的点位信息
#### 7) 耐久工程曲线指标采集
可实现固定周期指定曲线指标的采集,用于长时间观察指标的变化情况,输入文件:
- configs.xlsx执行之前需要手动修改好configs.xlsx中的参数以及间隔时间
- target.zip需要确认工程点位和动作无问题后保存导出
> 重新运行时,必须突出软件,重新运行
#### 其他
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
@ -486,7 +498,64 @@ v0.1.9.3(2024/07/15)
- 修改modbus连接失败报错输出形式使之只在automatic test页面显示
- 将该文件移动至toplevel为后面扩展做准备
- 修改heartbeat文件路径使后续打包的时候更方便
2. [APIs: aio.py]:
2. [APIs: aio.py]
- 修改heartbeat文件路径使后续打包的时候更方便
- 修改write2textbox函数的打印逻辑先判断网络相关
v0.1.9.4(2024/07/15)
1. [profile: aio.py]完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]删除validate_resp函数修改execution函数
3. [profile: factory_test.py]
- 新增耐久/老化测试程序
- 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
v0.2.0.0(2024/07/17)
1. [profile: aio.py]
- 增加velocity相关逻辑
- 修改负载信息为曲线信息
2. [profile: factory_test.py]
- 增加velocity相关逻辑
3. [profile: current.py]
- 修正减速比获取的规则
4. [profile: openapi.py]
- HmiRequest模块日志取消记录move.monitor相关
- HmiRequest模块增加了durable_lock变量控制文件读写互斥
v0.2.0.1(2024/07/19)
1. [main: aio.py]
- 修改了x轴显示使之为时间刻度
- 修改pre_warning函数增加了durable test的初始化
2. [main: factory_test.py]
- 增加了数据计算错误的判断逻辑
- 增加了历史数据保存的逻辑
- 增加了文件读写互斥的逻辑
- 修改功能为输出有效电流和最大电流,并将数据结构简化
v0.2.0.2(2024/07/26)
1. [main: current.py]
- 修正堵转电流无法正确写入结果文件的问题
2. [main: do_brake.py]
- 初始速度采集等待时间设置为可通过configs.xlsx配置文件调整的
- 初次速度采集停止逻辑修改为tasks.stop指令未验证
- 急停信号触发前pending时间设置为固定值10s
- 实现正负方向速度采集逻辑
- 工程名变更逻辑实现修改为通配符,方便后续根据机型保存文件
- 增加超差后写诊断的逻辑并可以通过configs.xlsx配置文件调整
- 程序输出中增加时间戳,方便调试定位日志时间
3. [main: do_current.py]
- 工程名变更逻辑实现修改为通配符,方便后续根据机型保存文件
4. 为工程文件添加更详细的注释
5. 补充了do_current/do_brake的流程图
6. [main: openapi.py]
- 将modbus motor_on/off的实现方法改为高电平脉冲触发
7. configs.xlsx配置表新增write_diagnosis/get_init_speed两个参数
v0.2.0.3(2024/07/27)
1. [APIs: do_brake.py]: 精简程序,解决 OOM 问题
2. [APIs: do_current.py]: 精简程序,解决 OOM 问题
3. [APIs: factory_test.py]: 精简程序,解决 OOM 问题
4. [APIsL openapi.py]
- 心跳修改为 1 s因为 OOM 问题的解决依赖于长久的打开曲线开关,此时对于 hr.c_msg 的定时清理是个挑战,将心跳缩短,有利于清理日志后,避免丢失心跳
- 新增 diagnosis.save 命令,但是执行时,有问题,待解决

Binary file not shown.

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 1, 9, 2),
prodvers=(0, 1, 9, 2),
filevers=(0, 2, 0, 3),
prodvers=(0, 2, 0, 3),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.9.2 (2024-07-13)'),
StringStruct('FileVersion', '0.2.0.3 (2024-07-27)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.9.2 (2024-07-13)')])
StringStruct('ProductVersion', '0.2.0.3 (2024-07-27)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -5,8 +5,8 @@
"data": {
"open": false,
"display_open": false,
"overrun": true,
"turn_area": true,
"overrun": false,
"turn_area": false,
"delay_motion": false
}
}

View File

@ -1 +1 @@
0.1.9.2 @ 07/13/2024
0.2.0.2 @ 07/26/2024

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -7,17 +7,43 @@ import customtkinter
from time import time, strftime, localtime, sleep
from urllib.request import urlopen
from socket import setdefaulttimeout
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from data_process import *
from automatic_test import *
from durable_action import *
import openapi
import matplotlib.pyplot as plt
from matplotlib import use
from pandas import DataFrame, read_excel
use('Agg')
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
durable_data_current_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current.xlsx'
durable_data_current_max_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current_max.xlsx'
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(3)
# global vars
durable_data_current = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
durable_data_current_max = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'},
@ -36,6 +62,10 @@ widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
}
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '指标选择'},
}
class App(customtkinter.CTk):
@ -45,6 +75,10 @@ class App(customtkinter.CTk):
self.w_param = 84
self.hr = None
self.md = None
self.canvas = None
self.flg = 0
self.df_copy = None
self.old_curve = None
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
@ -71,7 +105,7 @@ class App(customtkinter.CTk):
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.2\nDate: 07/13/2024", font=self.my_font, text_color="#4F4F4F")
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.2.0.3\nDate: 07/27/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================
@ -80,6 +114,7 @@ class App(customtkinter.CTk):
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process")
self.tabview.add("Automatic Test")
self.tabview.add("Durable Action")
# create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
@ -115,7 +150,7 @@ class App(customtkinter.CTk):
# For automatic test tab START =====================================================================
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
@ -127,7 +162,7 @@ class App(customtkinter.CTk):
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', padx=(20, 5), pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
@ -137,6 +172,23 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# For durable_action tab START =====================================================================
# create progress bar
self.progressbar_da = customtkinter.CTkProgressBar(self.tabview.tab('Durable Action'))
self.progressbar_da.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_da.configure(mode="determinnate", width=self.w_param)
self.progressbar_da.start()
for widgit in widgits_da:
if widgit == 'path':
widgits_da[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widgit.upper()}', font=self.my_font)
widgits_da[widgit]['label'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], sticky='e', padx=(20, 5), pady=10)
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), dynamic_resizing=False, button_color='#708090', fg_color='#778899', values=['device_servo_trq_feedback', '[max] device_servo_trq_feedback'], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
# create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
@ -150,10 +202,64 @@ class App(customtkinter.CTk):
if cur_vers.strip() != new_vers.strip():
msg = f"""当前版本:{cur_vers}\n更新版本:{new_vers}\n\n请及时前往钉盘更新~~~"""
tkinter.messagebox.showwarning(title="版本更新", message=msg)
self.destroy()
except:
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def create_canvas(self, figure):
self.canvas = FigureCanvasTkAgg(figure, self.tabview.tab('Durable Action'))
self.canvas.draw()
self.canvas.get_tk_widget().configure(height=600)
self.canvas.get_tk_widget().grid(row=3, column=1, rowspan=3, columnspan=13, padx=20, pady=10, sticky="nsew")
def create_plot(self):
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.size'] = 14
plt.rcParams['lines.marker'] = 'o'
curvesel = widgits_da['curvesel']['optionmenu'].get()
while True:
if not self.hr.durable_lock:
self.hr.durable_lock = 1
if curvesel == 'device_servo_trq_feedback':
df = read_excel(durable_data_current_xlsx)
_title = 'device_servo_trq_feedback'
elif curvesel == '[max] device_servo_trq_feedback':
_title = '[max] device_servo_trq_feedback'
df = read_excel(durable_data_current_max_xlsx)
else:
_title = 'device_servo_trq_feedback'
df = read_excel(durable_data_current_xlsx)
self.hr.durable_lock = 0
break
else:
sleep(1)
if not df.equals(self.df_copy) or self.flg == 0 or curvesel != self.old_curve:
self.flg = 1
self.df_copy = df.copy()
self.old_curve = widgits_da['curvesel']['optionmenu'].get()
figure = plt.figure(frameon=True, facecolor='#E9E9E9')
plt.subplots_adjust(left=0.04, right=0.98, bottom=0.1, top=0.95)
_ = df['time'].to_list()
_xticks = [str(_i) for _i in _]
ax = figure.add_subplot(1, 1, 1)
ax.set_xticks(range(len(_xticks)))
ax.set_xticklabels(_xticks)
df.plot(grid=True, x='time', y='axis1', ax=ax)
df.plot(grid=True, x='time', y='axis2', ax=ax)
df.plot(grid=True, x='time', y='axis3', ax=ax)
df.plot(grid=True, x='time', y='axis4', ax=ax)
df.plot(grid=True, x='time', y='axis5', ax=ax)
df.plot(grid=True, x='time', y='axis6', ax=ax, title=_title, legend='upper left', rot=30)
self.create_canvas(figure)
def thread_it(self, func, *args):
""" 将函数打包进线程 """
self.myThread = Thread(target=func, args=args)
@ -182,16 +288,25 @@ class App(customtkinter.CTk):
# self.tabview.configure(state='normal')
def detect_network(self):
df = DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = DataFrame(durable_data_current_max)
df.to_excel(durable_data_current_max_xlsx, index=False)
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
self.hr = openapi.HmiRequest(self.write2textbox)
self.md = openapi.ModbusRequest(self.write2textbox)
while True:
if self.tabview.get() == 'Durable Action':
self.create_plot()
with open(heartbeat, 'r', encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
pb_color = 'green' if c_state == '1' else 'red'
self.progressbar.configure(progress_color=pb_color)
self.progressbar_da.configure(progress_color=pb_color)
if c_state == '0':
self.hr.t_bool = False
sleep(3)
@ -201,13 +316,16 @@ class App(customtkinter.CTk):
def tabview_click(self):
self.initialization()
tab_name = self.tabview.get()
if tab_name == 'Data Process':
self.flg = 0
self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test':
self.flg = 0
self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal')
elif tab_name == 'Durable Action':
pass
def initialization(self):
tab_name = self.tabview.get()
@ -228,7 +346,7 @@ class App(customtkinter.CTk):
self.menu_sub_dp.grid_forget()
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
if widgit in ['path', ]:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
@ -238,6 +356,15 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
elif tab_name == 'Durable Action':
for widgit in widgits_da:
if widgit in ['path', ]:
widgits_da[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_da[widgit]['entry'].delete(0, tkinter.END)
widgits_da[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'].configure(state='normal')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
def func_main_callback(self, func_name):
self.initialization()
@ -334,7 +461,7 @@ class App(customtkinter.CTk):
self.textbox.tag_config(tagName=color, foreground=color)
tab_name_cur = self.tabview.get()
if tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
if tab_name == tab_name_cur:
if wait != 0:
self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update()
@ -348,7 +475,7 @@ class App(customtkinter.CTk):
self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update()
self.textbox.see('end')
elif tab_name == tab_name_cur:
elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test' or tab_name_cur == 'Durable Action':
if wait != 0:
self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update()
@ -454,12 +581,24 @@ class App(customtkinter.CTk):
return 0, 0
else:
return 0, 0
elif tab_name == 'Durable Action':
path = widgits_da['path']['entry'].get().strip()
curvesel = widgits_da['curvesel']['optionmenu'].get()
c1 = exists(path)
c2 = curvesel in ['device_servo_trq_feedback', '[max] device_servo_trq_feedback']
if c1 and c2:
return 7, path, curvesel
else:
return 0, 0
def func_start_callback(self):
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main}
func_dict = {
1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main,
7: factory_test.main
}
if flag == 1:
func_dict[flag](path=args[0], vel=args[1], trq=args[2], estop=args[3], w2t=self.write2textbox)
elif flag == 2:
@ -474,10 +613,19 @@ class App(customtkinter.CTk):
elif flag == 6:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
elif flag == 7:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, w2t=self.write2textbox)
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
def pre_warning(self):
if self.tabview.get() == 'Durable Action':
df = DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = DataFrame(durable_data_current_max)
df.to_excel(durable_data_current_max_xlsx, index=False)
if tkinter.messagebox.askyesno(title="开始运行", message="确认机器已按照测试规范更新固件,并提按照测试机型前修改好工程?"):
pass
else:

View File

@ -2,18 +2,6 @@ from json import loads
from sys import argv
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -21,7 +9,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response

View File

@ -1,5 +1,4 @@
from random import randint
from time import sleep, time
from time import sleep, time, strftime, localtime
from sys import argv
from os import scandir, mkdir
from os.path import exists
@ -9,12 +8,13 @@ from openpyxl import load_workbook
import pandas
RADIAN = 57.3 # 180 / 3.1415926
tab_name = 'Automatic Test'
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name='Automatic Test')
w2t(msg, 0, 1, 'red', tab_name)
else:
dirs = []
files = []
@ -29,8 +29,8 @@ def traversal_files(path, w2t):
def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 5:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name)
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name)
config_file = reach33 = reach66 = reach100 = prj_file = None
for data_file in data_files:
@ -46,8 +46,8 @@ def check_files(path, loadsel, data_dirs, data_files, w2t):
elif filename.endswith('.zip'):
prj_file = data_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name='Automatic Test')
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name)
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 2, 'red', tab_name)
if config_file and reach33 and reach66 and reach100 and prj_file:
result_dirs = []
@ -65,11 +65,11 @@ def check_files(path, loadsel, data_dirs, data_files, w2t):
if _reach == 'reach100':
mkdir(f"{path}\\j3\\{dir_name}")
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
w2t("数据目录合规性检查结束,未发现问题......", 0, 0, 'blue', tab_name)
return config_file, reach33, reach66, reach100, prj_file, result_dirs
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!', 0, 0, 'red', tab_name)
w2t(' 1. configs.xlsx\n 2. reach33/reach66/reach100_xxxx.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name)
def prj_to_xcore(prj_file):
@ -90,9 +90,8 @@ def prj_to_xcore(prj_file):
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
cmd += 'sudo mv projects/target/_build/*.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
@ -101,26 +100,15 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name)
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name)
return _response
@ -130,15 +118,19 @@ def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
_d2d_stop = {'device_safety_estop': []}
for data in curve_data:
dict_results = data['data']
# dict_results.reverse()
for item in dict_results:
item['value'].reverse()
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == axis-1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_safety_estop':
_d2d_stop['device_safety_estop'].extend(item['value'])
if len(_d2d_trq['device_servo_trq_feedback']) / 1000 > 10:
break
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
@ -148,9 +140,8 @@ def gen_result_file(path, curve_data, axis, _reach, _load, _speed, count):
df.to_csv(_filename, sep='\t', index=False)
def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
def run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t):
_count = 0
speed_max = 0
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
@ -168,107 +159,125 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
]
wb = load_workbook(config_file, read_only=True)
ws = wb['Target']
write_diagnosis = float(ws.cell(row=3, column=10).value)
get_init_speed = float(ws.cell(row=4, column=10).value)
if ws.cell(row=1, column=1).value == 'positive':
md.write_pon(True)
md.write_pon(1)
elif ws.cell(row=1, column=1).value == 'negative':
md.write_pon(False)
md.write_pon(0)
else:
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', 'Automatic Test')
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', tab_name)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
for condition in result_dirs:
_reach = condition.split('_')[0].removeprefix('reach')
_load = condition.split('_')[1].removeprefix('load')
_speed = condition.split('_')[2].removeprefix('speed')
# if _speed != '100' or _reach != '100':
# continue
for axis in range(1, 4):
md.write_axis(axis)
speed_max = 0
if axis == 3 and _reach != '100':
continue
w2t(f"-"*90, 0, 0, 'purple', tab_name)
for count in range(1, 4):
_count += 1
w2t(f"[{_count}/63-{count}] 正在执行{axis}{condition}的制动测试......", 0, 0, 'purple', 'Automatic Test')
this_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()))
prj_path = 'target/_build/target.prj'
w2t(f"[{this_time} | {_count}/63] 正在执行 {axis}{condition} 的第 {count} 次制动测试...", 0, 0, 'purple', tab_name)
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
md.trigger_estop()
md.reset_estop()
md.write_act(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
md.clear_alarm()
md.write_act(0)
sleep(write_diagnosis) # 软急停超差后等待写诊断时间可通过configs.xlsx配置
# 2. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
if ws.cell(row=1, column=1).value == 'positive':
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
else:
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', 'Automatic Test')
_rl_speed = f"VelSet {_speed}"
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; '
cmd += 'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
# 3. reload工程后pp2main并且自动模式和上电最后运行程序
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
while count == 1:
# 2. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
if ws.cell(row=1, column=1).value == 'positive':
_rl_cmd = f"brake_E(j{axis}_{_reach}_p, j{axis}_{_reach}_n, p_speed, p_tool)"
elif ws.cell(row=1, column=1).value == 'negative':
_rl_cmd = f"brake_E(j{axis}_{_reach}_n, j{axis}_{_reach}_p, p_speed, p_tool)"
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
else:
sleep(1)
# 4. 第一次打开诊断曲线并执行采集8s之后触发软急停关闭曲线采集找出最大速度传递给RL程序最后清除相关记录
if count == 1:
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(10) # 前10秒获取实际最大速度
w2t("configs.xlsx中Target页面A1单元格填写不正确检查后重新运行...", 0, 111, 'red', tab_name)
_rl_speed = f"VelSet {_speed}"
_rl_tool = f"tool p_tool = tool{loadsel.removeprefix('tool')}"
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo sed -i "/brake_E/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {_rl_cmd}" projects/target/_build/brake/main.mod; '
cmd += 'sudo sed -i "/VelSet/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {_rl_speed}" projects/target/_build/brake/main.mod; '
cmd += 'sudo sed -i "/tool p_tool/d" projects/target/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/i {_rl_tool}" projects/target/_build/brake/main.mod; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
md.trigger_estop()
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
# 3. reload工程后pp2main并且自动模式和上电最后运行程序
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 打开诊断曲线并执行采集之后触发软急停关闭曲线采集找出最大速度传递给RL程序最后清除相关记录
sleep(get_init_speed) # 获取实际最大速度可通过configs.xlsx配置
_response = execution('rl_task.stop', hr, w2t, tasks=['brake'])
# sleep(1)
# 找出最大速度
for _msg in hr.c_msg:
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
dict_results = loads(_msg)['data']
for item in dict_results:
if item.get('channel', None) == axis-1 and item.get('name', None) == 'hw_joint_vel_feedback':
_ = abs(RADIAN*sum(item['value'])/len(item['value']))
speed_max = max(_, speed_max)
_ = RADIAN * sum(item['value']) / len(item['value'])
if ws.cell(row=1, column=1).value == 'positive':
speed_max = max(_, speed_max)
elif ws.cell(row=1, column=1).value == 'negative':
speed_max = min(_, speed_max)
print(f"speed max = {speed_max}")
speed_max = abs(speed_max)
speed_target = float(ws.cell(row=3, column=axis+1).value) * float(_speed) / 100
if speed_max < speed_target*0.95 or speed_max > speed_target*1.05:
w2t(f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}", 0, 0, 'indigo', 'Automatic Test')
w2t(f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}", 0, 0, 'indigo', tab_name)
md.write_speed_max(speed_max)
sleep(1)
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
hr.c_msg_xs.clear()
if len(hr.c_msg) > 240:
del hr.c_msg[240:]
if speed_max < 10:
md.clear_alarm()
w2t("未获取到正确的速度,即将重新获取...", 0, 0, 'red', tab_name)
continue
else:
break
# 5. 清除软急停重新运行程序并打开曲线发送继续运动信号当速度达到最大值时通过DO触发急停
md.reset_estop()
md.reset_estop() # 其实没必要
md.clear_alarm()
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['brake', 'stop0_related'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['brake', 'stop0_related'])
_response = execution('state.switch_auto', hr, w2t)
@ -276,50 +285,42 @@ def run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t):
_response = execution('rl_task.run', hr, w2t, tasks=['brake', 'stop0_related'])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(True)
md.write_act(1)
break
else:
sleep(1)
else:
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(randint(3, 6))
md.write_probe(True)
sleep(10) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间
md.write_probe(1)
_t_start = time()
while True:
if md.read_brake_done() == 1:
sleep(1) # 保证所有数据均已返回
md.write_probe(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 保证所有数据均已返回
sleep(1) # 保证速度归零
md.write_probe(0)
break
else:
if (time() - _t_start) > 30:
w2t(f"30s内未触发急停该条数据无效需要确认RL/Python程序编写正确并正常执行或者判别是否是机器本体问题...", 0, 0, 'red', 'Automatic Test')
md.write_probe(False)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 保证所有数据均已返回
w2t(f"30s内未触发急停该条数据无效需要确认RL/Python程序编写正确并正常执行或者判别是否是机器本体问题,比如正负方向速度是否一致...", 0, 0, 'red', tab_name)
md.write_probe(0)
break
else:
sleep(1)
# 6. 保留数据并处理输出
curve_data = []
for _msg in hr.c_msg:
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
curve_data.insert(0, loads(_msg))
else:
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
break
hr.c_msg_xs.clear()
if len(hr.c_msg) > 240:
del hr.c_msg[240:]
gen_result_file(path, curve_data, axis, _reach, _load, _speed, count)
else:
w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', 'Automatic Test')
w2t(f"\n{loadsel.removeprefix('tool')}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。", 0, 0, 'green', tab_name)
def main(path, hr, md, loadsel, w2t):
@ -327,10 +328,10 @@ def main(path, hr, md, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, reach33, reach66, reach100, prj_file, result_dirs = check_files(path, loadsel, data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, loadsel, hr, md, config_file, prj_file, result_dirs, w2t)
run_rl(path, loadsel, hr, md, config_file, result_dirs, w2t)
_e_time = time()
time_total = _e_time - _s_time
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s", 0, 0, 'green', 'Automatic Test')
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s", 0, 0, 'green', tab_name)
if __name__ == '__main__':

View File

@ -7,6 +7,20 @@ from paramiko import SSHClient, AutoAddPolicy
from json import loads
import pandas
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
def traversal_files(path, w2t):
if not exists(path):
@ -75,9 +89,8 @@ def prj_to_xcore(prj_file):
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
cmd += 'sudo mv projects/target/_build/*.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
@ -86,18 +99,6 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -105,7 +106,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response
@ -118,7 +120,10 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
@ -147,7 +152,10 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == 0 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
@ -216,7 +224,10 @@ def data_proc_regular(path, filename, channel, scenario_time):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
@ -237,7 +248,10 @@ def data_proc_inertia(path, filename, channel):
for line in lines:
data = eval(line.strip())['data']
for item in data:
item['value'].reverse()
try:
item['value'].reverse()
except KeyError:
continue
if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':
@ -298,20 +312,19 @@ def run_rl(path, hr, md, loadsel, w2t):
conditions = c_inertia
disc = disc_inertia
# preparation 触发软急停,并解除,目的是让可能正在运行着的机器停下来
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
# _response = execution('diagnosis.save', hr, w2t, save=True) # 这条命令有问题
md.trigger_estop()
md.reset_estop()
for condition in conditions:
number = conditions.index(condition)
w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test')
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.reset_estop()
# 1. 将act重置为False并修改未要执行的场景
md.write_act(False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. 修改未要执行的场景
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
@ -324,15 +337,14 @@ def run_rl(path, hr, md, loadsel, w2t):
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
# 3. reload工程后pp2main并且自动模式和上电
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 4. 开始运行程序,单轴运行15s
# 3. 开始运行程序,单轴运行35s
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
@ -345,25 +357,8 @@ def run_rl(path, hr, md, loadsel, w2t):
else:
sleep(1)
# 5. 打开诊断曲线,并执行采集
# 4. 打开诊断曲线,并执行采集
sleep(10) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
scenario_time = 0
if number < 6:
sleep(35)
@ -385,24 +380,16 @@ def run_rl(path, hr, md, loadsel, w2t):
scenario_time = md.read_scenario_time()
sleep(float(scenario_time)*0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
# 5.停止程序运行,保留数据并处理输出
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
for _msg in hr.c_msg:
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
disc[number][1].insert(0, loads(_msg))
else:
_index = 210
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
break
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
if len(hr.c_msg) > 240:
del hr.c_msg[240:]
gen_result_file(path, loadsel, disc, number, scenario_time)
else:
if loadsel == 'tool100':

View File

@ -189,7 +189,7 @@ def current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t):
for axis, cur_value in avg.items():
try:
shtname = f"J{axis}"
wb[shtname]["J4"].value = float(cur_value)
wb[shtname]["J4"].value = float(cur_value[0])
except:
pass
@ -251,7 +251,7 @@ def p_single(wb, single, vel, trq, rpms, rrs, w2t):
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = rrs[axis+1]
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)
@ -331,7 +331,7 @@ def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
set_option("display.precision", 2)
if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t')
rr = rrs[axis+1]
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8)

View File

@ -0,0 +1 @@
__all__ = ['factory_test']

View File

@ -0,0 +1,301 @@
from sys import argv
from os.path import exists, dirname
from os import scandir
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from time import sleep, time, strftime, localtime
from pandas import DataFrame
from openpyxl import load_workbook
from math import sqrt
from numpy import power
from csv import writer
tab_name = 'Durable Action'
count = 0
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_current_max_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current_max.xlsx'
display_pdo_params = [
# {"name": "hw_joint_vel_feedback", "channel": 0},
# {"name": "hw_joint_vel_feedback", "channel": 1},
# {"name": "hw_joint_vel_feedback", "channel": 2},
# {"name": "hw_joint_vel_feedback", "channel": 3},
# {"name": "hw_joint_vel_feedback", "channel": 4},
# {"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
title = [
'time', 'trq-1', 'trq-2', 'trq-3', 'trq-4', 'trq-5', 'trq-6', 'trq-max-1', 'trq-max-2', 'trq-max-3', 'trq-max-4',
'trq-max-5', 'trq-max-6'
]
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name=tab_name)
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort()
if _files != ['configs.xlsx', 'target.zip']:
w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name)
data_files.sort()
return data_files
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += 'sudo mv projects/target/_build/*.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name=tab_name)
else:
_response = loads(_msg)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name=tab_name)
return _response
def run_rl(path, config_file, data_all, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 3. 开始运行程序
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
sleep(20) # 初始化 scenario time 为 0
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}s", 0, 0, 'green', tab_name)
break
else:
if (time() - _t_start) // 60 > 3:
w2t(f"未收到场景的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time())
sleep(scenario_time*0.2)
# 6. 准备初始数据,关闭诊断曲线,保留数据并处理输出
with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv:
csv_writer = writer(f_csv)
csv_writer.writerow(title)
_wb = load_workbook(config_file, read_only=True)
_ws = _wb['Target']
wait_time = float(_ws.cell(row=2, column=10).value)
rcs = []
for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value))
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t)
# 7. 继续运行
while True:
# 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线
sleep(wait_time+scenario_time+7)
# 保留数据并处理输出
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t)
def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, md, w2t):
_data_list = []
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
_data_list.insert(0, loads(_msg))
else:
hr.c_msg_xs.clear()
if len(hr.c_msg) > 240:
del hr.c_msg[240:]
# with open(f'{path}\\log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_d2d_trq = {0: [], 1: [], 2: [], 3: [], 4: [], 5: []}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[i].extend(item['value'])
if len(_d2d_trq[0]) / 1000 > scenario_time + 1:
this_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()))
next_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()+wait_time+10+scenario_time)).split()[-1]
_df = DataFrame(_d2d_trq)
_flg = 0
_res = []
for i in range(6):
def overmax_data(df, index, number, flag):
if number > 100:
md.trigger_estop()
hr.durable_quit = 1
df.to_excel(f'{path}\\{this_time}.xlsx')
w2t(f"[{this_time}] {flag}-axis-{index} 数据过大错误,需要检查确定。", 0, 10, 'red', tab_name)
try:
_ = sqrt(_df[i].apply(lambda x: power((rcs[i]*x/1000), 2)).sum()/len(_df[i]))
except:
md.trigger_estop()
_df.to_excel(path+"\\err_data.xlsx")
w2t(f"{i}calculate error", 0, 11, 'red', tab_name)
if not _flg:
del data[0]['time'][0]
data[0]['time'].append(this_time.split()[-1])
del data[1]['time'][0]
data[1]['time'].append(this_time.split()[-1])
_res.append(this_time)
_flg = 1
del data[0][f"axis{i + 1}"][0]
overmax_data(_df, i, _, 'trq')
data[0][f"axis{i + 1}"].append(_)
_res.append(_)
_ = rcs[i] * _df[i].abs().max() / 1000
overmax_data(_df, i, _, 'trq-max')
del data[1][f"axis{i + 1}"][0]
data[1][f"axis{i + 1}"].append(_)
_res.append(_)
_df_1 = DataFrame(data[0])
_df_2 = DataFrame(data[1])
with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv:
def change_order(res):
_time = res[0:1]
_trq = []
_trq_max = []
for _item in res[1::2]:
_trq.append(_item)
for _item in res[2::2]:
_trq_max.append(_item)
return _time + _trq + _trq_max
csv_writer = writer(f_csv)
csv_writer.writerow(change_order(_res))
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df_1.to_excel(durable_data_current_xlsx, index=False)
_df_2.to_excel(durable_data_current_max_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
global count
count += 1
w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}", 0, 0, '#008B8B', tab_name)
break
else:
md.trigger_estop()
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq[0]:
f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 10, 'red', tab_name)
def main(path, hr, md, w2t):
durable_data_current = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
durable_data_current_max = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
data_all = [durable_data_current, durable_data_current_max]
data_dirs, data_files = traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, config_file, data_all, hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])

View File

@ -1,4 +1,4 @@
from json import load, dumps
from json import load, dumps, loads
from socket import socket, setdefaulttimeout, AF_INET, SOCK_STREAM
from threading import Thread
import selectors
@ -21,18 +21,27 @@ class ModbusRequest(object):
self.tab_name = 'openapi'
self.host = '192.168.0.160'
self.port = 502
self.interval = 0.3
self.c = ModbusTcpClient(self.host, self.port)
self.c.connect()
def motor_off(self):
try:
self.c.write_register(40002, 0)
sleep(self.interval)
self.c.write_register(40002, 1)
sleep(self.interval)
self.c.write_register(40002, 0)
except Exception as Err:
self.w2t(f"{Err}\n无法正常下电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
def motor_on(self):
try:
self.c.write_register(40003, 0)
sleep(self.interval)
self.c.write_register(40003, 1)
sleep(self.interval)
self.c.write_register(40003, 0)
except Exception as Err:
self.w2t(f"{Err}\n无法正常上电连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
@ -45,11 +54,11 @@ class ModbusRequest(object):
def reset_estop(self):
try:
self.c.write_register(40012, 1)
sleep(0.2)
sleep(self.interval)
self.c.write_register(40001, 0)
sleep(0.2)
sleep(self.interval)
self.c.write_register(40001, 1)
sleep(0.2)
sleep(self.interval)
self.c.write_register(40001, 0)
except Exception as Err:
self.w2t(f"{Err}\n无法重置软急停连接Modbus失败需要确认网络是否通畅或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
@ -171,6 +180,7 @@ class HmiRequest(object):
self.half_length = 0
self.index = 0
self.reset_index = 0
self.durable_lock = 0
self.sock_conn()
self.t_heartbeat = Thread(target=self.heartbeat)
@ -251,14 +261,19 @@ class HmiRequest(object):
f_hb.write(_flag)
if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(1.5)
# with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n')
t = time()
with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
for msg in self.c_msg:
f.write(str(t) + '-' + str(loads(msg)) + '\n')
sleep(1)
def msg_storage(self, response, flag=0):
# response是解码后的字符串
messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 20000:
if 'move.monitor' in response:
pass
elif len(messages) < 20000:
messages.insert(0, response)
else:
messages.insert(0, response)
@ -565,7 +580,7 @@ class HmiRequest(object):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8',
with open(f'{current_path}/../assets/templates/json/{command}.json', encoding='utf-8',
mode='r') as f_json:
req = load(f_json)
except:
@ -591,6 +606,8 @@ class HmiRequest(object):
req['data']['type'] = kwargs['type']
req['data']['bias'] = kwargs['bias']
req['data']['value'] = kwargs['value']
case 'diagnosis.save':
req['data']['save'] = kwargs['save']
case _:
pass