100 lines
3.1 KiB
Python
100 lines
3.1 KiB
Python
from configparser import ConfigParser
|
||
from pymodbus.client import ModbusSerialClient, ModbusTcpClient
|
||
from time import sleep, strftime, localtime, time
|
||
from openpyxl import load_workbook, Workbook
|
||
from os import rename
|
||
# import pymodbus
|
||
# pymodbus.pymodbus_apply_logging_config("DEBUG")
|
||
|
||
|
||
def initializations():
|
||
try:
|
||
now = strftime("%Y%m%d%H%M%S", localtime(time()))
|
||
rename('results.xlsx', f'results_{now}.xlsx')
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = 'results'
|
||
ws['A1'].value = '时间'
|
||
ws['B1'].value = '次数'
|
||
ws['C1'].value = '结果'
|
||
wb.save('results.xlsx')
|
||
wb.close()
|
||
|
||
|
||
def do_connections():
|
||
configs = ConfigParser()
|
||
configs.read('conf.ini')
|
||
|
||
# ================ 配置Modbus TCP客户端 ================
|
||
tcp_host = configs.get('md_tcp', 'addr')
|
||
tcp_port = configs.getint('md_tcp', 'port')
|
||
|
||
client_tcp = ModbusTcpClient(tcp_host, tcp_port)
|
||
if client_tcp.connect():
|
||
print(f"Modbus TCP已连接到{tcp_host}:{tcp_port}...")
|
||
else:
|
||
raise Exception(f"Modbus TCP无法连接到{tcp_host}:{tcp_port}...")
|
||
|
||
# ================ 配置Modbus RTU客户端 ================
|
||
rtu_port = configs.get('md_rtu', 'port')
|
||
|
||
client_rtu = ModbusSerialClient(
|
||
method='rtu',
|
||
port=rtu_port, # 根据实际情况调整端口,Windows上可能是 'COM3'
|
||
baudrate=38400, # 根据协议设定波特率为38400
|
||
timeout=1,
|
||
parity='N', # 无奇偶校验
|
||
stopbits=2, # 2个停止位
|
||
bytesize=8 # 8个数据位
|
||
)
|
||
if client_rtu.connect():
|
||
print(f"Modbus RTU已连接到系统{rtu_port}端口...")
|
||
else:
|
||
raise Exception(f"Modbus RTU无法连接到系统{rtu_port}端口...")
|
||
|
||
return client_tcp, client_rtu
|
||
|
||
|
||
def get_gauge_data(client_tcp, client_rtu):
|
||
count = 2
|
||
client_tcp.write_register(41000, 1) # 将 act 信号置为 True
|
||
# ================ 功能实现 ================
|
||
while True:
|
||
while True:
|
||
res_tcp = client_tcp.read_holding_registers(41001, 1) # 获取 time_to_go 的值
|
||
sleep(0.5)
|
||
if not res_tcp.registers[0]:
|
||
continue
|
||
else:
|
||
break
|
||
try:
|
||
res_rtu = client_rtu.read_holding_registers(0x0000, 2, slave=1)
|
||
plus_or_minus = 1 if res_rtu.registers[0] == 0 else -1
|
||
result = res_rtu.registers[1] * plus_or_minus / 10000
|
||
now = strftime("%Y-%m-%d %H:%M:%S", localtime(time()))
|
||
wb = load_workbook('results.xlsx')
|
||
ws = wb['results']
|
||
ws[f'A{count}'].value = now
|
||
ws[f'B{count}'].value = count-1
|
||
ws[f'C{count}'].value = float(result)
|
||
wb.save('results.xlsx')
|
||
wb.close()
|
||
count += 1
|
||
except:
|
||
client_tcp.write_register(41000, 0) # 将 act 信号置为 False
|
||
|
||
sleep(2) #
|
||
|
||
|
||
def main():
|
||
initializations()
|
||
client_tcp, client_rtu = do_connections()
|
||
get_gauge_data(client_tcp, client_rtu)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|