This repository has been archived on 2025-03-27. You can view files and clone it, but cannot push or open issues or pull requests.

435 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import threading
import openpyxl
import pandas
import re
import csv
from common import clibs
def initialization(path, w2t, insert_logdb):
_, data_files = clibs.traversal_files(path, w2t)
count, config_file = 0, None
for data_file in data_files:
filename = data_file.split("/")[-1]
if re.match(".*\\.cfg", filename):
config_file = filename
count += 1
elif filename == "T_电机电流.xlsx":
count += 1
else:
if not re.match("j[1-7].*\\.data", filename):
msg = f"不合规 {data_file}\n"
msg += "所有数据文件必须以 j[1-7]_ 开头,以 .data 结尾比如j1_abcdef.data请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
if count != 2:
msg = "需要有一个机型配置文件\"*.cfg\",以及一个数据处理文件\"T_电机电流.xlsx\"表格,请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
insert_logdb("INFO", "current", f"current: 获取必要文件:{data_files}")
return data_files, config_file
def current_max(data_files, rcs, trq, w2t, insert_logdb):
insert_logdb("INFO", "current", "MAX: 正在处理最大电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"MAX: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"MAX: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trq-1] # 获取 "device_servo_trq_feedback"
c_max = df[col].abs().max()
scale = 1000
_ = abs(c_max/scale*rca)
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"MAX: 获取到的列名为 {col},最大电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【MAX】数据处理完毕......")
insert_logdb("INFO", "current", f"MAX: 获取最大电流值结束 current_max = {current}")
return current
def current_avg(data_files, rcs, trqh, w2t, insert_logdb):
insert_logdb("INFO", "current", "AVG: 正在处理平均电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"AVG: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"AVG: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trqh-1]
c_std = df[col].std()
c_avg = df[col].mean()
scale = 1000
_ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"AVG: 获取到的列名为 {col},平均电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【AVG】数据处理完毕......\n")
insert_logdb("INFO", "current", f"AVG: 获取平均电流值结束 current_avg = {current}")
return current
def current_cycle(data_files, vel, trq, trqh, sensor, rrs, rcs, params, w2t, insert_logdb):
result, hold, single, scenario, dur_time = None, [], [], [], 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx":
result = data_file
elif re.match("j[1-7]_hold_.*\\.data", filename):
hold.append(data_file)
elif re.match("j[1-7]_s_.*\\.data", filename):
scenario.append(data_file)
dur_time = float(filename.split("_")[3])
elif re.match("j[1-7]_.*\\.data", filename):
single.append(data_file)
clibs.stop = True
w2t(f"正在打开文件 {result},需要 10s 左右......\n")
t_excel = clibs.GetThreadResult(openpyxl.load_workbook, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_progress = threading.Thread(target=clibs.tl_prg, args=("Processing......", ))
t_progress.daemon = True
t_progress.start()
wb = t_excel.get_result()
ws = wb["统计"]
for idx in range(len(params)):
row = idx + 2
for col in range(2, 8):
ws.cell(row=row, column=col).value = params[idx][col-2]
if hold:
avg = current_avg(hold, rcs, trqh, w2t, insert_logdb)
for axis, cur_value in avg.items():
sht_name = f"J{axis}"
wb[sht_name]["O4"].value = float(cur_value[0])
if dur_time == 0:
p_single(wb, single, vel, trq, sensor, rrs, w2t, insert_logdb)
else:
p_scenario(wb, scenario, vel, trq, sensor, rrs, dur_time, w2t)
clibs.stop = True
w2t(f"正在保存文件 {result},需要 10s 左右......\n")
t_excel = threading.Thread(target=wb.save, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_excel.join()
clibs.stop = False
t_progress.join()
w2t("----------------------------------------------------------\n")
w2t("全部处理完毕")
def find_point(data_file, df, flag, row_s, row_e, threshold, step, end_point, skip_scale, axis, seq, w2t, insert_logdb):
if flag == "lt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都大于 threshold说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg < threshold:
insert_logdb("WARNING", "current", f"【lt】{axis} 轴第 {seq} 次查找数据可能有异常row_s = {row_s}, row_e = {row_e}")
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
elif flag == "gt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
# if axis == 1 and seq == 1:
# insert_logdb("DEBUG", "current", f"【gt】{axis} 轴speed_avg = {speed_avg}row_s = {row_s}, row_e = {row_e}")
if speed_avg > threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都小于 threshold说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg > threshold:
insert_logdb("WARNING", "current", f"【gt】{axis} 轴第 {seq} 次查找数据可能有异常row_s = {row_s}, row_e = {row_e}")
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
def get_row_number(threshold, flag, df, row_s, row_e, axis, insert_logdb):
count_1, count_2 = 0, 0
if flag == "start" or flag == "end":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number > threshold:
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
elif flag == "middle":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number < threshold: # 唯一的区别
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
places = {"start": "起点", "middle": "中间点", "end": "终点"} # 因为是终点数据,所以可能有异常
insert_logdb("WARNING", "current", f"{axis} 轴获取{places[flag]}数据 {row_e} 可能有异常,需关注!")
return row_e
def p_single(wb, single, vel, trq, sensor, rrs, w2t, insert_logdb):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single:
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
step = 50 # 步进值
end_point = 200 # 有效数值的数目
threshold = 5 # 200个点的平均阈值线
skip_scale = 2
row_start, row_middle, row_end = 0, 0, 0
row_e = df.index[-1]
row_s = row_e - end_point
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < 2:
# 第一次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, "pre-1", w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, "pre-2", w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, "pre-3", w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第一次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第二次采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第三次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
elif speed_avg > 2:
# 第一次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, "pre-1", w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, "pre-2", w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第一次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次正式采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
insert_logdb("INFO", "current", f"{axis} 轴起点:{row_start}")
insert_logdb("INFO", "current", f"{axis} 轴中间点:{row_middle}")
insert_logdb("INFO", "current", f"{axis} 轴终点:{row_end}")
insert_logdb("INFO", "current", f"{axis} 轴数据非零段点数:{row_middle-row_start+1}")
insert_logdb("INFO", "current", f"{axis} 轴数据为零段点数:{row_end-row_middle+1}")
if abs(row_end+row_start-2*row_middle) > 1000:
insert_logdb("WARNING", "current", f"{axis} 轴数据占空比异常!")
data, first_c, second_c, third_c = [], vel-1, trq-1, sensor-1
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, first_c])
data.append(df_origin.iloc[row, second_c])
data.append(df_origin.iloc[row, third_c])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
cell.value = None
i += 1
def p_scenario(wb, scenario, vel, trq, sensor, rrs, dur_time, w2t):
for data_file in scenario:
cycle = 0.001
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
row_start = 3000
row_end = row_start + int(dur_time/cycle)
if row_end > df.index[-1]:
w2t(f"位置超限:{data_file} 共有 {df.index[-1]} 条数据,无法取到第 {row_end} 条数据,需要确认场景周期时间...", "red", "DataOverLimit")
data, first_c, second_c, third_c = [], vel-1, trq-1, sensor-1
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, first_c])
data.append(df_origin.iloc[row, second_c])
data.append(df_origin.iloc[row, third_c])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=250000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception:
cell.value = None
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
i += 1
def get_configs(config_file, w2t, insert_logdb):
try:
with open(config_file, mode="r", encoding="utf-8") as f_config:
configs = json.load(f_config)
except Exception as Err:
insert_logdb("ERROR", "current", f"get_config: 无法打开 {config_file},获取配置文件参数错误 {Err}")
w2t(f"无法打开 {config_file}", color="red", desc="OpenFileError")
# 最大角速度,额定电流,减速比,额定转速
version = configs["VERSION"]
rcs = [abs(_) for _ in configs["MOTOR"]["RATED_TORQUE"]] # 电机额定电流rc for rated current
m_max_rcs = [] # 电机最大电流
m_hold_rcs = [] # 电机堵转电流
m_rts = [] # 电机额定转矩rt for rated torque
m_max_rts = [] # 电机峰值转矩
m_r_rpms = [] # 电机额定转速
m_max_rpms = [] # 电机最大转速
m_tcs = [] # 电机转矩常数tc for torque constant
rrs = [abs(_) for _ in configs["TRANSMISSION"]["REDUCTION_RATIO_NUMERATOR"]] # 减速比rr for reduction ratio
r_max_sst = [] # 减速器最大启停转矩sst for start and stop torque
r_max_t = [] # 减速器瞬时最大转矩
sc = [] # 采样周期sc for sample cycle
r_rts = [] # 减速器额定转矩
r_r_rpms = [] # 减速器额定转速
r_life_cycle = [] # 减速器L10寿命
r_avg_t = [] # 减速器平均负载转矩允许最大值
insert_logdb("INFO", "current", f"get_configs: 机型文件版本 {config_file}_{version}")
insert_logdb("INFO", "current", f"get_configs: 减速比 {rrs}")
insert_logdb("INFO", "current", f"get_configs: 额定电流 {rcs}")
return rcs, m_max_rcs, m_hold_rcs, m_rts, m_max_rts, m_r_rpms, m_max_rpms, m_tcs, rrs, r_max_sst, r_max_t, sc, r_rts, r_r_rpms, r_life_cycle, r_avg_t
def main():
sub = clibs.data_dp["_sub"]
path = clibs.data_dp["_path"]
vel = int(clibs.data_dp["_vel"])
trq = int(clibs.data_dp["_trq"])
trqh = int(clibs.data_dp["_trqh"])
sensor = int(clibs.data_dp["_sensor"])
w2t = clibs.w2t
insert_logdb = clibs.insert_logdb
insert_logdb("INFO", "current", "current: 参数初始化成功")
data_files, config_file = initialization(path, w2t, insert_logdb)
params = get_configs(f"{path}/{config_file}", w2t, insert_logdb)
rcs, rrs = params[0], params[8]
if sub == "max":
current_max(data_files, rcs, trq, w2t, insert_logdb)
elif sub == "avg":
current_avg(data_files, rcs, trqh, w2t, insert_logdb)
elif sub == "cycle":
current_cycle(data_files, vel, trq, trqh, sensor, rrs, rcs, params, w2t, insert_logdb)
if __name__ == '__main__':
main()