init commit

This commit is contained in:
2025-03-20 18:13:49 +08:00
commit 09d322378a
103 changed files with 12549 additions and 0 deletions

View File

@ -0,0 +1,85 @@
import os.path
import matplotlib.pyplot as plt
import pandas
from matplotlib.widgets import Slider
from PySide6.QtCore import Signal, QThread
from common import clibs
# class DrawCurves(QThread):
# output = Signal(str, str)
#
# def __init__(self, /):
# super().__init__()
#
# @staticmethod
# def logger(level, module, content, color="black", error="", flag="both", signal=output):
# clibs.logger(level, module, content, color, flag, signal)
# if level.upper() == "ERROR":
# raise Exception(error)
#
# def initialization(self):
# path, curves = None, None
# try:
# path = clibs.data_dd["path"]
# curves = clibs.data_dd["curves"]
# except Exception:
# clibs.w2t("程序未开始运行,暂无数据可以展示......\n", "red")
# return None, None
#
# for curve in curves:
# if not os.path.exists(f"{path}/{curve}.csv"):
# clibs.w2t(f"{curve}曲线数据暂未生成,稍后再试......\n", "orange")
# return None, None
#
# return path, curves
#
#
# def data_plot(path, curve):
# titles = {"hw_joint_vel_feedback": "各关节最大速度曲线", "device_servo_trq_feedback": "各关节平均有效转矩曲线"}
# ylabels = {"hw_joint_vel_feedback": "速度(rad/s)", "device_servo_trq_feedback": "转矩(Nm)"}
#
# fig, axes = plt.subplots(figsize=(10, 4.5), dpi=100)
# cols = [f"{curve}_{i}" for i in range(6)]
# cols.insert(0, "time")
# df = pandas.read_csv(f"{path}/{curve}.csv")
# plt.plot(df[cols[1]], label="一轴")
# plt.plot(df[cols[2]], label="二轴")
# plt.plot(df[cols[3]], label="三轴")
# plt.plot(df[cols[4]], label="四轴")
# plt.plot(df[cols[5]], label="五轴")
# plt.plot(df[cols[6]], label="六轴")
# axes.set_title(titles[curve])
# axes.set_ylabel(ylabels[curve])
# axes.legend(loc="upper right")
#
# slider_position = plt.axes((0.1, 0.01, 0.8, 0.05), facecolor="blue") # (left, bottom, width, height)
# scrollbar = Slider(slider_position, 'Time', 1, int(len(df)), valstep=1)
#
# def update(val):
# pos = scrollbar.val
# axes.set_xlim([pos, pos + 10])
# fig.canvas.draw_idle()
#
# scrollbar.on_changed(update)
# fig.tight_layout(rect=(0, 0.02, 0.96, 1)) # tuple (left, bottom, right, top)
#
#
# def main():
# path, curves = initialization()
# if not path or not curves:
# return
#
# for curve in curves:
# data_plot(path, curve)
#
# plt.show()
#
#
# plt.rcParams['font.sans-serif'] = ['SimHei']
# plt.rcParams['axes.unicode_minus'] = False
# plt.rcParams['figure.dpi'] = 100
# plt.rcParams['font.size'] = 14
# plt.rcParams['lines.marker'] = 'o'
# plt.rcParams["figure.autolayout"] = True
#

View File

@ -0,0 +1,239 @@
import json
import threading
import time
import pandas
import numpy
import math
import csv
from PySide6.QtCore import Signal, QThread
from common import clibs
class DoBrakeTest(QThread):
output = Signal(str, str)
def __init__(self, dir_path, interval, proc, /):
super().__init__()
self.dir_path = dir_path
self.interval = interval
self.proc = proc
self.idx = 6
def logger(self, level, module, content, color="black", error="", flag="both"):
clibs.logger(level, module, content, color, flag, signal=self.output)
if level.upper() == "ERROR":
raise Exception(f"{error} | {content}")
def initialization(self, data_dirs, data_files):
def check_files():
if len(curves) == 0:
self.logger("ERROR", "factory-check_files", "未查询到需要记录数据的曲线,至少选择一个!", "red", "CurveNameError")
if len(data_dirs) != 0 or len(data_files) != 1:
self.logger("ERROR", "factory-check_files", "初始路径下不允许有文件夹,且初始路径下只能存在一个工程文件 —— *.zip确认后重新运行", "red", "InitFileError")
if not data_files[0].endswith(".zip"):
self.logger("ERROR", "factory-check_files", f"{data_files[0]} 不是一个有效的工程文件,需确认!", "red", "ProjectFileError")
return data_files[0], interval
def get_configs():
robot_type, records = None, None
msg_id, state = clibs.c_hr.execution("controller.get_params")
records = clibs.c_hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
robot_type = eval(record[0])["data"]["robot_type"]
server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg"
local_file = self.dir_path + f"/{robot_type}.cfg"
clibs.c_pd.pull_file_from_server(server_file, local_file)
try:
with open(local_file, mode="r", encoding="utf-8") as f_config:
configs = json.load(f_config)
except Exception as Err:
self.logger("ERROR", "factory-get_configs", f"无法打开 {local_file}<br>{Err}", "red", "OpenFileError")
# 最大角速度,额定电流,减速比,额定转速
version = configs["VERSION"]
m_avs = configs["MOTION"]["JOINT_MAX_SPEED"]
m_rts = configs["MOTOR"]["RATED_TORQUE"] # 电机额定转矩rt for rated torque
m_tcs = [1, 1, 1, 1, 1, 1] # 电机转矩常数tc for torque constant
m_rcs = []
for i in range(len(m_tcs)):
m_rcs.append(m_rts[i] / m_tcs[i]) # 电机额定电流rc for rated current
clibs.insert_logdb("INFO", "do_brake", f"get_configs: 机型文件版本 {robot_type}_{version}")
clibs.insert_logdb("INFO", "do_brake", f"get_configs: 各关节角速度 {m_avs}")
clibs.insert_logdb("INFO", "do_brake", f"get_configs: 各关节额定电流 {m_rcs}")
return m_avs, m_rcs
prj_file, interval = check_files()
avs, rcs = get_configs()
params = {
"prj_file": prj_file,
"interval": interval,
"avs": avs,
"rcs": rcs,
}
self.logger("INFO", "factory-initialization", "数据目录合规性检查结束,未发现问题......", "green")
return params
@staticmethod
def change_curve_state(curves, stat):
display_pdo_params = [{"name": name, "channel": chl} for name in curves for chl in range(6)]
clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat, overrun=True, turn_area=True, delay_motion=False)
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=50, version="1.4.1")
def run_rl(self, params, curves):
prj_file, interval = params["prj_file"], params["interval"]
# 1. 关闭诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
self.change_curve_state(curves, False)
clibs.c_md.r_soft_estop(0)
clibs.c_md.r_soft_estop(1)
clibs.c_md.r_clear_alarm()
clibs.c_md.write_act(False)
time.sleep(1) # 让曲线彻底关闭
# 2. reload工程后pp2main并且自动模式和上电
prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1])
prj_path = f"{prj_name}/_build/{prj_name}.prj"
clibs.c_hr.execution("overview.reload", prj_path=prj_path, tasks=["factory"])
clibs.c_hr.execution("rl_task.pp_to_main", tasks=["factory"])
clibs.c_hr.execution("state.switch_auto")
clibs.c_hr.execution("state.switch_motor_on")
# 3. 开始运行程序
clibs.c_hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0)
clibs.c_hr.execution("rl_task.run", tasks=["factory"])
t_start = time.time()
while True:
if clibs.c_md.read_ready_to_go() == 1:
clibs.c_md.write_act(True)
break
else:
if (time.time() - t_start) > 15:
self.logger("ERROR", "factory-run_rl", "15s 内未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", "red", "ReadySignalTimeoutError")
else:
time.sleep(1)
# 4. 获取初始数据,周期时间,首次的各轴平均电流值,打开诊断曲线,并执行采集
time.sleep(10) # 等待 RL 程序中 scenario_time 初始化
t_start = time.time()
while True:
scenario_time = float(f"{float(clibs.c_md.read_scenario_time()):.2f}")
if scenario_time != 0:
self.logger("INFO", "factory-run_rl", f"耐久工程的周期时间:{scenario_time}s | 单轮次执行时间:{scenario_time+interval}~{scenario_time*2+interval}")
break
else:
time.sleep(1)
if (time.time() - t_start) > 900:
self.logger("ERROR", "factory-run_rl", f"900s 内未收到耐久工程的周期时间需要确认RL程序和工具通信交互是否正常执行...", "red", "GetScenarioTimeError")
# 6. 准备数据保存文件
for curve in curves:
with open(f"{self.dir_path}/{curve}.csv", mode="a+", newline="") as f_csv:
titles = [f"{curve}_{i}" for i in range(6)]
titles.insert(0, "time")
csv_writer = csv.writer(f_csv)
csv_writer.writerow(titles)
# 7. 开始采集
count = 0
while clibs.running:
this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
next_time_1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1))
next_time_2 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()+scenario_time+interval+1+scenario_time))
self.logger("INFO", "factory-run_rl", f"[{this_time}] 当前次数:{count:09d} | 预计下次数据更新时间:{next_time_1}~{next_time_2}", "#008B8B")
count += 1
# 固定间隔,更新一次数据,打开曲线,获取周期内电流,关闭曲线
time.sleep(interval)
while True:
capture_start = clibs.c_md.read_capture_start()
if capture_start == 1:
break
else:
time.sleep(0.1)
self.change_curve_state(curves, True)
time.sleep(scenario_time)
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()-scenario_time))
self.change_curve_state(curves, False)
# 保留数据并处理输出
self.gen_results(params, curves, start_time, end_time)
else:
self.change_curve_state(curves, False)
self.logger("INFO", "factory", "后台数据清零完成,现在可以重新运行其他程序。", "green")
def gen_results(self, params, curves, start_time, end_time):
clibs.cursor.execute(f"select content from logs where time between '{start_time}' and '{end_time}' and content like '%diagnosis.result%' order by id asc")
records = clibs.cursor.fetchall()
self.data_proc(records, params, curves)
def data_proc(self, records, params, curves):
for curve in curves:
if curve == "device_servo_trq_feedback":
# proc_device_servo_trq_feedback(records, params, w2t)
t = threading.Thread(target=self.proc_device_servo_trq_feedback, args=(records, params))
t.daemon = True
t.start()
elif curve == "hw_joint_vel_feedback":
# proc_hw_joint_vel_feedback(records, params, w2t)
t = threading.Thread(target=self.proc_hw_joint_vel_feedback, args=(records, params))
t.daemon = True
t.start()
def proc_device_servo_trq_feedback(self, records, params):
d_trq, rcs, results = [[], [], [], [], [], []], params["rcs"], [time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))]
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
for axis in range(6):
if item.get("channel", None) == axis and item.get("name", None) == "device_servo_trq_feedback":
d_trq[axis].extend(d_item)
for axis in range(6):
df = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq[axis]})
_ = math.sqrt(numpy.square(df[df.columns[0]].values * 1.27 / 1000).sum() / len(df))
results.append(_)
path = "/".join(params["prj_file"].split("/")[:-1])
with open(f"{path}/device_servo_trq_feedback.csv", mode="a+", newline="") as f_csv:
csv_writer = csv.writer(f_csv)
csv_writer.writerow(results)
def proc_hw_joint_vel_feedback(self, records, params):
d_trq, rcs, results = [[], [], [], [], [], []], params["rcs"], [time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))]
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
for axis in range(6):
if item.get("channel", None) == axis and item.get("name", None) == "hw_joint_vel_feedback":
d_trq[axis].extend(d_item)
for axis in range(6):
df = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_trq[axis]})
_ = df.max().iloc[0]
results.append(_)
path = "/".join(params["prj_file"].split("/")[:-1])
with open(f"{path}/hw_joint_vel_feedback.csv", mode="a+", newline="") as f_csv:
csv_writer = csv.writer(f_csv)
csv_writer.writerow(results)
def processing(self):
time_start = time.time()
clibs.running[self.idx] = 1
data_dirs, data_files = clibs.traversal_files(self.dir_path, self.output)
params = self.initialization(data_dirs, data_files)
prj_file = params["prj_file"]
clibs.c_pd.push_prj_to_server(prj_file)
self.run_rl(params)
self.logger("INFO", "brake-processing", "-"*60 + "<br>全部处理完毕<br>", "purple")
time_total = time.time() - time_start
msg = f"处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s"
self.logger("INFO", "brake-processing", msg)