from configparser import ConfigParser from pymodbus.client import ModbusSerialClient, ModbusTcpClient from time import sleep, strftime, localtime, time from openpyxl import load_workbook, Workbook from os import rename # import pymodbus # pymodbus.pymodbus_apply_logging_config("DEBUG") def initializations(): try: now = strftime("%Y%m%d%H%M%S", localtime(time())) rename('results.xlsx', f'results_{now}.xlsx') except FileNotFoundError: pass wb = Workbook() ws = wb.active ws.title = 'results' ws['A1'].value = '时间' ws['B1'].value = '次数' ws['C1'].value = '结果' wb.save('results.xlsx') wb.close() def do_connections(): configs = ConfigParser() configs.read('conf.ini') # ================ 配置Modbus TCP客户端 ================ tcp_host = configs.get('md_tcp', 'addr') tcp_port = configs.getint('md_tcp', 'port') client_tcp = ModbusTcpClient(tcp_host, tcp_port) if client_tcp.connect(): print(f"Modbus TCP已连接到{tcp_host}:{tcp_port}...") else: raise Exception(f"Modbus TCP无法连接到{tcp_host}:{tcp_port}...") # ================ 配置Modbus RTU客户端 ================ rtu_port = configs.get('md_rtu', 'port') client_rtu = ModbusSerialClient( method='rtu', port=rtu_port, # 根据实际情况调整端口,Windows上可能是 'COM3' baudrate=38400, # 根据协议设定波特率为38400 timeout=1, parity='N', # 无奇偶校验 stopbits=2, # 2个停止位 bytesize=8 # 8个数据位 ) if client_rtu.connect(): print(f"Modbus RTU已连接到系统{rtu_port}端口...") else: raise Exception(f"Modbus RTU无法连接到系统{rtu_port}端口...") return client_tcp, client_rtu def get_gauge_data(client_tcp, client_rtu): count = 2 client_tcp.write_register(41000, 1) # 将 act 信号置为 True # ================ 功能实现 ================ while True: while True: res_tcp = client_tcp.read_holding_registers(41001, 1) # 获取 time_to_go 的值 sleep(0.5) if not res_tcp.registers[0]: continue else: break try: res_rtu = client_rtu.read_holding_registers(0x0000, 2, slave=1) plus_or_minus = 1 if res_rtu.registers[0] == 0 else -1 result = res_rtu.registers[1] * plus_or_minus / 10000 now = strftime("%Y-%m-%d %H:%M:%S", localtime(time())) wb = load_workbook('results.xlsx') ws = wb['results'] ws[f'A{count}'].value = now ws[f'B{count}'].value = count-1 ws[f'C{count}'].value = float(result) wb.save('results.xlsx') wb.close() count += 1 except: client_tcp.write_register(41000, 0) # 将 act 信号置为 False sleep(2) # def main(): initializations() client_tcp, client_rtu = do_connections() get_gauge_data(client_tcp, client_rtu) if __name__ == '__main__': main()