minor modifications

This commit is contained in:
gitea 2024-07-18 14:59:20 +08:00
parent dd0873637f
commit 1cefe4a16b
5 changed files with 37 additions and 48 deletions

View File

@ -13,13 +13,13 @@ from automatic_test import *
from durable_action import *
import openapi
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
from matplotlib import use
from pandas import DataFrame, read_excel
matplotlib.use('Agg')
use('Agg')
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
durable_data_current_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_velocity.xlsx'
durable_data_current_max_xlsx = f'{dirname(__file__)}/../assets/templates/durable/durable_data_current_max.xlsx'
customtkinter.set_appearance_mode("System") # Modes: "System" (standard), "Dark", "Light"
customtkinter.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
customtkinter.set_widget_scaling(1.1) # widget dimensions and text size
@ -35,7 +35,7 @@ durable_data_current = {
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
durable_data_current_max = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
@ -64,7 +64,7 @@ widgits_at = {
}
widgits_da = {
'path': {'label': '', 'entry': '', 'row': 1, 'col': 2, 'text': '数据文件夹路径'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '曲线选择'},
'curvesel': {'label': '', 'optionmenu': '', 'row': 1, 'col': 1, 'text': '指标选择'},
}
@ -185,7 +185,7 @@ class App(customtkinter.CTk):
widgits_da[widgit]['entry'] = customtkinter.CTkEntry(self.tabview.tab('Durable Action'), width=670, placeholder_text=widgits_da[widgit]['text'], font=self.my_font)
widgits_da[widgit]['entry'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col']+1, columnspan=11, padx=(5, 10), pady=10, sticky='we')
elif widgit in ['curvesel']:
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), dynamic_resizing=False, button_color='#708090', fg_color='#778899', values=["device_servo_trq_feedback", "hw_joint_vel_feedback"], font=self.my_font)
widgits_da[widgit]['optionmenu'] = customtkinter.CTkOptionMenu(self.tabview.tab('Durable Action'), dynamic_resizing=False, button_color='#708090', fg_color='#778899', values=['device_servo_trq_feedback', '[max] device_servo_trq_feedback'], font=self.my_font)
widgits_da[widgit]['optionmenu'].grid(row=widgits_da[widgit]['row'], column=widgits_da[widgit]['col'], padx=5, pady=10, sticky='we')
widgits_da[widgit]['optionmenu'].set(widgits_da[widgit]['text'])
# For durable_action tab END =====================================================================
@ -224,14 +224,14 @@ class App(customtkinter.CTk):
if not self.hr.durable_lock:
self.hr.durable_lock = 1
if curvesel == 'device_servo_trq_feedback':
df = pd.read_excel(durable_data_current_xlsx)
df = read_excel(durable_data_current_xlsx)
_title = 'device_servo_trq_feedback'
elif curvesel == 'hw_joint_vel_feedback':
_title = 'hw_joint_vel_feedback'
df = pd.read_excel(durable_data_velocity_xlsx)
elif curvesel == '[max] device_servo_trq_feedback':
_title = '[max] device_servo_trq_feedback'
df = read_excel(durable_data_current_max_xlsx)
else:
_title = 'device_servo_trq_feedback'
df = pd.read_excel(durable_data_current_xlsx)
df = read_excel(durable_data_current_xlsx)
self.hr.durable_lock = 0
break
else:
@ -281,10 +281,10 @@ class App(customtkinter.CTk):
# self.tabview.configure(state='normal')
def detect_network(self):
df = pd.DataFrame(durable_data_current)
df = DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = pd.DataFrame(durable_data_velocity)
df.to_excel(durable_data_velocity_xlsx, index=False)
df = DataFrame(durable_data_current_max)
df.to_excel(durable_data_current_max_xlsx, index=False)
with open(heartbeat, "w", encoding='utf-8') as f_hb:
f_hb.write('0')
@ -578,7 +578,7 @@ class App(customtkinter.CTk):
path = widgits_da['path']['entry'].get().strip()
curvesel = widgits_da['curvesel']['optionmenu'].get()
c1 = exists(path)
c2 = curvesel in ['device_servo_trq_feedback', 'hw_joint_vel_feedback']
c2 = curvesel in ['device_servo_trq_feedback', '[max] device_servo_trq_feedback']
if c1 and c2:
return 7, path, curvesel
else:

View File

@ -4,21 +4,21 @@ from os import scandir
from paramiko import SSHClient, AutoAddPolicy
from json import loads
from time import sleep, time, strftime, localtime
import pandas as pd
from pandas import DataFrame
from openpyxl import load_workbook
from math import sqrt
tab_name = 'Durable Action'
count = 0
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_velocity_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_velocity.xlsx'
durable_data_current_max_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current_max.xlsx'
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
{"name": "hw_joint_vel_feedback", "channel": 1},
{"name": "hw_joint_vel_feedback", "channel": 2},
{"name": "hw_joint_vel_feedback", "channel": 3},
{"name": "hw_joint_vel_feedback", "channel": 4},
{"name": "hw_joint_vel_feedback", "channel": 5},
# {"name": "hw_joint_vel_feedback", "channel": 0},
# {"name": "hw_joint_vel_feedback", "channel": 1},
# {"name": "hw_joint_vel_feedback", "channel": 2},
# {"name": "hw_joint_vel_feedback", "channel": 3},
# {"name": "hw_joint_vel_feedback", "channel": 4},
# {"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
@ -35,7 +35,7 @@ durable_data_current = {
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
durable_data_velocity = {
durable_data_current_max = {
'time': list(range(1, 26)),
'axis1': [0 for _ in range(25)],
'axis2': [0 for _ in range(25)],
@ -44,7 +44,7 @@ durable_data_velocity = {
'axis5': [0 for _ in range(25)],
'axis6': [0 for _ in range(25)],
}
data_all = [durable_data_current, durable_data_velocity]
data_all = [durable_data_current, durable_data_current_max]
def traversal_files(path, w2t):
@ -214,44 +214,33 @@ def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, w2t):
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_d2d_trq = {
'device_servo_trq_feedback_0': [], 'device_servo_trq_feedback_1': [], 'device_servo_trq_feedback_2': [],
'device_servo_trq_feedback_3': [], 'device_servo_trq_feedback_4': [], 'device_servo_trq_feedback_5': [],
}
_d2d_vel = {
'hw_joint_vel_feedback_0': [], 'hw_joint_vel_feedback_1': [], 'hw_joint_vel_feedback_2': [],
'hw_joint_vel_feedback_3': [], 'hw_joint_vel_feedback_4': [], 'hw_joint_vel_feedback_5': [],
}
_d2d_trq = {0: [], 1: [], 2: [], 3: [], 4: [], 5: []}
_d2d_trq_max = {0: [], 1: [], 2: [], 3: [], 4: [], 5: []}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[f'device_servo_trq_feedback_{i}'].extend(item['value'])
elif item.get('channel', None) == i and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel[f'hw_joint_vel_feedback_{i}'].extend(item['value'])
_d2d_trq[i].extend(item['value'])
if len(_d2d_trq['device_servo_trq_feedback_0']) / 1000 > scenario_time + 1:
# ========= for trq =========
_df_1 = pd.DataFrame(_d2d_trq)
if len(_d2d_trq[0]) / 1000 > scenario_time + 1:
_df = DataFrame(_d2d_trq)
for i in range(6):
_ = sqrt(100*_df_1[f'device_servo_trq_feedback_{i}'].apply(lambda x: (rcs[i]*x/10000)**2).sum()/len(_df[f'device_servo_trq_feedback_{i}']))
_ = sqrt(100*_df[i].apply(lambda x: (rcs[i]*x/10000)**2).sum()/len(_df[i]))
del data[0][f"axis{i + 1}"][0]
data[0][f"axis{i + 1}"].append(_)
_df_1 = pd.DataFrame(data[0])
# ========= for vel =========
_df_2 = pd.DataFrame(_d2d_vel)
for i in range(6):
_ = sqrt(100*_df_2[f'hw_joint_vel_feedback_{i}'].apply(lambda x: (rcs[i]*x/10000)**2).sum()/len(_df[f'hw_joint_vel_feedback_{i}']))
_ = rcs[i] * _df[i].abs().max() / 1000
del data[1][f"axis{i + 1}"][0]
data[1][f"axis{i + 1}"].append(_)
_df_2 = pd.DataFrame(data[1])
_df_1 = DataFrame(data[0])
_df_2 = DataFrame(data[1])
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df_1.to_excel(durable_data_current_xlsx, index=False)
_df_2.to_excel(durable_data_velocity_xlsx, index=False)
_df_2.to_excel(durable_data_current_max_xlsx, index=False)
hr.durable_lock = 0
break
else: