耐久场景测试,将采集规则由固定时间间隔,修改为固定运动动作周期

This commit is contained in:
2025-03-11 15:16:47 +08:00
parent 0fb4e43482
commit 62200a9fe4
9 changed files with 69 additions and 27 deletions

View File

@ -4,7 +4,6 @@ import time
import pandas
import math
import csv
import numpy
from common import clibs
@ -122,12 +121,12 @@ def run_rl(path, params, curves, hr, md, w2t):
while True:
scenario_time = float(f"{float(md.read_scenario_time()):.2f}")
if scenario_time != 0:
w2t(f"耐久工程的周期时间:{scenario_time}s | 单轮次执行时间:{scenario_time+interval}\n")
w2t(f"耐久工程的周期时间:{scenario_time}s | 单轮次执行时间:{scenario_time+interval}~{scenario_time*2+interval}\n")
break
else:
time.sleep(1)
if (time.time() - t_start) > 300:
w2t(f"300s 内未收到耐久工程的周期时间需要确认RL程序和工具通信交互是否正常执行...\n", "red", "GetScenarioTimeError")
if (time.time() - t_start) > 900:
w2t(f"900s 内未收到耐久工程的周期时间需要确认RL程序和工具通信交互是否正常执行...\n", "red", "GetScenarioTimeError")
# 6. 准备数据保存文件
for curve in curves:
@ -141,11 +140,19 @@ def run_rl(path, params, curves, hr, md, w2t):
count = 0
while clibs.running:
this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
next_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1))
w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}\n", "#008B8B")
next_time_1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1))
next_time_2 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1+scenario_time))
w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time_1}~{next_time_2}\n", "#008B8B")
count += 1
# 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线
time.sleep(interval)
while True:
capture_start = md.read_capture_start()
if capture_start == 1:
break
else:
time.sleep(0.1)
change_curve_state(hr, curves, True, True)
time.sleep(scenario_time)
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
@ -194,7 +201,7 @@ def proc_device_servo_trq_feedback(records, params, w2t):
for axis in range(6):
df = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq[axis]})
_ = math.sqrt(df.apply(lambda x: numpy.power((rcs[axis] * float(x.iloc[0]) / 1000), 2)).sum() / len(df))
_ = math.sqrt((df[df.columns[0]] * rcs[axis] / 1000).pow(2).sum() / len(df))
results.append(_)
path = "/".join(params["prj_file"].split("/")[:-1])