import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_tcp import logging import struct from .SetMessage import * from .ByteOrder import * class TcpMaster(): def __init__(self, host=None, port=None): try: self.master = modbus_tcp.TcpMaster(host=host, port=port) self.master.set_timeout(5.0) except Exception as e: pass def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'): try: if '.' not in str(outputValue): # 整数值,使用单寄存器写入 outputValue = int(outputValue) self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue) else: # 浮点数值,需要使用多寄存器写入(因为浮点数占用2个寄存器) outputValue = float(outputValue) if order == 'ABCD': # 大端模式 valueByte = floatToABCD(outputValue) elif order == 'DCBA': # 小端模式 valueByte = floatToDCBA(outputValue) elif order == 'BADC': valueByte = floatToBADC(outputValue) elif order == 'CDAB': valueByte = floatToCDAB(outputValue) else: valueByte = floatToABCD(outputValue) # 浮点数必须使用 WRITE_MULTIPLE_REGISTERS,因为需要写入2个寄存器 self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte) except Exception as e: return 'error' def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'): """写多个寄存器,支持不同数据类型和字节序""" try: processedValues = [] for value in outputValues: if '.' not in str(value): processedValues.append(int(value)) else: floatValue = float(value) if order == 'ABCD': valueByte = floatToABCD(floatValue) elif order == 'DCBA': valueByte = floatToDCBA(floatValue) elif order == 'BADC': valueByte = floatToBADC(floatValue) elif order == 'CDAB': valueByte = floatToCDAB(floatValue) processedValues.extend(valueByte) self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues) except Exception as e: return 'error' def writeSingleCoil(self, slaveId, address, outputValue): try: outputValue = int(outputValue) if outputValue in [0, 1]: self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue) except Exception as e: return 'error' def writeMultipleCoils(self, slaveId, startAddress, outputValues): """写多个线圈,支持布尔值列表或0/1整数列表""" try: processedValues = [] for value in outputValues: if isinstance(value, bool): processedValues.append(1 if value else 0) else: intValue = int(value) if intValue in [0, 1]: processedValues.append(intValue) else: processedValues.append(1 if intValue else 0) self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues) except Exception as e: return 'error' def readHoldingRegisters(self, slaveId, startAddress, varNums, order='ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2) if order == 'ABCD': valueByte = ABCDToFloat(value) elif order == 'DCBA': valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except: return 'error' def readInputRegisters(self, slaveId, startAddress, varNums, order='ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2) if order == 'ABCD': valueByte = ABCDToFloat(value) elif order == 'DCBA': valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except Exception as e: return 'error' def readCoils(self, slaveId, startAddress, varNums, order='ABCD'): try: value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums) if varNums == 1: return value[0] if value else 0 else: return value except Exception as e: return 'error' def readInputCoils(self, slaveId, startAddress, varNums): try: value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums) if varNums == 1: return value[0] if value else 0 else: return value except Exception as e: return 'error' # print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5]) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f') # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f') # self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1) # self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]) # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f')) # Read and write floats # send some queries # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12)) if __name__ == "__main__": TCPM = TcpMaster(host = '127.0.0.1', port = 502) TCPM.writeSingleRegister(1, 0, str(1)) # TCPM.readHoldingRegisters(1, 0, 8)