v0.1.8.0(2024/07/04)

1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成
This commit is contained in:
gitea 2024-07-04 21:08:13 +08:00
parent 524af160d8
commit fc56d81e9c
5 changed files with 113 additions and 58 deletions

View File

@ -421,3 +421,6 @@ v0.1.7.6(2024/07/04)
- 优化了get_from_id的逻辑
4. [autotest.xml]: 新增了scenario_time只写寄存器
v0.1.8.0(2024/07/04)
1. [APIs: do_current.py]: 完成了堵转电流和惯量负载电机电流的采集和处理,至此,电机电流的自动化工作基本完成

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(0, 1, 7, 0),
prodvers=(0, 1, 7, 0),
filevers=(0, 1, 8, 0),
prodvers=(0, 1, 8, 0),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,12 +31,12 @@ VSVersionInfo(
'040904b0',
[StringStruct('CompanyName', 'Rokae - https://www.rokae.com/'),
StringStruct('FileDescription', 'All in one automatic toolbox'),
StringStruct('FileVersion', '0.1.7.0 (2024-06-26)'),
StringStruct('FileVersion', '0.1.8.0 (2024-07-04)'),
StringStruct('InternalName', 'AIO.exe'),
StringStruct('LegalCopyright', '© 2024-2024 Manford Fan'),
StringStruct('OriginalFilename', 'AIO.exe'),
StringStruct('ProductName', 'AIO'),
StringStruct('ProductVersion', '0.1.7.0 (2024-06-26)')])
StringStruct('ProductVersion', '0.1.8.0 (2024-07-04)')])
]),
VarFileInfo([VarStruct('Translation', [1033, 1200])])
]

View File

@ -1 +1 @@
0.1.7.0 @ 06/26/2024
0.1.8.0 @ 07/04/2024

View File

@ -71,7 +71,7 @@ class App(customtkinter.CTk):
btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
# create version info
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.7.0\nDate: 06/26/2024", font=self.my_font, text_color="#4F4F4F")
self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.8.0\nDate: 07/04/2024", font=self.my_font, text_color="#4F4F4F")
self.frame_func.rowconfigure(6, weight=1)
self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
# =====================================================================

View File

@ -24,7 +24,7 @@ def traversal_files(path, w2t):
return dirs, files
def check_files(path, data_dirs, data_files, w2t):
def check_files(path, loadsel, data_dirs, data_files, w2t):
if len(data_dirs) != 0 or len(data_files) != 3:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
w2t(' 1. configs.xlsx\n 2. T_电机电流.xlsx\n 3. xxxx.zip', 0, 1, 'red', tab_name='Automatic Test')
@ -44,12 +44,13 @@ def check_files(path, data_dirs, data_files, w2t):
if config_file and current_file and prj_file:
w2t("数据目录合规性检查结束,未发现问题......", tab_name='Automatic Test')
if loadsel == 'tool100':
os.mkdir(f"{path}\\single")
os.mkdir(f"{path}\\s_1")
os.mkdir(f"{path}\\s_2")
os.mkdir(f"{path}\\s_3")
elif loadsel == 'inertia':
os.mkdir(f"{path}\\inertia")
os.mkdir(f"{path}\\hold")
return config_file, current_file, prj_file
else:
w2t('初始路径下不允许有文件夹,且初始路径下只能存在如下三个文件,确认后重新运行!', 0, 0, 'red', tab_name='Automatic Test')
@ -61,8 +62,6 @@ def prj_to_xcore(prj_file):
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect('192.168.0.160', 22, username='luoshi', password='luoshi2019')
sftp = ssh.open_sftp()
# stdin, stdout, stderr = ssh.exec_command('rm /tmp/target.zip')
# ssh.exec_command('rm /tmp/target.zip')
sftp.put(prj_file, '/tmp/target.zip')
cmd = 'cd /tmp; rm -rf target/; mkdir target; unzip -d target/ -q target.zip; '
cmd += 'chmod 777 -R target/; rm target.zip'
@ -110,7 +109,7 @@ def execution(cmd, hr, w2t, **kwargs):
return _response
def data_proc(path, filename, channel):
def data_proc_regular(path, filename, channel, scenario_time):
if channel in list(range(6)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
@ -175,69 +174,127 @@ def data_proc(path, filename, channel):
df_01 = pandas.DataFrame.from_dict(_d2d_vel_0)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_0)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_1)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_1)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_2)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_2)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_3)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_3)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_4)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_4)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = pandas.DataFrame.from_dict(_d2d_vel_5)
df_02 = pandas.DataFrame.from_dict(_d2d_trq_5)
df = pandas.concat([df_01, df_02], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_1.data'
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(9, 15)):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
if item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\single\\j{channel-8}_hold.data'
df.to_csv(_filename, sep='\t', index=False)
def gen_result_file(path, loadsel, disc, number):
def data_proc_inertia(path, filename, channel):
with open(filename, 'r', encoding='utf-8') as f_obj:
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
if item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
df1 = pandas.DataFrame.from_dict(_d2d_vel)
df2 = pandas.DataFrame.from_dict(_d2d_trq)
df = pandas.concat([df1, df2], axis=1)
_filename = f'{path}\\inertia\\j{channel+4}_inertia.data'
df.to_csv(_filename, sep='\t', index=False)
def gen_result_file(path, loadsel, disc, number, scenario_time):
filename = path + f'\\data.txt'
with open(filename, 'w', encoding='utf-8') as f_obj:
for line in disc[number][1]:
f_obj.write(str(line)+'\n')
if loadsel == 'tool100':
data_proc(path, filename, number)
data_proc_regular(path, filename, number, scenario_time)
elif loadsel == 'inertia':
data_proc(path, filename, 'all')
data_proc_inertia(path, filename, number)
def regular_load(path, hr, md, loadsel, w2t):
conditions = [
" scenario(0, j1_p, j1_n, p_speed, p_tool)",
" scenario(0, j2_p, j2_n, p_speed, p_tool)",
" scenario(0, j3_p, j3_n, p_speed, p_tool)",
" scenario(0, j4_p, j4_n, p_speed, p_tool)",
" scenario(0, j5_p, j5_n, p_speed, p_tool)",
" scenario(0, j6_p, j6_n, p_speed, p_tool)",
" scenario(1, j5_p, j5_n, p_speed, p_tool)",
" scenario(2, j5_p, j5_n, p_speed, p_tool)",
" scenario(3, j5_p, j5_n, p_speed, p_tool)",
def run_rl(path, hr, md, loadsel, w2t):
c_regular = [
"scenario(0, j1_p, j1_n, p_speed, p_tool, i_tool)",
"scenario(0, j2_p, j2_n, p_speed, p_tool, i_tool)",
"scenario(0, j3_p, j3_n, p_speed, p_tool, i_tool)",
"scenario(0, j4_p, j4_n, p_speed, p_tool, i_tool)",
"scenario(0, j5_p, j5_n, p_speed, p_tool, i_tool)",
"scenario(0, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(1, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(2, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(3, j6_p, j6_n, p_speed, p_tool, i_tool)",
"scenario(4, j1_hold, j1_hold, p_speed, p_tool, i_tool)",
"scenario(4, j2_hold, j2_hold, p_speed, p_tool, i_tool)",
"scenario(4, j3_hold, j3_hold, p_speed, p_tool, i_tool)",
"scenario(4, j4_hold, j4_hold, p_speed, p_tool, i_tool)",
"scenario(4, j5_hold, j5_hold, p_speed, p_tool, i_tool)",
"scenario(4, j6_hold, j6_hold, p_speed, p_tool, i_tool)",
]
for condition in conditions:
disc = {
c_inertia = [
"scenario(5, j4_p_inertia, j4_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j5_p_inertia, j5_n_inertia, p_speed, p_tool, i_tool)",
"scenario(5, j6_p_inertia, j6_n_inertia, p_speed, p_tool, i_tool)",
]
disc_regular = {
0: ['一轴', []], 1: ['二轴', []], 2: ['三轴', []], 3: ['四轴', []], 4: ['五轴', []], 5: ['六轴', []],
6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []]
6: ['场景一', []], 7: ['场景二', []], 8: ['场景三', []], 9: ['一轴保持', []], 10: ['二轴保持', []],
11: ['三轴保持', []], 12: ['四轴保持', []], 13: ['五轴保持', []], 14: ['六轴保持', []]
}
disc_inertia = {0: ['四轴惯量', []], 1: ['五轴惯量', []], 2: ['六轴惯量', []]}
if loadsel == 'tool100':
conditions = c_regular
disc = disc_regular
elif loadsel == 'inertia':
conditions = c_inertia
disc = disc_inertia
for condition in conditions:
number = conditions.index(condition)
w2t(f"正在执行{disc[number][0]}测试......", 0, 0, 'purple', 'Automatic Test')
@ -283,6 +340,7 @@ def regular_load(path, hr, md, loadsel, w2t):
w2t("未收到机器人的运行信号需要确认RL程序编写正确并正常执行...", 0, 111, 'red', 'Automatic Test')
# 5. 打开诊断曲线,并执行采集
sleep(7) # 保证程序已经运行起来,其实主要是为了保持电流的采集而设定
_response = execution('diagnosis.open', hr, w2t, open=True, display_open=True)
display_pdo_params = [
{"name": "hw_joint_vel_feedback", "channel": 0},
@ -300,7 +358,8 @@ def regular_load(path, hr, md, loadsel, w2t):
{"name": "device_safety_estop", "channel": 0},
]
_response = execution('diagnosis.set_params', hr, w2t, display_pdo_params=display_pdo_params)
if number < 6:
scenario_time = 0
if number < 6 or number > 8:
sleep(15)
else:
_t_start = time()
@ -315,14 +374,15 @@ def regular_load(path, hr, md, loadsel, w2t):
else:
sleep(5)
sleep(1) # 一定要延迟一秒再读一次scenario time寄存器因为一开始读取的数值不准确
sleep(float(md.read_scenario_time())*0.2) # 再运行周期的20%即可
scenario_time = md.read_scenario_time()
sleep(float(scenario_time)*0.2) # 再运行周期的20%即可
# 6. 关闭诊断曲线,停止程序运行,下电并且换成手动模式
_response = execution('diagnosis.open', hr, w2t, open=False, display_open=False)
_response = execution('rl_task.stop', hr, w2t, tasks=['current'])
_response = execution('state.switch_motor_off', hr, w2t)
_response = execution('state.switch_manual', hr, w2t)
sleep(2) # 保证所有数据均已返回
sleep(1) # 保证所有数据均已返回
# 7. 保留数据并处理输出
for _msg in hr.c_msg:
if 'diagnosis.result' in _msg:
@ -333,27 +393,19 @@ def regular_load(path, hr, md, loadsel, w2t):
if 'diagnosis.result' in _msg:
_index = hr.c_msg.index(_msg)
break
del hr.c_msg[_index-10:]
del hr.c_msg[_index:]
hr.c_msg_xs.clear()
gen_result_file(path, loadsel, disc, number)
gen_result_file(path, loadsel, disc, number, scenario_time)
else:
w2t("满载情况下,单轴和场景电机电流采集完毕。", 0, 0, 'green', 'Automatic Test')
def inertia_load(path, hr, md, loadsel, w2t):
pass
def run_rl(path, hr, md, loadsel, w2t):
if loadsel == 'tool100':
regular_load(path, hr, md, loadsel, w2t)
w2t("单轴和场景电机电流采集完毕,如需采集惯量负载,须切换负载类型,并更换惯量负载,重新执行。", 0, 0, 'green', 'Automatic Test')
elif loadsel == 'inertia':
inertia_load(path, hr, md, loadsel, w2t)
w2t("惯量负载电机电流采集完毕,如需采集单轴/场景/保持电机电流,须切换负载类型,并更换偏置负载,重新执行。", 0, 0, 'green', 'Automatic Test')
def main(path, hr, md, loadsel, w2t):
data_dirs, data_files = traversal_files(path, w2t)
config_file, current_file, prj_file = check_files(path, data_dirs, data_files, w2t)
config_file, current_file, prj_file = check_files(path, loadsel, data_dirs, data_files, w2t)
prj_to_xcore(prj_file)
run_rl(path, hr, md, loadsel, w2t)