You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DCS/protocol/ModBus/rtumaster_example.py

147 lines
5.6 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import serial
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu, hooks
import sys
from .ByteOrder import *
from .SetMessage import *
# master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0))
# exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1)
# 参数说明:
# @slave=1 : identifier of the slave. from 1 to 247.
# @function_code=READ_HOLDING_REGISTERS功能码
# @starting_address=100寄存器的开始地址
# @quantity_of_x=3寄存器/线圈的数量
# @output_value一个整数或可迭代的值1/[1,1,1,0,0,1]/xrange(12)
# @data_format对接收的数据进行格式化
class RTUMaster():
def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
try:
self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff))
self.master.set_timeout(5.0)
self.master.set_verbose(True)
except Exception as e:
pass
hooks.install_hook("modbus_rtu.RtuMaster.after_recv", afterRecv)
hooks.install_hook("modbus_rtu.RtuMaster.before_send", afterSend)
def writeSingleRegister(self, slaveId, address, outputValue, order = 'ABCD'):
try:
if '.' not in str(outputValue):
outputValue = int(outputValue)
# print(outputValue)
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address = address, output_value = outputValue)
else:
outputValue = float(outputValue)
if order == 'ABCD': # 大端模式
valueByte = floatToABCD(outputValue)
elif order == 'DCBA': # 小端模式
valueByte = floatToDCBA(outputValue)
elif order == 'BADC':
valueByte = floatToBADC(outputValue)
elif order == 'CDAB':
valueByte = floatToCDAB(outputValue)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=valueByte)
except Exception as e:
print('error')
def writeSingleCoil(self, slaveId, address, outputValue):
if outputValue in [0, 1]:
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value = outputValue)
def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
return value[0]
except Exception as e:
return 'error'
def readInputCoils(self, slaveId, startAddress, varNums):
try:
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
return value[0]
except Exception as e:
return 'error'
# -*- coding: utf_8 -*-
# import serial
# import modbus_tk
# import modbus_tk.defines as cst
# from modbus_tk import modbus_rtu
# def mod(PORT="com2"):
# red = []
# alarm = ""
# try:
# # 设定串口为从站
# master = modbus_rtu.RtuMaster(serial.Serial(port=PORT,
# baudrate=9600, bytesize=8, parity='N', stopbits=1))
# master.set_timeout(5.0)
# master.set_verbose(True)
# # 读保持寄存器
# red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码
# print(red)
# alarm = "正常"
# return list(red), alarm
# except Exception as exc:
# print(str(exc))
# alarm = (str(exc))
# return red, alarm ##如果异常就返回[],故障信息
# if __name__ == "__main__":
# mod()