another try

This commit is contained in:
gitea 2024-07-19 16:32:31 +08:00
parent 73230a7133
commit 2f782c9693
3 changed files with 58 additions and 45 deletions

View File

@ -161,6 +161,8 @@ pyinstaller --noconfirm --onedir --windowed --add-data "C:/Users/Administrator/A
- configs.xlsx执行之前需要手动修改好configs.xlsx中的参数以及间隔时间 - configs.xlsx执行之前需要手动修改好configs.xlsx中的参数以及间隔时间
- target.zip需要确认工程点位和动作无问题后保存导出 - target.zip需要确认工程点位和动作无问题后保存导出
> 重新运行时,必须突出软件,重新运行
#### 其他 #### 其他
customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现: customtkinter的tabview组件不支持修改字体大小可以参考 [Changing Font of a Tabview](https://github.com/TomSchimansky/CustomTkinter/issues/2296) 进行手动修改源码实现:
a. 运行 `pip show customtkinter`,获取到库的路径 a. 运行 `pip show customtkinter`,获取到库的路径
@ -518,3 +520,12 @@ v0.2.0.0(2024/07/17)
- HmiRequest模块日志取消记录move.monitor相关 - HmiRequest模块日志取消记录move.monitor相关
- HmiRequest模块增加了durable_lock变量控制文件读写互斥 - HmiRequest模块增加了durable_lock变量控制文件读写互斥
v0.2.0.1(2024/07/19)
1. [main: aio.py]
- 修改了x轴显示使之为时间刻度
- 修改pre_warning函数增加了durable test的初始化
2. [main: factory_test.py]
- 增加了数据计算错误的判断逻辑
- 增加了历史数据保存的逻辑
- 增加了文件读写互斥的逻辑
- 修改功能为输出有效电流和最大电流,并将数据结构简化

View File

@ -619,6 +619,12 @@ class App(customtkinter.CTk):
tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", ) tkinter.messagebox.showerror(title="参数错误", message="请检查对应参数是否填写正确!", )
def pre_warning(self): def pre_warning(self):
if self.tabview.get() == 'Durable Action':
df = DataFrame(durable_data_current)
df.to_excel(durable_data_current_xlsx, index=False)
df = DataFrame(durable_data_current_max)
df.to_excel(durable_data_current_max_xlsx, index=False)
if tkinter.messagebox.askyesno(title="开始运行", message="确认机器已按照测试规范更新固件,并提按照测试机型前修改好工程?"): if tkinter.messagebox.askyesno(title="开始运行", message="确认机器已按照测试规范更新固件,并提按照测试机型前修改好工程?"):
pass pass
else: else:

View File

@ -11,7 +11,6 @@ from numpy import power
from csv import writer from csv import writer
tab_name = 'Durable Action' tab_name = 'Durable Action'
count = 0
durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx' durable_data_current_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current.xlsx'
durable_data_current_max_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current_max.xlsx' durable_data_current_max_xlsx = f'{dirname(__file__)}/../../assets/templates/durable/durable_data_current_max.xlsx'
display_pdo_params = [ display_pdo_params = [
@ -28,25 +27,6 @@ display_pdo_params = [
{"name": "device_servo_trq_feedback", "channel": 4}, {"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5}, {"name": "device_servo_trq_feedback", "channel": 5},
] ]
durable_data_current = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
durable_data_current_max = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
data_all = [durable_data_current, durable_data_current_max]
title = [ title = [
'time', 'trq-1', 'trq-2', 'trq-3', 'trq-4', 'trq-5', 'trq-6', 'trq-max-1', 'trq-max-2', 'trq-max-3', 'trq-max-4', 'time', 'trq-1', 'trq-2', 'trq-3', 'trq-4', 'trq-5', 'trq-6', 'trq-max-1', 'trq-max-2', 'trq-max-3', 'trq-max-4',
'trq-max-5', 'trq-max-6' 'trq-max-5', 'trq-max-6'
@ -71,8 +51,7 @@ def traversal_files(path, w2t):
def check_files(data_dirs, data_files, w2t): def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2: if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name)
10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]] _files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort() _files.sort()
@ -124,16 +103,13 @@ def execution(cmd, hr, w2t, **kwargs):
return _response return _response
def run_rl(path, config_file, hr, md, w2t): def run_rl(path, config_file, data_all, count, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
md.trigger_estop() md.trigger_estop()
md.reset_estop() md.reset_estop()
md.write_act(False) md.write_act(False)
sleep(1) # 让曲线彻底关闭 sleep(1) # 让曲线彻底关闭
_response = execution('state.switch_manual', hr, w2t)
_response = execution('state.switch_motor_off', hr, w2t)
# 2. reload工程后pp2main并且自动模式和上电 # 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj' prj_path = 'target/_build/target.prj'
@ -163,7 +139,7 @@ def run_rl(path, config_file, hr, md, w2t):
while True: while True:
scenario_time = md.read_scenario_time() scenario_time = md.read_scenario_time()
if float(scenario_time) > 1: if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}", 0, 0, 'green', tab_name) w2t(f"场景的周期时间:{scenario_time}s", 0, 0, 'green', tab_name)
break break
else: else:
if (time() - _t_start) // 60 > 3: if (time() - _t_start) // 60 > 3:
@ -172,13 +148,9 @@ def run_rl(path, config_file, hr, md, w2t):
sleep(5) sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确 sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time()) scenario_time = float(md.read_scenario_time())
sleep(scenario_time * 0.2) # 再运行周期的20%即可 sleep(scenario_time*0.2)
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式 # 6. 准备初始数据,关闭诊断曲线,保留数据并处理输出
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv: with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv:
csv_writer = writer(f_csv) csv_writer = writer(f_csv)
csv_writer.writerow(title) csv_writer.writerow(title)
@ -189,23 +161,24 @@ def run_rl(path, config_file, hr, md, w2t):
for i in range(6): for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value)) rcs.append(float(_ws.cell(row=6, column=i + 2).value))
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, w2t) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
sleep(1) # 保证所有数据均已返回
get_durable_data(path, data_all, scenario_time, wait_time, count, rcs, hr, md, w2t)
# 8. 继续运行 # 7. 继续运行
while True: while True:
# 每3分钟,更新一次数据,打开曲线,获取周期内电流,关闭曲线 # 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线
sleep(wait_time) sleep(wait_time)
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True) _response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params) _response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
sleep(scenario_time+5) sleep(scenario_time+5)
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False) _response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=[])
sleep(2) sleep(2)
# 7. 保留数据并处理输出 # 保留数据并处理输出
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, w2t) get_durable_data(path, data_all, scenario_time, wait_time, count, rcs, hr, md, w2t)
def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, w2t): def get_durable_data(path, data, scenario_time, wait_time, count, rcs, hr, md, w2t):
_data_list = [] _data_list = []
for _msg in hr.c_msg: for _msg in hr.c_msg:
if 'diagnosis.result' in _msg: if 'diagnosis.result' in _msg:
@ -219,7 +192,7 @@ def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, w2t):
del hr.c_msg[_index:] del hr.c_msg[_index:]
hr.c_msg_xs.clear() hr.c_msg_xs.clear()
# with open('log.txt', 'w', encoding='utf-8') as f_obj: # with open(f'{path}\\log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list: # for _ in _data_list:
# f_obj.write(f"{_}\n") # f_obj.write(f"{_}\n")
@ -241,12 +214,15 @@ def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, w2t):
for i in range(6): for i in range(6):
def overmax_data(df, index, number, flag): def overmax_data(df, index, number, flag):
if number > 100: if number > 100:
md.trigger_estop()
hr.durable_quit = 1
df.to_excel(f'{path}\\{this_time}.xlsx') df.to_excel(f'{path}\\{this_time}.xlsx')
w2t(f"[{this_time}] {flag}-axis-{index} 数据过大错误,需要检查确定。", 0, 0, 'red', tab_name) w2t(f"[{this_time}] {flag}-axis-{index} 数据过大错误,需要检查确定。", 0, 10, 'red', tab_name)
try: try:
_ = sqrt(_df[i].apply(lambda x: power((rcs[i]*x/1000), 2)).sum()/len(_df[i])) _ = sqrt(_df[i].apply(lambda x: power((rcs[i]*x/1000), 2)).sum()/len(_df[i]))
except: except:
md.trigger_estop()
_df.to_excel(path+"\\err_data.xlsx") _df.to_excel(path+"\\err_data.xlsx")
w2t(f"{i}calculate error", 0, 11, 'red', tab_name) w2t(f"{i}calculate error", 0, 11, 'red', tab_name)
@ -294,22 +270,42 @@ def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, w2t):
break break
else: else:
sleep(1) sleep(1)
global count
count += 1 count += 1
w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}", 0, 0, '#008B8B', tab_name) w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}", 0, 0, '#008B8B', tab_name)
break break
else: else:
md.trigger_estop()
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj: with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq[0]: for _ in _d2d_trq[0]:
f_obj.write(f"{_}\n") f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 2, 'red', tab_name) w2t("采集的数据时间长度不够,需要确认。", 0, 10, 'red', tab_name)
def main(path, hr, md, w2t): def main(path, hr, md, w2t):
count = 0
durable_data_current = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
durable_data_current_max = {
'time': list(range(1, 19)),
'axis1': [0 for _ in range(18)],
'axis2': [0 for _ in range(18)],
'axis3': [0 for _ in range(18)],
'axis4': [0 for _ in range(18)],
'axis5': [0 for _ in range(18)],
'axis6': [0 for _ in range(18)],
}
data_all = [durable_data_current, durable_data_current_max]
data_dirs, data_files = traversal_files(path, w2t) data_dirs, data_files = traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t) config_file, prj_file = check_files(data_dirs, data_files, w2t)
prj_to_xcore(prj_file) prj_to_xcore(prj_file)
run_rl(path, config_file, hr, md, w2t) run_rl(path, config_file, data_all, count, hr, md, w2t)
if __name__ == '__main__': if __name__ == '__main__':