You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
8.4 KiB
Python

7 months ago
import serial
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
7 months ago
import sys
from .ByteOrder import *
from .SetMessage import *
# master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0))
# exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1)
# 参数说明:
# @slave=1 : identifier of the slave. from 1 to 247.
# @function_code=READ_HOLDING_REGISTERS功能码
# @starting_address=100寄存器的开始地址
# @quantity_of_x=3寄存器/线圈的数量
# @output_value一个整数或可迭代的值1/[1,1,1,0,0,1]/xrange(12)
# @data_format对接收的数据进行格式化
class RTUMaster():
def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
try:
self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff))
self.master.set_timeout(5.0)
self.master.set_verbose(True)
except Exception as e:
pass
def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'):
7 months ago
try:
if '.' not in str(outputValue):
# 整数值,使用单寄存器写入
7 months ago
outputValue = int(outputValue)
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue)
7 months ago
else:
# 浮点数值需要使用多寄存器写入因为浮点数占用2个寄存器
7 months ago
outputValue = float(outputValue)
valueByte = None # 初始化变量
if order == 'ABCD': # 大端模式
7 months ago
valueByte = floatToABCD(outputValue)
elif order == 'DCBA': # 小端模式
7 months ago
valueByte = floatToDCBA(outputValue)
elif order == 'BADC':
valueByte = floatToBADC(outputValue)
elif order == 'CDAB':
valueByte = floatToCDAB(outputValue)
else:
# 默认使用 ABCD 字节序
# print(f"Unknown byte order '{order}', using default ABCD")
valueByte = floatToABCD(outputValue)
if valueByte is not None:
# 浮点数必须使用 WRITE_MULTIPLE_REGISTERS因为需要写入2个寄存器
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte)
else:
# print(f"Failed to convert float value {outputValue} with order {order}")
return 'error'
7 months ago
except Exception as e:
print(f'writeSingleRegister error: {e}')
return 'error'
7 months ago
def writeSingleCoil(self, slaveId, address, outputValue):
try:
outputValue = int(outputValue)
if outputValue in [0, 1]:
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue)
except Exception as e:
return 'error'
def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'):
"""写多个寄存器,支持不同数据类型和字节序"""
try:
processedValues = []
for value in outputValues:
if '.' not in str(value):
# 整数值
processedValues.append(int(value))
else:
# 浮点值 - 根据字节序转换为寄存器对
floatValue = float(value)
if order == 'ABCD': # 大端模式
valueByte = floatToABCD(floatValue)
elif order == 'DCBA': # 小端模式
valueByte = floatToDCBA(floatValue)
elif order == 'BADC':
valueByte = floatToBADC(floatValue)
elif order == 'CDAB':
valueByte = floatToCDAB(floatValue)
processedValues.extend(valueByte)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
def writeMultipleCoils(self, slaveId, startAddress, outputValues):
"""写多个线圈支持布尔值列表或0/1整数列表"""
try:
processedValues = []
for value in outputValues:
if isinstance(value, bool):
processedValues.append(1 if value else 0)
else:
intValue = int(value)
if intValue in [0, 1]:
processedValues.append(intValue)
else:
# 如果值不是0或1转换为布尔值
processedValues.append(1 if intValue else 0)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
7 months ago
def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readCoils(self, slaveId, startAddress, varNums, order='ABCD'):
7 months ago
try:
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# 如果只读取一个线圈,返回单个值;否则返回列表
7 months ago
return value[0]
except Exception as e:
return 'error'
def readInputCoils(self, slaveId, startAddress, varNums):
try:
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# 如果只读取一个线圈,返回单个值;否则返回列表
7 months ago
return value[0]
except Exception as e:
return 'error'
# -*- coding: utf_8 -*-
# import serial
# import modbus_tk
# import modbus_tk.defines as cst
# from modbus_tk import modbus_rtu
# def mod(PORT="com2"):
# red = []
# alarm = ""
# try:
# # 设定串口为从站
# master = modbus_rtu.RtuMaster(serial.Serial(port=PORT,
# baudrate=9600, bytesize=8, parity='N', stopbits=1))
# master.set_timeout(5.0)
# master.set_verbose(True)
# # 读保持寄存器
# red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码
# print(red)
# alarm = "正常"
# return list(red), alarm#
7 months ago
# except Exception as exc:
# print(str(exc))
# alarm = (str(exc))
# return red, alarm ##如果异常就返回[],故障信息
# if __name__ == "__main__":
# mod()