You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

234 lines
8.5 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
HART模拟MODBUS-RTU从站管理器
使用modbus_tk实现HART一个主变量和三个动态变量的通讯
使用COM1端口站地址1寄存器地址0,3,5,7
"""
import threading
import time
import struct
from typing import Optional, List, Dict, Any
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
from protocol.ModBus.ByteOrder import *
class HartRtuSlaveManager:
"""HART模拟MODBUS-RTU从站管理器"""
def __init__(self, comPort='COM1', slaveId=1, baudRate=9600):
"""
初始化HART RTU从站管理器
:param comPort: 串口端口默认COM1
:param slaveId: 从站地址默认1
:param baudRate: 波特率默认9600
"""
self.comPort = comPort
self.slaveId = slaveId
self.baudRate = baudRate
self.rtuServer = None
self.rtuSlave = None
self.isConnected = False
self.isRunning = False
self.lock = threading.Lock()
# HART变量寄存器地址映射
self.registerMap = {
'primaryVariable': 0, # 主变量 - 寄存器地址0
'dynamicVariable1': 3, # 动态变量1 - 寄存器地址3
'dynamicVariable2': 5, # 动态变量2 - 寄存器地址5
'dynamicVariable3': 7 # 动态变量3 - 寄存器地址7
}
# 变量值缓存
self.variableValues = {
'primaryVariable': 0.0,
'dynamicVariable1': 0.0,
'dynamicVariable2': 0.0,
'dynamicVariable3': 0.0
}
def _initializeRegisters(self):
"""初始化寄存器数据"""
if not self.rtuSlave:
return
try:
# 初始化主变量寄存器地址0
self.rtuSlave.set_values("primary_var", 0, [0, 0])
# 初始化动态变量1寄存器地址3
self.rtuSlave.set_values("dynamic_var1", 3, [0, 0])
# 初始化动态变量2寄存器地址5
self.rtuSlave.set_values("dynamic_var2", 5, [0, 0])
# 初始化动态变量3寄存器地址7
self.rtuSlave.set_values("dynamic_var3", 7, [0, 0])
except Exception as e:
print(f"初始化寄存器失败: {e}")
def _floatToRegisters(self, value: float) -> tuple:
"""将浮点数转换为两个16位寄存器值"""
return floatToBADC(value)
def _registersToFloat(self, high: int, low: int) -> float:
"""将两个16位寄存器值转换为浮点数"""
return BADCToFloat([high, low])
def startSlave(self) -> bool:
"""启动RTU从站"""
try:
with self.lock:
if self.isConnected:
return True
# 创建串口对象
import serial
serialPort = serial.Serial(
port=self.comPort,
baudrate=self.baudRate,
bytesize=8,
parity='N',
stopbits=1,
timeout=1.0
)
# 创建RTU服务器
self.rtuServer = modbus_rtu.RtuServer(serialPort)
# 启动服务器
self.rtuServer.start()
# 添加从站
self.rtuSlave = self.rtuServer.add_slave(self.slaveId)
# 为每个HART变量添加独立的保持寄存器块
# 主变量寄存器0-1起始地址0长度2
self.rtuSlave.add_block("primary_var", cst.HOLDING_REGISTERS, 0, 2)
# 动态变量1寄存器3-4起始地址3长度2
self.rtuSlave.add_block("dynamic_var1", cst.HOLDING_REGISTERS, 3, 2)
# 动态变量2寄存器5-6起始地址5长度2
self.rtuSlave.add_block("dynamic_var2", cst.HOLDING_REGISTERS, 5, 2)
# 动态变量3寄存器7-8起始地址7长度2
self.rtuSlave.add_block("dynamic_var3", cst.HOLDING_REGISTERS, 7, 2)
# 初始化寄存器
self._initializeRegisters()
self.isConnected = True
self.isRunning = True
print(f"HART RTU从站启动成功: {self.comPort}, 站地址: {self.slaveId}")
return True
except Exception as e:
print(f"启动HART RTU从站时出错: {e}")
self.isConnected = False
return False
def stopSlave(self):
"""停止RTU从站"""
try:
with self.lock:
self.isRunning = False
if self.rtuServer:
self.rtuServer.stop()
self.rtuServer = None
self.rtuSlave = None
self.isConnected = False
print("HART RTU从站已停止")
except Exception as e:
print(f"停止HART RTU从站时出错: {e}")
# 移除数据同步线程相关方法modbus_tk会自动处理
def writeVariable(self, variableName: str, value: float) -> bool:
"""
写入HART变量值
:param variableName: 变量名称 ('primaryVariable', 'dynamicVariable1', etc.)
:param value: 变量值
:return: 写入是否成功
"""
if not self.isConnected or variableName not in self.registerMap:
return False
try:
with self.lock:
# 更新变量值缓存
self.variableValues[variableName] = value
# 获取对应的寄存器块名称
blockName = self._getBlockName(variableName)
if not blockName:
return False
# 将浮点数转换为寄存器值
high, low = self._floatToRegisters(value)
# 获取寄存器地址
regAddr = self.registerMap[variableName]
# 直接写入RTU从站寄存器使用绝对地址
if self.rtuSlave and self.isConnected:
self.rtuSlave.set_values(blockName, regAddr, [high, low])
return True
except Exception as e:
print(f"写入HART变量 {variableName} 失败: {e}")
return False
def readVariable(self, variableName: str) -> Optional[float]:
"""
读取HART变量值
:param variableName: 变量名称
:return: 变量值失败返回None
"""
if not self.isConnected or variableName not in self.registerMap:
return None
try:
with self.lock:
# 获取对应的寄存器块名称
blockName = self._getBlockName(variableName)
if not blockName:
return None
# 获取寄存器地址
regAddr = self.registerMap[variableName]
# 从RTU从站读取最新数据使用绝对地址
if self.rtuSlave:
values = self.rtuSlave.get_values(blockName, regAddr, 2)
if len(values) == 2:
high, low = values
value = self._registersToFloat(high, low)
self.variableValues[variableName] = value
return value
# 如果从RTU从站读取失败返回缓存值
return self.variableValues.get(variableName, 0.0)
except Exception as e:
print(f"读取HART变量 {variableName} 失败: {e}")
return None
def _getBlockName(self, variableName: str) -> Optional[str]:
"""根据变量名获取对应的寄存器块名称"""
blockMapping = {
'primaryVariable': 'primary_var',
'dynamicVariable1': 'dynamic_var1',
'dynamicVariable2': 'dynamic_var2',
'dynamicVariable3': 'dynamic_var3'
}
return blockMapping.get(variableName)
def isCommunicationOk(self) -> bool:
"""检查通讯是否正常"""
return self.isConnected and self.rtuServer is not None and self.rtuSlave is not None