Compare commits
	
		
			7 Commits
		
	
	
		
			d76ee3d223
			...
			cdbe1c40c6
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| cdbe1c40c6 | |||
| 5ed38b4b2a | |||
| 27877e2b64 | |||
| edda9defdd | |||
| 485dffdd0b | |||
| d35858e14e | |||
| 718db9ec45 | 
@@ -473,5 +473,20 @@ v0.1.9.2(2024/07/13)
 | 
			
		||||
   - 调整单轴测试时间为35s,适配大负载机型,调整堵转电流持续时间15s,适当减少测试时间
 | 
			
		||||
   - 将act信号置为False的动作放在初始化,增加程序健壮性
 | 
			
		||||
   - 修改所有输出文件的命名,在扩展名之前加入时间戳
 | 
			
		||||
3. [current: current.py]: 在find_point函数种,当无法找到正确点位时,继续执行,而不是直接终止执行
 | 
			
		||||
   - 删除多余的时序矫正语句——item['value'].reverse(),使输出的曲线为平滑的自然顺序
 | 
			
		||||
3. [current: current.py]
 | 
			
		||||
   - 在find_point函数种,当无法找到正确点位时,继续执行,而不是直接终止执行
 | 
			
		||||
   - max功能计算逻辑矫正,应该是取绝对值的最大值
 | 
			
		||||
   - 整体梳理了trq/trqh的传递路径,现已修正完毕
 | 
			
		||||
   - 减速比rr数据源修改为configs.xlsx
 | 
			
		||||
4. 在current工程main函数增加 VelSet 100语句
 | 
			
		||||
 | 
			
		||||
v0.1.9.3(2024/07/15)
 | 
			
		||||
1. [APIs: openapi.py]
 | 
			
		||||
   - 修改modbus连接失败报错输出形式,使之只在automatic test页面显示
 | 
			
		||||
   - 将该文件移动至toplevel,为后面扩展做准备
 | 
			
		||||
   - 修改heartbeat文件路径,使后续打包的时候更方便
 | 
			
		||||
2. [APIs: aio.py]: 
 | 
			
		||||
   - 修改heartbeat文件路径,使后续打包的时候更方便
 | 
			
		||||
   - 修改write2textbox函数的打印逻辑,先判断网络相关
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
										
											Binary file not shown.
										
									
								
							
										
											Binary file not shown.
										
									
								
							@@ -9,8 +9,9 @@ from urllib.request import urlopen
 | 
			
		||||
from socket import setdefaulttimeout
 | 
			
		||||
from data_process import *
 | 
			
		||||
from automatic_test import *
 | 
			
		||||
import openapi
 | 
			
		||||
 | 
			
		||||
current_path = dirname(__file__)
 | 
			
		||||
heartbeat = f'{dirname(__file__)}/../assets/templates/heartbeat'
 | 
			
		||||
customtkinter.set_appearance_mode("System")  # Modes: "System" (standard), "Dark", "Light"
 | 
			
		||||
customtkinter.set_default_color_theme("blue")  # Themes: "blue" (standard), "green", "dark-blue"
 | 
			
		||||
customtkinter.set_widget_scaling(1.1)  # widget dimensions and text size
 | 
			
		||||
@@ -70,7 +71,7 @@ class App(customtkinter.CTk):
 | 
			
		||||
        btns_func['log']['btn'].configure(command=lambda: self.thread_it(self.func_log_callback))
 | 
			
		||||
        btns_func['end']['btn'].configure(command=lambda: self.thread_it(self.func_end_callback))
 | 
			
		||||
        # create version info
 | 
			
		||||
        self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.1\nDate: 07/12/2024", font=self.my_font, text_color="#4F4F4F")
 | 
			
		||||
        self.label_version = customtkinter.CTkLabel(self.frame_func, justify='left', text="Vers: 0.1.9.2\nDate: 07/13/2024", font=self.my_font, text_color="#4F4F4F")
 | 
			
		||||
        self.frame_func.rowconfigure(6, weight=1)
 | 
			
		||||
        self.label_version.grid(row=6, column=0, padx=20, pady=20, sticky='s')
 | 
			
		||||
        # =====================================================================
 | 
			
		||||
@@ -166,7 +167,7 @@ class App(customtkinter.CTk):
 | 
			
		||||
        self.seg_button.configure(state='disabled')
 | 
			
		||||
        # self.tabview.configure(state='disabled')
 | 
			
		||||
        self.textbox.delete(index1='1.0', index2='end')
 | 
			
		||||
        with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_h:
 | 
			
		||||
        with open(heartbeat, 'r', encoding='utf-8') as f_h:
 | 
			
		||||
            c_state = f_h.read().strip()
 | 
			
		||||
 | 
			
		||||
        if c_state == '0' and value != '功能切换':
 | 
			
		||||
@@ -181,13 +182,13 @@ class App(customtkinter.CTk):
 | 
			
		||||
        # self.tabview.configure(state='normal')
 | 
			
		||||
 | 
			
		||||
    def detect_network(self):
 | 
			
		||||
        with open(f"{current_path}/../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
 | 
			
		||||
        with open(heartbeat, "w", encoding='utf-8') as f_hb:
 | 
			
		||||
            f_hb.write('0')
 | 
			
		||||
        self.hr = openapi.HmiRequest(self.write2textbox)
 | 
			
		||||
        self.md = openapi.ModbusRequest(self.write2textbox)
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            with open(f'{current_path}/../assets/templates/heartbeat', 'r', encoding='utf-8') as f_hb:
 | 
			
		||||
            with open(heartbeat, 'r', encoding='utf-8') as f_hb:
 | 
			
		||||
                c_state = f_hb.read().strip()
 | 
			
		||||
                pb_color = 'green' if c_state == '1' else 'red'
 | 
			
		||||
                self.progressbar.configure(progress_color=pb_color)
 | 
			
		||||
@@ -333,7 +334,7 @@ class App(customtkinter.CTk):
 | 
			
		||||
        self.textbox.tag_config(tagName=color, foreground=color)
 | 
			
		||||
        tab_name_cur = self.tabview.get()
 | 
			
		||||
 | 
			
		||||
        if tab_name == tab_name_cur:
 | 
			
		||||
        if tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
 | 
			
		||||
            if wait != 0:
 | 
			
		||||
                self.textbox.insert(index='end', text=text, tags=color)
 | 
			
		||||
                self.textbox.update()
 | 
			
		||||
@@ -347,7 +348,7 @@ class App(customtkinter.CTk):
 | 
			
		||||
                self.textbox.insert(index='end', text=text + '\n', tags=color)
 | 
			
		||||
                self.textbox.update()
 | 
			
		||||
                self.textbox.see('end')
 | 
			
		||||
        elif tab_name == 'openapi' and tab_name_cur == 'Automatic Test':
 | 
			
		||||
        elif tab_name == tab_name_cur:
 | 
			
		||||
            if wait != 0:
 | 
			
		||||
                self.textbox.insert(index='end', text=text, tags=color)
 | 
			
		||||
                self.textbox.update()
 | 
			
		||||
 
 | 
			
		||||
@@ -1 +1 @@
 | 
			
		||||
__all__ = ['openapi', 'btn_functions', 'do_brake', 'do_current']
 | 
			
		||||
__all__ = ['btn_functions', 'do_brake', 'do_current']
 | 
			
		||||
@@ -120,10 +120,8 @@ def data_proc_regular(path, filename, channel, scenario_time):
 | 
			
		||||
                for item in data:
 | 
			
		||||
                    item['value'].reverse()
 | 
			
		||||
                    if item.get('channel', None) == channel and item.get('name', None) == 'hw_joint_vel_feedback':
 | 
			
		||||
                        item['value'].reverse()
 | 
			
		||||
                        _d2d_vel['hw_joint_vel_feedback'].extend(item['value'])
 | 
			
		||||
                    elif item.get('channel', None) == channel and item.get('name', None) == 'device_servo_trq_feedback':
 | 
			
		||||
                        item['value'].reverse()
 | 
			
		||||
                        _d2d_trq['device_servo_trq_feedback'].extend(item['value'])
 | 
			
		||||
 | 
			
		||||
            df1 = pandas.DataFrame.from_dict(_d2d_vel)
 | 
			
		||||
 
 | 
			
		||||
@@ -79,7 +79,7 @@ def initialization(path, sub, w2t):
 | 
			
		||||
    return data_files
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_max(data_files, rcs, trqh, w2t):
 | 
			
		||||
def current_max(data_files, rcs, trq, w2t):
 | 
			
		||||
    current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
 | 
			
		||||
    for data_file in data_files:
 | 
			
		||||
        if data_file.endswith('.data'):
 | 
			
		||||
@@ -93,8 +93,8 @@ def current_max(data_files, rcs, trqh, w2t):
 | 
			
		||||
        axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
 | 
			
		||||
        rca = rcs[axis-1]
 | 
			
		||||
 | 
			
		||||
        col = df.columns.values[trqh-1]
 | 
			
		||||
        c_max = df[col].max()
 | 
			
		||||
        col = df.columns.values[trq-1]
 | 
			
		||||
        c_max = df[col].abs().max()
 | 
			
		||||
 | 
			
		||||
        scale = 1 if data_file.endswith('.csv') else 1000
 | 
			
		||||
        _ = abs(c_max/scale*rca)
 | 
			
		||||
@@ -118,7 +118,7 @@ def current_max(data_files, rcs, trqh, w2t):
 | 
			
		||||
    return current
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_avg(data_files, rcs, trqh, w2t):
 | 
			
		||||
def current_avg(data_files, rcs, trq, w2t):
 | 
			
		||||
    current = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: []}
 | 
			
		||||
    for data_file in data_files:
 | 
			
		||||
        if data_file.endswith('.data'):
 | 
			
		||||
@@ -132,7 +132,7 @@ def current_avg(data_files, rcs, trqh, w2t):
 | 
			
		||||
        axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
 | 
			
		||||
        rca = rcs[axis-1]
 | 
			
		||||
 | 
			
		||||
        col = df.columns.values[trqh - 1]
 | 
			
		||||
        col = df.columns.values[trq-1]
 | 
			
		||||
        c_std = df[col].std()
 | 
			
		||||
        c_avg = df[col].mean()
 | 
			
		||||
 | 
			
		||||
@@ -158,7 +158,7 @@ def current_avg(data_files, rcs, trqh, w2t):
 | 
			
		||||
    return current
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t):
 | 
			
		||||
def current_cycle(dur, data_files, rcs, rrs, vel, trq, trqh, rpms, w2t):
 | 
			
		||||
    result = None
 | 
			
		||||
    hold = []
 | 
			
		||||
    single = []
 | 
			
		||||
@@ -194,9 +194,9 @@ def current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t):
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
    if dur == 0:
 | 
			
		||||
        p_single(wb, single, vel, trq, rpms, w2t)
 | 
			
		||||
        p_single(wb, single, vel, trq, rpms, rrs, w2t)
 | 
			
		||||
    else:
 | 
			
		||||
        p_scenario(wb, single, vel, trq, rpms, dur, w2t)
 | 
			
		||||
        p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t)
 | 
			
		||||
 | 
			
		||||
    w2t(f"正在保存文件 {result},需要 10s 左右", 1, 0, 'orange')
 | 
			
		||||
    stop = 0
 | 
			
		||||
@@ -223,8 +223,7 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
 | 
			
		||||
            else:
 | 
			
		||||
                return _row_s, _row_e
 | 
			
		||||
        else:
 | 
			
		||||
            # w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, exitcode, 'red')
 | 
			
		||||
            w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, 0, 'red')
 | 
			
		||||
            w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到第{exitcode}个有效点...", 0, exitcode, 'red')
 | 
			
		||||
    elif flag == 'gt':
 | 
			
		||||
        while _row_e > end_point:
 | 
			
		||||
            speed_avg = df.iloc[_row_s:_row_e, 0].abs().mean()
 | 
			
		||||
@@ -235,11 +234,10 @@ def find_point(data_file, pos, flag, df, _row_s, _row_e, w2t, exitcode, threshol
 | 
			
		||||
            else:
 | 
			
		||||
                return _row_s, _row_e
 | 
			
		||||
        else:
 | 
			
		||||
            # w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
 | 
			
		||||
            w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, 0, 'red')
 | 
			
		||||
            w2t(f"[{pos}] {data_file}数据有误,需要检查,无法找到有效起始点或结束点...", 0, exitcode, 'red')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def p_single(wb, single, vel, trq, rpms, w2t):
 | 
			
		||||
def p_single(wb, single, vel, trq, rpms, rrs, w2t):
 | 
			
		||||
    # 1. 先找到第一个速度为零的点,数据从后往前找,一开始就是零的情况不予考虑
 | 
			
		||||
    # 2. 记录第一个点的位置,继续向前查找第二个速度为零的点,同理,一开始为零的点不予考虑
 | 
			
		||||
    # 3. 记录第二个点的位置,并将其中的数据拷贝至对应位置
 | 
			
		||||
@@ -253,7 +251,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
 | 
			
		||||
        set_option("display.precision", 2)
 | 
			
		||||
        if data_file.endswith('.data'):
 | 
			
		||||
            df = read_csv(data_file, sep='\t')
 | 
			
		||||
            rr = float(wb['统计'].cell(row=2, column=axis+1).value)
 | 
			
		||||
            rr = rrs[axis+1]
 | 
			
		||||
            addition = 180 / 3.1415926 * 60 / 360 * rr
 | 
			
		||||
        elif data_file.endswith('.csv'):
 | 
			
		||||
            df = read_csv(data_file, sep=',', encoding='gbk', header=8)
 | 
			
		||||
@@ -270,6 +268,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
 | 
			
		||||
        col_names = list(df.columns)
 | 
			
		||||
        df_1 = df[col_names[vel-1]].multiply(rpm*addition)
 | 
			
		||||
        df_2 = df[col_names[trq-1]].multiply(scale)
 | 
			
		||||
        print(df_1.abs().max())
 | 
			
		||||
        df = concat([df_1, df_2], axis=1)
 | 
			
		||||
 | 
			
		||||
        _step = 5 if data_file.endswith('.csv') else 50
 | 
			
		||||
@@ -320,7 +319,7 @@ def p_single(wb, single, vel, trq, rpms, w2t):
 | 
			
		||||
                    cell.value = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def p_scenario(wb, single, vel, trq, rpms, dur, w2t):
 | 
			
		||||
def p_scenario(wb, single, vel, trq, rpms, rrs, dur, w2t):
 | 
			
		||||
    for data_file in single:
 | 
			
		||||
        cycle = 0.001
 | 
			
		||||
        axis = int(data_file.split('\\')[-1].split('_')[0].removeprefix('j'))
 | 
			
		||||
@@ -332,7 +331,7 @@ def p_scenario(wb, single, vel, trq, rpms, dur, w2t):
 | 
			
		||||
        set_option("display.precision", 2)
 | 
			
		||||
        if data_file.endswith('.data'):
 | 
			
		||||
            df = read_csv(data_file, sep='\t')
 | 
			
		||||
            rr = float(wb['统计'].cell(row=2, column=axis+1).value)
 | 
			
		||||
            rr = rrs[axis+1]
 | 
			
		||||
            addition = 180 / 3.1415926 * 60 / 360 * rr
 | 
			
		||||
        elif data_file.endswith('.csv'):
 | 
			
		||||
            df = read_csv(data_file, sep=',', encoding='gbk', header=8)
 | 
			
		||||
@@ -376,6 +375,7 @@ def get_configs(configfile, w2t):
 | 
			
		||||
    _wb = load_workbook(configfile, read_only=True)
 | 
			
		||||
    _ws = _wb['Target']
 | 
			
		||||
    rcs = []
 | 
			
		||||
    rrs = []
 | 
			
		||||
    rpms = []
 | 
			
		||||
    for i in range(2, 9):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -388,18 +388,23 @@ def get_configs(configfile, w2t):
 | 
			
		||||
        except:
 | 
			
		||||
            rcs.append(0.0)
 | 
			
		||||
 | 
			
		||||
    return rpms, rcs
 | 
			
		||||
        try:
 | 
			
		||||
            rrs.append(float(_ws.cell(row=2, column=i).value))
 | 
			
		||||
        except:
 | 
			
		||||
            rrs.append(0.0)
 | 
			
		||||
 | 
			
		||||
    return rpms, rcs, rrs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main(path, sub, dur, vel, trq, trqh, w2t):
 | 
			
		||||
    data_files = initialization(path, sub, w2t)
 | 
			
		||||
    rpms, rcs = get_configs(path + '\\configs.xlsx', w2t)
 | 
			
		||||
    rpms, rcs, rrs = get_configs(path + '\\configs.xlsx', w2t)
 | 
			
		||||
    if sub == 'max':
 | 
			
		||||
        current_max(data_files, rcs, trqh, w2t)
 | 
			
		||||
        current_max(data_files, rcs, trq, w2t)
 | 
			
		||||
    elif sub == 'avg':
 | 
			
		||||
        current_avg(data_files, rcs, trqh, w2t)
 | 
			
		||||
        current_avg(data_files, rcs, trq, w2t)
 | 
			
		||||
    elif sub == 'cycle':
 | 
			
		||||
        current_cycle(dur, data_files, rcs, vel, trq, trqh, rpms, w2t)
 | 
			
		||||
        current_cycle(dur, data_files, rcs, rrs,  vel, trq, trqh, rpms, w2t)
 | 
			
		||||
    else:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ from pymodbus.constants import Endian
 | 
			
		||||
MAX_FRAME_SIZE = 1024
 | 
			
		||||
setdefaulttimeout(2)
 | 
			
		||||
current_path = dirname(__file__)
 | 
			
		||||
heartbeat = f'{current_path}/../assets/templates/heartbeat'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ModbusRequest(object):
 | 
			
		||||
@@ -27,22 +28,19 @@ class ModbusRequest(object):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(40002, 1)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def motor_on(self):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(40003, 1)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def trigger_estop(self):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(40012, 0)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def reset_estop(self):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -54,66 +52,50 @@ class ModbusRequest(object):
 | 
			
		||||
            sleep(0.2)
 | 
			
		||||
            self.c.write_register(40001, 0)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def clear_alarm(self):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(40000, 1)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def get_cart_vel(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(40537, 7)
 | 
			
		||||
            print(f"cart vel: {results.registers}")
 | 
			
		||||
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def get_jnt_vel(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(40579, 7)
 | 
			
		||||
            print(f"joint vel: {results.registers}")
 | 
			
		||||
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def get_tcp_vel(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(40607, 7)
 | 
			
		||||
            print(f"tcp vel: {results.registers}")
 | 
			
		||||
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def get_tcp_mag_vel(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(40621, 1)
 | 
			
		||||
            print(f"tcp mag: {results.registers}")
 | 
			
		||||
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_act(self, number):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(41000, number)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def read_ready_to_go(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(41001, 1)
 | 
			
		||||
            return results.registers[0]
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def read_scenario_time(self):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -122,15 +104,13 @@ class ModbusRequest(object):
 | 
			
		||||
            result = f"{result.decode_32bit_float():.3f}"
 | 
			
		||||
            return result
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_stop0(self, number):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(41004, number)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_speed_max(self, speed):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -139,16 +119,14 @@ class ModbusRequest(object):
 | 
			
		||||
            payload = builder.build()
 | 
			
		||||
            self.c.write_registers(41005, payload, skip_encode=True)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def read_brake_done(self):
 | 
			
		||||
        try:
 | 
			
		||||
            results = self.c.read_holding_registers(41007, 1)
 | 
			
		||||
            return results.registers[0]
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_axis(self, axis):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -157,22 +135,19 @@ class ModbusRequest(object):
 | 
			
		||||
            payload = builder.to_registers()
 | 
			
		||||
            self.c.write_registers(41008, payload)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_probe(self, probe):
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(41010, probe)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
    def write_pon(self, pon):  # positive or negative
 | 
			
		||||
        try:
 | 
			
		||||
            self.c.write_register(41011, pon)
 | 
			
		||||
        except Exception as Err:
 | 
			
		||||
            self.w2t(f"{Err}")
 | 
			
		||||
            self.w2t("无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
            self.w2t(f"{Err}\n无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class HmiRequest(object):
 | 
			
		||||
@@ -210,7 +185,7 @@ class HmiRequest(object):
 | 
			
		||||
 | 
			
		||||
    def sock_conn(self):
 | 
			
		||||
        # while True:
 | 
			
		||||
        with open(f"{current_path}/../../assets/templates/heartbeat", "r", encoding='utf-8') as f_hb:
 | 
			
		||||
        with open(heartbeat, "r", encoding='utf-8') as f_hb:
 | 
			
		||||
            c_state = f_hb.read().strip()
 | 
			
		||||
        if c_state == '0':
 | 
			
		||||
            try:
 | 
			
		||||
@@ -224,7 +199,7 @@ class HmiRequest(object):
 | 
			
		||||
                self.c_xs.setblocking(False)
 | 
			
		||||
 | 
			
		||||
                self.w2t("Connection success", 0, 0, 'green', tab_name=self.tab_name)
 | 
			
		||||
                with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
 | 
			
		||||
                with open(heartbeat, "w", encoding='utf-8') as f_hb:
 | 
			
		||||
                    f_hb.write('1')
 | 
			
		||||
                md = ModbusRequest(self.w2t)
 | 
			
		||||
                md.reset_estop()
 | 
			
		||||
@@ -234,7 +209,7 @@ class HmiRequest(object):
 | 
			
		||||
                md.write_axis(1)
 | 
			
		||||
            except Exception as Err:
 | 
			
		||||
                self.w2t("Connection failed...", 0, 0, 'red', tab_name=self.tab_name)
 | 
			
		||||
                with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
 | 
			
		||||
                with open(heartbeat, "w", encoding='utf-8') as f_hb:
 | 
			
		||||
                    f_hb.write('0')
 | 
			
		||||
 | 
			
		||||
    def header_check(self, index, data):
 | 
			
		||||
@@ -272,12 +247,12 @@ class HmiRequest(object):
 | 
			
		||||
            print(f"hb = {_flag}", end=' ')
 | 
			
		||||
            print(f"len(c_msg) = {len(self.c_msg)}", end=' ')
 | 
			
		||||
            print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n')
 | 
			
		||||
            with open(f"{current_path}/../../assets/templates/heartbeat", "w", encoding='utf-8') as f_hb:
 | 
			
		||||
            with open(heartbeat, "w", encoding='utf-8') as f_hb:
 | 
			
		||||
                f_hb.write(_flag)
 | 
			
		||||
                if _flag == '0':
 | 
			
		||||
                    self.w2t(f"{_id} 心跳丢失,连接失败,重新连接中...", 0, 7, 'red', tab_name=self.tab_name)
 | 
			
		||||
            sleep(1.5)
 | 
			
		||||
            # with open(f"{current_path}/../../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
 | 
			
		||||
            # with open(f"{current_path}/../assets/templates/c_msg.log", "w", encoding='utf-8') as f:
 | 
			
		||||
            #     for msg in self.c_msg:
 | 
			
		||||
            #         f.write(str(loads(msg)) + '\n')
 | 
			
		||||
 | 
			
		||||
@@ -590,7 +565,7 @@ class HmiRequest(object):
 | 
			
		||||
        if flg == 0:  # for old protocols
 | 
			
		||||
            req = None
 | 
			
		||||
            try:
 | 
			
		||||
                with open(f'{current_path}/../../assets/templates/{command}.json', encoding='utf-8',
 | 
			
		||||
                with open(f'{current_path}/../assets/templates/{command}.json', encoding='utf-8',
 | 
			
		||||
                          mode='r') as f_json:
 | 
			
		||||
                    req = load(f_json)
 | 
			
		||||
            except:
 | 
			
		||||
		Reference in New Issue
	
	Block a user