This repository has been archived on 2025-03-27. You can view files and clone it, but cannot push or open issues or pull requests.
2025-01-06 12:26:49 +08:00

424 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import openpyxl
import pandas
import re
import csv
from common import clibs
def initialization(path, w2t, insert_logdb):
_, data_files = clibs.traversal_files(path, w2t)
count = 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "configs.xlsx":
count += 1
elif filename == "T_电机电流.xlsx":
...
else:
if not re.match("j[1-7].*\\.data", filename):
msg = f"不合规 {data_file}\n"
msg += "所有数据文件必须以 j[1-7]_ 开头,以 .data 结尾比如j1_abcdef.data\n配置文件需要命名为\"configs.xlsx\",结果文件需要命名为\"T_电机电流.xlsx\"\n"
msg += "需要有配置文件\"configs.xlsx\"表格,以及数据处理文件\"T_电机电流.xlsx\"表格,请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
if count != 1:
msg = "需要有配置文件\"configs.xlsx\"表格,以及数据处理文件\"T_电机电流.xlsx\"表格,请检查整改后重新运行\n"
w2t(msg, "red", "FilenameIllegal")
insert_logdb("INFO", "current", f"current: 获取必要文件:{data_files}")
return data_files
def current_max(data_files, rcs, trq, w2t, insert_logdb):
insert_logdb("INFO", "current", "MAX: 正在处理最大电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"MAX: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"MAX: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trq-1] # 获取 "device_servo_trq_feedback"
c_max = df[col].abs().max()
scale = 1000
_ = abs(c_max/scale*rca)
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"MAX: 获取到的列名为 {col},最大电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【MAX】数据处理完毕......")
insert_logdb("INFO", "current", f"MAX: 获取最大电流值结束 current_max = {current}")
return current
def current_avg(data_files, rcs, trq, w2t, insert_logdb):
insert_logdb("INFO", "current", "AVG: 正在处理平均电流值逻辑...")
current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: []}
for data_file in data_files:
if data_file.endswith(".data"):
df = pandas.read_csv(data_file, sep="\t")
else:
continue
insert_logdb("INFO", "current", f"AVG: 正在处理 {data_file}")
cols = len(df.columns)
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
rca = rcs[axis-1]
insert_logdb("INFO", "current", f"AVG: 最大列数为 {cols}{axis} 轴的额定电流为 {rca}")
col = df.columns.values[trq-1]
c_std = df[col].std()
c_avg = df[col].mean()
scale = 1000
_ = (abs(c_avg)+c_std*3)/scale*rca
current[axis].append(_)
w2t(f"{data_file}: {_:.4f}\n")
insert_logdb("INFO", "current", f"AVG: 获取到的列名为 {col},平均电流为 {_}")
with open(data_file, "a+") as f_data:
csv_writer = csv.writer(f_data, delimiter="\t")
csv_writer.writerow([""] * (cols-1) + [_])
for axis, cur in current.items():
if not cur:
continue
else:
w2t(f"{axis}轴数据:")
for value in cur:
w2t(f"{value:.4f} ")
w2t("\n")
w2t("\n【AVG】数据处理完毕......\n")
insert_logdb("INFO", "current", f"AVG: 获取平均电流值结束 current_avg = {current}")
return current
def current_cycle(data_files, vel, trq, trqh, rrs, rcs, rpms, w2t, insert_logdb):
result, hold, single, scenario, dur_time = None, [], [], [], 0
for data_file in data_files:
filename = data_file.split("/")[-1]
if filename == "T_电机电流.xlsx":
result = data_file
elif re.match("j[1-7]_hold_.*\\.data", filename):
hold.append(data_file)
elif re.match("j[1-7]_s_.*\\.data", filename):
scenario.append(data_file)
dur_time = float(filename.split("_")[3])
elif re.match("j[1-7]_.*\\.data", filename):
single.append(data_file)
clibs.stop = True
w2t(f"正在打开文件 {result},需要 10s 左右......\n")
t_excel = clibs.GetThreadResult(openpyxl.load_workbook, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_progress = threading.Thread(target=clibs.tl_prg, args=("Processing......", ))
t_progress.daemon = True
t_progress.start()
wb = t_excel.get_result()
if hold:
avg = current_avg(hold, rcs, trqh, w2t, insert_logdb)
for axis, cur_value in avg.items():
sht_name = f"J{axis}"
wb[sht_name]["O4"].value = float(cur_value[0])
if dur_time == 0:
p_single(wb, single, vel, rrs, w2t, insert_logdb)
else:
p_scenario(wb, scenario, vel, rrs, dur_time, w2t, insert_logdb)
clibs.stop = True
w2t(f"正在保存文件 {result},需要 10s 左右......\n")
t_excel = threading.Thread(target=wb.save, args=(result, ))
t_excel.daemon = True
t_excel.start()
t_excel.join()
clibs.stop = False
t_progress.join()
w2t("----------------------------------------------------------\n")
w2t("全部处理完毕")
def find_point(data_file, df, flag, row_s, row_e, threshold, step, end_point, skip_scale, axis, seq, w2t, insert_logdb):
if flag == "lt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都大于 2说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg < threshold:
insert_logdb("WARNING", "current", f"【lt】{axis} 轴第 {seq} 次查找数据有异常row_s = {row_s}, row_e = {row_e}")
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
continue
else:
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
elif flag == "gt":
while row_e > end_point:
speed_avg = df.iloc[row_s:row_e].abs().mean()
# if axis == 1 and seq == 1:
# insert_logdb("DEBUG", "current", f"【gt】{axis} 轴speed_avg = {speed_avg}row_s = {row_s}, row_e = {row_e}")
if speed_avg > threshold:
row_e -= step
row_s -= step
continue
else:
# one more time如果连续两次 200 个点的平均值都小于 2说明已经到了临界点了其实也不一定只不过相对遇到一次就判定临界点更安全一点点
# 从实际数据看,这开逻辑很小概率能触发到
speed_avg = df.iloc[row_s-end_point*skip_scale:row_e-end_point*skip_scale].abs().mean()
if speed_avg > threshold:
insert_logdb("WARNING", "current", f"【gt】{axis} 轴第 {seq} 次查找数据有异常row_s = {row_s}, row_e = {row_e}")
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
continue
else:
return row_s, row_e
else:
w2t(f"{data_file} 数据有误,需要检查,无法找到第 {seq} 个有效点...", "red", "AnchorNotFound")
def get_row_number(threshold, flag, df, row_s, row_e, axis, insert_logdb):
count_1, count_2 = 0, 0
if flag == "start" or flag == "end":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number > threshold:
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
elif flag == "middle":
for number in df.iloc[row_s:row_e].abs():
count_2 += 1
if number < threshold: # 唯一的区别
count_1 += 1
if count_1 == 10:
return row_s + count_2 - 10
else:
count_1 = 0
places = {"start": "起点", "middle": "中间点", "end": "终点"}
insert_logdb("WARNING", "current", f"{axis} 轴获取{places[flag]}数据 {row_e} 可能有异常,需关注!")
return row_e
def p_single(wb, single, vel, rrs, w2t, insert_logdb):
# 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
# 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
# 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
for data_file in single:
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
step = 50 # 步进值
end_point = 200 # 有效数值的数目
threshold = 2 # 200个点的平均阈值线
skip_scale = 2
row_start, row_middle, row_end = 0, 0, 0
row_e = df.index[-1]
row_s = row_e - end_point
speed_avg = df.iloc[row_s:row_e].abs().mean()
if speed_avg < 2:
# 第一次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第一次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第二次采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 正式第三次采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
elif speed_avg > 2:
# 第一次过滤:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次过滤:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 0, w2t, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第一次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 1, w2t, insert_logdb)
row_end = get_row_number(threshold, "end", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第二次正式采集:消除速度为零的数据,找到速度即将大于零的上升临界点
row_s, row_e = find_point(data_file, df, "lt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 2, w2t, insert_logdb)
row_middle = get_row_number(threshold, "middle", df, row_s, row_e, axis, insert_logdb)
row_e -= end_point*skip_scale
row_s -= end_point*skip_scale
# 第三次正式采集:消除速度大于零的数据,找到速度即将趋近于零的下降临界点
row_s, row_e = find_point(data_file, df, "gt", row_s, row_e, threshold, step, end_point, skip_scale, axis, 3, w2t, insert_logdb)
row_start = get_row_number(threshold, "start", df, row_s, row_e, axis, insert_logdb)
insert_logdb("INFO", "current", f"{axis} 轴起点:{row_start}")
insert_logdb("INFO", "current", f"{axis} 轴中间点:{row_middle}")
insert_logdb("INFO", "current", f"{axis} 轴终点:{row_end}")
insert_logdb("INFO", "current", f"{axis} 轴数据非零段点数:{row_middle-row_start+1}")
insert_logdb("INFO", "current", f"{axis} 轴数据为零段点数:{row_end-row_middle+1}")
if abs(row_end+row_start-2*row_middle) > 1000:
insert_logdb("WARNING", "current", f"{axis} 轴数据占空比异常!")
data = []
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
data.append(df_origin.iloc[row, 2])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=150000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception as Err:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
cell.value = None
i += 1
def p_scenario(wb, scenario, vel, rrs, dur_time, w2t, insert_logdb):
for data_file in scenario:
cycle = 0.001
axis = int(data_file.split("/")[-1].split("_")[0].removeprefix("j"))
sht_name = f"J{axis}"
ws = wb[sht_name]
pandas.set_option("display.precision", 2)
df_origin = pandas.read_csv(data_file, sep="\t")
rr = rrs[axis-1]
addition = 180 / 3.1415926 * 60 / 360 * rr
col_names = list(df_origin.columns)
df = df_origin[col_names[vel-1]].multiply(addition)
row_start = 3000
row_end = row_start + int(dur_time/cycle)
if row_end > df.index[-1]:
w2t(f"位置超限:{data_file} 共有 {df.index[-1]} 条数据,无法取到第 {row_end} 条数据,需要确认场景周期时间...", "red", "DataOverLimit")
data = []
for row in range(row_start, row_end+1):
data.append(df_origin.iloc[row, 0])
data.append(df_origin.iloc[row, 1])
data.append(df_origin.iloc[row, 2])
i = 0
for row in ws.iter_rows(min_row=2, min_col=2, max_row=250000, max_col=4):
for cell in row:
try:
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = float(((i//3)+1)/1000)
_ = f"{data[i]:.2f}"
cell.value = float(_)
i += 1
except Exception as Err:
cell.value = None
if i % 3 == 0:
ws.cell((i//3)+2, 1).value = None
i += 1
def get_configs(configfile, w2t, insert_logdb):
try:
wb = openpyxl.load_workbook(configfile, read_only=True)
ws = wb["Target"]
except Exception as Err:
insert_logdb("ERROR", "current", f"无法打开 {configfile},获取配置文件参数错误 {Err}")
w2t(f"无法打开 {configfile}", color="red", desc="OpenFileError")
# 最大角速度,额定电流,减速比,额定转速
rrs, avs, rcs, rpms = [], [], [], []
for i in range(2, 8):
rrs.append(abs(float(ws.cell(row=2, column=i).value)))
avs.append(abs(float(ws.cell(row=3, column=i).value)))
rpms.append(abs(float(ws.cell(row=4, column=i).value)))
rcs.append(abs(float(ws.cell(row=6, column=i).value)))
insert_logdb("INFO", "current", f"current: 获取减速比:{rrs}")
insert_logdb("INFO", "current", f"current: 获取角速度:{avs}")
insert_logdb("INFO", "current", f"current: 获取额定电流:{rcs}")
insert_logdb("INFO", "current", f"current: 获取额定转速:{rpms}")
return rrs, avs, rcs, rpms
def main():
sub = clibs.data_dp["_sub"]
path = clibs.data_dp["_path"]
vel = int(clibs.data_dp["_vel"])
trq = int(clibs.data_dp["_trq"])
trqh = int(clibs.data_dp["_trqh"])
w2t = clibs.w2t
insert_logdb = clibs.insert_logdb
insert_logdb("INFO", "current", "current: 参数初始化成功")
data_files = initialization(path, w2t, insert_logdb)
rrs, avs, rcs, rpms = get_configs(path + "\\configs.xlsx", w2t, insert_logdb)
if sub == "max":
current_max(data_files, rcs, trq, w2t, insert_logdb)
elif sub == "avg":
current_avg(data_files, rcs, trq, w2t, insert_logdb)
elif sub == "cycle":
current_cycle(data_files, vel, trq, trqh, rrs, rcs, rpms, w2t, insert_logdb)
if __name__ == '__main__':
main()