This repository has been archived on 2025-03-27. You can view files and clone it, but cannot push or open issues or pull requests.

260 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import multiprocessing
import time
import paramiko
import pandas
from common import clibs
def initialization(path, sub, data_dirs, data_files, hr, w2t):
def check_files():
msg = "初始路径下不允许有文件夹,且初始路径下只能存在如下两个文件,确认后重新运行!\n"
msg += "1. T_电机电流.xlsx\n2. xxxx.zip\n"
if len(data_dirs) != 0 or len(data_files) != 2:
w2t(msg, "red", "InitFileError")
prj_file, count = None, 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx":
count += 1
elif filename.endswith(".zip"):
count += 1
prj_file = data_file
else:
w2t(msg, "red", "InitFileError")
if count != 2:
w2t(msg, "red", "InitFileError")
w2t("数据目录合规性检查结束,未发现问题......\n")
if sub == "tool100" or sub == "inertia":
os.mkdir(f"{path}/single")
os.mkdir(f"{path}/s_1")
os.mkdir(f"{path}/s_2")
os.mkdir(f"{path}/s_3")
elif sub == "inertia":
os.mkdir(f"{path}/inertia")
else:
w2t("负载选择错误,电机电流测试只能选择 tool100/inertia 规格!\n", "red", "LoadSelectError")
return prj_file
def get_configs():
robot_type = None
msg_id, state = hr.execution("controller.get_params")
records = hr.get_from_id(msg_id, state)
for record in records:
if "请求发送成功" not in record[0]:
robot_type = eval(record[0])["data"]["robot_type"]
server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg"
local_file = path + f"/{robot_type}.cfg"
clibs.c_pd.pull_file_from_server(server_file, local_file)
_prj_file = check_files()
get_configs()
return _prj_file
def single_axis_proc(path, records, number):
text = "single" if number < 6 else "hold"
number = number if number < 6 else number - 6
d_vel, d_trq, d_sensor = [], [], []
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
if item.get("channel", None) == number and item.get("name", None) == "hw_joint_vel_feedback":
d_vel.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "device_servo_trq_feedback":
d_trq.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "hw_sensor_trq_feedback":
d_sensor.extend(d_item)
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
df3 = pandas.DataFrame.from_dict({"hw_sensor_trq_feedback": d_sensor})
df = pandas.concat([df1, df2, df3], axis=1)
filename = f"{path}/single/j{number + 1}_{text}_{time.time()}.data"
df.to_csv(filename, sep="\t", index=False)
def scenario_proc(path, records, number, scenario_time):
for axis in range(6):
d_vel, d_trq, d_sensor = [], [], []
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
if item.get("channel", None) == axis and item.get("name", None) == "hw_joint_vel_feedback":
d_vel.extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "device_servo_trq_feedback":
d_trq.extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "hw_sensor_trq_feedback":
d_sensor.extend(d_item)
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
df3 = pandas.DataFrame.from_dict({"hw_sensor_trq_feedback": d_sensor})
df = pandas.concat([df1, df2, df3], axis=1)
filename = f"{path}/s_{number-11}/j{axis+1}_s_{number-11}_{scenario_time}_{time.time()}.data"
df.to_csv(filename, sep="\t", index=False)
def gen_result_file(path, number, start_time, end_time, scenario_time):
@clibs.db_lock
def get_records(s_time, e_time):
clibs.cursor.execute(f"select content from logs where time between '{s_time}' and '{e_time}' and content like '%diagnosis.result%' order by id asc")
_ = clibs.cursor.fetchall()
return _
if number < 12:
records = get_records(start_time, end_time)
p = multiprocessing.Process(target=single_axis_proc, args=(path, records, number))
p.daemon = True
p.start()
elif number < 15:
records = get_records(start_time, end_time)
p = multiprocessing.Process(target=scenario_proc, args=(path, records, number, scenario_time))
p.daemon = True
p.start()
def change_curve_state(hr, stat_1, stat_2):
display_pdo_params = [{"name": name, "channel": chl} for name in ["hw_joint_vel_feedback", "device_servo_trq_feedback", "hw_sensor_trq_feedback"] for chl in range(6)]
hr.execution("diagnosis.open", open=stat_1, display_open=stat_2, overrun=True, turn_area=True, delay_motion=False)
hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params, frequency=50, version="1.4.1")
def run_rl(path, prj_file, hr, md, sub, w2t):
prj_name = prj_file.split("/")[-1].split(".")[0]
c_regular = [
"scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)",
"scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool)",
"scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool)",
"scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool)",
"scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool)",
"scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool)",
"scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool)",
"scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool)",
"scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool)",
"scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool)",
"scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool)",
"scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool)",
]
c_inertia = [
"scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool)",
]
disc_regular = ["一轴", "二轴", "三轴", "四轴", "五轴", "六轴", "一轴保持", "二轴保持", "三轴保持", "四轴保持", "五轴保持", "六轴保持", "场景一", "场景二", "场景三"]
disc_inertia = ["四轴惯量", "五轴惯量", "六轴惯量"]
conditions, disc = [], []
if sub == "tool100":
conditions, disc = c_regular, disc_regular
elif sub == "inertia":
conditions, disc = c_inertia, disc_inertia
# 打开诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来
md.r_soft_estop(0)
md.r_soft_estop(1)
md.r_clear_alarm()
for condition in conditions:
number = conditions.index(condition)
w2t(f"正在执行{disc[number]}测试......\n")
# 1. 将act重置为False并修改将要执行的场景
md.write_act(False)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(clibs.ip_addr, clibs.ssh_port, username=clibs.username, password=clibs.password)
cmd = "cd /home/luoshi/bin/controller/; "
cmd += f'sudo sed -i "/scenario/d" projects/{prj_name}/_build/current/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/{prj_name}/_build/current/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write(clibs.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
# 2. reload工程后pp2main并且自动模式和上电
prj_path = f"{prj_name}/_build/{prj_name}.prj"
hr.execution("overview.reload", prj_path=prj_path, tasks=["current"])
hr.execution("rl_task.pp_to_main", tasks=["current"])
hr.execution("state.switch_auto")
hr.execution("state.switch_motor_on")
# 3. 开始运行程序
hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0)
hr.execution("rl_task.run", tasks=["current"])
t_start = time.time()
while True:
if md.read_ready_to_go() == 1:
md.write_act(True)
break
else:
time.sleep(1)
if (time.time() - t_start) > 20:
w2t("20s 内未收到机器人的运行信号需要确认RL程序和工具通信是否正常执行...", "red", "ReadySignalTimeoutError")
# 4. 执行采集
time.sleep(10) # 消除前 10s 的不稳定数据
change_curve_state(hr, True, True)
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
single_time, stall_time, scenario_time = 40, 10, 0
if number < 6: # 单轴
time.sleep(single_time)
elif number < 12: # 堵转
time.sleep(stall_time)
else: # 场景
t_start = time.time()
while True:
scenario_time = float(f"{float(md.read_scenario_time()):.2f}")
if float(scenario_time) != 0:
w2t(f"场景{number - 11}的周期时间:{scenario_time}\n")
break
else:
time.sleep(1)
if (time.time()-t_start) > 180:
w2t(f"180s 内未收到场景{number - 11}的周期时间需要确认RL程序和工具通信交互是否正常执行...\n", "red", "GetScenarioTimeError")
time.sleep(20)
# 5.停止程序运行,保留数据并处理输出
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
hr.execution("rl_task.stop", tasks=["current"])
time.sleep(2) # 确保数据都拿到
change_curve_state(hr, False, False)
gen_result_file(path, number, start_time, end_time, scenario_time)
else:
if sub == "tool100":
w2t("单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行。\n", "green")
elif sub == "inertia":
w2t("惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行。\n", "green")
def main():
s_time = time.time()
path = clibs.data_at["_path"]
sub = clibs.data_at["_sub"]
w2t = clibs.w2t
hr = clibs.c_hr
md = clibs.c_md
data_dirs, data_files = clibs.traversal_files(path, w2t)
prj_file = initialization(path, sub, data_dirs, data_files, hr, w2t)
clibs.c_pd.push_prj_to_server(prj_file)
run_rl(path, prj_file, hr, md, sub, w2t)
e_time = time.time()
time_total = e_time - s_time
w2t(f"-" * 90 + "\n", "purple")
w2t(f"处理总时长:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s", "green")
if __name__ == "__main__":
main()