5 Commits

Author SHA1 Message Date
3010cb8931 v0.2.0.0(2024/07/17)
1. [profile: aio.py]
   - 增加velocity相关逻辑
   - 修改负载信息为曲线信息
2. [profile: factory_test.py]
   - 增加velocity相关逻辑
3. [profile: current.py]
   - 修正减速比获取的规则
4. [profile: openapi.py]
   - HmiRequest模块:日志取消记录move.monitor相关
   - HmiRequest模块:增加了durable_lock变量,控制文件读写互斥
2024-07-17 14:17:00 +08:00
da5ddcea0a v0.1.9.4(2024/07/15)
1. [profile: aio.py]:完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]:删除validate_resp函数,修改execution函数
3. [profile: factory_test.py]
   - 新增耐久/老化测试程序
   - 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
2024-07-17 10:09:06 +08:00
cf9d51b475 fix merge 2024-07-15 13:42:10 +08:00
f4a70a0034 Merge branch 'main' of gitea.rustle.cc:gitea/rokae into profile
fetch the newest codes of main
2024-07-15 13:34:09 +08:00
71b2d9d42e pending dev 2024-07-11 19:09:08 +08:00
36 changed files with 472 additions and 59 deletions

1
.gitignore vendored
View File

@ -8,3 +8,4 @@ aio/__pycache__/
aio/code/automatic_test/__pycache__/ aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/ aio/code/data_process/__pycache__/
aio/assets/templates/c_msg.log aio/assets/templates/c_msg.log
aio/code/durable_action/__pycache__/

View File

@ -486,7 +486,28 @@ v0.1.9.3(2024/07/15)
- 修改modbus连接失败报错输出形式使之只在automatic test页面显示 - 修改modbus连接失败报错输出形式使之只在automatic test页面显示
- 将该文件移动至toplevel为后面扩展做准备 - 将该文件移动至toplevel为后面扩展做准备
- 修改heartbeat文件路径使后续打包的时候更方便 - 修改heartbeat文件路径使后续打包的时候更方便
2. [APIs: aio.py]: 2. [APIs: aio.py]
- 修改heartbeat文件路径使后续打包的时候更方便 - 修改heartbeat文件路径使后续打包的时候更方便
- 修改write2textbox函数的打印逻辑先判断网络相关 - 修改write2textbox函数的打印逻辑先判断网络相关
v0.1.9.4(2024/07/15)
1. [profile: aio.py]完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]删除validate_resp函数修改execution函数
3. [profile: factory_test.py]
- 新增耐久/老化测试程序
- 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
v0.2.0.0(2024/07/17)
1. [profile: aio.py]
- 增加velocity相关逻辑
- 修改负载信息为曲线信息
2. [profile: factory_test.py]
- 增加velocity相关逻辑
3. [profile: current.py]
- 修正减速比获取的规则
4. [profile: openapi.py]
- HmiRequest模块日志取消记录move.monitor相关
- HmiRequest模块增加了durable_lock变量控制文件读写互斥

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo( ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4) # filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0. # Set not needed items to zero 0.
filevers=(0, 1, 9, 2), filevers=(0, 2, 0, 0),
prodvers=(0, 1, 9, 2), prodvers=(0, 2, 0, 0),
# Contains a bitmask that specifies the valid bits 'flags'r # Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f, mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file. # Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0', '040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'), [StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'), StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.9.2 (2024-07-13)'), StringStruct('FileVersion', '0.2.0.0 (2024-07-17)'),
StringStruct('InternalName', 'AIO.exe'), StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'), StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'), StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'), StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.9.2 (2024-07-13)')]) StringStruct('ProductVersion', '0.2.0.0 (2024-07-17)')])
]), ]),
VarFileInfo([VarStruct('Translation', [1033, 1200])]) VarFileInfo([VarStruct('Translation', [1033, 1200])])
] ]

Binary file not shown.

Binary file not shown.

View File

@ -1 +1 @@
0.1.9.2 @ 07/13/2024 0.2.0.0 @ 07/17/2024

View File

@ -7,17 +7,43 @@ import customtkinter
from time import time, strftime, localtime, sleep from time import time, strftime, localtime, sleep
from urllib.request import urlopen from urllib.request import urlopen
from socket import setdefaulttimeout from socket import setdefaulttimeout
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from data_process import * from data_process import *
from automatic_test import * from automatic_test import *
from durable_action import *
import openapi import openapi
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
matplotlib.use('Agg')
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat' heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
durable_data_current_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_velocity.xlsx'
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light" customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue" customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(3) setdefaulttimeout(3)
# global vars # global vars
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
btns_func = { btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'}, 'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'}, 'check': {'btn': '', 'row': 2, 'text': '检查参数'},
@ -36,6 +62,10 @@ widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'}, 'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'}, 'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
} }
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '曲线选择'},
}
class App(customtkinter.CTk): class App(customtkinter.CTk):
@ -45,6 +75,10 @@ class App(customtkinter.CTk):
self.w_param = 84 self.w_param = 84
self.hr = None self.hr = None
self.md = None self.md = None
self.canvas = None
self.flg = 0
self.df_copy = None
self.old_curve = None
# ===================================================================== # =====================================================================
# configure window # configure window
self.title("AIO - All in one automatic toolbox") self.title("AIO - All in one automatic toolbox")
@ -71,7 +105,7 @@ class App(customtkinter.CTk):
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback)) btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback)) btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info # create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.2\nDate: 07/13/2024", font=self.my_font, text_color="#4F4F4F") self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.2.0.0\nDate: 07/17/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1) self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s') self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# ===================================================================== # =====================================================================
@ -80,6 +114,7 @@ class App(customtkinter.CTk):
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew") self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process") self.tabview.add("Data Process")
self.tabview.add("Automatic Test") self.tabview.add("Automatic Test")
self.tabview.add("Durable Action")
# create main menu for data process # create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback) self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10) self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
@ -115,7 +150,7 @@ class App(customtkinter.CTk):
# For automatic test tab START ===================================================================== # For automatic test tab START =====================================================================
# create buttons # create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback)) self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew") self.seg_button.grid(row=1, column=2, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"]) self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换") self.seg_button.set("功能切换")
# create progress bar # create progress bar
@ -127,7 +162,7 @@ class App(customtkinter.CTk):
for widgit in widgits_at: for widgit in widgits_at:
if widgit == 'path': if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font) widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5) widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', padx=(20, 5), pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font) widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we') widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled') widgits_at[widgit]['entry'].configure(state='disabled')
@ -137,6 +172,23 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text']) widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled') widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END ===================================================================== # For automatic test tab END =====================================================================
# For durable_action tab START =====================================================================
# create progress bar
self.progressbar_da = customtkinter.CTkProgressBar(self.tabview.tab('Durable Action'))
self.progressbar_da.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_da.configure(mode="determinnate", width=self.w_param)
self.progressbar_da.start()
for widgit in widgits_da:
if widgit == 'path':
widgits_da[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widgit.upper()}', font=self.my_font)
widgits_da[widgit]['label'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], sticky='e', padx=(20, 5), pady=10)
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), button_color='#708090', fg_color='#778899', values=["device_servo_trq_feedback", "hw_joint_vel_feedback"], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
# create textbox # create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5) self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew') self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
@ -154,6 +206,53 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......") tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ---------------------------------------------------------------------------------------- # functions below ↓ ----------------------------------------------------------------------------------------
def create_canvas(self, figure):
self.canvas = FigureCanvasTkAgg(figure, self)
self.canvas.draw()
self.canvas.get_tk_widget().configure(height=640)
self.canvas.get_tk_widget().grid(row=3, column=1, rowspan=3, columnspan=13, padx=20, pady=10, sticky="nsew")
def create_plot(self):
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.size'] = 14
plt.rcParams['lines.marker'] = 'o'
curvesel = widgits_da['curvesel']['optionmenu'].get()
while True:
if not self.hr.durable_lock:
self.hr.durable_lock = 1
if curvesel == 'device_servo_trq_feedback':
df = pd.read_excel(durable_data_current_xlsx)
_title = 'device_servo_trq_feedback'
elif curvesel == 'hw_joint_vel_feedback':
_title = 'hw_joint_vel_feedback'
df = pd.read_excel(durable_data_velocity_xlsx)
else:
_title = 'device_servo_trq_feedback'
df = pd.read_excel(durable_data_current_xlsx)
self.hr.durable_lock = 0
break
else:
sleep(1)
if not df.equals(self.df_copy) or self.flg == 0 or curvesel != self.old_curve:
self.flg = 1
self.df_copy = df.copy()
self.old_curve = widgits_da['curvesel']['optionmenu'].get()
figure = plt.figure(frameon=True, facecolor='#E9E9E9')
plt.subplots_adjust(left=0.04, right=0.98, bottom=0.05, top=0.95)
ax = figure.add_subplot(1, 1, 1)
df.plot(grid=True, x='time', y='axis1', ax=ax)
df.plot(grid=True, x='time', y='axis2', ax=ax)
df.plot(grid=True, x='time', y='axis3', ax=ax)
df.plot(grid=True, x='time', y='axis4', ax=ax)
df.plot(grid=True, x='time', y='axis5', ax=ax)
df.plot(grid=True, x='time', y='axis6', ax=ax, title=_title, legend='upper left')
self.create_canvas(figure)
def thread_it(self, func, *args): def thread_it(self, func, *args):
""" 将函数打包进线程 """ """ 将函数打包进线程 """
self.myThread = Thread(target=func, args=args) self.myThread = Thread(target=func, args=args)
@ -182,16 +281,25 @@ class App(customtkinter.CTk):
# self.tabview.configure(state='normal') # self.tabview.configure(state='normal')
def detect_network(self): def detect_network(self):
df = pd.DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = pd.DataFrame(durable_data_velocity)
df.to_excel(durable_data_velocity_xlsx, index=False)
with open(heartbeat, "w", encoding='utf-8') as f_hb: with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0') f_hb.write('0')
self.hr = openapi.HmiRequest(self.write2textbox) self.hr = openapi.HmiRequest(self.write2textbox)
self.md = openapi.ModbusRequest(self.write2textbox) self.md = openapi.ModbusRequest(self.write2textbox)
while True: while True:
if self.tabview.get() == 'Durable Action':
self.create_plot()
with open(heartbeat, 'r', encoding='utf-8') as f_hb: with open(heartbeat, 'r', encoding='utf-8') as f_hb:
c_state = f_hb.read().strip() c_state = f_hb.read().strip()
pb_color = 'green' if c_state == '1' else 'red' pb_color = 'green' if c_state == '1' else 'red'
self.progressbar.configure(progress_color=pb_color) self.progressbar.configure(progress_color=pb_color)
self.progressbar_da.configure(progress_color=pb_color)
if c_state == '0': if c_state == '0':
self.hr.t_bool = False self.hr.t_bool = False
sleep(3) sleep(3)
@ -204,13 +312,21 @@ class App(customtkinter.CTk):
tab_name = self.tabview.get() tab_name = self.tabview.get()
if tab_name == 'Data Process': if tab_name == 'Data Process':
self.flg = 0
self.menu_main_dp.set("Start Here!") self.menu_main_dp.set("Start Here!")
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
self.flg = 0
self.menu_main_at.set("Start Here!") self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal') self.seg_button.configure(state='normal')
elif tab_name == 'Durable Action':
pass
def initialization(self): def initialization(self):
tab_name = self.tabview.get() tab_name = self.tabview.get()
try:
self.canvas.get_tk_widget().grid_forget()
except:
pass
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
if tab_name == 'Data Process': if tab_name == 'Data Process':
for widgit in widgits_dp: for widgit in widgits_dp:
@ -228,7 +344,7 @@ class App(customtkinter.CTk):
self.menu_sub_dp.grid_forget() self.menu_sub_dp.grid_forget()
elif tab_name == 'Automatic Test': elif tab_name == 'Automatic Test':
for widgit in widgits_at: for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']: if widgit in ['path', ]:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black') widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END) widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal') widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
@ -238,6 +354,15 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text']) widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled') widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换") self.seg_button.set("功能切换")
elif tab_name == 'Durable Action':
for widgit in widgits_da:
if widgit in ['path', ]:
widgits_da[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_da[widgit]['entry'].delete(0, tkinter.END)
widgits_da[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'].configure(state='normal')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
def func_main_callback(self, func_name): def func_main_callback(self, func_name):
self.initialization() self.initialization()
@ -334,7 +459,7 @@ class App(customtkinter.CTk):
self.textbox.tag_config(tagName=color, foreground=color) self.textbox.tag_config(tagName=color, foreground=color)
tab_name_cur = self.tabview.get() tab_name_cur = self.tabview.get()
if tab_name == 'openapi' and tab_name_cur == 'Automatic Test': if tab_name == tab_name_cur:
if wait != 0: if wait != 0:
self.textbox.insert(index='end', text=text, tags=color) self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update() self.textbox.update()
@ -348,7 +473,7 @@ class App(customtkinter.CTk):
self.textbox.insert(index='end', text=text + '\n', tags=color) self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update() self.textbox.update()
self.textbox.see('end') self.textbox.see('end')
elif tab_name == tab_name_cur: elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test' or tab_name_cur == 'Durable Action':
if wait != 0: if wait != 0:
self.textbox.insert(index='end', text=text, tags=color) self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update() self.textbox.update()
@ -454,12 +579,24 @@ class App(customtkinter.CTk):
return 0, 0 return 0, 0
else: else:
return 0, 0 return 0, 0
elif tab_name == 'Durable Action':
path = widgits_da['path']['entry'].get().strip()
curvesel = widgits_da['curvesel']['optionmenu'].get()
c1 = exists(path)
c2 = curvesel in ['device_servo_trq_feedback', 'hw_joint_vel_feedback']
if c1 and c2:
return 7, path, curvesel
else:
return 0, 0
def func_start_callback(self): def func_start_callback(self):
self.textbox.delete(index1='1.0', index2='end') self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param() flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main} func_dict = {
1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main,
7: factory_test.main
}
if flag == 1: if flag == 1:
func_dict[flag](path=args[0], vel=args[1], trq=args[2], estop=args[3], w2t=self.write2textbox) func_dict[flag](path=args[0], vel=args[1], trq=args[2], estop=args[3], w2t=self.write2textbox)
elif flag == 2: elif flag == 2:
@ -474,6 +611,9 @@ class App(customtkinter.CTk):
elif flag == 6: elif flag == 6:
self.pre_warning() self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox) func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
elif flag == 7:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, w2t=self.write2textbox)
else: else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )

View File

@ -2,18 +2,6 @@ from json import loads
from sys import argv from sys import argv
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs): def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs) _id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id) _msg = hr.get_from_id(_id)
@ -21,7 +9,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else: else:
_response = loads(_msg) _response = loads(_msg)
validate_resp(_id, _response, w2t) if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response return _response

View File

@ -101,18 +101,6 @@ def prj_to_xcore(prj_file):
ssh.close() ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs): def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs) _id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id) _msg = hr.get_from_id(_id)
@ -120,7 +108,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else: else:
_response = loads(_msg) _response = loads(_msg)
validate_resp(_id, _response, w2t) if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response return _response

View File

@ -86,18 +86,6 @@ def prj_to_xcore(prj_file):
ssh.close() ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs): def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs) _id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id) _msg = hr.get_from_id(_id)
@ -105,7 +93,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test') w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else: else:
_response = loads(_msg) _response = loads(_msg)
validate_resp(_id, _response, w2t) if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response return _response

View File

@ -251,7 +251,7 @@ def p_single(wb, single, vel, trq, rpms, rrs, w2t):
set_option("display.precision", 2) set_option("display.precision", 2)
if data_file.endswith('.data'): if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t') df = read_csv(data_file, sep='\t')
rr = rrs[axis+1] rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'): elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8) df = read_csv(data_file, sep=',', encoding='gbk', header=8)
@ -331,7 +331,7 @@ def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
set_option("display.precision", 2) set_option("display.precision", 2)
if data_file.endswith('.data'): if data_file.endswith('.data'):
df = read_csv(data_file, sep='\t') df = read_csv(data_file, sep='\t')
rr = rrs[axis+1] rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr addition = 180 / 3.1415926 * 60 / 360 * rr
elif data_file.endswith('.csv'): elif data_file.endswith('.csv'):
df = read_csv(data_file, sep=',', encoding='gbk', header=8) df = read_csv(data_file, sep=',', encoding='gbk', header=8)

View File

@ -0,0 +1 @@
__all__ = ['factory_test']

View File

@ -0,0 +1,279 @@
from sys import argv
from os.path import exists, dirname
from os import scandir
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from time import sleep, time
import pandas as pd
from openpyxl import load_workbook
from math import sqrt
tab_name = 'Durable Action'
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_velocity.xlsx'
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
data_all = [durable_data_current, durable_data_velocity]
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name=tab_name)
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0,
10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort()
if _files != ['configs.xlsx', 'target.zip']:
w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0, 10, 'red', tab_name)
data_files.sort()
return data_files
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name=tab_name)
else:
_response = loads(_msg)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name=tab_name)
return _response
def run_rl(path, config_file, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 3. 开始运行程序
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}", 0, 0, 'green', tab_name)
break
else:
if (time() - _t_start) // 60 > 3:
w2t(f"未收到场景的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time())
sleep(scenario_time * 0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
get_durable_data(path, config_file, data_all, scenario_time, hr, w2t)
# 8. 继续运行
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
while True:
# 每3分钟更新一次数据打开曲线获取周期内电流关闭曲线
sleep(180)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(scenario_time + 10)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
# 7. 保留数据并处理输出
get_durable_data(path, config_file, data_all, scenario_time, hr, w2t)
def get_durable_data(path, config_file, data, scenario_time, hr, w2t):
_data_list = []
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_data_list.insert(0, loads(_msg))
else:
_index = 210
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
break
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
# with open('log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_wb = load_workbook(config_file, read_only=True)
_ws = _wb['Target']
rcs = []
for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value))
_d2d_trq = {
'device_servo_trq_feedback_0': [], 'device_servo_trq_feedback_1': [], 'device_servo_trq_feedback_2': [],
'device_servo_trq_feedback_3': [], 'device_servo_trq_feedback_4': [], 'device_servo_trq_feedback_5': [],
}
_d2d_vel = {
'hw_joint_vel_feedback_0': [], 'hw_joint_vel_feedback_1': [], 'hw_joint_vel_feedback_2': [],
'hw_joint_vel_feedback_3': [], 'hw_joint_vel_feedback_4': [], 'hw_joint_vel_feedback_5': [],
}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[f'device_servo_trq_feedback_{i}'].extend(item['value'])
elif item.get('channel', None) == i and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel[f'hw_joint_vel_feedback_{i}'].extend(item['value'])
if len(_d2d_trq['device_servo_trq_feedback_0']) / 1000 > scenario_time + 1:
_df = pd.DataFrame(_d2d_trq)
for i in range(6):
_ = sqrt(_df[f'device_servo_trq_feedback_{i}'].apply(lambda x: (rcs[i]*x/1000)**2).sum()/len(_df[f'device_servo_trq_feedback_{i}']))
del data[0][f"axis{i + 1}"][0]
data[0][f"axis{i + 1}"].append(_)
_df = pd.DataFrame(data[0])
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df.to_excel(durable_data_current_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
_df = pd.DataFrame(_d2d_vel)
for i in range(6):
_ = sqrt(_df[f'hw_joint_vel_feedback_{i}'].apply(lambda x: (rcs[i]*x/1000)**2).sum()/len(_df[f'hw_joint_vel_feedback_{i}']))
del data[1][f"axis{i + 1}"][0]
data[1][f"axis{i + 1}"].append(_)
_df = pd.DataFrame(data[1])
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df.to_excel(durable_data_velocity_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
break
else:
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq['device_servo_trq_feedback_0']:
f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 2, 'red', tab_name)
def main(path, hr, md, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, config_file, hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])

View File

@ -1,4 +1,4 @@
from json import load, dumps from json import load, dumps, loads
from socket import socket, setdefaulttimeout, AF_INET, SOCK_STREAM from socket import socket, setdefaulttimeout, AF_INET, SOCK_STREAM
from threading import Thread from threading import Thread
import selectors import selectors
@ -171,6 +171,7 @@ class HmiRequest(object):
self.half_length = 0 self.half_length = 0
self.index = 0 self.index = 0
self.reset_index = 0 self.reset_index = 0
self.durable_lock = 0
self.sock_conn() self.sock_conn()
self.t_heartbeat = Thread(target=self.heartbeat) self.t_heartbeat = Thread(target=self.heartbeat)
@ -251,14 +252,17 @@ class HmiRequest(object):
f_hb.write(_flag) f_hb.write(_flag)
if _flag == '0': if _flag == '0':
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name) self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(1.5) sleep(2)
# with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f: # with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg: # for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n') # f.write(str(loads(msg)) + '\n')
def msg_storage(self, response, flag=0): def msg_storage(self, response, flag=0):
# response是解码后的字符串
messages = self.c_msg if flag == 0 else self.c_msg_xs messages = self.c_msg if flag == 0 else self.c_msg_xs
if len(messages) < 20000: if 'move.monitor' in response:
pass
elif len(messages) < 20000:
messages.insert(0, response) messages.insert(0, response)
else: else:
messages.insert(0, response) messages.insert(0, response)
@ -565,7 +569,7 @@ class HmiRequest(object):
if flg == 0: # for old protocols if flg == 0: # for old protocols
req = None req = None
try: try:
with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8', with open(f'{current_path}/../assets/templates/json/{command}.json', encoding='utf-8',
mode='r') as f_json: mode='r') as f_json:
req = load(f_json) req = load(f_json)
except: except: