|
|
|
|
|
import serial
|
|
|
|
|
|
import modbus_tk
|
|
|
import modbus_tk.defines as cst
|
|
|
from modbus_tk import modbus_rtu, hooks
|
|
|
import sys
|
|
|
|
|
|
from .ByteOrder import *
|
|
|
from .SetMessage import *
|
|
|
|
|
|
# master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0))
|
|
|
|
|
|
# exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1)
|
|
|
# 参数说明:
|
|
|
# @slave=1 : identifier of the slave. from 1 to 247.
|
|
|
# @function_code=READ_HOLDING_REGISTERS:功能码
|
|
|
# @starting_address=100:寄存器的开始地址
|
|
|
# @quantity_of_x=3:寄存器/线圈的数量
|
|
|
# @output_value:一个整数或可迭代的值:1/[1,1,1,0,0,1]/xrange(12)
|
|
|
# @data_format:对接收的数据进行格式化
|
|
|
|
|
|
|
|
|
|
|
|
class RTUMaster():
|
|
|
def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
|
|
|
try:
|
|
|
self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff))
|
|
|
self.master.set_timeout(5.0)
|
|
|
self.master.set_verbose(True)
|
|
|
except Exception as e:
|
|
|
pass
|
|
|
|
|
|
hooks.install_hook("modbus_rtu.RtuMaster.after_recv", afterRecv)
|
|
|
hooks.install_hook("modbus_rtu.RtuMaster.before_send", afterSend)
|
|
|
|
|
|
def writeSingleRegister(self, slaveId, address, outputValue, order = 'ABCD'):
|
|
|
try:
|
|
|
if '.' not in str(outputValue):
|
|
|
outputValue = int(outputValue)
|
|
|
# print(outputValue)
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address = address, output_value = outputValue)
|
|
|
else:
|
|
|
outputValue = float(outputValue)
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
valueByte = floatToABCD(outputValue)
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
valueByte = floatToDCBA(outputValue)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = floatToBADC(outputValue)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = floatToCDAB(outputValue)
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=valueByte)
|
|
|
except Exception as e:
|
|
|
print('error')
|
|
|
|
|
|
def writeSingleCoil(self, slaveId, address, outputValue):
|
|
|
|
|
|
if outputValue in [0, 1]:
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value = outputValue)
|
|
|
|
|
|
|
|
|
def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
|
|
|
try:
|
|
|
if order == 'int':
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
|
|
|
else:
|
|
|
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = BADCToFloat(value)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = CDABToFloat(value)
|
|
|
return valueByte
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
|
|
|
try:
|
|
|
if order == 'int':
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
|
|
|
else:
|
|
|
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
elif order == 'BADC':
|
|
|
valueByte = BADCToFloat(value)
|
|
|
elif order == 'CDAB':
|
|
|
valueByte = CDABToFloat(value)
|
|
|
return valueByte
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
|
|
|
try:
|
|
|
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
|
|
|
return value[0]
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
def readInputCoils(self, slaveId, startAddress, varNums):
|
|
|
try:
|
|
|
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
|
|
|
return value[0]
|
|
|
except Exception as e:
|
|
|
return 'error'
|
|
|
|
|
|
# -*- coding: utf_8 -*-
|
|
|
|
|
|
|
|
|
# import serial
|
|
|
# import modbus_tk
|
|
|
# import modbus_tk.defines as cst
|
|
|
# from modbus_tk import modbus_rtu
|
|
|
|
|
|
|
|
|
# def mod(PORT="com2"):
|
|
|
# red = []
|
|
|
# alarm = ""
|
|
|
# try:
|
|
|
# # 设定串口为从站
|
|
|
# master = modbus_rtu.RtuMaster(serial.Serial(port=PORT,
|
|
|
# baudrate=9600, bytesize=8, parity='N', stopbits=1))
|
|
|
# master.set_timeout(5.0)
|
|
|
# master.set_verbose(True)
|
|
|
|
|
|
# # 读保持寄存器
|
|
|
# red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码
|
|
|
# print(red)
|
|
|
# alarm = "正常"
|
|
|
# return list(red), alarm
|
|
|
|
|
|
# except Exception as exc:
|
|
|
# print(str(exc))
|
|
|
# alarm = (str(exc))
|
|
|
|
|
|
# return red, alarm ##如果异常就返回[],故障信息
|
|
|
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
|
# mod()
|