import json import threading import time import pandas import math import csv import numpy from common import clibs def initialization(path, hr, data_dirs, data_files, interval, curves, w2t): def check_files(): nonlocal interval if interval == "": interval = 300 elif interval.isdigit(): if int(interval) < 300: w2t(f"输入时间间隔 {interval} < 300,使用默认时间间隔 300s ......\n", "orange") interval = 300 else: interval = int(interval) else: w2t(f"{interval} 不是有效的输入,时间间隔必须是一个大于 300 的正整数!\n", "red", "NotIntegerError") if len(curves) == 0: w2t("未查询到需要记录数据的曲线,至少选择一个!\n", "red", "CurveNameError") if len(data_dirs) != 0 or len(data_files) != 1: w2t("初始路径下不允许有文件夹,且初始路径下只能存在一个工程文件 —— *.zip,确认后重新运行!\n", "red", "InitFileError") if not data_files[0].endswith(".zip"): w2t(f"{data_files[0]} 不是一个有效的工程文件,需确认!\n", "red", "ProjectFileError") return data_files[0], interval def get_configs(): robot_type, records = None, None try: msg_id, state = hr.execution("controller.get_params") records = hr.get_from_id(msg_id, state) except Exception: w2t("网络不可达,需要先连接xCore!\n", "red", "NetworkError") for record in records: if "请求发送成功" not in record[0]: robot_type = eval(record[0])["data"]["robot_type"] server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg" local_file = path + f"/{robot_type}.cfg" clibs.c_pd.pull_file_from_server(server_file, local_file) try: with open(local_file, mode="r", encoding="utf-8") as f_config: configs = json.load(f_config) except Exception as Err: clibs.insert_logdb("ERROR", "current", f"get_config: 无法打开 {local_file},获取配置文件参数错误 {Err}") w2t(f"无法打开 {local_file}\n", color="red", desc="OpenFileError") # 最大角速度,额定电流,减速比,额定转速 version = configs["VERSION"] m_avs = configs["MOTION"]["JOINT_MAX_SPEED"] m_rts = configs["MOTOR"]["RATED_TORQUE"] # 电机额定转矩rt for rated torque m_tcs = [1, 1, 1, 1, 1, 1] # 电机转矩常数,tc for torque constant m_rcs = [] for i in range(len(m_tcs)): m_rcs.append(m_rts[i] / m_tcs[i]) # 电机额定电流,rc for rated current clibs.insert_logdb("INFO", "do_brake", f"get_configs: 机型文件版本 {robot_type}_{version}") clibs.insert_logdb("INFO", "do_brake", f"get_configs: 各关节角速度 {m_avs}") clibs.insert_logdb("INFO", "do_brake", f"get_configs: 各关节额定电流 {m_rcs}") return m_avs, m_rcs prj_file, interval = check_files() avs, rcs = get_configs() params = { "prj_file": prj_file, "interval": interval, "avs": avs, "rcs": rcs, } return params def change_curve_state(hr, curves, stat_1, stat_2): display_pdo_params = [{"name": name, "channel": chl} for name in curves for chl in range(6)] hr.execution("diagnosis.open", open=stat_1, display_open=stat_2, overrun=True, turn_area=True, delay_motion=False) hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=50, version="1.4.1") def run_rl(path, params, curves, hr, md, w2t): prj_file, interval = params["prj_file"], params["interval"] # 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电 change_curve_state(hr, curves, False, False) md.r_soft_estop(0) md.r_soft_estop(1) md.r_clear_alarm() md.write_act(False) time.sleep(1) # 让曲线彻底关闭 # 2. reload工程后,pp2main,并且自动模式和上电 prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1]) prj_path = f"{prj_name}/_build/{prj_name}.prj" hr.execution("overview.reload", prj_path=prj_path, tasks=["factory"]) hr.execution("rl_task.pp_to_main", tasks=["factory"]) hr.execution("state.switch_auto") hr.execution("state.switch_motor_on") # 3. 开始运行程序 hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0) hr.execution("rl_task.run", tasks=["factory"]) t_start = time.time() while True: if md.read_ready_to_go() == 1: md.write_act(True) break else: if (time.time() - t_start) > 3: w2t("3s 内未收到机器人的运行信号,需要确认RL程序编写正确并正常执行...\n", "red", "ReadySignalTimeoutError") else: time.sleep(1) # 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集 time.sleep(10) # 等待 RL 程序中 scenario_time 初始化 t_start = time.time() while True: scenario_time = float(f"{float(md.read_scenario_time()):.2f}") if scenario_time != 0: w2t(f"耐久工程的周期时间:{scenario_time}s | 单轮次执行时间:{scenario_time+interval}\n") break else: time.sleep(1) if (time.time() - t_start) > 300: w2t(f"300s 内未收到耐久工程的周期时间,需要确认RL程序和工具通信交互是否正常执行...\n", "red", "GetScenarioTimeError") # 6. 准备数据保存文件 for curve in curves: with open(f"{path}/{curve}.csv", mode="a+", newline="") as f_csv: titles = [f"{curve}_{i}" for i in range(6)] titles.insert(0, "time") csv_writer = csv.writer(f_csv) csv_writer.writerow(titles) # 7. 开始采集 count = 0 while clibs.running: this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) next_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1)) w2t(f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time}\n", "#008B8B") count += 1 # 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线 time.sleep(interval) change_curve_state(hr, curves, True, True) time.sleep(scenario_time) end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()-scenario_time)) change_curve_state(hr, curves, False, False) # 保留数据并处理输出 gen_results(params, curves, start_time, end_time, w2t) else: w2t("后台数据清零完成,现在可以重新运行之前停止的程序。\n", "red") exit() def gen_results(params, curves, start_time, end_time, w2t): try: clibs.lock.acquire(True) clibs.cursor.execute(f"select content from logs where time between '{start_time}' and '{end_time}' and content like '%diagnosis.result%' order by id asc") records = clibs.cursor.fetchall() finally: clibs.lock.release() data_proc(records, params, curves, w2t) def data_proc(records, params, curves, w2t): for curve in curves: if curve == "device_servo_trq_feedback": # proc_device_servo_trq_feedback(records, params, w2t) t = threading.Thread(target=proc_device_servo_trq_feedback, args=(records, params, w2t)) t.daemon = True t.start() elif curve == "hw_joint_vel_feedback": # proc_hw_joint_vel_feedback(records, params, w2t) t = threading.Thread(target=proc_hw_joint_vel_feedback, args=(records, params, w2t)) t.daemon = True t.start() def proc_device_servo_trq_feedback(records, params, w2t): d_trq, rcs, results = [[], [], [], [], [], []], params["rcs"], [time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))] for record in records: data = eval(record[0])["data"] for item in data: d_item = reversed(item["value"]) for axis in range(6): if item.get("channel", None) == axis and item.get("name", None) == "device_servo_trq_feedback": d_trq[axis].extend(d_item) for axis in range(6): df = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq[axis]}) _ = math.sqrt(df.apply(lambda x: numpy.power((rcs[axis] * float(x.iloc[0]) / 1000), 2)).sum() / len(df)) results.append(_) path = "/".join(params["prj_file"].split("/")[:-1]) with open(f"{path}/device_servo_trq_feedback.csv", mode="a+", newline="") as f_csv: csv_writer = csv.writer(f_csv) csv_writer.writerow(results) def proc_hw_joint_vel_feedback(records, params, w2t): d_trq, rcs, results = [[], [], [], [], [], []], params["rcs"], [time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))] for record in records: data = eval(record[0])["data"] for item in data: d_item = reversed(item["value"]) for axis in range(6): if item.get("channel", None) == axis and item.get("name", None) == "hw_joint_vel_feedback": d_trq[axis].extend(d_item) for axis in range(6): df = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_trq[axis]}) _ = df.max().iloc[0] results.append(_) path = "/".join(params["prj_file"].split("/")[:-1]) with open(f"{path}/hw_joint_vel_feedback.csv", mode="a+", newline="") as f_csv: csv_writer = csv.writer(f_csv) csv_writer.writerow(results) def detect_db_size(): @clibs.db_lock def release_memory(): line_number = 20000 leftover = 4000 # 200s clibs.cursor.execute("select count(id) from logs") len_records = clibs.cursor.fetchone()[0] if len_records > line_number: del_num = len_records - leftover + 1 clibs.cursor.execute(f"delete from logs where id < {del_num}") clibs.cursor.execute(f"update logs set id=(id-{del_num-1}) where id > {del_num-1}") clibs.cursor.execute(f"update sqlite_sequence set seq = {leftover+1} WHERE name = 'logs' ") clibs.cursor.execute("vacuum") while True: release_memory() time.sleep(1) def main(): t = threading.Thread(target=detect_db_size) t.daemon = True t.start() path = clibs.data_dd["path"] interval = clibs.data_dd["interval"].strip() curves = clibs.data_dd["curves"] hr = clibs.c_hr md = clibs.c_md w2t = clibs.w2t clibs.running = 23 data_dirs, data_files = clibs.traversal_files(path, w2t) params = initialization(path, hr, data_dirs, data_files, interval, curves, w2t) prj_file = params["prj_file"] clibs.c_pd.push_prj_to_server(prj_file) try: run_rl(path, params, curves, hr, md, w2t) except Exception as Err: w2t(f"工厂耐久程序执行过程中出现异常,{Err}\n", "red") change_curve_state(hr, curves, False, False) if __name__ == "__main__": main()