Compare commits

...

2 Commits

Author SHA1 Message Date
66df4edbed 电机电流单独测试逻辑优化 2025-05-16 13:55:33 +08:00
b758733b59 fixing bugs 2025-04-22 20:02:23 +08:00
9 changed files with 342 additions and 26 deletions

13
aio.py
View File

@ -725,10 +725,14 @@ class MainWindow(main_window.Ui_MainWindow):
_, result, ret, error, idx, (msg_id, flag) = results _, result, ret, error, idx, (msg_id, flag) = results
if _ is False: if _ is False:
self.btn_hmi_send.setDisabled(False) self.btn_hmi_send.setDisabled(False)
clibs.logger("INFO", "aio", f"hmi: [send] 请求发送失败 {msg_id}", "red") clibs.logger("INFO", "aio", f"hmi: [send] 请求发送失败 {msg_id}, error = {error}", "red")
return return
records = clibs.c_hr.get_from_id(msg_id) try:
records = clibs.c_hr.get_from_id(msg_id)
finally:
self.btn_hmi_send.setDisabled(False)
for record in records: for record in records:
if "请求发送成功" not in record[0]: if "请求发送成功" not in record[0]:
self.pte_hmi_recv.clear() self.pte_hmi_recv.clear()
@ -742,10 +746,10 @@ class MainWindow(main_window.Ui_MainWindow):
nonlocal hmi_dict, cmd_json, flag nonlocal hmi_dict, cmd_json, flag
if flag == 0: if flag == 0:
clibs.c_hr.c.send(clibs.c_hr.package(cmd_json)) clibs.c_hr.c.send(clibs.c_hr.package(cmd_json))
clibs.c_hr.logger("DEBUG", "aio", f"hmi: [send] 老协议请求发送成功 {cmd_json}") clibs.logger("DEBUG", "aio", f"hmi: [send] 老协议请求发送成功 {cmd_json}")
elif flag == 1: elif flag == 1:
clibs.c_hr.c_xs.send(clibs.c_hr.package_xs(hmi_dict)) clibs.c_hr.c_xs.send(clibs.c_hr.package_xs(hmi_dict))
clibs.c_hr.logger("DEBUG", "aio", f"hmi: xService请求发送成功 {cmd_json}") clibs.logger("DEBUG", "aio", f"hmi: xService请求发送成功 {cmd_json}")
if clibs.status["hmi"] == 0: if clibs.status["hmi"] == 0:
QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 HMI 连接!") QMessageBox.critical(self, "错误", "使用该功能之前,需要先打开 HMI 连接!")
@ -803,7 +807,6 @@ class MainWindow(main_window.Ui_MainWindow):
def prog_done_ec_send(self, results): def prog_done_ec_send(self, results):
flag, result, ret, error, idx, cmd = results flag, result, ret, error, idx, cmd = results
print(f"res = {results}")
if ret[1] == "error": if ret[1] == "error":
clibs.logger("ERROR", "openapi", f"{ret[0]}", "red") clibs.logger("ERROR", "openapi", f"{ret[0]}", "red")
else: else:

Binary file not shown.

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo( ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4) # filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0. # Set not needed items to zero 0.
filevers=(0, 4, 0, 3), filevers=(0, 4, 0, 6),
prodvers=(0, 4, 0, 3), prodvers=(0, 4, 0, 6),
# Contains a bitmask that specifies the valid bits 'flags'r # Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f, mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file. # Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0', '040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'), [StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'), StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.4.0.3 (2025-04-10)'), StringStruct('FileVersion', '0.4.0.6 (2025-05-16)'),
StringStruct('InternalName', 'AIO.exe'), StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2025 Manford Fan'), StringStruct('LegalCopyright', '© 2024-2025 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'), StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'), StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.4.0.3 (2025-04-10)')]) StringStruct('ProductVersion', '0.4.0.6 (2025-05-16)')])
]), ]),
VarFileInfo([VarStruct('Translation', [1033, 1200])]) VarFileInfo([VarStruct('Translation', [1033, 1200])])
] ]

View File

@ -1 +1 @@
0.4.0.3@04/10/2025 0.4.0.6@05/16/2025

View File

@ -1 +1 @@
0.4.0.3@04/10/2025 0.4.0.6@05/16/2025

View File

@ -1,3 +1,4 @@
import random
import threading import threading
import time import time
import os import os
@ -124,6 +125,7 @@ class DoBrakeTest(QThread):
else: else:
clibs.count = 0 clibs.count = 0
clibs.logger("WARNING", "do_brake", f"尝试三次后仍无法获取正确数据,本次数据无效,继续执行...", "red") clibs.logger("WARNING", "do_brake", f"尝试三次后仍无法获取正确数据,本次数据无效,继续执行...", "red")
return None
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel}) df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq}) df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
@ -314,9 +316,11 @@ class DoBrakeTest(QThread):
def exec_brake(): def exec_brake():
flag, start, data, record = True, time.time(), None, None flag, start, data, record = True, time.time(), None, None
while flag: while flag:
time.sleep(0.05) time.sleep(0.01*random.randint(1, 10))
# time.sleep(0.05)
if time.time() - start > 20: if time.time() - start > 20:
clibs.logger("ERROR", "do_brake", "20s 内未触发急停,需排查......", "red") clibs.logger("INFO", "do_brake", "20s 内未触发急停,需排查,当前先继续执行......", "red")
break
try: try:
clibs.lock.acquire(True) clibs.lock.acquire(True)
@ -337,10 +341,18 @@ class DoBrakeTest(QThread):
time.sleep(clibs.INTERVAL*2) # wait speed goes down to 0 time.sleep(clibs.INTERVAL*2) # wait speed goes down to 0
flag = False flag = False
break break
return time.time() return time.time(), flag
time.sleep(11) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间 time.sleep(11) # 排除从其他位姿到零点位姿,再到轴极限位姿的时间
end_time = exec_brake() end_time, flag = exec_brake()
if flag is True: # 没有触发急停
if clibs.count < 3:
clibs.count += 1
continue
else:
clibs.count = 0
break
# 6. 保留数据并处理输出 # 6. 保留数据并处理输出
ret = self.gen_result_file(axis, end_time, reach, load, speed, speed_max, rounds) ret = self.gen_result_file(axis, end_time, reach, load, speed, speed_max, rounds)
if ret != "retry": if ret != "retry":

View File

@ -3,6 +3,7 @@ import threading
import time import time
import paramiko import paramiko
import pandas import pandas
import openpyxl
from PySide6.QtCore import Signal, QThread from PySide6.QtCore import Signal, QThread
from codes.common import clibs from codes.common import clibs
@ -16,9 +17,9 @@ class DoCurrentTest(QThread):
def initialization(self, data_dirs, data_files): def initialization(self, data_dirs, data_files):
def check_files(): def check_files():
msg = "初始路径下不允许有文件夹,初始路径下只能存在如下个文件,且文件为关闭状态,确认后重新运行!<br>" msg = "初始路径下不允许有文件夹,初始路径下只能存在如下个文件,且文件为关闭状态,确认后重新运行!<br>"
msg += "1. T_电机电流.xlsx<br>2. xxxx.zip" msg += "1. T_电机电流.xlsx<br>2. configs.xlsx<br>3. xxxx.zip"
if len(data_dirs) != 0 or len(data_files) != 2: if len(data_dirs) != 0 or len(data_files) != 3:
clibs.logger("ERROR", "do_current", msg, "red") clibs.logger("ERROR", "do_current", msg, "red")
prj_file, count = None, 0 prj_file, count = None, 0
@ -26,13 +27,16 @@ class DoCurrentTest(QThread):
filename = data_file.split("/")[-1] filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx": if filename == "T_电机电流.xlsx":
count += 1 count += 1
elif filename == "configs.xlsx":
count += 1
config_file = data_file
elif filename.endswith(".zip"): elif filename.endswith(".zip"):
count += 1 count += 1
prj_file = data_file prj_file = data_file
else: else:
clibs.logger("ERROR", "do_current", msg, "red") clibs.logger("ERROR", "do_current", msg, "red")
if count != 2: if count != 3:
clibs.logger("ERROR", "do_current", msg, "red") clibs.logger("ERROR", "do_current", msg, "red")
if self.tool == "tool100": if self.tool == "tool100":
@ -45,7 +49,7 @@ class DoCurrentTest(QThread):
else: else:
clibs.logger("ERROR", "do_current", "负载选择错误,电机电流测试只能选择 tool100/inertia 规格!", "red") clibs.logger("ERROR", "do_current", "负载选择错误,电机电流测试只能选择 tool100/inertia 规格!", "red")
return prj_file return prj_file, config_file
def get_configs(): def get_configs():
robot_type = None robot_type = None
@ -157,7 +161,14 @@ class DoCurrentTest(QThread):
clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat) clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat)
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params) clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params)
def run_rl(self, prj_file): def run_rl(self, prj_file, config_file):
wb = openpyxl.load_workbook(config_file, read_only=True)
ws = wb["Target"]
try:
single_current = int(str(ws.cell(row=6, column=2).value).strip())
except:
single_current = -1
wb.close()
prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1]) prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1])
c_regular = [ c_regular = [
"scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)", "scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)",
@ -199,9 +210,9 @@ class DoCurrentTest(QThread):
clibs.logger("ERROR", "do_current", "后台数据清零完成,现在可以重新运行其他程序。", "green") clibs.logger("ERROR", "do_current", "后台数据清零完成,现在可以重新运行其他程序。", "green")
number = conditions.index(condition) number = conditions.index(condition)
# for testing if single_current in range(0, 15) and single_current != number:
# if number < 12: continue
# continue
clibs.logger("INFO", "do_current", f"正在执行{disc[number]}测试......") clibs.logger("INFO", "do_current", f"正在执行{disc[number]}测试......")
# 1. 将act重置为False并修改将要执行的场景 # 1. 将act重置为False并修改将要执行的场景
@ -278,9 +289,9 @@ class DoCurrentTest(QThread):
clibs.logger("ERROR", "do_current", "processing: 需要在网络设置中连接HMI以及Modbus通信", "red") clibs.logger("ERROR", "do_current", "processing: 需要在网络设置中连接HMI以及Modbus通信", "red")
data_dirs, data_files = clibs.traversal_files(self.dir_path) data_dirs, data_files = clibs.traversal_files(self.dir_path)
prj_file = self.initialization(data_dirs, data_files) prj_file, config_file = self.initialization(data_dirs, data_files)
clibs.c_pd.push_prj_to_server(prj_file) clibs.c_pd.push_prj_to_server(prj_file)
self.run_rl(prj_file) self.run_rl(prj_file, config_file)
clibs.logger("INFO", "do_current", "-" * 60 + "<br>全部处理完毕<br>", "purple") clibs.logger("INFO", "do_current", "-" * 60 + "<br>全部处理完毕<br>", "purple")
time_total = time.time() - time_start time_total = time.time() - time_start

View File

@ -0,0 +1,289 @@
import os
import threading
import time
import paramiko
import pandas
from PySide6.QtCore import Signal, QThread
from codes.common import clibs
class DoCurrentTest(QThread):
def __init__(self, dir_path, tool, /):
super().__init__()
self.dir_path = dir_path
self.tool = tool
self.idx = 5
def initialization(self, data_dirs, data_files):
def check_files():
msg = "初始路径下不允许有文件夹,初始路径下只能存在如下两个文件,且文件为关闭状态,确认后重新运行!<br>"
msg += "1. T_电机电流.xlsx<br>2. xxxx.zip"
if len(data_dirs) != 0 or len(data_files) != 2:
clibs.logger("ERROR", "do_current", msg, "red")
prj_file, count = None, 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx":
count += 1
elif filename.endswith(".zip"):
count += 1
prj_file = data_file
else:
clibs.logger("ERROR", "do_current", msg, "red")
if count != 2:
clibs.logger("ERROR", "do_current", msg, "red")
if self.tool == "tool100":
os.mkdir(f"{self.dir_path}/single")
os.mkdir(f"{self.dir_path}/s_1")
os.mkdir(f"{self.dir_path}/s_2")
os.mkdir(f"{self.dir_path}/s_3")
elif self.tool == "inertia":
os.mkdir(f"{self.dir_path}/inertia")
else:
clibs.logger("ERROR", "do_current", "负载选择错误,电机电流测试只能选择 tool100/inertia 规格!", "red")
return prj_file
def get_configs():
robot_type = None
msg_id = clibs.c_hr.execution("controller.get_params")
records = clibs.c_hr.get_from_id(msg_id)
for record in records:
if "请求发送成功" not in record[0]:
robot_type = eval(record[0])["data"]["robot_type"]
server_file = f"/home/luoshi/bin/controller/robot_cfg/{robot_type}/{robot_type}.cfg"
local_file = self.dir_path + f"/{robot_type}.cfg"
clibs.c_pd.pull_file_from_server(server_file, local_file)
clibs.logger("INFO", "do_current", "正在做初始化校验和配置,这可能需要一点时间......", "green")
_prj_file = check_files()
get_configs()
clibs.logger("INFO", "do_current", "数据目录合规性检查结束,未发现问题......", "green")
return _prj_file
def single_axis_proc(self, records, number):
text = "single" if number < 6 else "hold"
number = number if number < 6 else number - 6
d_vel, d_trq, d_sensor, d_trans, d_predict_trq, d_real_trq = [], [], [], [], [], []
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
if item.get("channel", None) == number and item.get("name", None) == "hw_joint_vel_feedback":
d_vel.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "device_servo_trq_feedback":
d_trq.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "hw_sensor_trq_feedback":
d_sensor.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "hw_estimate_trans_trq_res":
d_trans.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "hw_predict_trq_res":
d_predict_trq.extend(d_item)
elif item.get("channel", None) == number and item.get("name", None) == "hw_real_trq_res":
d_real_trq.extend(d_item)
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq})
df3 = pandas.DataFrame.from_dict({"hw_sensor_trq_feedback": d_sensor})
df4 = pandas.DataFrame.from_dict({"hw_estimate_trans_trq_res": d_trans})
df5 = pandas.DataFrame.from_dict({"hw_predict_trq_res": d_predict_trq})
df6 = pandas.DataFrame.from_dict({"hw_real_trq_res": d_real_trq})
df = pandas.concat([df1, df2, df3, df4, df5, df6], axis=1)
filename = f"{self.dir_path}/single/j{number + 1}_{text}_{time.time()}.data"
df.to_csv(filename, sep="\t", index=False)
def scenario_proc(self, records, number, scenario_time):
# d_vel, d_trq, d_sensor, d_trans, d_predict_trq, d_real_trq = [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []], [[], [], [], [], [], []]
d_vel, d_trq, d_sensor, d_trans, d_predict_trq, d_real_trq = [[[], [], [], [], [], []] for _ in range(6)]
for record in records:
data = eval(record[0])["data"]
for item in data:
d_item = reversed(item["value"])
for axis in range(6):
if item.get("channel", None) == axis and item.get("name", None) == "hw_joint_vel_feedback":
d_vel[axis].extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "device_servo_trq_feedback":
d_trq[axis].extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "hw_sensor_trq_feedback":
d_sensor[axis].extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "hw_estimate_trans_trq_res":
d_trans[axis].extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "hw_predict_trq_res":
d_predict_trq[axis].extend(d_item)
elif item.get("channel", None) == axis and item.get("name", None) == "hw_real_trq_res":
d_real_trq[axis].extend(d_item)
for axis in range(6):
df1 = pandas.DataFrame.from_dict({"hw_joint_vel_feedback": d_vel[axis]})
df2 = pandas.DataFrame.from_dict({"device_servo_trq_feedback": d_trq[axis]})
df3 = pandas.DataFrame.from_dict({"hw_sensor_trq_feedback": d_sensor[axis]})
df4 = pandas.DataFrame.from_dict({"hw_estimate_trans_trq_res": d_trans[axis]})
df5 = pandas.DataFrame.from_dict({"hw_predict_trq_res": d_predict_trq[axis]})
df6 = pandas.DataFrame.from_dict({"hw_real_trq_res": d_real_trq[axis]})
df = pandas.concat([df1, df2, df3, df4, df5, df6], axis=1)
filename = f"{self.dir_path}/s_{number-11}/j{axis+1}_s_{number-11}_{scenario_time}_{time.time()}.data"
df.to_csv(filename, sep="\t", index=False)
def gen_result_file(self, number, start_time, end_time, scenario_time):
def get_records():
s_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
e_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time+clibs.INTERVAL))
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{s_time}' AND '{e_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC")
return clibs.cursor.fetchall()
finally:
clibs.lock.release()
if number < 12:
records = get_records()
t = threading.Thread(target=self.single_axis_proc, args=(records, number))
t.daemon = True
t.start()
elif number < 15:
records = get_records()
t = threading.Thread(target=self.scenario_proc, args=(records, number, scenario_time))
t.daemon = True
t.start()
@staticmethod
def change_curve_state(stat):
curves = ["hw_joint_vel_feedback", "device_servo_trq_feedback", "hw_sensor_trq_feedback", "hw_estimate_trans_trq_res", "hw_predict_trq_res", "hw_real_trq_res"]
curves = ["hw_joint_vel_feedback", "device_servo_trq_feedback", "hw_sensor_trq_feedback", "hw_estimate_trans_trq_res"]
display_pdo_params = [] if not stat else [{"name": curve, "channel": chl} for curve in curves for chl in range(6)]
clibs.c_hr.execution("diagnosis.open", open=stat, display_open=stat)
clibs.c_hr.execution("diagnosis.set_params", display_pdo_params=display_pdo_params)
def run_rl(self, prj_file):
prj_name = ".".join(prj_file.split("/")[-1].split(".")[:-1])
c_regular = [
"scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)",
"scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool)",
"scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool)",
"scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool)",
"scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool)",
"scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool)",
"scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool)",
"scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool)",
"scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool)",
"scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool)",
"scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool)",
"scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool)",
]
c_inertia = [
"scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool)",
]
disc_regular = ["一轴", "二轴", "三轴", "四轴", "五轴", "六轴", "一轴保持", "二轴保持", "三轴保持", "四轴保持", "五轴保持", "六轴保持", "场景一", "场景二", "场景三"]
disc_inertia = ["四轴惯量", "五轴惯量", "六轴惯量"]
conditions, disc = [], []
if self.tool == "tool100":
conditions, disc = c_regular, disc_regular
elif self.tool == "inertia":
conditions, disc = c_inertia, disc_inertia
# 打开诊断曲线,触发软急停,并解除,目的是让可能正在运行着的机器停下来
clibs.c_md.r_soft_estop(0)
clibs.c_md.r_soft_estop(1)
clibs.c_md.r_clear_alarm()
for condition in conditions:
if clibs.stop_flag:
clibs.logger("ERROR", "do_current", "后台数据清零完成,现在可以重新运行其他程序。", "green")
number = conditions.index(condition)
# for testing
# if number < 12:
# continue
clibs.logger("INFO", "do_current", f"正在执行{disc[number]}测试......")
# 1. 将act重置为False并修改将要执行的场景
clibs.c_md.write_act(False)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(clibs.ip_addr, clibs.ssh_port, username=clibs.username, password=clibs.password)
cmd = "cd /home/luoshi/bin/controller/; "
cmd += f'sudo sed -i "/scenario/d" projects/{prj_name}/_build/current/main.mod; '
cmd += f'sudo sed -i "/DONOTDELETE/i {condition}" projects/{prj_name}/_build/current/main.mod'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
stdin.write(clibs.password + "\n")
stdout.read().decode() # 需要read一下才能正常执行
stderr.read().decode()
# 2. reload工程后pp2main并且自动模式和上电
prj_path = f"{prj_name}/_build/{prj_name}.prj"
clibs.c_hr.execution("overview.reload", prj_path=prj_path, tasks=["current"])
clibs.c_hr.execution("rl_task.pp_to_main", tasks=["current"])
clibs.c_hr.execution("state.switch_auto")
clibs.c_hr.execution("state.switch_motor_on")
# 3. 开始运行程序
clibs.c_hr.execution("rl_task.set_run_params", loop_mode=True, override=1.0)
clibs.c_hr.execution("rl_task.run", tasks=["current"])
t_start = time.time()
while True:
if clibs.c_md.read_ready_to_go() == 1:
clibs.c_md.write_act(True)
break
else:
time.sleep(1)
if (time.time() - t_start) > 15:
clibs.logger("ERROR", "do_current", "15s 内未收到机器人的运行信号需要确认RL程序和工具通信是否正常执行...", "red")
# 4. 执行采集
time.sleep(10) # 消除前 10s 的不稳定数据
self.change_curve_state(True)
start_time = time.time()
single_time, stall_time, scenario_time = 40, 10, 0
if number < 6: # 单轴
time.sleep(single_time)
elif number < 12: # 堵转
time.sleep(stall_time)
else: # 场景
t_start = time.time()
while True:
scenario_time = float(f"{float(clibs.c_md.read_scenario_time()):.2f}")
if float(scenario_time) != 0:
clibs.logger("INFO", "do_current", f"场景{number - 11}的周期时间:{scenario_time}")
break
else:
time.sleep(1)
if (time.time()-t_start) > 180:
clibs.logger("ERROR", "do_current", f"180s 内未收到场景{number - 11}的周期时间需要确认RL程序和工具通信交互是否正常执行...", "red")
time.sleep(20)
# 5.停止程序运行,保留数据并处理输出
end_time = time.time()
clibs.c_hr.execution("rl_task.stop", tasks=["current"])
self.change_curve_state(False)
time.sleep(2) # 确保数据都入库
self.gen_result_file(number, start_time, end_time, scenario_time)
else:
if self.tool == "tool100":
clibs.logger("INFO", "do_current", "单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行", "green")
elif self.tool == "inertia":
clibs.logger("INFO", "do_current", "惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行", "green")
def processing(self):
time_start = time.time()
clibs.running[self.idx] = 1
if clibs.status["hmi"] != 1 or clibs.status["md"] != 1:
clibs.logger("ERROR", "do_current", "processing: 需要在网络设置中连接HMI以及Modbus通信", "red")
data_dirs, data_files = clibs.traversal_files(self.dir_path)
prj_file = self.initialization(data_dirs, data_files)
clibs.c_pd.push_prj_to_server(prj_file)
self.run_rl(prj_file)
clibs.logger("INFO", "do_current", "-" * 60 + "<br>全部处理完毕<br>", "purple")
time_total = time.time() - time_start
msg = f"处理时间:{time_total // 3600:02.0f} h {time_total % 3600 // 60:02.0f} m {time_total % 60:02.0f} s"
clibs.logger("INFO", "do_current", msg)

View File

@ -28,6 +28,7 @@ class ModbusRequest(QThread):
self.c = ModbusTcpClient(host=self.ip, port=self.port) self.c = ModbusTcpClient(host=self.ip, port=self.port)
if self.c.connect(): if self.c.connect():
clibs.logger("INFO", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) success!", "green") clibs.logger("INFO", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) success!", "green")
self.r_soft_estop(1)
else: else:
clibs.logger("ERROR", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) failed!", "red") clibs.logger("ERROR", "openapi", f"Modbus connection({clibs.ip_addr}:{clibs.modbus_port}) failed!", "red")