完善rokae自动化测试脚本工具,增加readme文件等

This commit is contained in:
2025-08-30 15:21:01 +08:00
parent 71046300fe
commit 15009924b3
9 changed files with 573 additions and 28 deletions

View File

@@ -11,13 +11,18 @@ from typing import Callable, Dict
from loguru import logger
from pymodbus.client.tcp import ModbusTcpClient
from paramiko import SSHClient, AutoAddPolicy
import plotly.graph_objects as go
from codes import clibs
class HmiRequest:
def __init__(self):
self.c = self.c_xs = self.t_rx_pkg = self.t_rx_pkg_xs = self.t_s_heart = None
self.c: socket.socket
self.c_xs: socket.socket
self.t_rx_pkg: threading.Thread
self.t_rx_pkg_xs: threading.Thread
self.t_s_heart: threading.Thread
self.data = self.data_xs = b""
self.is_connected = True
self.sock_conn()
@@ -170,8 +175,8 @@ class HmiRequest:
cmd, ts = msg_id.split("@")
t = ts.replace("T", " ")
t_ts = datetime.timestamp(datetime.strptime(t, "%Y-%m-%d %H:%M:%S.%f"))
t_send = datetime.strftime(datetime.fromtimestamp(t_ts-time_delay), '%Y-%m-%d %H:%M:%S.%f')
t_receive = datetime.strftime(datetime.fromtimestamp(t_ts+time_delay), '%Y-%m-%d %H:%M:%S.%f')
t_send = datetime.strftime(datetime.fromtimestamp(t_ts-time_delay), "%Y-%m-%d %H:%M:%S.%f")
t_receive = datetime.strftime(datetime.fromtimestamp(t_ts+time_delay), "%Y-%m-%d %H:%M:%S.%f")
for idx in range(time_delay):
time.sleep(clibs.interval)
try:
@@ -187,7 +192,7 @@ class HmiRequest:
logger.error(f"hr: {time_delay}s内无法找到请求 {msg_id} 的响应!")
# raise Exception()
def exec_cmd(self, cmd, **kwargs):
def exec_cmd(self, cmd: str, **kwargs):
try:
with open(f"{clibs.base_path}/assets/protocols/{cmd}.json", mode="r", encoding="utf-8") as f_cmd:
req = json.load(f_cmd)
@@ -199,15 +204,14 @@ class HmiRequest:
logger.error(f"hr: 暂不支持 {cmd} 功能或确认该功能存在err = {err}")
raise Exception()
match cmd:
case "state.set_tp_mode" | "overview.set_autoload" | "overview.reload" | "rl_task.pp_to_main" | "rl_task.run" | "rl_task.stop" | "rl_task.set_run_params" | "diagnosis.set_params" | "diagnosis.open" | "drag.set_params" | "controller.set_params" | "collision.set_params" | "move.set_quickstop_distance" | "move.set_params" | "move.set_monitor_cfg" | "modbus.get_values" | "modbus.set_params" | "system_io.update_configuration" | "diagnosis.get_params" | "jog.set_params" | "jog.start" | "move.stop" | "move.quick_turn" | "move.set_quickturn_pos" | "soft_limit.set_params" | "fieldbus_device.set_params" | "socket.set_params" | "diagnosis.save" | "register.set_value":
req["data"].update(kwargs)
case "safety.safety_area.signal_enable" | "safety.safety_area.overall_enable" | "safety.safety_area.safety_area_enable" | "safety.safety_area.set_param":
req["c"].update(kwargs)
case "log_code.data.code_list":
req["s"]["log_code.data"]["code_list"] = kwargs["code_list"]
case _:
pass
if cmd in ["state.set_tp_mode", "overview.set_autoload", "overview.reload", "rl_task.pp_to_main", "rl_task.run", "rl_task.stop", "rl_task.set_run_params", "diagnosis.set_params", "diagnosis.open", "drag.set_params", "controller.set_params", "collision.set_params", "move.set_quickstop_distance", "move.set_params", "move.set_monitor_cfg", "modbus.get_values", "modbus.set_params", "system_io.update_configuration", "diagnosis.get_params", "jog.set_params", "jog.start", "move.stop", "move.quick_turn", "move.set_quickturn_pos", "soft_limit.set_params", "fieldbus_device.set_params", "socket.set_params", "diagnosis.save", "register.set_value"]:
req["data"].update(kwargs)
elif cmd in ["safety.safety_area.signal_enable", "safety.safety_area.overall_enable", "safety.safety_area.safety_area_enable", "safety.safety_area.set_param"]:
req["c"].update(kwargs)
elif cmd in ["log_code.data.code_list"]:
req["s"]["log_code.data"]["code_list"] = kwargs["code_list"]
else:
pass
cmd = json.dumps(req, separators=(",", ":"))
if p_type == 0:
@@ -217,12 +221,13 @@ class HmiRequest:
else:
raise Exception("Unexpected CMD Input")
time.sleep(clibs.interval/4)
return req["id"]
class ModbusRequest:
def __init__(self):
self.c = None
self.c: ModbusTcpClient
self.sock_conn()
def sock_conn(self):
@@ -250,10 +255,10 @@ class ModbusRequest:
def r_reset_estop_clear_alarm(self):
self.__reg_high_pulse(40002, "复位外部急停状态,并清除告警信息")
def r_motor_off(self):
def r_motor_on(self):
self.__reg_high_pulse(40004, "上电")
def r_motor_on(self):
def r_motor_off(self):
self.__reg_high_pulse(40005, "下电")
def r_motor_on_off(self, action: bool):
@@ -333,7 +338,7 @@ class ModbusRequest:
def __get_state(self, addr: int, count: int = 1, **kwargs):
res = self.c.read_holding_registers(addr, count=count).registers[0]
logger.info(f"md: {addr}-{kwargs['func']},结果为{res} {kwargs['desc']}")
logger.info(f'md: {addr}-{kwargs["func"]},结果为{res} {kwargs["desc"]}')
return res
@property
@@ -501,7 +506,7 @@ class ModbusRequest:
class EcRequest:
def __init__(self):
self.c = None
self.c: socket.socket
self.exec_desc = " :--: true表示执行成功false表示执行失败"
self.sock_conn()
@@ -783,7 +788,7 @@ class PreDos(object):
logger.error(f"pd: SSH无法连接需检查网络连通性或者登录信息是否正确 -- {e}")
raise Exception()
def push_prj_to_server(self, prj_file):
def push_prj_to_server(self, prj_file: str):
# prj_file工程的本地完整路径
self.__ssh2server()
prj_name, postfix = os.path.basename(prj_file).split(".")
@@ -797,7 +802,7 @@ class PreDos(object):
_ = stderr.read().decode()
self.__ssh.close()
def pull_prj_from_server(self, prj_name, local_prj_path):
def pull_prj_from_server(self, prj_name: str, local_prj_path: str):
# prj_name要拉取的服务端工程名
# local_prj_path存放工程文件的本地路径
self.__ssh2server()
@@ -817,7 +822,7 @@ class PreDos(object):
self.__ssh.close()
def push_file_to_server(self, local_file, server_file):
def push_file_to_server(self, local_file: str, server_file: str):
# local_file本地文件完整路径
# server_file服务端文件完整路径
self.__ssh2server()
@@ -830,7 +835,7 @@ class PreDos(object):
_ = stderr.read().decode()
self.__ssh.close()
def pull_file_from_server(self, server_file, local_file):
def pull_file_from_server(self, server_file: str, local_file: str):
# local_file本地文件完整路径
# server_file服务端文件完整路径
self.__ssh2server()
@@ -851,9 +856,9 @@ class PreDos(object):
class RobotInit(object):
def __init__(self):
self.pd = PreDos()
self.hr = HmiRequest()
def __init__(self, hr: HmiRequest, pd: PreDos):
self.hr: HmiRequest = hr
self.pd: PreDos = pd
def robot_init(self):
# 推送配置文件
@@ -903,7 +908,7 @@ class RobotInit(object):
self.hr.exec_cmd("io_device.load_cfg")
self.hr.exec_cmd("modbus.load_cfg")
def fw_updater(self, fw_file):
def fw_updater(self, fw_file: str):
"""
:param fw_file: xCore升级文件/机型文件的绝对路径
:return: None
@@ -984,7 +989,7 @@ class UpgradeRequest:
http://confluence.i.rokae.com/pages/viewpage.action?pageId=21531754
"""
def __init__(self):
self.c = None
self.c: socket.socket
self.sock_conn()
def sock_conn(self):
@@ -1039,3 +1044,88 @@ class UpgradeRequest:
def origin_recovery(self): # OK 恢复备份,恢复后手动设置,并重启生效
self.sr_pkg({"cmd": "recover"})
class PlotTrajectory:
# hw_cart_pos_feedback_motorside笛卡尔空间下电机侧位置反馈单位m,rad
# hw_cart_pos_feedback_linkside笛卡尔空间下连杆侧位置反馈单位m,rad
# hw_cart_pos_feedback_encoder_tcp_in_world笛卡尔位置反馈关节端编码器数据单位m,rad
# hw_cart_pos_feedback_flan_in_world笛卡尔位置反馈法兰相对于世界坐标系
# hw_cart_pos_feedback_tcp_in_world笛卡尔位置反馈tcp相对于世界坐标系
def __init__(self, hr: HmiRequest, md: ModbusRequest, prj_name: str, tasks: list, curve_name: str = "hw_cart_pos_feedback_tcp_in_world"):
self.hr: HmiRequest = hr
self.md: ModbusRequest = md
self.curve_name: str = curve_name
self.prj_name: str = prj_name
self.tasks: list = tasks
def __change_curve_state(self, stat):
if not stat:
display_pdo_params = [{"name": "count", "channel": 0}]
else:
display_pdo_params = [{"name": self.curve_name, "channel": chl} for chl in range(3)]
self.hr.exec_cmd("diagnosis.set_params", display_pdo_params=display_pdo_params)
self.hr.exec_cmd("diagnosis.open", open=stat, display_open=stat)
def __plot_trajectory(self, start_time, end_time):
x, y, z = [], [], []
try:
clibs.lock.acquire(True)
clibs.cursor.execute(f"SELECT content FROM logs WHERE timestamp BETWEEN '{start_time}' AND '{end_time}' AND content LIKE '%diagnosis.result%' ORDER BY id ASC")
records = clibs.cursor.fetchall()
finally:
clibs.lock.release()
for record in records:
data = eval(record[0])["data"]
for item in data:
item_rm = [x * 1000 for x in reversed(item["value"])]
# item_rm = item["value"]
if item.get("channel", None) == 0 and item.get("name", None) == self.curve_name:
x.extend(item_rm)
elif item.get("channel", None) == 1 and item.get("name", None) == self.curve_name:
y.extend(item_rm)
elif item.get("channel", None) == 2 and item.get("name", None) == self.curve_name:
z.extend(item_rm)
# pyplot
trace_path = go.Scatter3d(x=x, y=y, z=z, mode="lines", line=dict(color="blue", width=4), name="运动轨迹")
trace_start = go.Scatter3d(x=[x[0]], y=[y[0]], z=[z[0]], mode="markers+text", marker=dict(color="green", size=8), text=["起点"], textposition="top center", name="起点")
trace_end = go.Scatter3d(x=[x[-1]], y=[y[-1]], z=[z[-1]], mode="markers+text", marker=dict(color="red", size=8), text=["终点"], textposition="bottom center", name="终点")
fig = go.Figure(data=[trace_path, trace_start, trace_end])
fig.write_html("trajectory.html", auto_open=False)
fig.update_layout(
scene=dict(
xaxis_title="X",
yaxis_title="Y",
zaxis_title="Z"
),
margin=dict(l=0, r=0, b=0, t=30),
legend=dict(x=0, y=1),
dragmode="orbit"
)
fig.show(config={"scrollZoom": True})
def draw_traj(self):
self.md.r_soft_estop(True)
self.md.r_reset_estop_clear_alarm()
self.md.r_switch_auto()
self.md.r_motor_on()
self.__change_curve_state(True)
self.hr.exec_cmd("overview.reload", prj_path=f"{self.prj_name}/_build/{self.prj_name}.prj", tasks=self.tasks)
self.hr.exec_cmd("rl_task.set_run_params", loop_mode=False, override=1.0)
self.hr.exec_cmd("rl_task.pp_to_main", tasks=self.tasks)
self.hr.exec_cmd("rl_task.run", tasks=self.tasks)
t_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
time.sleep(clibs.interval)
while self.md.w_sta_robot_is_moving:
time.sleep(clibs.interval * 10)
time.sleep(clibs.interval * 5)
t_end = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
self.__change_curve_state(False)
self.__plot_trajectory(t_start, t_end)