You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
8.4 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import serial
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
import sys
from .ByteOrder import *
from .SetMessage import *
# master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0))
# exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1)
# 参数说明:
# @slave=1 : identifier of the slave. from 1 to 247.
# @function_code=READ_HOLDING_REGISTERS功能码
# @starting_address=100寄存器的开始地址
# @quantity_of_x=3寄存器/线圈的数量
# @output_value一个整数或可迭代的值1/[1,1,1,0,0,1]/xrange(12)
# @data_format对接收的数据进行格式化
class RTUMaster():
def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
try:
self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff))
self.master.set_timeout(5.0)
self.master.set_verbose(True)
except Exception as e:
pass
def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'):
try:
if '.' not in str(outputValue):
# 整数值,使用单寄存器写入
outputValue = int(outputValue)
self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue)
else:
# 浮点数值需要使用多寄存器写入因为浮点数占用2个寄存器
outputValue = float(outputValue)
valueByte = None # 初始化变量
if order == 'ABCD': # 大端模式
valueByte = floatToABCD(outputValue)
elif order == 'DCBA': # 小端模式
valueByte = floatToDCBA(outputValue)
elif order == 'BADC':
valueByte = floatToBADC(outputValue)
elif order == 'CDAB':
valueByte = floatToCDAB(outputValue)
else:
# 默认使用 ABCD 字节序
# print(f"Unknown byte order '{order}', using default ABCD")
valueByte = floatToABCD(outputValue)
if valueByte is not None:
# 浮点数必须使用 WRITE_MULTIPLE_REGISTERS因为需要写入2个寄存器
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte)
else:
# print(f"Failed to convert float value {outputValue} with order {order}")
return 'error'
except Exception as e:
print(f'writeSingleRegister error: {e}')
return 'error'
def writeSingleCoil(self, slaveId, address, outputValue):
try:
outputValue = int(outputValue)
if outputValue in [0, 1]:
self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue)
except Exception as e:
return 'error'
def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'):
"""写多个寄存器,支持不同数据类型和字节序"""
try:
processedValues = []
for value in outputValues:
if '.' not in str(value):
# 整数值
processedValues.append(int(value))
else:
# 浮点值 - 根据字节序转换为寄存器对
floatValue = float(value)
if order == 'ABCD': # 大端模式
valueByte = floatToABCD(floatValue)
elif order == 'DCBA': # 小端模式
valueByte = floatToDCBA(floatValue)
elif order == 'BADC':
valueByte = floatToBADC(floatValue)
elif order == 'CDAB':
valueByte = floatToCDAB(floatValue)
processedValues.extend(valueByte)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
def writeMultipleCoils(self, slaveId, startAddress, outputValues):
"""写多个线圈支持布尔值列表或0/1整数列表"""
try:
processedValues = []
for value in outputValues:
if isinstance(value, bool):
processedValues.append(1 if value else 0)
else:
intValue = int(value)
if intValue in [0, 1]:
processedValues.append(intValue)
else:
# 如果值不是0或1转换为布尔值
processedValues.append(1 if intValue else 0)
self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues)
except Exception as e:
return 'error'
def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
try:
if order == 'int':
valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
else:
value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
if order == 'ABCD': # 大端模式
valueByte = ABCDToFloat(value)
elif order == 'DCBA': # 小端模式
valueByte = DCBAToFloat(value)
elif order == 'BADC':
valueByte = BADCToFloat(value)
elif order == 'CDAB':
valueByte = CDABToFloat(value)
return valueByte
except Exception as e:
return 'error'
def readCoils(self, slaveId, startAddress, varNums, order='ABCD'):
try:
value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# 如果只读取一个线圈,返回单个值;否则返回列表
return value[0]
except Exception as e:
return 'error'
def readInputCoils(self, slaveId, startAddress, varNums):
try:
value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# 如果只读取一个线圈,返回单个值;否则返回列表
return value[0]
except Exception as e:
return 'error'
# -*- coding: utf_8 -*-
# import serial
# import modbus_tk
# import modbus_tk.defines as cst
# from modbus_tk import modbus_rtu
# def mod(PORT="com2"):
# red = []
# alarm = ""
# try:
# # 设定串口为从站
# master = modbus_rtu.RtuMaster(serial.Serial(port=PORT,
# baudrate=9600, bytesize=8, parity='N', stopbits=1))
# master.set_timeout(5.0)
# master.set_verbose(True)
# # 读保持寄存器
# red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码
# print(red)
# alarm = "正常"
# return list(red), alarm#
# except Exception as exc:
# print(str(exc))
# alarm = (str(exc))
# return red, alarm ##如果异常就返回[],故障信息
# if __name__ == "__main__":
# mod()