You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
7.0 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp
import logging
import struct
from .SetMessage import *
from .ByteOrder import *
class TcpMaster():
def __init__(self, host=None, port=None):
try:
self.master = modbus_tcp.TcpMaster(host=host, port=port)
self.master.set_timeout(5.0)
except Exception as e:
pass
def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'):
try:
if '.' not in str(outputValue):
# 整数值,使用单寄存器写入
outputValue = int(outputValue)
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue)
else:
# 浮点数值需要使用多寄存器写入因为浮点数占用2个寄存器
outputValue = float(outputValue)
if order == 'ABCD': # 大端模式
valueByte = floatToABCD(outputValue)
elif order == 'DCBA': # 小端模式
valueByte = floatToDCBA(outputValue)
elif order == 'BADC':
valueByte = floatToBADC(outputValue)
elif order == 'CDAB':
valueByte = floatToCDAB(outputValue)
else:
valueByte = floatToABCD(outputValue)
# 浮点数必须使用 WRITE_MULTIPLE_REGISTERS因为需要写入2个寄存器
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte)
except Exception as e:
return 'error'
def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'):
"""写多个寄存器,支持不同数据类型和字节序"""
try:
processedValues = []
for value in outputValues:
if '.' not in str(value):
processedValues.append(int(value))
else:
floatValue = float(value)
if order == 'ABCD':
valueByte = floatToABCD(floatValue)
elif order == 'DCBA':
valueByte = floatToDCBA(floatValue)
elif order == 'BADC':
valueByte = floatToBADC(floatValue)
elif order == 'CDAB':
valueByte = floatToCDAB(floatValue)
processedValues.extend(valueByte)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
def writeSingleCoil(self, slaveId, address, outputValue):
try:
outputValue = int(outputValue)
if outputValue in [0, 1]:
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue)
except Exception as e:
return 'error'
def writeMultipleCoils(self, slaveId, startAddress, outputValues):
"""写多个线圈支持布尔值列表或0/1整数列表"""
try:
processedValues = []
for value in outputValues:
if isinstance(value, bool):
processedValues.append(1 if value else 0)
else:
intValue = int(value)
if intValue in [0, 1]:
processedValues.append(intValue)
else:
processedValues.append(1 if intValue else 0)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
def readHoldingRegisters(self, slaveId, startAddress, varNums, order='ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
if order == 'ABCD':
valueByte = ABCDToFloat(value)
elif order == 'DCBA':
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except:
return 'error'
def readInputRegisters(self, slaveId, startAddress, varNums, order='ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
if order == 'ABCD':
valueByte = ABCDToFloat(value)
elif order == 'DCBA':
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readCoils(self, slaveId, startAddress, varNums, order='ABCD'):
try:
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
if varNums == 1:
return value[0] if value else 0
else:
return value
except Exception as e:
return 'error'
def readInputCoils(self, slaveId, startAddress, varNums):
try:
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
if varNums == 1:
return value[0] if value else 0
else:
return value
except Exception as e:
return 'error'
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
# Read and write floats
# send some queries
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
if __name__ == "__main__":
TCPM = TcpMaster(host = '127.0.0.1', port = 502)
TCPM.writeSingleRegister(1, 0, str(1))
# TCPM.readHoldingRegisters(1, 0, 8)