import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_tcp, hooks import logging import struct from protocol.ModBus.ByteOrder import * class TcpMaster(): def __init__(self, host, port): try: self.master = modbus_tcp.TcpMaster(host = host, port = port) self.master.set_timeout(5.0) except Exception as e: pass # hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv) # hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend) def writeMultipleRegister(self, slaveId, address, outputValue): try: self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=valueByte) except Exception as e: return 'error' def readHoldingRegisters(self, slaveId, startAddress, varNums): try: value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums) # if order == 'ABCD': # 大端模式 # valueByte = ABCDToFloat(value) # elif order == 'DCBA': # 小端模式 # valueByte = DCBAToFloat(value) # elif order == 'BADC': # valueByte = BADCToFloat(value) # elif order == 'CDAB': # valueByte = CDABToFloat(value) return value except: return 'error' # def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): # try: # if order == 'int': # valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0] # else: # value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2) # if order == 'ABCD': # 大端模式 # valueByte = ABCDToFloat(value) # elif order == 'DCBA': # 小端模式 # valueByte = DCBAToFloat(value) # elif order == 'BADC': # valueByte = BADCToFloat(value) # elif order == 'CDAB': # valueByte = CDABToFloat(value) # return valueByte # except Exception as e: # return 'error' # def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'): # try: # value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums) # return value[0] # except Exception as e: # return 'error' # def readInputCoils(self, slaveId, startAddress, varNums): # print(slaveId, startAddress, varNums) # try: # value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums) # return value[0] # except: # return 'error' # # print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5]) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f') # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f') # self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1) # self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]) # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f')) # Read and write floats # send some queries # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12)) if __name__ == "__main__": tcpMaster = TcpMaster('127.0.0.1', 502) a = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,1] tcpMaster.writeSingleRegister(1, 0, a) # tcpMaster.readHoldingRegisters(1, 6, 2) # tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5]) # # master = modbus_tcp.TcpMaster(host="localhost", port=502) # values = [6] # master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)