You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.9 KiB
Python

2 years ago
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp, hooks
import logging
import struct
from protocol.ModBus.ByteOrder import *
class TcpMaster():
def __init__(self, host, port):
try:
self.master = modbus_tcp.TcpMaster(host = host, port = port)
self.master.set_timeout(5.0)
except Exception as e:
pass
# hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv)
# hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend)
2 years ago
def writeMultipleRegister(self, slaveId, address, outputValue):
2 years ago
try:
2 years ago
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=outputValue)
2 years ago
except Exception as e:
2 years ago
print(e)
2 years ago
return 'error'
2 years ago
2 years ago
2 years ago
def readHoldingRegisters(self, slaveId, startAddress, varNums):
2 years ago
try:
2 years ago
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)
2 years ago
return value
2 years ago
except:
return 'error'
# def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# if order == 'int':
# valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
# else:
# value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
# if order == 'ABCD': # 大端模式
# valueByte = ABCDToFloat(value)
# elif order == 'DCBA': # 小端模式
# valueByte = DCBAToFloat(value)
# elif order == 'BADC':
# valueByte = BADCToFloat(value)
# elif order == 'CDAB':
# valueByte = CDABToFloat(value)
# return valueByte
# except Exception as e:
# return 'error'
# def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# return value[0]
# except Exception as e:
# return 'error'
# def readInputCoils(self, slaveId, startAddress, varNums):
# print(slaveId, startAddress, varNums)
# try:
# value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# return value[0]
# except:
# return 'error'
#
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
# Read and write floats
# send some queries
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
if __name__ == "__main__":
tcpMaster = TcpMaster('127.0.0.1', 502)
a = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,1]
tcpMaster.writeSingleRegister(1, 0, a)
# tcpMaster.readHoldingRegisters(1, 6, 2)
# tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5])
#
# master = modbus_tcp.TcpMaster(host="localhost", port=502)
# values = [6]
# master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)