""" HART模拟MODBUS-RTU从站管理器 使用modbus_tk实现HART一个主变量和三个动态变量的通讯 使用COM1端口,站地址1,寄存器地址0,3,5,7 """ import threading import time import struct from typing import Optional, List, Dict, Any import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_rtu from protocol.ModBus.ByteOrder import * class HartRtuSlaveManager: """HART模拟MODBUS-RTU从站管理器""" def __init__(self, comPort='COM1', slaveId=1, baudRate=9600): """ 初始化HART RTU从站管理器 :param comPort: 串口端口,默认COM1 :param slaveId: 从站地址,默认1 :param baudRate: 波特率,默认9600 """ self.comPort = comPort self.slaveId = slaveId self.baudRate = baudRate self.rtuServer = None self.rtuSlave = None self.isConnected = False self.isRunning = False self.lock = threading.Lock() # HART变量寄存器地址映射 self.registerMap = { 'primaryVariable': 0, # 主变量 - 寄存器地址0 'dynamicVariable1': 3, # 动态变量1 - 寄存器地址3 'dynamicVariable2': 5, # 动态变量2 - 寄存器地址5 'dynamicVariable3': 7 # 动态变量3 - 寄存器地址7 } # 变量值缓存 self.variableValues = { 'primaryVariable': 0.0, 'dynamicVariable1': 0.0, 'dynamicVariable2': 0.0, 'dynamicVariable3': 0.0 } def _initializeRegisters(self): """初始化寄存器数据""" if not self.rtuSlave: return try: # 初始化主变量寄存器(地址0) self.rtuSlave.set_values("primary_var", 0, [0, 0]) # 初始化动态变量1寄存器(地址3) self.rtuSlave.set_values("dynamic_var1", 3, [0, 0]) # 初始化动态变量2寄存器(地址5) self.rtuSlave.set_values("dynamic_var2", 5, [0, 0]) # 初始化动态变量3寄存器(地址7) self.rtuSlave.set_values("dynamic_var3", 7, [0, 0]) except Exception as e: print(f"初始化寄存器失败: {e}") def _floatToRegisters(self, value: float) -> tuple: """将浮点数转换为两个16位寄存器值""" return floatToBADC(value) def _registersToFloat(self, high: int, low: int) -> float: """将两个16位寄存器值转换为浮点数""" return BADCToFloat([high, low]) def startSlave(self) -> bool: """启动RTU从站""" try: with self.lock: if self.isConnected: return True # 创建串口对象 import serial serialPort = serial.Serial( port=self.comPort, baudrate=self.baudRate, bytesize=8, parity='N', stopbits=1, timeout=1.0 ) # 创建RTU服务器 self.rtuServer = modbus_rtu.RtuServer(serialPort) # 启动服务器 self.rtuServer.start() # 添加从站 self.rtuSlave = self.rtuServer.add_slave(self.slaveId) # 为每个HART变量添加独立的保持寄存器块 # 主变量:寄存器0-1(起始地址0,长度2) self.rtuSlave.add_block("primary_var", cst.HOLDING_REGISTERS, 0, 2) # 动态变量1:寄存器3-4(起始地址3,长度2) self.rtuSlave.add_block("dynamic_var1", cst.HOLDING_REGISTERS, 3, 2) # 动态变量2:寄存器5-6(起始地址5,长度2) self.rtuSlave.add_block("dynamic_var2", cst.HOLDING_REGISTERS, 5, 2) # 动态变量3:寄存器7-8(起始地址7,长度2) self.rtuSlave.add_block("dynamic_var3", cst.HOLDING_REGISTERS, 7, 2) # 初始化寄存器 self._initializeRegisters() self.isConnected = True self.isRunning = True print(f"HART RTU从站启动成功: {self.comPort}, 站地址: {self.slaveId}") return True except Exception as e: print(f"启动HART RTU从站时出错: {e}") self.isConnected = False return False def stopSlave(self): """停止RTU从站""" try: with self.lock: self.isRunning = False if self.rtuServer: self.rtuServer.stop() self.rtuServer = None self.rtuSlave = None self.isConnected = False print("HART RTU从站已停止") except Exception as e: print(f"停止HART RTU从站时出错: {e}") # 移除数据同步线程相关方法,modbus_tk会自动处理 def writeVariable(self, variableName: str, value: float) -> bool: """ 写入HART变量值 :param variableName: 变量名称 ('primaryVariable', 'dynamicVariable1', etc.) :param value: 变量值 :return: 写入是否成功 """ if not self.isConnected or variableName not in self.registerMap: return False try: with self.lock: # 更新变量值缓存 self.variableValues[variableName] = value # 获取对应的寄存器块名称 blockName = self._getBlockName(variableName) if not blockName: return False # 将浮点数转换为寄存器值 high, low = self._floatToRegisters(value) # 获取寄存器地址 regAddr = self.registerMap[variableName] # 直接写入RTU从站寄存器(使用绝对地址) if self.rtuSlave and self.isConnected: self.rtuSlave.set_values(blockName, regAddr, [high, low]) return True except Exception as e: print(f"写入HART变量 {variableName} 失败: {e}") return False def readVariable(self, variableName: str) -> Optional[float]: """ 读取HART变量值 :param variableName: 变量名称 :return: 变量值,失败返回None """ if not self.isConnected or variableName not in self.registerMap: return None try: with self.lock: # 获取对应的寄存器块名称 blockName = self._getBlockName(variableName) if not blockName: return None # 获取寄存器地址 regAddr = self.registerMap[variableName] # 从RTU从站读取最新数据(使用绝对地址) if self.rtuSlave: values = self.rtuSlave.get_values(blockName, regAddr, 2) if len(values) == 2: high, low = values value = self._registersToFloat(high, low) self.variableValues[variableName] = value return value # 如果从RTU从站读取失败,返回缓存值 return self.variableValues.get(variableName, 0.0) except Exception as e: print(f"读取HART变量 {variableName} 失败: {e}") return None def _getBlockName(self, variableName: str) -> Optional[str]: """根据变量名获取对应的寄存器块名称""" blockMapping = { 'primaryVariable': 'primary_var', 'dynamicVariable1': 'dynamic_var1', 'dynamicVariable2': 'dynamic_var2', 'dynamicVariable3': 'dynamic_var3' } return blockMapping.get(variableName) def isCommunicationOk(self) -> bool: """检查通讯是否正常""" return self.isConnected and self.rtuServer is not None and self.rtuSlave is not None