from sys import argv from json import loads from time import sleep, time, strftime, localtime from pandas import DataFrame from openpyxl import load_workbook from math import sqrt from numpy import power from csv import writer from commons import clibs logger = clibs.log_prod tab_name = clibs.tab_names['da'] count = 0 display_pdo_params = [ # {"name": "hw_joint_vel_feedback", "channel": 0}, # {"name": "hw_joint_vel_feedback", "channel": 1}, # {"name": "hw_joint_vel_feedback", "channel": 2}, # {"name": "hw_joint_vel_feedback", "channel": 3}, # {"name": "hw_joint_vel_feedback", "channel": 4}, # {"name": "hw_joint_vel_feedback", "channel": 5}, {"name": "device_servo_trq_feedback", "channel": 0}, {"name": "device_servo_trq_feedback", "channel": 1}, {"name": "device_servo_trq_feedback", "channel": 2}, {"name": "device_servo_trq_feedback", "channel": 3}, {"name": "device_servo_trq_feedback", "channel": 4}, {"name": "device_servo_trq_feedback", "channel": 5}, ] title = [ 'time', 'trq-1', 'trq-2', 'trq-3', 'trq-4', 'trq-5', 'trq-6', 'trq-max-1', 'trq-max-2', 'trq-max-3', 'trq-max-4', 'trq-max-5', 'trq-max-6' ] def check_files(data_dirs, data_files, w2t): if len(data_dirs) != 0 or len(data_files) != 2: w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name) _files = [data_files[0].split('\\')[-1], data_files[1].split('\\')[-1]] _files.sort() if _files != ['configs.xlsx', 'target.zip']: w2t('初始路径下只能存在如下文件,确认后重新运行!\n1. target.zip\n2. configs.xlsx', 0, 10, 'red', tab_name) data_files.sort() return data_files def run_rl(path, config_file, data_all, hr, md, w2t): # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 clibs.execution('diagnosis.open', hr, w2t, tab_name, open=True, display_open=True) clibs.execution('diagnosis.set_params', hr, w2t, tab_name, display_pdo_params=display_pdo_params) md.trigger_estop() md.reset_estop() md.write_act(False) sleep(1) # 让曲线彻底关闭 # 2. reload工程后,pp2main,并且自动模式和上电 prj_path = 'target/_build/target.prj' clibs.execution('overview.reload', hr, w2t, tab_name, prj_path=prj_path, tasks=['current']) clibs.execution('rl_task.pp_to_main', hr, w2t, tab_name, tasks=['current']) clibs.execution('state.switch_auto', hr, w2t, tab_name) clibs.execution('state.switch_motor_on', hr, w2t, tab_name) # 3. 开始运行程序 clibs.execution('rl_task.run', hr, w2t, tab_name, tasks=['current']) _t_start = time() while True: if md.read_ready_to_go() == 1: md.write_act(True) break else: if (time() - _t_start) // 20 > 1: w2t("20s内未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name) else: sleep(1) # 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集 sleep(20) # 初始化 scenario time 为 0 _t_start = time() while True: scenario_time = md.read_scenario_time() if float(scenario_time) > 1: w2t(f"场景的周期时间:{scenario_time}s", 0, 0, 'green', tab_name) break else: if (time() - _t_start) // 60 > 3: w2t(f"未收到场景的周期时间,需要确认RL程序编写正确并正常执行...", 0, 111, 'red', tab_name) else: sleep(5) sleep(1) # 一定要延迟一秒再读一次scenario time寄存器,因为一开始读取的数值不准确 scenario_time = float(md.read_scenario_time()) sleep(scenario_time*0.2) # 6. 准备初始数据,关闭诊断曲线,保留数据并处理输出 with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv: csv_writer = writer(f_csv) csv_writer.writerow(title) _wb = load_workbook(config_file, read_only=True) _ws = _wb['Target'] wait_time = float(_ws.cell(row=2, column=10).value) rcs = [] for i in range(6): rcs.append(float(_ws.cell(row=6, column=i + 2).value)) get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t) # 7. 继续运行 while True: # 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线 sleep(wait_time+scenario_time+7) # 保留数据并处理输出 get_durable_data(path, data_all, scenario_time, wait_time, rcs, hr, md, w2t) def get_durable_data(path, data, scenario_time, wait_time, rcs, hr, md, w2t): _data_list = [] _c_msg = hr.c_msg.copy() for _msg in _c_msg: if 'diagnosis.result' in _msg: _data_list.insert(0, loads(_msg)) else: hr.c_msg_xs.clear() if len(hr.c_msg) > 270: del hr.c_msg[270:] # with open(f'{path}\\log.txt', 'w', encoding='utf-8') as f_obj: # for _ in _data_list: # f_obj.write(f"{_}\n") _d2d_trq = {0: [], 1: [], 2: [], 3: [], 4: [], 5: []} for line in _data_list: for item in line['data']: for i in range(6): item['value'].reverse() if item.get('channel', None) == i and item.get('name', None) == 'device_servo_trq_feedback': _d2d_trq[i].extend(item['value']) if len(_d2d_trq[0]) / 1000 > scenario_time + 1: this_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time())) next_time = strftime("%Y-%m-%d %H:%M:%S", localtime(time()+wait_time+10+scenario_time)).split()[-1] _df = DataFrame(_d2d_trq) _flg = 0 _res = [] for i in range(6): def overmax_data(df, index, number, flag): if number > 100: md.trigger_estop() hr.durable_quit = 1 df.to_excel(f'{path}\\{this_time}.xlsx') w2t(f"[{this_time}] {flag}-axis-{index} 数据过大错误,需要检查确定。", 0, 10, 'red', tab_name) try: _ = sqrt(_df[i].apply(lambda x: power((rcs[i]*x/1000), 2)).sum()/len(_df[i])) except: md.trigger_estop() _df.to_excel(path+"\\err_data.xlsx") w2t(f"{i}calculate error", 0, 11, 'red', tab_name) if not _flg: del data[0]['time'][0] data[0]['time'].append(this_time.split()[-1]) del data[1]['time'][0] data[1]['time'].append(this_time.split()[-1]) _res.append(this_time) _flg = 1 del data[0][f"axis{i + 1}"][0] overmax_data(_df, i, _, 'trq') data[0][f"axis{i + 1}"].append(_) _res.append(_) _ = rcs[i] * _df[i].abs().max() / 1000 overmax_data(_df, i, _, 'trq-max') del data[1][f"axis{i + 1}"][0] data[1][f"axis{i + 1}"].append(_) _res.append(_) _df_1 = DataFrame(data[0]) _df_2 = DataFrame(data[1]) with open(f'{path}\\results.csv', mode='a+', newline='') as f_csv: def change_order(res): _time = res[0:1] _trq = [] _trq_max = [] for _item in res[1::2]: _trq.append(_item) for _item in res[2::2]: _trq_max.append(_item) return _time + _trq + _trq_max csv_writer = writer(f_csv) csv_writer.writerow(change_order(_res)) while True: if not hr.durable_lock: hr.durable_lock = 1 _df_1.to_excel(clibs.durable_data_current_xlsx, index=False) _df_2.to_excel(clibs.durable_data_current_max_xlsx, index=False) hr.durable_lock = 0 break else: sleep(1) global count count += 1 w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}", 0, 0, '#008B8B', tab_name) break else: md.trigger_estop() with open(f'{path}\\device_servo_trq_feedback_0.txt', 'w', encoding='utf-8') as f_obj: for _ in _d2d_trq[0]: f_obj.write(f"{_}\n") w2t("采集的数据时间长度不够,需要确认。", 0, 10, 'red', tab_name) def main(path, hr, md, w2t): data_all = [clibs.durable_data_current, clibs.durable_data_current_max] data_dirs, data_files = clibs.traversal_files(path, w2t) config_file, prj_file = check_files(data_dirs, data_files, w2t) clibs.prj_to_xcore(prj_file) run_rl(path, config_file, data_all, hr, md, w2t) if __name__ == '__main__': main(*argv[1:])