This repository has been archived on 2025-03-27. You can view files and clone it, but cannot push or open issues or pull requests.
2025-01-17 13:20:49 +08:00

332 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import os
import paramiko
import openpyxl
import pandas
import json
from common import clibs, openapi
def initialization(path, sub, data_dirs, data_files, hr, w2t):
def check_files():
msg = "初始路径下不允许有文件夹,且初始路径下只能存在如下五个文件,确认后重新运行!\n"
msg += "1. configs.xlsx\n2. reach33/reach66/reach100_xxxx.xlsx\n3. xxxx.zip\n"
if len(data_dirs) != 0 or len(data_files) != 5:
w2t(msg, "red", "InitFileError")
config_file, reach33_file, reach66_file, reach100_file, prj_file, result_dirs = None, None, None, None, None, []
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "configs.xlsx":
config_file = data_file
elif filename.startswith("reach33_") and filename.endswith(".xlsx"):
reach33_file = data_file
elif filename.startswith("reach66_") and filename.endswith(".xlsx"):
reach66_file = data_file
elif filename.startswith("reach100_") and filename.endswith(".xlsx"):
reach100_file = data_file
elif filename.endswith(".zip"):
prj_file = data_file
else:
w2t(msg, "red", "InitFileError")
if config_file and reach33_file and reach66_file and reach100_file and prj_file:
os.mkdir(f"{path}/j1")
os.mkdir(f"{path}/j2")
os.mkdir(f"{path}/j3")
load = f"load{sub.removeprefix("tool")}"
for reach in ["reach33", "reach66", "reach100"]:
for speed in ["speed33", "speed66", "speed100"]:
dir_name = "_".join([reach, load, speed])
result_dirs.append(dir_name)
os.mkdir(f"{path}/j1/{dir_name}")
os.mkdir(f"{path}/j2/{dir_name}")
if reach == "reach100":
os.mkdir(f"{path}/j3/{dir_name}")
w2t("数据目录合规性检查结束,未发现问题......\n", "blue")
return config_file, prj_file, result_dirs
else:
w2t(msg, "red", "InitFileError")
def get_configs():
robot_type = None
msg_id, state = hr.execution("controller.get_params")
records = hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
robot_type = eval(record[0])["data"]["robot_type"]
server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg"
local_file = path + f"/{robot_type}.cfg"
clibs.c_pd.pull_file_from_server(server_file, local_file)
try:
with open(local_file, mode="r", encoding="utf-8") as f_config:
configs = json.load(f_config)
except Exception as Err:
clibs.insert_logdb("ERROR", "current", f"get_config: 无法打开 {local_file},获取配置文件参数错误 {Err}")
w2t(f"无法打开 {local_file}", color="red", desc="OpenFileError")
# 最大角速度,额定电流,减速比,额定转速
version = configs["VERSION"]
avs = configs["MOTION"]["JOINT_MAX_SPEED"]
clibs.insert_logdb("INFO", "do_brake", f"get_configs: 机型文件版本 {robot_type}_{version}")
clibs.insert_logdb("INFO", "do_brake", f"get_configs: 各关节角速度 {avs}")
return avs
try:
clibs.c_hr.set_socket_params(True, str(clibs.external_port), "\r", 1)
clibs.c_ec = openapi.ExternalCommunication(clibs.ip_addr, clibs.external_port)
except Exception:
w2t(f"{clibs.ip_addr}:{clibs.socket_port} 或者 {clibs.ip_addr}:{clibs.external_port} 不可达,需检查网络连接!\n", color="red", desc="NetworkError")
_config_file, _prj_file, _result_dirs = check_files()
_avs = get_configs()
return _config_file, _prj_file, _result_dirs, _avs
@clibs.db_lock
def gen_result_file(path, axis, t_end, reach, load, speed, rounds):
d_vel, d_trq, d_stop = [], [], []
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t_end-12))
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t_end))
clibs.cursor.execute(f"select content from logs where time between '{start_time}' and '{end_time}' and content like '%diagnosis.result%' order by id asc")
records = clibs.cursor.fetchall()
for record in records: # 保留最后12s的数据
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback":
d_vel.extend(d_item)
elif item.get("channel", None) == axis-1 and item.get("name", None) == "device_servo_trq_feedback":
d_trq.extend(d_item)
elif item.get("channel", None) == 0 and item.get("name", None) == "device_safety_estop":
d_stop.extend(d_item)
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
df3 = pandas.DataFrame.from_dict({"device_safety_estop": d_stop})
df = pandas.concat([df1, df2, df3], axis=1)
filename = f"{path}/j{axis}/reach{reach}_load{load}_speed{speed}/reach{reach}_load{load}_speed{speed}_{rounds}.data"
df.to_csv(filename, sep="\t", index=False)
def run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t):
count, total, speed_target = 0, 63, 0
prj_name = prj_file.split("/")[-1].split(".")[0]
display_pdo_params = [{"name": name, "channel": chl} for name in ["hw_joint_vel_feedback", "device_servo_trq_feedback"] for chl in range(6)]
display_pdo_params.append({"name": "device_safety_estop", "channel": 0})
wb = openpyxl.load_workbook(config_file, read_only=True)
ws = wb["Target"]
write_diagnosis = float(ws.cell(row=2, column=2).value)
get_init_speed = float(ws.cell(row=3, column=2).value)
single_brake = str(ws.cell(row=4, column=2).value)
pon = ws.cell(row=5, column=2).value
io_name = ws.cell(row=6, column=2).value.upper()
wb.close()
w2t(f"基本参数配置write_diagnosis = {write_diagnosis}, get_init_speed = {get_init_speed}, single_brake = {single_brake}, pon = {pon}\n")
if pon == "positive":
md.write_pon(1)
elif pon == "negative":
md.write_pon(0)
else:
w2t("configs.xlsx 中 Target 页面 B5 单元格填写不正确,检查后重新运行...", "red", "DirectionError")
hr.execution("diagnosis.open", open=True, display_open=True, overrun=True, turn_area=True, delay_motion=False)
hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=50, version="1.4.1")
for condition in result_dirs:
reach = condition.split("_")[0].removeprefix("reach")
load = condition.split("_")[1].removeprefix("load")
speed = condition.split("_")[2].removeprefix("speed")
# for single condition test
single_axis = -1
if single_brake != "0":
total = 3
single_axis = int(single_brake.split("-")[0])
if reach != single_brake.split("-")[1] or load != single_brake.split("-")[2] or speed != single_brake.split("-")[3]:
continue
for axis in range(1, 4):
# for single condition test
if (single_axis != -1 and single_axis != axis) or (axis == 3 and reach != "100"):
continue
md.write_axis(axis)
w2t(f"-"*90+"\n", "purple")
for rounds in range(1, 4):
count += 1
_ = 3 if count % 3 == 0 else count % 3
this_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
prj_path = f"{prj_name}/_build/{prj_name}.prj"
w2t(f"[{this_time} | {count}/{total}] 正在执行 {axis}{condition} 的第 {_} 次制动测试...\n")
# 1. 触发软急停,并解除,目的是让可能正在运行着的机器停下来,切手动模式并下电
md.r_soft_estop(0)
md.r_soft_estop(1)
clibs.c_ec.setdo_value(io_name, "true")
md.r_reset_estop()
md.r_clear_alarm()
md.write_act(0)
time.sleep(write_diagnosis) # 急停超差后等待写诊断时间可通过configs.xlsx配置2.3 版本之后设置为 0 即可
while count % 3 == 1:
# 2. 修改要执行的场景
rl_cmd = ""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=clibs.ip_addr, port=clibs.ssh_port, username=clibs.username, password=clibs.password)
if pon == "positive":
rl_cmd = f"brake_E(j{axis}_{reach}_p, j{axis}_{reach}_n, p_speed, p_tool)"
elif pon == "negative":
rl_cmd = f"brake_E(j{axis}_{reach}_n, j{axis}_{reach}_p, p_speed, p_tool)"
rl_speed = f"VelSet {speed}"
rl_tool = f"tool p_tool = tool{sub.removeprefix("tool")}"
cmd = "cd /home/luoshi/bin/controller/; "
cmd += f'sudo sed -i "/brake_E/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {rl_cmd}" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/MoveAbsJ/i {rl_speed}" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/tool p_tool/d" projects/{prj_name}/_build/brake/main.mod; '
cmd += f'sudo sed -i "/VelSet/i {rl_tool}" projects/{prj_name}/_build/brake/main.mod; '
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write(clibs.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
# 3. reload工程后pp2main并且自动模式和上电最后运行程序
hr.execution("overview.reload", prj_path=prj_path, tasks=["brake"])
hr.execution("rl_task.pp_to_main", tasks=["brake"])
hr.execution("state.switch_auto")
hr.execution("state.switch_motor_on")
hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0)
hr.execution("rl_task.run", tasks=["brake"])
t_start = time.time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
time.sleep(1)
if (time.time() - t_start) > 20:
w2t("20s 内未收到机器人的运行信号,需要确认 RL 程序编写正确并正常执行...", "red", "ReadySignalTimeoutError")
# 4. 找出最大速度传递给RL程序最后清除相关记录
time.sleep(10) # 消除前 10s 的不稳定数据
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
time.sleep(get_init_speed) # 指定时间后获取实际【正|负】方向的最大速度可通过configs.xlsx配置
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
hr.execution("rl_task.stop", tasks=["brake"])
time.sleep(5) # 确保数据都拿到
# 找出最大速度
@clibs.db_lock
def get_speed_max():
_speed_max = 0
clibs.cursor.execute(f"select content from logs where time between '{start_time}' and '{end_time}' and content like '%diagnosis.result%' order by id asc")
records = clibs.cursor.fetchall()
for record in records:
data = eval(record[0])["data"]
for item in data:
if item.get("channel", None) == axis-1 and item.get("name", None) == "hw_joint_vel_feedback":
_ = clibs.RADIAN * sum(item["value"]) / len(item["value"])
if pon == "positive":
_speed_max = max(_, _speed_max)
elif pon == "negative":
_speed_max = min(_, _speed_max)
return _speed_max
speed_max = abs(get_speed_max())
speed_target = avs[axis-1] * float(speed) / 100
clibs.insert_logdb("INFO", "do_brake", f"axis = {axis}, direction = {pon}, max speed = {speed_max}")
if speed_max < speed_target*0.95 or speed_max > speed_target*1.05:
w2t(f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}", "indigo")
clibs.insert_logdb("WARNING", "do_brake", f"Axis: {axis}-{count} | Speed: {speed_max} | Shouldbe: {speed_target}")
md.write_speed_max(speed_max)
if speed_max < 10:
md.r_clear_alarm()
w2t("未获取到正确的速度,即将重新获取...\n", "red")
continue
else:
break
# 5. 重新运行程序发送继续运动信号当速度达到最大值时通过DO触发急停
hr.execution("rl_task.pp_to_main", tasks=["brake"])
hr.execution("state.switch_auto")
hr.execution("state.switch_motor_on")
hr.execution("rl_task.run", tasks=["brake"])
for i in range(3):
if md.read_ready_to_go() == 1:
md.write_act(1)
break
else:
time.sleep(1)
else:
w2t("3s 内未收到机器人的运行信号,需要确认 RL 程序配置正确并正常执行...", "red", "ReadySignalTimeoutError")
time.sleep(10) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间
def exec_brake():
flag, start, data, record = True, time.time(), None, None
while flag:
time.sleep(0.2)
if time.time() - start > 20:
w2t("20s 内未触发急停,需排查......", "red", "BrakeTimeoutError")
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"select content from logs where content like '%diagnosis.result%' order by id desc limit 1")
record = clibs.cursor.fetchone()
data = eval(record[0])["data"]
finally:
clibs.lock.release()
for item in data:
if item.get("channel", None) != axis-1 or item.get("name", None) != "hw_joint_vel_feedback":
continue
speed_moment = clibs.RADIAN * sum(item["value"]) / len(item["value"])
if (pon == "positive" and speed_moment > 0) or (pon == "negative" and speed_moment < 0):
if abs(speed_moment) > speed_target * 0.95:
clibs.c_ec.setdo_value(io_name, "false")
time.sleep(5)
flag = False
break
return time.time()
t_end = exec_brake()
# 6. 保留数据并处理输出
gen_result_file(path, axis, t_end, reach, load, speed, rounds)
else:
w2t(f"\n{sub.removeprefix("tool")}%负载的制动性能测试执行完毕,如需采集其他负载,须切换负载类型,并更换其他负载,重新执行。\n", "green")
hr.execution("diagnosis.open", open=False, display_open=False, overrun=True, turn_area=True, delay_motion=False)
hr.execution("diagnosis.set_params", display_pdo_params=[], frequency=50, version="1.4.1")
def main():
s_time = time.time()
path = clibs.data_at["_path"]
sub = clibs.data_at["_sub"]
w2t = clibs.w2t
hr = clibs.c_hr
md = clibs.c_md
data_dirs, data_files = clibs.traversal_files(path, w2t)
config_file, prj_file, result_dirs, avs = initialization(path, sub, data_dirs, data_files, hr, w2t)
clibs.c_pd.push_prj_to_server(prj_file)
run_rl(path, sub, hr, md, config_file, prj_file, result_dirs, avs, w2t)
e_time = time.time()
time_total = e_time - s_time
w2t(f"-" * 90 + "\n", "purple")
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s\n", "green")
if __name__ == "__main__":
main()