This repository has been archived on 2025-02-25. You can view files and clone it, but cannot push or open issues or pull requests.
gitea 59711d9c65 4. [main: openapi.py]:新增 rl_task.set_run_params 指令支持,可设定速度滑块以及是否重复运行
5. [main: do_brake/do_current/factory_test.py]:在初始化运动时增加 `clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)`
2024-08-20 18:03:44 +08:00

226 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sys import argv
from json import loads
from time import sleep, time, strftime, localtime
from pandas import DataFrame
from openpyxl import load_workbook
from math import sqrt
from numpy import power
from csv import writer
from commons import clibs
logger = clibs.log_prod
tab_name = clibs.tab_names['da']
count = 0
display_pdo_params = [
# {"name": "hw_joint_vel_feedback", "channel": 0},
# {"name": "hw_joint_vel_feedback", "channel": 1},
# {"name": "hw_joint_vel_feedback", "channel": 2},
# {"name": "hw_joint_vel_feedback", "channel": 3},
# {"name": "hw_joint_vel_feedback", "channel": 4},
# {"name": "hw_joint_vel_feedback", "channel": 5},
{"name": "device_servo_trq_feedback", "channel": 0},
{"name": "device_servo_trq_feedback", "channel": 1},
{"name": "device_servo_trq_feedback", "channel": 2},
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
]
title = [
'time', 'trq-1', 'trq-2', 'trq-3', 'trq-4', 'trq-5', 'trq-6', 'trq-max-1', 'trq-max-2', 'trq-max-3', 'trq-max-4',
'trq-max-5', 'trq-max-6'
]
def check_files(data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 2:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name)
_files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]]
_files.sort()
if _files != ['configs.xlsx', 'target.zip']:
w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name)
data_files.sort()
return data_files
def run_rl(path, config_file, data_all, hr, md, w2t):
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
clibs.execution('diagnosis.open', hr, w2t, tab_name, open=True, display_open=True)
clibs.execution('diagnosis.set_params', hr, w2t, tab_name, display_pdo_params=display_pdo_params)
md.trigger_estop()
md.reset_estop()
md.write_act(False)
sleep(1) # 让曲线彻底关闭
# 2. reload工程后pp2main并且自动模式和上电
prj_path = 'target/_build/target.prj'
clibs.execution('overview.reload', hr, w2t, tab_name, prj_path=prj_path, tasks=['current'])
clibs.execution('rl_task.pp_to_main', hr, w2t, tab_name, tasks=['current'])
clibs.execution('state.switch_auto', hr, w2t, tab_name)
clibs.execution('state.switch_motor_on', hr, w2t, tab_name)
# 3. 开始运行程序
clibs.execution('rl_task.set_run_params', hr, w2t, tab_name, loop_mode=True, override=1.0)
clibs.execution('rl_task.run', hr, w2t, tab_name, tasks=['current'])
_t_start = time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
if (time() - _t_start) // 20 > 1:
w2t("20s内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
sleep(20) # 初始化 scenario time 为 0
_t_start = time()
while True:
scenario_time = md.read_scenario_time()
if float(scenario_time) > 1:
w2t(f"场景的周期时间:{scenario_time}s", 0, 0, 'green', tab_name)
break
else:
if (time() - _t_start) // 60 > 3:
w2t(f"未收到场景的周期时间需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name)
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
scenario_time = float(md.read_scenario_time())
sleep(scenario_time*0.2)
# 6. 准备初始数据,关闭诊断曲线,保留数据并处理输出
with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv:
csv_writer = writer(f_csv)
csv_writer.writerow(title)
_wb = load_workbook(config_file, read_only=True)
_ws = _wb['Target']
wait_time = float(_ws.cell(row=2, column=10).value)
rcs = []
for i in range(6):
rcs.append(float(_ws.cell(row=6, column=i + 2).value))
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t)
# 7. 继续运行
while True:
# 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线
sleep(wait_time+scenario_time+7)
# 保留数据并处理输出
get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t)
def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, md, w2t):
_data_list = []
_c_msg = hr.c_msg.copy()
for _msg in _c_msg:
if 'diagnosis.result' in _msg:
_data_list.insert(0, loads(_msg))
else:
hr.c_msg_xs.clear()
if len(hr.c_msg) > 270:
del hr.c_msg[270:]
# with open(f'{path}\\log.txt', 'w', encoding='utf-8') as f_obj:
# for _ in _data_list:
# f_obj.write(f"{_}\n")
_d2d_trq = {0: [], 1: [], 2: [], 3: [], 4: [], 5: []}
for line in _data_list:
for item in line['data']:
for i in range(6):
item['value'].reverse()
if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq[i].extend(item['value'])
if len(_d2d_trq[0]) / 1000 > scenario_time + 1:
this_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()))
next_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()+wait_time+10+scenario_time)).split()[-1]
_df = DataFrame(_d2d_trq)
_flg = 0
_res = []
for i in range(6):
def overmax_data(df, index, number, flag):
if number > 100:
md.trigger_estop()
hr.durable_quit = 1
df.to_excel(f'{path}\\{this_time}.xlsx')
w2t(f"[{this_time}] {flag}-axis-{index} 数据过大错误,需要检查确定。", 0, 10, 'red', tab_name)
try:
_ = sqrt(_df[i].apply(lambda x: power((rcs[i]*x/1000), 2)).sum()/len(_df[i]))
except:
md.trigger_estop()
_df.to_excel(path+"\\err_data.xlsx")
w2t(f"{i}calculate error", 0, 11, 'red', tab_name)
if not _flg:
del data[0]['time'][0]
data[0]['time'].append(this_time.split()[-1])
del data[1]['time'][0]
data[1]['time'].append(this_time.split()[-1])
_res.append(this_time)
_flg = 1
del data[0][f"axis{i + 1}"][0]
overmax_data(_df, i, _, 'trq')
data[0][f"axis{i + 1}"].append(_)
_res.append(_)
_ = rcs[i] * _df[i].abs().max() / 1000
overmax_data(_df, i, _, 'trq-max')
del data[1][f"axis{i + 1}"][0]
data[1][f"axis{i + 1}"].append(_)
_res.append(_)
_df_1 = DataFrame(data[0])
_df_2 = DataFrame(data[1])
with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv:
def change_order(res):
_time = res[0:1]
_trq = []
_trq_max = []
for _item in res[1::2]:
_trq.append(_item)
for _item in res[2::2]:
_trq_max.append(_item)
return _time + _trq + _trq_max
csv_writer = writer(f_csv)
csv_writer.writerow(change_order(_res))
while True:
if not hr.durable_lock:
hr.durable_lock = 1
_df_1.to_excel(clibs.durable_data_current_xlsx, index=False)
_df_2.to_excel(clibs.durable_data_current_max_xlsx, index=False)
hr.durable_lock = 0
break
else:
sleep(1)
global count
count += 1
w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}", 0, 0, '#008B8B', tab_name)
break
else:
md.trigger_estop()
with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj:
for _ in _d2d_trq[0]:
f_obj.write(f"{_}\n")
w2t("采集的数据时间长度不够,需要确认。", 0, 10, 'red', tab_name)
def main(path, hr, md, w2t):
data_all = [clibs.durable_data_current, clibs.durable_data_current_max]
data_dirs, data_files = clibs.traversal_files(path, w2t)
config_file, prj_file = check_files(data_dirs, data_files, w2t)
clibs.prj_to_xcore(prj_file)
run_rl(path, config_file, data_all, hr, md, w2t)
if __name__ == '__main__':
main(*argv[1:])