You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			106 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			106 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
| 
 | |
| import modbus_tk
 | |
| import modbus_tk.defines as cst
 | |
| from modbus_tk import modbus_tcp, hooks
 | |
| import logging
 | |
| import struct
 | |
| from protocol.ModBus.ByteOrder import *
 | |
| 
 | |
| 
 | |
| class TcpMaster():
 | |
|     def __init__(self, host, port):
 | |
|         try:
 | |
|             self.master = modbus_tcp.TcpMaster(host = host, port = port)
 | |
|             self.master.set_timeout(5.0)
 | |
|         except Exception as e:
 | |
|             pass
 | |
|         
 | |
|         # hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv)
 | |
|         # hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend)
 | |
| 
 | |
|     def writeMultipleRegister(self, slaveId, address, outputValue):
 | |
|         try:
 | |
|             self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=valueByte)
 | |
|         except Exception as e:
 | |
|             return 'error'
 | |
| 
 | |
| 
 | |
|     def readHoldingRegisters(self, slaveId, startAddress, varNums):
 | |
|         try:
 | |
|             value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)
 | |
|                 # if order == 'ABCD': # 大端模式
 | |
|                 #     valueByte = ABCDToFloat(value)
 | |
|                 # elif order == 'DCBA': # 小端模式
 | |
|                 #     valueByte = DCBAToFloat(value)
 | |
|                 # elif order == 'BADC':
 | |
|                 #     valueByte = BADCToFloat(value)
 | |
|                 # elif order == 'CDAB':
 | |
|                 #     valueByte = CDABToFloat(value)
 | |
|             return value
 | |
| 
 | |
|         except:
 | |
|             return 'error'
 | |
| 
 | |
|     # def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
 | |
|     #     try:
 | |
|     #         if order == 'int':
 | |
|     #             valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
 | |
|     #         else:
 | |
|     #             value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
 | |
|     #             if order == 'ABCD': # 大端模式
 | |
|     #                 valueByte = ABCDToFloat(value)
 | |
|     #             elif order == 'DCBA': # 小端模式
 | |
|     #                 valueByte = DCBAToFloat(value)
 | |
|     #             elif order == 'BADC':
 | |
|     #                 valueByte = BADCToFloat(value)
 | |
|     #             elif order == 'CDAB':
 | |
|     #                 valueByte = CDABToFloat(value)
 | |
|     #         return valueByte
 | |
|     #     except Exception as e:
 | |
|     #         return 'error'
 | |
| 
 | |
|     # def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
 | |
|     #     try:
 | |
|     #         value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
 | |
|     #         return value[0]
 | |
|     #     except Exception as e:
 | |
|     #         return 'error'
 | |
|     # def readInputCoils(self, slaveId, startAddress, varNums):
 | |
|     #     print(slaveId, startAddress, varNums)
 | |
|     #     try:
 | |
|     #         value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
 | |
|     #         return value[0]
 | |
|     #     except:
 | |
|     #         return 'error'
 | |
|     #
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
|         # print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
 | |
|         # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
 | |
|         # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
 | |
|         # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
 | |
|         # self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
 | |
|         # self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
 | |
|         # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
 | |
| 
 | |
|         # Read and write floats
 | |
| 
 | |
|         # send some queries
 | |
|         # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     tcpMaster = TcpMaster('127.0.0.1', 502)
 | |
|     a = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,1]
 | |
|     tcpMaster.writeSingleRegister(1, 0, a)
 | |
|     # tcpMaster.readHoldingRegisters(1, 6, 2)
 | |
|     # tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5])
 | |
|     #
 | |
|     # master = modbus_tcp.TcpMaster(host="localhost", port=502)
 | |
|     # values = [6]
 | |
|     # master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values) |