1. [current: do_current.py] 增加了 hw_sensor_trq_feedback 曲线的采集

2. [current: current.py] 增加了 hw_sensor_trq_feedback 曲线数据的处理,以及修改了之前数据处理的逻辑
3. [current: clibs.py] 新增可手动修改连接 IP 地址的功能,存储在 assets/templates/ipaddr.txt 中,默认是 192.168.0.160
This commit is contained in:
2024-12-05 16:14:59 +08:00
parent 5c5168442f
commit 4d297118e0
10 changed files with 83 additions and 27 deletions

View File

@ -21,6 +21,12 @@ display_pdo_params = [
{"name": "device_servo_trq_feedback", "channel": 3},
{"name": "device_servo_trq_feedback", "channel": 4},
{"name": "device_servo_trq_feedback", "channel": 5},
{"name": "hw_sensor_trq_feedback", "channel": 0},
{"name": "hw_sensor_trq_feedback", "channel": 1},
{"name": "hw_sensor_trq_feedback", "channel": 2},
{"name": "hw_sensor_trq_feedback", "channel": 3},
{"name": "hw_sensor_trq_feedback", "channel": 4},
{"name": "hw_sensor_trq_feedback", "channel": 5},
]
@ -63,6 +69,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines[-500:]: # 保留最后25s的数据
data = eval(line.strip())['data']
for item in data:
@ -74,10 +81,13 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\single\\j{channel+1}_single_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(6, 9)):
@ -85,16 +95,22 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel_0 = {'hw_joint_vel_feedback': []}
_d2d_trq_0 = {'device_servo_trq_feedback': []}
_d2d_sensor_0 = {'hw_sensor_trq_feedback': []}
_d2d_vel_1 = {'hw_joint_vel_feedback': []}
_d2d_trq_1 = {'device_servo_trq_feedback': []}
_d2d_sensor_1 = {'hw_sensor_trq_feedback': []}
_d2d_vel_2 = {'hw_joint_vel_feedback': []}
_d2d_trq_2 = {'device_servo_trq_feedback': []}
_d2d_sensor_2 = {'hw_sensor_trq_feedback': []}
_d2d_vel_3 = {'hw_joint_vel_feedback': []}
_d2d_trq_3 = {'device_servo_trq_feedback': []}
_d2d_sensor_3 = {'hw_sensor_trq_feedback': []}
_d2d_vel_4 = {'hw_joint_vel_feedback': []}
_d2d_trq_4 = {'device_servo_trq_feedback': []}
_d2d_sensor_4 = {'hw_sensor_trq_feedback': []}
_d2d_vel_5 = {'hw_joint_vel_feedback': []}
_d2d_trq_5 = {'device_servo_trq_feedback': []}
_d2d_sensor_5 = {'hw_sensor_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
@ -106,60 +122,78 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel_0['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_0['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 0 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_0['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_1['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_1['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 1 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_1['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_2['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 2 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_2['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_2['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_3['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_3['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_3['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_4['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_4['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 4 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_4['hw_sensor_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_joint_vel_feedback':
_d2d_vel_5['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq_5['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == 5 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor_5['hw_sensor_trq_feedback'].extend(item['value'])
df_01 = DataFrame.from_dict(_d2d_vel_0)
df_02 = DataFrame.from_dict(_d2d_trq_0)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_0)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j1_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_1)
df_02 = DataFrame.from_dict(_d2d_trq_1)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_1)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j2_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_2)
df_02 = DataFrame.from_dict(_d2d_trq_2)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_2)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j3_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_3)
df_02 = DataFrame.from_dict(_d2d_trq_3)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_3)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j4_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_4)
df_02 = DataFrame.from_dict(_d2d_trq_4)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_4)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j5_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
df_01 = DataFrame.from_dict(_d2d_vel_5)
df_02 = DataFrame.from_dict(_d2d_trq_5)
df = concat([df_01, df_02], axis=1)
df_03 = DataFrame.from_dict(_d2d_sensor_5)
df = concat([df_01, df_02, df_03], axis=1)
_filename = f'{path}\\s_{channel-5}\\j6_s_{channel-5}_{scenario_time}_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
elif channel in list(range(9, 15)):
@ -167,6 +201,7 @@ def data_proc_regular(path, filename, channel, scenario_time):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines[-300:]: # 保留最后15s的数据
data = eval(line.strip())['data']
for item in data:
@ -178,10 +213,13 @@ def data_proc_regular(path, filename, channel, scenario_time):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel-9 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_sensor['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\single\\j{channel-8}_hold_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)
@ -191,6 +229,7 @@ def data_proc_inertia(path, filename, channel):
lines = f_obj.readlines()
_d2d_vel = {'hw_joint_vel_feedback': []}
_d2d_trq = {'device_servo_trq_feedback': []}
_d2d_sensor = {'hw_sensor_trq_feedback': []}
for line in lines:
data = eval(line.strip())['data']
for item in data:
@ -202,10 +241,13 @@ def data_proc_inertia(path, filename, channel):
_d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'device_servo_trq_feedback':
_d2d_trq['device_servo_trq_feedback'].extend(item['value'])
elif item.get('channel', None) == channel+3 and item.get('name', None) == 'hw_sensor_trq_feedback':
_d2d_trq['hw_sensor_trq_feedback'].extend(item['value'])
df1 = DataFrame.from_dict(_d2d_vel)
df2 = DataFrame.from_dict(_d2d_trq)
df = concat([df1, df2], axis=1)
df3 = DataFrame.from_dict(_d2d_sensor)
df = concat([df1, df2, df3], axis=1)
_filename = f'{path}\\inertia\\j{channel+4}_inertia_{time()}.data'
df.to_csv(_filename, sep='\t', index=False)