v0.1.9.4(2024/07/15)

1. [profile: aio.py]:完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]:删除validate_resp函数,修改execution函数
3. [profile: factory_test.py]
   - 新增耐久/老化测试程序
   - 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整
This commit is contained in:
gitea 2024-07-17 10:09:06 +08:00
parent cf9d51b475
commit da5ddcea0a
36 changed files with 402 additions and 87 deletions

3
.gitignore vendored
View File

@ -7,4 +7,5 @@ aio/venv
aio/__pycache__/
aio/code/automatic_test/__pycache__/
aio/code/data_process/__pycache__/
aio/assets/templates/c_msg.log
aio/assets/templates/c_msg.log
aio/code/durable_action/__pycache__/

View File

@ -486,7 +486,16 @@ v0.1.9.3(2024/07/15)
- 修改modbus连接失败报错输出形式使之只在automatic test页面显示
- 将该文件移动至toplevel为后面扩展做准备
- 修改heartbeat文件路径使后续打包的时候更方便
2. [APIs: aio.py]:
2. [APIs: aio.py]
- 修改heartbeat文件路径使后续打包的时候更方便
- 修改write2textbox函数的打印逻辑先判断网络相关
v0.1.9.4(2024/07/15)
1. [profile: aio.py]完善durable text相关逻辑
2. [profile: do_brake/do_current/btn_functions.py]删除validate_resp函数修改execution函数
3. [profile: factory_test.py]
- 新增耐久/老化测试程序
- 实现六轴折线图显示
4. [profile: openapi.py]:多次合并遗留问题处理
5. templates文件夹组织架构调整

Binary file not shown.

Binary file not shown.

View File

@ -1 +1 @@
0
1

View File

@ -7,17 +7,43 @@ import customtkinter
from time import time, strftime, localtime, sleep
from urllib.request import urlopen
from socket import setdefaulttimeout
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from data_process import *
from automatic_test import *
from durable_action import *
import openapi
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
matplotlib.use('Agg')
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
durable_data_current_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_velocity.xlsx'
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
customtkinter.set_window_scaling(1.1) # window geometry dimensions
setdefaulttimeout(3)
# global vars
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
btns_func = {
'start': {'btn': '', 'row': 1, 'text': '开始运行'},
'check': {'btn': '', 'row': 2, 'text': '检查参数'},
@ -36,11 +62,9 @@ widgits_at = {
'path': {'label': '', 'entry': '', 'row': 2, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载信息'},
}
widgits_pr = {
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 2, 'col': 1, 'text': '负载选择'},
'actionsel': {'label': '', 'optionmenu': '', 'row': 3, 'col': 1, 'text': '动作选择'},
'inertiasel': {'label': '', 'optionmenu': '', 'row': 4, 'col': 1, 'text': '惯量选择'},
'loadsel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '负载选择'},
}
@ -51,6 +75,9 @@ class App(customtkinter.CTk):
self.w_param = 84
self.hr = None
self.md = None
self.canvas = None
self.flg = 0
self.df_copy = None
# =====================================================================
# configure window
self.title("AIO - All in one automatic toolbox")
@ -86,7 +113,7 @@ class App(customtkinter.CTk):
self.tabview.grid(row=0, column=1, padx=10, pady=5, sticky="nsew")
self.tabview.add("Data Process")
self.tabview.add("Automatic Test")
self.tabview.add("Profile Test")
self.tabview.add("Durable Action")
# create main menu for data process
self.menu_main_dp = customtkinter.CTkOptionMenu(self.tabview.tab('Data Process'), values=["init", "brake", "current", "iso", "wavelogger"], font=self.my_font, text_color='yellow', button_color='red', fg_color='green', command=self.func_main_callback)
self.menu_main_dp.grid(row=1, column=1, sticky='we', padx=5, pady=10)
@ -122,7 +149,7 @@ class App(customtkinter.CTk):
# For automatic test tab START =====================================================================
# create buttons
self.seg_button = customtkinter.CTkSegmentedButton(self.tabview.tab('Automatic Test'), dynamic_resizing=False, font=customtkinter.CTkFont(size=16, weight='bold'), command=lambda value='机器状态': self.thread_it(self.segmented_button_callback))
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(20, 10), pady=(10, 10), sticky="ew")
self.seg_button.grid(row=1, column=2, columnspan=12, padx=(65, 10), pady=(10, 10), sticky="ew")
self.seg_button.configure(dynamic_resizing=False, values=["功能切换", "触发急停", "恢复急停", "待定功能", "功能待定", "机器状态", "告警信息"])
self.seg_button.set("功能切换")
# create progress bar
@ -134,7 +161,7 @@ class App(customtkinter.CTk):
for widgit in widgits_at:
if widgit == 'path':
widgits_at[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Automatic Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', pady=5)
widgits_at[widgit]['label'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col'], sticky='e', padx=(20, 5), pady=5)
widgits_at[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Automatic Test'), width=670, placeholder_text=widgits_at[widgit]['text'], font=self.my_font)
widgits_at[widgit]['entry'].grid(row=widgits_at[widgit]['row'], column=widgits_at[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
widgits_at[widgit]['entry'].configure(state='disabled')
@ -144,33 +171,23 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
# For automatic test tab END =====================================================================
# For profile tab START =====================================================================
for widgit in widgits_pr:
if widgit == 'path':
widgits_pr[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Profile Test'), text=f'{widgit.upper()}', font=self.my_font)
widgits_pr[widgit]['label'].grid(row=widgits_pr[widgit]['row'], column=widgits_pr[widgit]['col'], sticky='e', pady=5)
widgits_pr[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Profile Test'), width=670, placeholder_text=widgits_pr[widgit]['text'], font=self.my_font)
widgits_pr[widgit]['entry'].grid(row=widgits_pr[widgit]['row'], column=widgits_pr[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=5, sticky='we')
# widgits_pr[widgit]['entry'].configure(state='disabled')
elif widgit in ['loadsel', 'actionsel', 'inertiasel']:
match widgit:
case 'loadsel':
values = ["tool100", "inertia"]
case 'actionsel':
values = ["signle", "box", "cart"]
case 'inertiasel':
values = ["high", "middle", "low"]
widgits_pr[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Profile Test'), button_color='#708090', fg_color='#778899', values=values, width=self.w_param, font=self.my_font)
widgits_pr[widgit]['optionmenu'].grid(row=widgits_pr[widgit]['row'], column=widgits_pr[widgit]['col'], padx=5, pady=5, sticky='we')
widgits_pr[widgit]['optionmenu'].set(widgits_pr[widgit]['text'])
# widgits_pr[widgit]['optionmenu'].configure(state='disabled')
# For durable_action tab START =====================================================================
# create progress bar
self.progressbar_pr = customtkinter.CTkProgressBar(self.tabview.tab('Profile Test'))
self.progressbar_pr.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_pr.configure(mode="determinnate", width=10)
self.progressbar_pr.start()
# For profile tab END =====================================================================
self.progressbar_da = customtkinter.CTkProgressBar(self.tabview.tab('Durable Action'))
self.progressbar_da.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.progressbar_da.configure(mode="determinnate", width=self.w_param)
self.progressbar_da.start()
for widgit in widgits_da:
if widgit == 'path':
widgits_da[widgit]['label'] = customtkinter.CTkLabel(self.tabview.tab('Durable Action'), text=f'{widgit.upper()}', font=self.my_font)
widgits_da[widgit]['label'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], sticky='e', padx=(20, 5), pady=10)
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['loadsel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), button_color='#708090', fg_color='#778899', values=["tool100", "inertia"], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
# create textbox
self.textbox = customtkinter.CTkTextbox(self, wrap='none', font=customtkinter.CTkFont(family="consolas", size=14), text_color="blue", fg_color='#E9E9E9', border_width=2, border_color='#CDCDCD', border_spacing=5)
self.textbox.grid(row=6, column=1, columnspan=13, ipadx=10, ipady=10, padx=10, pady=(5, 10), sticky='nsew')
@ -188,6 +205,35 @@ class App(customtkinter.CTk):
tkinter.messagebox.showwarning(title="版本更新", message="连接服务器失败,无法确认当前是否是最新版本......")
# functions below ↓ ----------------------------------------------------------------------------------------
def create_canvas(self, figure):
self.canvas = FigureCanvasTkAgg(figure, self)
self.canvas.draw()
self.canvas.get_tk_widget().configure(height=640)
self.canvas.get_tk_widget().grid(row=3, column=1, rowspan=3, columnspan=13, padx=20, pady=10, sticky="nsew")
def create_plot(self):
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.size'] = 14
plt.rcParams['lines.marker'] = 'o'
df = pd.read_excel(durable_data_current_xlsx)
if not df.equals(self.df_copy) or self.flg == 0:
self.flg = 1
self.df_copy = df.copy()
figure = plt.figure(frameon=True, facecolor='#E9E9E9')
plt.subplots_adjust(left=0.04, right=0.98, bottom=0.05, top=0.98)
ax = figure.add_subplot(1, 1, 1)
df.plot(grid=True, x='time', y='axis1', ax=ax)
df.plot(grid=True, x='time', y='axis2', ax=ax)
df.plot(grid=True, x='time', y='axis3', ax=ax)
df.plot(grid=True, x='time', y='axis4', ax=ax)
df.plot(grid=True, x='time', y='axis5', ax=ax)
df.plot(grid=True, x='time', y='axis6', ax=ax)
self.create_canvas(figure)
def thread_it(self, func, *args):
""" 将函数打包进线程 """
self.myThread = Thread(target=func, args=args)
@ -216,17 +262,23 @@ class App(customtkinter.CTk):
# self.tabview.configure(state='normal')
def detect_network(self):
df = pd.DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
self.hr = openapi.HmiRequest(self.write2textbox)
self.md = openapi.ModbusRequest(self.write2textbox)
while True:
if self.tabview.get() == 'Durable Action':
self.create_plot()
with open(heartbeat, 'r', encoding='utf-8') as f_hb:
c_state = f_hb.read().strip()
pb_color = 'green' if c_state == '1' else 'red'
self.progressbar.configure(progress_color=pb_color)
self.progressbar_pr.configure(progress_color=pb_color)
self.progressbar_da.configure(progress_color=pb_color)
if c_state == '0':
self.hr.t_bool = False
sleep(3)
@ -239,10 +291,22 @@ class App(customtkinter.CTk):
tab_name = self.tabview.get()
if tab_name == 'Data Process':
self.flg = 0
self.menu_main_dp.set("Start Here!")
try:
self.canvas.get_tk_widget().grid_forget()
except:
pass
elif tab_name == 'Automatic Test':
self.flg = 0
self.menu_main_at.set("Start Here!")
self.seg_button.configure(state='normal')
try:
self.canvas.get_tk_widget().grid_forget()
except:
pass
elif tab_name == 'Durable Action':
pass
def initialization(self):
tab_name = self.tabview.get()
@ -263,7 +327,7 @@ class App(customtkinter.CTk):
self.menu_sub_dp.grid_forget()
elif tab_name == 'Automatic Test':
for widgit in widgits_at:
if widgit in ['path', 'av1', 'av2', 'av3', 'av4', 'av5', 'av6', 'rc1', 'rc2', 'rc3', 'rc4', 'rc5', 'rc6', 'rr1', 'rr2', 'rr3', 'rr4', 'rr5', 'rr6']:
if widgit in ['path', ]:
widgits_at[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_at[widgit]['entry'].delete(0, tkinter.END)
widgits_at[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
@ -273,6 +337,15 @@ class App(customtkinter.CTk):
widgits_at[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
widgits_at[widgit]['optionmenu'].configure(state='disabled')
self.seg_button.set("功能切换")
elif tab_name == 'Durable Action':
for widgit in widgits_da:
if widgit in ['path', ]:
widgits_da[widgit]['label'].configure(text=f'{widgit.upper()}', text_color='black')
widgits_da[widgit]['entry'].delete(0, tkinter.END)
widgits_da[widgit]['entry'].configure(placeholder_text=widgits_at[widgit]['text'], state='normal')
elif widgit in ['loadsel']:
widgits_da[widgit]['optionmenu'].configure(state='normal')
widgits_da[widgit]['optionmenu'].set(widgits_at[widgit]['text'])
def func_main_callback(self, func_name):
self.initialization()
@ -369,7 +442,7 @@ class App(customtkinter.CTk):
self.textbox.tag_config(tagName=color, foreground=color)
tab_name_cur = self.tabview.get()
if tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
if tab_name == tab_name_cur:
if wait != 0:
self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update()
@ -383,7 +456,7 @@ class App(customtkinter.CTk):
self.textbox.insert(index='end', text=text + '\n', tags=color)
self.textbox.update()
self.textbox.see('end')
elif tab_name == tab_name_cur:
elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test' or tab_name_cur == 'Durable Action':
if wait != 0:
self.textbox.insert(index='end', text=text, tags=color)
self.textbox.update()
@ -489,12 +562,24 @@ class App(customtkinter.CTk):
return 0, 0
else:
return 0, 0
elif tab_name == 'Durable Action':
path = widgits_da['path']['entry'].get().strip()
loadsel = widgits_da['loadsel']['optionmenu'].get()
c1 = exists(path)
c2 = loadsel in ['tool100', 'inertia']
if c1 and c2:
return 7, path, loadsel
else:
return 0, 0
def func_start_callback(self):
self.textbox.delete(index1='1.0', index2='end')
flag, *args = self.check_param()
func_dict = {1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main}
func_dict = {
1: brake.main, 2: current.main, 3: iso.main, 4: wavelogger.main, 5: do_brake.main, 6: do_current.main,
7: factory_test.main
}
if flag == 1:
func_dict[flag](path=args[0], vel=args[1], trq=args[2], estop=args[3], w2t=self.write2textbox)
elif flag == 2:
@ -509,6 +594,9 @@ class App(customtkinter.CTk):
elif flag == 6:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, loadsel=args[1], w2t=self.write2textbox)
elif flag == 7:
self.pre_warning()
func_dict[flag](path=args[0], hr=self.hr, md=self.md, w2t=self.write2textbox)
else:
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )

View File

@ -2,18 +2,6 @@ from json import loads
from sys import argv
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -21,7 +9,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response

View File

@ -101,18 +101,6 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -120,7 +108,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response

View File

@ -86,18 +86,6 @@ def prj_to_xcore(prj_file):
ssh.close()
def validate_resp(_id, response, w2t):
match _id:
case 'DATA ERR':
w2t(f"数据处理错误,需要确认", 0, 4, 'red', tab_name='Automatic Test')
case 'DATA READ ERR':
w2t(f"无法读取数据,需要确认", 0, 3, 'red', tab_name='Automatic Test')
case 'NOT SUPPORT':
w2t(f"不支持的功能,需要确认", 0, 2, 'red', tab_name='Automatic Test')
if not response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
@ -105,7 +93,8 @@ def execution(cmd, hr, w2t, **kwargs):
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name='Automatic Test')
else:
_response = loads(_msg)
validate_resp(_id, _response, w2t)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
return _response

View File

@ -0,0 +1 @@
__all__ = ['factory_test']

View File

@ -0,0 +1,252 @@
from sys import argv
from os.path import exists, dirname
from os import scandir
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from time import sleep, time
import pandas as pd
from openpyxl import load_workbook
from math import sqrt
tab_name = 'Durable Action'
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_velocity.xlsx'
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
durable_data_current = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
'axis3': [0 for _ in range(25)],
'axis4': [0 for _ in range(25)],
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
def traversal_files(path, w2t):
if not exists(path):
msg = f'数据文件夹{path}不存在,请确认后重试......'
w2t(msg, 0, 1, 'red', tab_name=tab_name)
else:
dirs = []
files = []
for item in scandir(path):
if item.is_dir():
dirs.append(item.path)
elif item.is_file():
files.append(item.path)
return dirs, files
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0,
10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort()
if _files != ['configs.xlsx', 'target.zip']:
w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2.configs.xlsx', 0, 10, 'red', tab_name)
data_files.sort()
return data_files
def prj_to_xcore(prj_file):
ssh = SSHClient()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
ssh.exec_command(cmd)
cmd = 'sudo rm -rf /home/luoshi/bin/controller/projects/target; '
cmd += 'sudo mv /tmp/target/ /home/luoshi/bin/controller/projects/'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
_prj_name = prj_file.split('\\')[-1].removesuffix('.zip')
cmd = 'cd /home/luoshi/bin/controller/; '
cmd += f'sudo mv projects/target/_build/{_prj_name}.prj projects/target/_build/target.prj'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write('luoshi2019' + '\n')
stdin.flush()
print(stdout.read().decode()) # 必须得输出一下stdout才能正确执行sudo
print(stderr.read().decode()) # 顺便也执行以下stderr
ssh.close()
def execution(cmd, hr, w2t, **kwargs):
_id = hr.execution(cmd, **kwargs)
_msg = hr.get_from_id(_id)
if not _msg:
w2t(f"无法获取{_id}请求的响应信息", 0, 7, 'red', tab_name=tab_name)
else:
_response = loads(_msg)
if not _response:
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name=tab_name)
return _response
def run_rl(path, config_file, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
_response = execution('overview.reload', hr, w2t, prj_path=prj_path, tasks=['current'])
_response = execution('overview.get_cur_prj', hr, w2t)
_response = execution('rl_task.pp_to_main', hr, w2t, tasks=['current'])
_response = execution('state.switch_auto', hr, w2t)
_response = execution('state.switch_motor_on', hr, w2t)
# 3. 开始运行程序
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}", 0, 0, 'green', tab_name)
break
else:
if (time() - _t_start) // 60 > 3:
w2t(f"未收到场景的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time())
sleep(scenario_time * 0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
data = get_durable_data(path, config_file, durable_data_current, scenario_time, hr, w2t)
df = pd.DataFrame(data)
df.to_excel(durable_data_current_xlsx, index=False)
# 8. 继续运行
_response = execution('rl_task.run', hr, w2t, tasks=['current'])
while True:
# 每3分钟更新一次数据打开曲线获取周期内电流关闭曲线
sleep(180)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(scenario_time + 10)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
# 7. 保留数据并处理输出
data = get_durable_data(path, config_file, durable_data_current, scenario_time, hr, w2t)
df = pd.DataFrame(data)
df.to_excel(durable_data_current_xlsx, index=False)
def get_durable_data(path, config_file, data, scenario_time, hr, w2t):
_data_list = []
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_data_list.insert(0, loads(_msg))
else:
_index = 210
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
break
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
# with open('log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_wb = load_workbook(config_file, read_only=True)
_ws = _wb['Target']
rcs = []
for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value))
_d2d_trq = {
'device_servo_trq_feedback_0': [], 'device_servo_trq_feedback_1': [], 'device_servo_trq_feedback_2': [],
'device_servo_trq_feedback_3': [], 'device_servo_trq_feedback_4': [], 'device_servo_trq_feedback_5': [],
}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[f'device_servo_trq_feedback_{i}'].extend(item['value'])
if len(_d2d_trq['device_servo_trq_feedback_0']) / 1000 > scenario_time + 1:
_df = pd.DataFrame(_d2d_trq)
for i in range(6):
_ = sqrt(_df[f'device_servo_trq_feedback_{i}'].apply(lambda x: (rcs[i]*x/1000)**2).sum()/len(_df[f'device_servo_trq_feedback_{i}']))
del data[f"axis{i + 1}"][0]
data[f"axis{i + 1}"].append(_)
break
else:
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq['device_servo_trq_feedback_0']:
f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 2, 'red', tab_name)
return data
def main(path, hr, md, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, config_file, hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])

View File

@ -250,9 +250,8 @@ class HmiRequest(object):
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write(_flag)
if _flag == '0':
# self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
self.w2t(f"", 0, 7, 'red', tab_name=self.tab_name)
sleep(1.5)
self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
sleep(2)
# with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
# for msg in self.c_msg:
# f.write(str(loads(msg)) + '\n')
@ -566,7 +565,7 @@ class HmiRequest(object):
if flg == 0: # for old protocols
req = None
try:
with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8',
with open(f'{current_path}/../assets/templates/json/{command}.json', encoding='utf-8',
mode='r') as f_json:
req = load(f_json)
except:
@ -602,8 +601,7 @@ class HmiRequest(object):
self.c.send(self.package(cmd))
sleep(0.5)
except Exception as Err:
# self.w2t(f"{cmd}\n请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name)
self.w2t(f"", 0, 0, 'red', tab_name=self.tab_name)
self.w2t(f"{cmd}\n请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name)
return req['id']

View File

@ -1 +0,0 @@
__all__ = ['do_profile']