|
|
|
|
|
import modbus_tk
|
|
|
import modbus_tk.defines as cst
|
|
|
from modbus_tk import modbus_tcp
|
|
|
import logging
|
|
|
import struct
|
|
|
from .SetMessage import *
|
|
|
from .ByteOrder import *
|
|
|
|
|
|
|
|
|
class TcpMaster():
|
|
|
def __init__(self, host=None, port=None):
|
|
|
try:
|
|
|
self.master = modbus_tcp.TcpMaster(host=host, port=port)
|
|
|
self.master.set_timeout(5.0)
|
|
|
except Exception as e:
|
|
|
pass
|
|
|
|
|
|
def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'):
|
|
|
try:
|
|
|
if '.' not in str(outputValue):
|
|
|
# 整数值,使用单寄存器写入
|
|
|
outputValue = int(outputValue)
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue)
|
|
|
else:
|
|
|
# 浮点数值,需要使用多寄存器写入(因为浮点数占用2个寄存器)
|
|
|
outputValue = float(outputValue)
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
valueByte = floatToABCD(outputValue)
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
valueByte = floatToDCBA(outputValue)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = floatToBADC(outputValue)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = floatToCDAB(outputValue)
|
|
|
else:
|
|
|
valueByte = floatToABCD(outputValue)
|
|
|
# 浮点数必须使用 WRITE_MULTIPLE_REGISTERS,因为需要写入2个寄存器
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte)
|
|
|
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'):
|
|
|
"""写多个寄存器,支持不同数据类型和字节序"""
|
|
|
try:
|
|
|
processedValues = []
|
|
|
for value in outputValues:
|
|
|
if '.' not in str(value):
|
|
|
processedValues.append(int(value))
|
|
|
else:
|
|
|
floatValue = float(value)
|
|
|
if order == 'ABCD':
|
|
|
valueByte = floatToABCD(floatValue)
|
|
|
elif order == 'DCBA':
|
|
|
valueByte = floatToDCBA(floatValue)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = floatToBADC(floatValue)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = floatToCDAB(floatValue)
|
|
|
processedValues.extend(valueByte)
|
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues)
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def writeSingleCoil(self, slaveId, address, outputValue):
|
|
|
try:
|
|
|
outputValue = int(outputValue)
|
|
|
if outputValue in [0, 1]:
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue)
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def writeMultipleCoils(self, slaveId, startAddress, outputValues):
|
|
|
"""写多个线圈,支持布尔值列表或0/1整数列表"""
|
|
|
try:
|
|
|
processedValues = []
|
|
|
for value in outputValues:
|
|
|
if isinstance(value, bool):
|
|
|
processedValues.append(1 if value else 0)
|
|
|
else:
|
|
|
intValue = int(value)
|
|
|
if intValue in [0, 1]:
|
|
|
processedValues.append(intValue)
|
|
|
else:
|
|
|
processedValues.append(1 if intValue else 0)
|
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues)
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readHoldingRegisters(self, slaveId, startAddress, varNums, order='ABCD'):
|
|
|
try:
|
|
|
if order == 'int':
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
|
|
|
else:
|
|
|
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
|
|
|
if order == 'ABCD':
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
elif order == 'DCBA':
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = BADCToFloat(value)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = CDABToFloat(value)
|
|
|
return valueByte
|
|
|
except:
|
|
|
return 'error'
|
|
|
|
|
|
def readInputRegisters(self, slaveId, startAddress, varNums, order='ABCD'):
|
|
|
try:
|
|
|
if order == 'int':
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
|
|
|
else:
|
|
|
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
|
|
|
if order == 'ABCD':
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
elif order == 'DCBA':
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = BADCToFloat(value)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = CDABToFloat(value)
|
|
|
return valueByte
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readCoils(self, slaveId, startAddress, varNums, order='ABCD'):
|
|
|
try:
|
|
|
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
|
|
|
if varNums == 1:
|
|
|
return value[0] if value else 0
|
|
|
else:
|
|
|
return value
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readInputCoils(self, slaveId, startAddress, varNums):
|
|
|
try:
|
|
|
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
|
|
|
if varNums == 1:
|
|
|
return value[0] if value else 0
|
|
|
else:
|
|
|
return value
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
|
|
|
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
|
|
|
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
|
|
|
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
|
|
|
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
|
|
|
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
|
|
|
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
|
|
|
|
|
|
# Read and write floats
|
|
|
|
|
|
# send some queries
|
|
|
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
TCPM = TcpMaster(host = '127.0.0.1', port = 502)
|
|
|
TCPM.writeSingleRegister(1, 0, str(1))
|
|
|
# TCPM.readHoldingRegisters(1, 0, 8)
|