|
|
|
|
|
|
|
|
|
import serial
|
|
|
|
|
|
|
|
|
|
import modbus_tk
|
|
|
|
|
import modbus_tk.defines as cst
|
|
|
|
|
from modbus_tk import modbus_rtu
|
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
from .ByteOrder import *
|
|
|
|
|
from .SetMessage import *
|
|
|
|
|
|
|
|
|
|
# master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0))
|
|
|
|
|
|
|
|
|
|
# exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1)
|
|
|
|
|
# 参数说明:
|
|
|
|
|
# @slave=1 : identifier of the slave. from 1 to 247.
|
|
|
|
|
# @function_code=READ_HOLDING_REGISTERS:功能码
|
|
|
|
|
# @starting_address=100:寄存器的开始地址
|
|
|
|
|
# @quantity_of_x=3:寄存器/线圈的数量
|
|
|
|
|
# @output_value:一个整数或可迭代的值:1/[1,1,1,0,0,1]/xrange(12)
|
|
|
|
|
# @data_format:对接收的数据进行格式化
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RTUMaster():
|
|
|
|
|
def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
|
|
|
|
|
try:
|
|
|
|
|
self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff))
|
|
|
|
|
self.master.set_timeout(5.0)
|
|
|
|
|
self.master.set_verbose(True)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'):
|
|
|
|
|
try:
|
|
|
|
|
if '.' not in str(outputValue):
|
|
|
|
|
# 整数值,使用单寄存器写入
|
|
|
|
|
outputValue = int(outputValue)
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue)
|
|
|
|
|
else:
|
|
|
|
|
# 浮点数值,需要使用多寄存器写入(因为浮点数占用2个寄存器)
|
|
|
|
|
outputValue = float(outputValue)
|
|
|
|
|
valueByte = None # 初始化变量
|
|
|
|
|
|
|
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
|
|
valueByte = floatToABCD(outputValue)
|
|
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
|
|
valueByte = floatToDCBA(outputValue)
|
|
|
|
|
elif order == 'BADC':
|
|
|
|
|
valueByte = floatToBADC(outputValue)
|
|
|
|
|
elif order == 'CDAB':
|
|
|
|
|
valueByte = floatToCDAB(outputValue)
|
|
|
|
|
else:
|
|
|
|
|
# 默认使用 ABCD 字节序
|
|
|
|
|
# print(f"Unknown byte order '{order}', using default ABCD")
|
|
|
|
|
valueByte = floatToABCD(outputValue)
|
|
|
|
|
|
|
|
|
|
if valueByte is not None:
|
|
|
|
|
# 浮点数必须使用 WRITE_MULTIPLE_REGISTERS,因为需要写入2个寄存器
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte)
|
|
|
|
|
else:
|
|
|
|
|
# print(f"Failed to convert float value {outputValue} with order {order}")
|
|
|
|
|
return 'error'
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f'writeSingleRegister error: {e}')
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def writeSingleCoil(self, slaveId, address, outputValue):
|
|
|
|
|
try:
|
|
|
|
|
outputValue = int(outputValue)
|
|
|
|
|
if outputValue in [0, 1]:
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'):
|
|
|
|
|
"""写多个寄存器,支持不同数据类型和字节序"""
|
|
|
|
|
try:
|
|
|
|
|
processedValues = []
|
|
|
|
|
for value in outputValues:
|
|
|
|
|
if '.' not in str(value):
|
|
|
|
|
# 整数值
|
|
|
|
|
processedValues.append(int(value))
|
|
|
|
|
else:
|
|
|
|
|
# 浮点值 - 根据字节序转换为寄存器对
|
|
|
|
|
floatValue = float(value)
|
|
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
|
|
valueByte = floatToABCD(floatValue)
|
|
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
|
|
valueByte = floatToDCBA(floatValue)
|
|
|
|
|
elif order == 'BADC':
|
|
|
|
|
valueByte = floatToBADC(floatValue)
|
|
|
|
|
elif order == 'CDAB':
|
|
|
|
|
valueByte = floatToCDAB(floatValue)
|
|
|
|
|
processedValues.extend(valueByte)
|
|
|
|
|
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def writeMultipleCoils(self, slaveId, startAddress, outputValues):
|
|
|
|
|
"""写多个线圈,支持布尔值列表或0/1整数列表"""
|
|
|
|
|
try:
|
|
|
|
|
processedValues = []
|
|
|
|
|
for value in outputValues:
|
|
|
|
|
if isinstance(value, bool):
|
|
|
|
|
processedValues.append(1 if value else 0)
|
|
|
|
|
else:
|
|
|
|
|
intValue = int(value)
|
|
|
|
|
if intValue in [0, 1]:
|
|
|
|
|
processedValues.append(intValue)
|
|
|
|
|
else:
|
|
|
|
|
# 如果值不是0或1,转换为布尔值
|
|
|
|
|
processedValues.append(1 if intValue else 0)
|
|
|
|
|
|
|
|
|
|
self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
|
|
|
|
|
try:
|
|
|
|
|
if order == 'int':
|
|
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
|
|
|
|
|
else:
|
|
|
|
|
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
|
|
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
|
|
elif order == 'BADC':
|
|
|
|
|
valueByte = BADCToFloat(value)
|
|
|
|
|
elif order == 'CDAB':
|
|
|
|
|
valueByte = CDABToFloat(value)
|
|
|
|
|
return valueByte
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
|
|
|
|
|
try:
|
|
|
|
|
if order == 'int':
|
|
|
|
|
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
|
|
|
|
|
else:
|
|
|
|
|
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
|
|
|
|
|
if order == 'ABCD': # 大端模式
|
|
|
|
|
valueByte = ABCDToFloat(value)
|
|
|
|
|
elif order == 'DCBA': # 小端模式
|
|
|
|
|
valueByte = DCBAToFloat(value)
|
|
|
|
|
elif order == 'BADC':
|
|
|
|
|
valueByte = BADCToFloat(value)
|
|
|
|
|
elif order == 'CDAB':
|
|
|
|
|
valueByte = CDABToFloat(value)
|
|
|
|
|
return valueByte
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def readCoils(self, slaveId, startAddress, varNums, order='ABCD'):
|
|
|
|
|
try:
|
|
|
|
|
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
|
|
|
|
|
# 如果只读取一个线圈,返回单个值;否则返回列表
|
|
|
|
|
return value[0]
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
def readInputCoils(self, slaveId, startAddress, varNums):
|
|
|
|
|
try:
|
|
|
|
|
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
|
|
|
|
|
# 如果只读取一个线圈,返回单个值;否则返回列表
|
|
|
|
|
return value[0]
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return 'error'
|
|
|
|
|
|
|
|
|
|
# -*- coding: utf_8 -*-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# import serial
|
|
|
|
|
# import modbus_tk
|
|
|
|
|
# import modbus_tk.defines as cst
|
|
|
|
|
# from modbus_tk import modbus_rtu
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def mod(PORT="com2"):
|
|
|
|
|
# red = []
|
|
|
|
|
# alarm = ""
|
|
|
|
|
# try:
|
|
|
|
|
# # 设定串口为从站
|
|
|
|
|
# master = modbus_rtu.RtuMaster(serial.Serial(port=PORT,
|
|
|
|
|
# baudrate=9600, bytesize=8, parity='N', stopbits=1))
|
|
|
|
|
# master.set_timeout(5.0)
|
|
|
|
|
# master.set_verbose(True)
|
|
|
|
|
|
|
|
|
|
# # 读保持寄存器
|
|
|
|
|
# red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码
|
|
|
|
|
# print(red)
|
|
|
|
|
# alarm = "正常"
|
|
|
|
|
# return list(red), alarm#
|
|
|
|
|
|
|
|
|
|
# except Exception as exc:
|
|
|
|
|
# print(str(exc))
|
|
|
|
|
# alarm = (str(exc))
|
|
|
|
|
|
|
|
|
|
# return red, alarm ##如果异常就返回[],故障信息
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
|
|
|
# mod()
|