v0.2.0.5(2024/07/31)
此版本改动较大,公共部分做了规整,放置到新建文件夹 commons 当中,并所有自定义模块引入 logging 模块,记录重要信息 1. [t_change_ui: clibs.py] - 调整代码组织结构,新增模块,将公共函数以及类合并入此 - 将一些常量放入该模块 - 引入logging/concurrent_log_handler模块,并作初始化操作,供其他模块使用,按50M切割,最多保留10份 - prj_to_xcore函数设置工程名部分重写,修复了多个prj工程可能不能执行的问题 2. [t_change_ui: openapi.py] - 完全重写了 get_from_id 函数,使更精准 - 在 msg_storage 函数中,增加 logger,保留所有响应消息 - 删除 heartbeat 函数中的日志保存功能部分 - 心跳再次修改为 2s... 3. [t_change_ui: aio.py] - 增加了日志初始化部分 - detect_network 函数中修改重新实例化HR间隔为 4s,对应心跳 4. [t_change_ui: do_brake.py] - 使用一直打开曲线的方法规避解决了 OOM 的问题,同时修改数据处理方式,只取最后 12s 5. [t_change_ui: do_current.py] - 保持电流,只取最后 15s 6. [t_change_ui: all the part]: 引入 commons 包,并定制了 logging 输出,后续持续优化
This commit is contained in:
@ -1,61 +1,54 @@
|
||||
from json import loads
|
||||
from sys import argv
|
||||
from logging import getLogger
|
||||
from commons import clibs
|
||||
|
||||
|
||||
def execution(cmd, hr, w2t, **kwargs):
|
||||
_id = hr.execution(cmd, **kwargs)
|
||||
_msg = hr.get_from_id(_id)
|
||||
if not _msg:
|
||||
w2t(f"无法获取{_id}请求的响应信息", 0, 6, 'red', tab_name='Automatic Test')
|
||||
else:
|
||||
_response = loads(_msg)
|
||||
if not _response:
|
||||
w2t(f"无法获取{id}请求的响应信息", 0, 1, 'red', tab_name='Automatic Test')
|
||||
return _response
|
||||
tab_name = clibs.tab_names['at']
|
||||
logger = getLogger(__file__)
|
||||
|
||||
|
||||
def trigger_estop(md, w2t):
|
||||
md.trigger_estop()
|
||||
w2t("触发急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
|
||||
w2t("触发急停成功,可点击机器状态验证。", 0, 0, 'green', tab_name)
|
||||
|
||||
|
||||
def reset_estop(md, w2t):
|
||||
md.reset_estop()
|
||||
w2t("恢复急停成功,可点击机器状态验证。", 0, 0, 'green', 'Automatic Test')
|
||||
w2t("恢复急停成功,可点击机器状态验证。", 0, 0, 'green', tab_name)
|
||||
|
||||
|
||||
def get_state(hr, w2t):
|
||||
# 获取机器状态
|
||||
_response = execution('state.get_state', hr, w2t)
|
||||
_response = clibs.execution('state.get_state', hr, w2t, tab_name)
|
||||
stat_desc = {'engine': '上电状态', 'operate': '操作模式', 'rc_state': '控制器状态', 'robot_action': '机器人动作', 'safety_mode': '安全模式', 'servo_mode': '伺服工作模式', 'task_space': '工作任务空间'}
|
||||
for component, state in _response['data'].items():
|
||||
w2t(f"{stat_desc[component]}: {state}", tab_name='Automatic Test')
|
||||
w2t(f"{stat_desc[component]}: {state}", tab_name=tab_name)
|
||||
|
||||
# 获取设备伺服信息
|
||||
_response = execution('device.get_params', hr, w2t)
|
||||
_response = clibs.execution('device.get_params', hr, w2t, tab_name)
|
||||
dev_desc = {0: '伺服版本', 1: '伺服参数', 2: '安全板固件', 3: '控制器', 4: '通讯总线', 5: '解释器', 6: '运动控制', 8: '力控版本', 9: '末端固件', 10: '机型文件', 11: '环境包'}
|
||||
dev_vers = {}
|
||||
for device in _response['data']['devices']:
|
||||
dev_vers[device['type']] = device['version']
|
||||
for i in sorted(dev_desc.keys()):
|
||||
w2t(f"{dev_desc[i]}: {dev_vers[i]}", tab_name='Automatic Test')
|
||||
w2t(f"{dev_desc[i]}: {dev_vers[i]}", tab_name=tab_name)
|
||||
|
||||
# 设置示教器模式
|
||||
_response = execution('state.set_tp_mode', hr, w2t, tp_mode='without')
|
||||
_response = clibs.execution('state.set_tp_mode', hr, w2t, tab_name, tp_mode='without')
|
||||
|
||||
|
||||
def warning_info(hr, w2t):
|
||||
for msg in hr.c_msg:
|
||||
if 'alarm' in msg.lower():
|
||||
w2t(str(loads(msg)), tab_name='Automatic Test')
|
||||
w2t(str(loads(msg)), tab_name=tab_name)
|
||||
for msg in hr.c_msg_xs:
|
||||
if 'alarm' in msg.lower():
|
||||
w2t(str(loads(msg)), tab_name='Automatic Test')
|
||||
w2t(str(loads(msg)), tab_name=tab_name)
|
||||
|
||||
|
||||
def main(hr, md, func, w2t):
|
||||
if hr is None:
|
||||
w2t("无法连接机器人,检查是否已经使用Robot Assist软件连接机器,重试中...", 0, 49, 'red', tab_name='Automatic Test')
|
||||
w2t("无法连接机器人,检查是否已经使用Robot Assist软件连接机器,重试中...", 0, 49, 'red', tab_name)
|
||||
# func: get_state/
|
||||
match func:
|
||||
case 'trigger_estop':
|
||||
|
Reference in New Issue
Block a user