from json import load, dumps, loads from socket import socket, AF_INET, SOCK_STREAM from threading import Thread import selectors from time import time, sleep from pymodbus.client.tcp import ModbusTcpClient from pymodbus.payload import BinaryPayloadDecoder, BinaryPayloadBuilder from pymodbus.constants import Endian from commons import clibs logger = clibs.log_prod class ModbusRequest(object): def __init__(self, w2t): super().__init__() self.w2t = w2t self.tab_name = 'openapi' self.host = clibs.ip_addr self.port = 502 self.interval = 0.3 self.c = ModbusTcpClient(host=self.host, port=self.port) self.c.connect() def motor_off(self): try: self.c.write_register(40002, 0) sleep(self.interval) self.c.write_register(40002, 1) sleep(self.interval) self.c.write_register(40002, 0) except Exception as Err: self.w2t(f"{Err}\n无法正常下电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def motor_on(self): try: self.c.write_register(40003, 0) sleep(self.interval) self.c.write_register(40003, 1) sleep(self.interval) self.c.write_register(40003, 0) except Exception as Err: self.w2t(f"{Err}\n无法正常上电,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def trigger_estop(self): try: self.c.write_register(40012, 0) except Exception as Err: self.w2t(f"{Err}\n无法触发软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def reset_estop(self): try: self.c.write_register(40012, 1) sleep(self.interval) self.c.write_register(40001, 0) sleep(self.interval) self.c.write_register(40001, 1) sleep(self.interval) self.c.write_register(40001, 0) except Exception as Err: self.w2t(f"{Err}\n无法重置软急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def clear_alarm(self): try: self.c.write_register(40000, 1) except Exception as Err: self.w2t(f"{Err}\n无法清除告警,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_cart_vel(self): try: results = self.c.read_holding_registers(40537, 7) except Exception as Err: self.w2t(f"{Err}\n无法读取笛卡尔速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_jnt_vel(self): try: results = self.c.read_holding_registers(40579, 7) except Exception as Err: self.w2t(f"{Err}\n无法读取关节速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_tcp_vel(self): try: results = self.c.read_holding_registers(40607, 7) except Exception as Err: self.w2t(f"{Err}\n无法读取TCP速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def get_tcp_mag_vel(self): try: results = self.c.read_holding_registers(40621, 1) except Exception as Err: self.w2t(f"{Err}\n无法读取TCP合成速度,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_act(self, number): try: self.c.write_register(41000, number) except Exception as Err: self.w2t(f"{Err}\n无法发送执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_ready_to_go(self): try: results = self.c.read_holding_registers(41001, 1) return results.registers[0] except Exception as Err: self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_scenario_time(self): try: results = self.c.read_holding_registers(41002, 2) result = BinaryPayloadDecoder.fromRegisters(results.registers, byteorder=Endian.BIG, wordorder=Endian.LITTLE) result = f"{result.decode_32bit_float():.3f}" return result except Exception as Err: self.w2t(f"{Err}\n无法读取准备信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_stop0(self, number): try: self.c.write_register(41004, number) except Exception as Err: self.w2t(f"{Err}\n无法通过IO操作stop0急停,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_speed_max(self, speed): try: builder = BinaryPayloadBuilder(byteorder=Endian.BIG, wordorder=Endian.LITTLE) builder.add_32bit_float(float(speed)) payload = builder.build() self.c.write_registers(41005, payload, skip_encode=True) except Exception as Err: self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def read_brake_done(self): try: results = self.c.read_holding_registers(41007, 1) return results.registers[0] except Exception as Err: self.w2t(f"{Err}\n无法读取制动已执行信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_axis(self, axis): try: builder = BinaryPayloadBuilder(byteorder=Endian.BIG, wordorder=Endian.LITTLE) builder.add_32bit_int(int(axis)) payload = builder.to_registers() self.c.write_registers(41008, payload) except Exception as Err: self.w2t(f"{Err}\n无法写入速度值,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_probe(self, probe): try: self.c.write_register(41010, probe) except Exception as Err: self.w2t(f"{Err}\n无法写入速度探测信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) def write_pon(self, pon): # positive or negative try: self.c.write_register(41011, pon) except Exception as Err: self.w2t(f"{Err}\n无法写入正负方向信号,连接Modbus失败,需要确认网络是否通畅,或是未正确导入寄存器文件...", 0, 100, 'red', self.tab_name) class HmiRequest(object): def __init__(self, w2t): super().__init__() self.w2t = w2t self.c = None self.c_xs = None self.c_msg = [] self.c_msg_xs = [] self.flag = 0 self.response = '' self.leftover = 0 self.flag_xs = 0 self.response_xs = '' self.t_bool = True self.tab_name = 'openapi' self.pkg_size = 0 self.broke = 0 self.half = 0 self.half_length = 0 self.index = 0 self.reset_index = 0 self.durable_lock = 0 self.sock_conn() self.t_heartbeat = Thread(target=self.heartbeat) self.t_heartbeat.daemon = True self.t_heartbeat.start() self.t_unpackage = Thread(target=self.unpackage, args=(self.c,)) self.t_unpackage.daemon = True self.t_unpackage.start() self.t_unpackage_xs = Thread(target=self.unpackage_xs, args=(self.c_xs,)) self.t_unpackage_xs.daemon = True self.t_unpackage_xs.start() def sock_conn(self): # while True: with open(clibs.heartbeat, "r", encoding='utf-8') as f_hb: c_state = f_hb.read().strip() if c_state == '0': try: self.c = socket(AF_INET, SOCK_STREAM) self.c.connect((clibs.ip_addr, 5050)) self.c.setblocking(False) self.c_xs = socket(AF_INET, SOCK_STREAM) self.c_xs.connect((clibs.ip_addr, 6666)) self.c_xs.setblocking(False) logger.info("Connection success...") with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write('1') md = ModbusRequest(self.w2t) md.reset_estop() md.clear_alarm() md.write_act(False) md.write_probe(False) md.write_axis(1) except Exception as Err: logger.info("Connection failed...") with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write('0') def header_check(self, index, data): if index + 8 < len(data): _frame_size = int.from_bytes(data[index:index+2], byteorder='big') _pkg_size = int.from_bytes(data[index+2:index+6], byteorder='big') _protocol = int.from_bytes(data[index + 6:index + 7], byteorder='big') _reserved = int.from_bytes(data[index + 7:index + 8], byteorder='big') if _reserved == 0 and _protocol == 2: return index + 8, _frame_size, _pkg_size else: print(data) # print(f"index = {index}") # print(f"reserve = {_reserved}") # print(f"protocol = {_protocol}") # print("head check 数据有误,需要确认") self.w2t("Header Check: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name) else: self.half_length = len(data) - index self.half = data[index:] # print(f"in head check half: {self.half}") # print(f"in head check length: {self.half_length}") # print(f"in head check data: {data}") self.broke = 100 index += clibs.MAX_FRAME_SIZE return index, 0, 0 def heartbeat(self): while self.t_bool: _id = self.execution('controller.heart') _flag = '0' if self.get_from_id(_id) is None else '1' # print(f"hb = {_flag}", end=' ') # print(f"len(c_msg) = {len(self.c_msg)}", end=' ') # print(f"len(c_msg_xs) = {len(self.c_msg_xs)}", end='\n') with open(clibs.heartbeat, "w", encoding='utf-8') as f_hb: f_hb.write(_flag) if _flag == '0': logger.info(f"{_id} 心跳丢失,连接失败,重新连接中...") sleep(2) def msg_storage(self, response, flag=0): # response是解码后的字符串 messages = self.c_msg if flag == 0 else self.c_msg_xs logger.warning(f"{loads(response)}") if 'move.monitor' in response: pass elif len(messages) < 10000: messages.insert(0, response) else: messages.insert(0, response) while len(messages) > 20000: messages.pop() def get_response(self, data): # 流式获取单次请求的响应 if self.broke == 100: # print("*****************************************") # print(f"in get_response if broke == 100 half = {self.half}") _half_1 = self.half _half_2 = data[:8-self.half_length] _full = _half_1 + _half_2 # print(f"in get_response if broke == 100 _full = {_full}") _frame_size = int.from_bytes(_full[:2], byteorder='big') _pkg_size = int.from_bytes(_full[2:6], byteorder='big') _protocol = int.from_bytes(_full[6:7], byteorder='big') _reserved = int.from_bytes(_full[7:8], byteorder='big') if _reserved != 0 or _protocol != 2: print(data) self.w2t("in get_response: 解包数据有误,需要确认!", 0, 1, 'red', tab_name=self.tab_name) self.pkg_size = _pkg_size self.index = 8 - self.half_length # print(f"broke == 100 index = {self.index}") # print(f"broke == 100 INIT pkg size = {self.pkg_size}") # print(f"broke == 100 data = {data}") else: if self.reset_index == 1: self.index = 0 while self.index < len(data): # flag 为 0,则说明是一次新的请求对应的一次新的相应,也就是需要首次解包 if self.flag == 0: if self.broke == 100: self.broke = 0 else: self.index, _frame_size, self.pkg_size = self.header_check(self.index, data) # print(f"broke == 0 index = {self.index-8}") # print(f"broke == 0 INIT pkg size = {self.pkg_size}") # print(f"broke == 0 data = {data}") if self.index > clibs.MAX_FRAME_SIZE: break # 详见解包原理数据.txt,self.pkg_size 永远是除了当前data之外剩余未处理的数据大小 if self.pkg_size <= len(data) - self.index: # 说明剩余部分的数据就在当前data内,没有被分割 self.response = data[self.index:self.index + self.pkg_size].decode() self.msg_storage(flag=0, response=self.response) self.index += self.pkg_size # print(f"in flag=0 if data = {data}") # print(f"in flag=0 if index = {self.index}") # print(f"in flag=0 if pkg size = {self.pkg_size}") # print(f"in flag=0 if leftover = {self.leftover}") self.flag = 0 self.response = '' self.leftover = 0 self.pkg_size = 0 self.reset_index = 0 if self.index < len(data) else 1 elif self.pkg_size > len(data) - self.index: # 执行到这里说明该data是首包,且有有分包的情况发生了也就是该响应数据量稍微比较大 # 分散在了相邻的两个及以上的data中,需要flag=1的处理 self.flag = 1 if self.index+_frame_size-6 <= len(data): self.response = data[self.index:self.index+_frame_size-6].decode() self.index += (_frame_size-6) self.pkg_size -= (_frame_size-6) # 详见解包原理数据.txt,self.pkg_size self.reset_index = 0 if self.index + 2 < len(data): self.leftover = int.from_bytes(data[self.index:self.index + 2], byteorder='big') self.index += 2 self.reset_index = 0 else: if self.index + 2 == len(data): self.broke = 1 self.half = data[-2:] # print(f"flag = 0 encounter broke == 1 - half = {self.half}") elif self.index + 1 == len(data): self.broke = 2 self.half = data[-1:] # print(f"flag = 0 encounter broke == 2 - half = {self.half}") elif self.index == len(data): # print('flag = 0 encounter broke == 3') self.broke = 3 self.index += clibs.MAX_FRAME_SIZE self.reset_index = 1 break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE elif self.index+_frame_size-6 > len(data): self.response = data[self.index:].decode() self.pkg_size -= (len(data) - self.index) # 详见解包原理数据.txt,self.pkg_size self.leftover = (_frame_size-6-(len(data)-self.index)) self.index += clibs.MAX_FRAME_SIZE self.reset_index = 1 # print(f"in flag=0 else data = {data}") # print(f"in flag=0 else index = {self.index}") # print(f"in flag=0 else pkg size = {self.pkg_size}") # print(f"in flag=0 else leftover = {self.leftover}") # break elif self.flag == 1: # 继续处理之前为接收完的数据,处理完之后将flag重置为0 # !!!需要注意的是,包头/帧头也是有可能被分割开的!!!但是目前该程序未实现此种情况!!! if self.broke == 1: self.index = 0 self.leftover = int.from_bytes(self.half, byteorder='big') self.broke = 0 # print(f"broke 1 leftover: {self.leftover}") elif self.broke == 2: self.leftover = int.from_bytes(self.half+data[:1], byteorder='big') self.index = 1 self.broke = 0 # print(f"broke 2 leftover: {self.leftover}") if self.broke == 3: self.leftover = int.from_bytes(data[:2], byteorder='big') # print(f"broke 3 leftover: {self.leftover}") self.index = 2 self.broke = 0 while self.pkg_size > 0: if self.index + self.leftover <= len(data): # print(f"in pkg size > 0 loop before if data = {data}") # print(f"in pkg size > 0 loop before if index = {self.index}") # print(f"in pkg size > 0 loop before if pkg size = {self.pkg_size}") # print(f"in pkg size > 0 loop before if leftover = {self.leftover}") if self.leftover < 0 or self.leftover > 1024: self.w2t("", 0, 111, 'red') self.response += data[self.index:self.index + self.leftover].decode() self.pkg_size -= self.leftover if self.pkg_size == 0: self.msg_storage(flag=0, response=self.response) self.index += self.leftover self.flag = 0 self.response = '' self.leftover = 0 self.pkg_size = 0 self.reset_index = 0 if self.index < len(data) else 1 # print(f"in pkg size > 0 loop break if data = {data}") # print(f"in pkg size > 0 loop break if index = {self.index}") # print(f"in pkg size > 0 loop break if pkg size = {self.pkg_size}") # print(f"in pkg size > 0 loop break if leftover = {self.leftover}") break self.index += self.leftover if self.index + 2 < len(data): self.leftover = int.from_bytes(data[self.index:self.index + 2], byteorder='big') self.index += 2 self.reset_index = 0 else: # self.leftover = 4096 if self.index + 2 == len(data): self.broke = 1 self.half = data[-2:] # print(f"flag = 1 encounter broke == 1 - half = {self.half}") elif self.index + 1 == len(data): self.broke = 2 self.half = data[-1:] # print(f"flag = 1 encounter broke == 2 - half = {self.half}") elif self.index == len(data): # print('flag = 1 encounter broke == 3') self.broke = 3 self.index += clibs.MAX_FRAME_SIZE self.reset_index = 1 break # 因为 index + 2 的大小超过 clibs.MAX_FRAME_SIZE # print(f"in pkg size > 0 loop after if data = {data}") # print(f"in pkg size > 0 loop after if index = {self.index}") # print(f"in pkg size > 0 loop after if pkg size = {self.pkg_size}") # print(f"in pkg size > 0 loop after if leftover = {self.leftover}") if self.leftover < 0 or self.leftover > 1024: self.w2t("", 0, 111, 'red') else: # print(f"in pkg size > 0 loop before else data = {data}") # print(f"in pkg size > 0 loop before else index = {self.index}") # print(f"in pkg size > 0 loop before else pkg size = {self.pkg_size}") # print(f"in pkg size > 0 loop before else leftover = {self.leftover}") if self.leftover < 0 or self.leftover > 1024: self.w2t("", 0, 111, 'red') self.response += data[self.index:].decode() self.leftover -= (len(data) - self.index) self.pkg_size -= (len(data) - self.index) self.index += clibs.MAX_FRAME_SIZE self.reset_index = 1 # print(f"in pkg size > 0 loop after else data = {data}") # print(f"in pkg size > 0 loop after else index = {self.index}") # print(f"in pkg size > 0 loop after else pkg size = {self.pkg_size}") # print(f"in pkg size > 0 loop after else leftover = {self.leftover}") if self.leftover < 0 or self.leftover > 1024: self.w2t("", 0, 111, 'red') break # 该data内数据已经处理完毕,需要跳出大循环,通过break和index # else: # self.msg_storage(flag=0, response=self.response) # self.flag = 0 # self.response = '' # self.leftover = 0 # self.pkg_size = 0 # self.index -= 2 # self.reset_index = 0 if (len(data) > self.index > 0) else 1 def get_response_xs(self, data): if self.flag_xs == 0: if data[-1].decode() == '\r': _responses = data.decode().split('\r') for _response in _responses: self.msg_storage(flag=1, response=_response) else: _responses = data.decode().split('\r') for _response in _responses[:-1]: if not _response: break self.msg_storage(flag=1, response=_response) self.response_xs = _responses[-1] self.flag_xs = 1 else: if data[-1].decode() == '\r': _responses = (self.response_xs.encode() + data).decode().split('\r') for _response in _responses: self.msg_storage(flag=1, response=_response) self.response_xs = '' self.flag_xs = 0 else: _responses = (self.response_xs.encode() + data).decode().split('\r') for _response in _responses[:-1]: if not _response: break self.msg_storage(flag=1, response=_response) self.response_xs = _responses[-1] self.flag_xs = 1 def get_from_id(self, msg_id, flag=0): for i in range(3): with open(clibs.log_data_hmi, mode='r', encoding='utf-8') as f_log: for line in f_log: if msg_id in line.strip(): return line sleep(1) else: # 尝试在上一次分割的日志中查找,只做一次 sleep(1) try: with open(clibs.log_data_hmi+'.1', mode='r', encoding='utf-8') as f_log: for line in f_log: if msg_id in line.strip(): return line except FileNotFoundError: pass return None def package(self, cmd): _frame_head = (len(cmd) + 6).to_bytes(length=2, byteorder='big') _pkg_head = len(cmd).to_bytes(length=4, byteorder='big') _protocol = int(2).to_bytes(length=1, byteorder='big') _reserved = int(0).to_bytes(length=1, byteorder='big') return _frame_head + _pkg_head + _protocol + _reserved + cmd.encode() def package_xs(self, cmd): return f"{dumps(cmd, separators=(',', ':'))}\r".encode() def unpackage(self, sock): def to_read(conn, mask): data = conn.recv(clibs.MAX_FRAME_SIZE) if data: # print(data) self.get_response(data) else: print('closing', conn) sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while self.t_bool: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: logger.warning(Err) def unpackage_xs(self, sock): def to_read(conn, mask): data = conn.recv(1024) # Should be ready if data: # print(data) self.get_response_xs(data) else: print('closing', conn) sel.unregister(conn) conn.close() try: sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ, to_read) while self.t_bool: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) except Exception as Err: logger.warning(Err) def gen_id(self, command): _now = time() _id = f"{command}-{_now}" return _id def execution(self, command, flg=0, **kwargs): if flg == 0: # for old protocols req = None try: with open(f'{clibs.PREFIX}templates/json/{command}.json', encoding='utf-8', mode='r') as f_json: req = load(f_json) except: self.w2t(f"暂不支持 {command} 功能,或确认该功能存在...", 0, 1, 'red', tab_name=self.tab_name) match command: case 'state.set_tp_mode': req['data']['tp_mode'] = kwargs['tp_mode'] case 'overview.set_autoload': req['data']['autoload_prj_path'] = kwargs['autoload_prj_path'] case 'overview.reload': req['data']['prj_path'] = kwargs['prj_path'] req['data']['tasks'] = kwargs['tasks'] case 'rl_task.pp_to_main' | 'rl_task.run' | 'rl_task.stop': req['data']['tasks'] = kwargs['tasks'] case 'rl_task.set_run_params': req['data']['loop_mode'] = kwargs['loop_mode'] req['data']['override'] = kwargs['override'] case 'diagnosis.set_params': req['data']['display_pdo_params'] = kwargs['display_pdo_params'] case 'diagnosis.open': req['data']['open'] = kwargs['open'] req['data']['display_open'] = kwargs['display_open'] case 'register.set_value': req['data']['name'] = kwargs['name'] req['data']['type'] = kwargs['type'] req['data']['bias'] = kwargs['bias'] req['data']['value'] = kwargs['value'] case 'diagnosis.save': req['data']['save'] = kwargs['save'] case _: pass req['id'] = self.gen_id(command) # print(f"req = {req}") cmd = dumps(req, separators=(',', ':')) try: self.c.send(self.package(cmd)) sleep(0.5) except Exception as Err: # self.w2t(f"{cmd}: 请求发送失败...{Err}", 0, 0, 'red', tab_name=self.tab_name) logger.info(f"{cmd}: 请求发送失败...{Err}") return req['id'] else: # for xService pass