from utils.DBModels.ProtocolModel import ( ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ) from model.ProjectModel.GlobalConfigManager import GlobalConfigManager from model.ProjectModel.DeviceManage import DevicesManange from protocol.HartRtuSlaveManager import HartRtuSlaveManager from protocol.TCP.TCPVarManage import * from protocol.TCP.TemToMv import temToMv from protocol.RPC.RpcClient import RpcClient from protocol.RPC.RpcServer import RpcServer from protocol.ModBus.ModbusManager import ModbusManager from utils import Globals import threading import time import json from typing import Dict, Any, Optional class ProtocolManage(object): """通讯变量查找类,用于根据变量名在数据库模型中检索变量信息""" def __init__(self): # 根据全局配置动态获取启用的模型类 self.MODEL_CLASSES = GlobalConfigManager.getEnabledModelClasses() # 根据配置决定是否初始化TCPVarManager(仅当启用IO或TCRTD模块时) if GlobalConfigManager.needsTcpVarManager(): # self.tcpVarManager = TCPVarManager('192.168.1.50', 5055) self.tcpVarManager = TCPVarManager('127.0.0.1', 8000) self.writeTC = [0] * 8 self.writeRTD = [0] * 8 else: self.tcpVarManager = None self.writeTC = None self.writeRTD = None self.RpcClient = None self.RpcServer = None self.varInfoCache = {} # 保持驼峰命名 self.historyDBManage = Globals.getValue('historyDBManage') self.variableValueCache = {} # {varName: value} self.profibusManager = None self.profibusVarMap = {} self.profibusDeviceMeta = {} self.profibusEnabled = GlobalConfigManager.isModuleEnabled('profibusModule') self._profibusConnected = False self._profibusLastUpdate = 0 self.profibusLock = threading.Lock() if self.profibusEnabled: # print('yeyeye') self._initializeProfibusSupport() # Modbus 管理器 self.modbusManager = ModbusManager() # self.modbusManager.setVariableCache(self.variableValueCache, None, self.varInfoCache) # HART模拟RTU从站管理器 self.hartRtuSlaveManager = None if GlobalConfigManager.isModuleEnabled('hartSimulateModule'): try: self.hartRtuSlaveManager = HartRtuSlaveManager() print("HART模拟RTU从站管理器已初始化") except Exception as e: print(f"初始化HART模拟RTU从站管理器失败: {e}") self.hartRtuSlaveManager = None self.refreshVarCache() self.cacheLock = threading.Lock() # 设置 Modbus 管理器的缓存锁 self.modbusManager.setVariableCache(self.variableValueCache, self.cacheLock, self.varInfoCache) self.readThreadStop = threading.Event() self.readThread = threading.Thread(target=self._backgroundReadAllVariables, daemon=True) self.readThread.start() def clearVarCache(self): """清空变量信息缓存""" self.varInfoCache.clear() def refreshVarCache(self): """重新加载所有变量信息到缓存(可选实现)""" self.varInfoCache.clear() # 重新获取最新的启用模型类列表 self.MODEL_CLASSES = GlobalConfigManager.getEnabledModelClasses() for modelClass in self.MODEL_CLASSES: try: for varInstance in modelClass.select(): varName = getattr(varInstance, 'varName', None) if varName: varData = {} for field in varInstance._meta.sorted_fields: fieldName = field.name varData[fieldName] = getattr(varInstance, fieldName) self.varInfoCache[varName] = { 'modelType': modelClass.__name__, 'variableData': varData } except Exception as e: print(f"刷新缓存时出错: {modelClass.__name__}: {e}") if self.profibusEnabled and self.profibusManager: self._refreshProfibusVarCache() def lookupVariable(self, variableName): """ 根据变量名检索变量信息(优先查缓存) :param variableName: 要查询的变量名 :return: 包含变量信息和模型类型的字典,如果未找到返回None """ if variableName in self.varInfoCache: return self.varInfoCache[variableName] for modelClass in self.MODEL_CLASSES: varInstance = modelClass.getByName(variableName) if varInstance: varData = {} for field in varInstance._meta.sorted_fields: fieldName = field.name varData[fieldName] = getattr(varInstance, fieldName) result = { 'modelType': modelClass.__name__, 'variableData': varData } self.varInfoCache[variableName] = result # 写入缓存 return result return None def setClentMode(self, clentName, rabbitmqHost='localhost'): if self.RpcClient: self.RpcClient.close() self.RpcClient = RpcClient(clentName, rabbitmqHost, self) # 使用非阻塞方式启动RPC客户端 self.RpcClient.startNonBlocking() def closeClient(self): if self.RpcClient: self.RpcClient.close() self.RpcClient = None def setServerMode(self, rabbitmqHost='localhost'): if self.RpcServer: self.RpcServer.close() self.RpcServer = RpcServer(rabbitmqHost) def addClient(self, clientName): if self.RpcServer: self.RpcServer.addClient(clientName=clientName) else: return def closeServer(self): if self.RpcServer: self.RpcServer.close() self.RpcServer = None def writeVariableValue(self, variableName, value, trigger=None, timeoutMS=2000): """ 根据变量名写入变量值,根据变量类型进行不同处理(不保存到数据库) :param variableName: 变量名 :param value: 要写入的值 :return: 写入成功返回True,否则返回False """ varInfo = self.lookupVariable(variableName) if not varInfo: if self.RpcServer: try: existsVar, clientNames = self.RpcServer.existsVar(variableName) if existsVar: result = self.RpcServer.writeVar(variableName, value) return result.get("result") == "success" if result else False else: return False except Exception as e: print(f"RPC写入变量 {variableName} 失败: {e}") return False return False modelType = varInfo['modelType'] info = varInfo['variableData'] try: # 拆分为四个独立的Modbus协议条件判断 if modelType == 'ModbusTcpMasterVar': res = self.modbusManager.writeModbusTcpMasterValue(info, value) # print(res) if res == 'error': return elif modelType == 'ModbusTcpSlaveVar': res = self.modbusManager.writeModbusTcpSlaveValue(info, value) if res == 'error': return elif modelType == 'ModbusRtuMasterVar': res = self.modbusManager.writeModbusRtuMasterValue(info, value) if res == 'error': return elif modelType == 'ModbusRtuSlaveVar': res = self.modbusManager.writeModbusRtuSlaveValue(info, value) if res == 'error': return # HART协议变量处理 elif modelType == 'HartVar': pass # 温度/RTD变量处理 elif modelType == 'TcRtdVar': if not self.tcpVarManager: print("TCPVarManager未初始化,无法处理TcRtdVar变量") return False channel = int(info['channelNumber']) - 1 varType = info['varType'] compensationVar = float(info['compensationVar']) varModel = info['varModel'] model = self.getModelType(varModel) if self.getModelType(varModel) else localModel if model == localModel: if varType == 'PT100': self.writeRTD[channel] = value else: self.writeTC[channel] = value if varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A', 'PT100'] and model != SimModel: value = temToMv(varType, value + compensationVar) if trigger and varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: trigger = TCTrigger if trigger and varType in ['PT100']: trigger = RTDTrigger self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) # 模拟量变量处理 elif modelType == 'AnalogVar': if not self.tcpVarManager: print("TCPVarManager未初始化,无法处理AnalogVar变量") return False channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) if self.getModelType(varModel) else localModel if info['varType'] in ['AI', 'AO']: value = self.getRealAO(value, info['max'], info['min']) trigger = FPGATrigger if trigger else trigger self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) # PROFIBUS变量处理 elif modelType == 'ProfibusVar': success, normalizedValue = self._writeProfibusVariable(info, value) if not success: return False value = normalizedValue # HART模拟变量处理 elif modelType == 'HartSimulateVar': if not self.hartRtuSlaveManager: print("HART RTU从站管理器未初始化,无法处理HartSimulateVar变量") return False # 根据变量名确定是主变量还是动态变量 varName = variableName.lower() if 'primary' in varName or '主变量' in variableName: success = self.hartRtuSlaveManager.writeVariable('primaryVariable', float(value)) elif 'dynamic1' in varName or '动态变量1' in variableName: success = self.hartRtuSlaveManager.writeVariable('dynamicVariable1', float(value)) elif 'dynamic2' in varName or '动态变量2' in variableName: success = self.hartRtuSlaveManager.writeVariable('dynamicVariable2', float(value)) elif 'dynamic3' in varName or '动态变量3' in variableName: success = self.hartRtuSlaveManager.writeVariable('dynamicVariable3', float(value)) else: # 默认写入主变量 success = self.hartRtuSlaveManager.writeVariable('primaryVariable', float(value)) if not success: print(f"HART模拟变量 {variableName} 写入失败") return False rpcValue = self._prepareExternalValue(value) if self.RpcClient: self.RpcClient.setVarContent( variableName, rpcValue, info.get('min'), info.get('max'), info.get('varType') ) return True except Exception as e: print(f"写入变量值失败: {str(e)}") return False def _backgroundReadAllVariables(self, interval=1.0): """后台读取所有变量,增加间隔减少RPC调用频率""" while not self.readThreadStop.is_set(): try: allVarNames = list(self.getAllVariableNames()) if self.profibusEnabled and self.profibusManager: self._updateProfibusAreas(force=True) for varName in allVarNames: if self.readThreadStop.is_set(): break try: value = self._readVariableValueOriginal(varName) with self.cacheLock: self.variableValueCache[varName] = value except Exception as e: # 单个变量读取失败不影响其他变量 print(f"读取变量 {varName} 失败: {e}") continue # 增加间隔,减少RPC调用频率 time.sleep(interval) except Exception as e: print(f"后台读取线程异常: {e}") time.sleep(5) # 异常时等待更长时间 # ==================== PROFIBUS 变量管理 ==================== def _initializeProfibusSupport(self): try: self.profibusManager = DevicesManange() self._loadProfibusDevicesFromDB() self.profibusManager.recalculateAddress() except Exception as e: print(f"初始化PROFIBUS管理器失败: {e}") self.profibusManager = None self.profibusEnabled = False def _loadProfibusDevicesFromDB(self): self.profibusDeviceMeta = {} allDevices = DevicesManange.getAllDevice() if not allDevices or isinstance(allDevices, str): return for deviceRow in allDevices: if not deviceRow: continue try: deviceName = deviceRow[0] except (IndexError, TypeError): continue proType = deviceRow[1] if len(deviceRow) > 1 else None masterSlaveModel = deviceRow[2] if len(deviceRow) > 2 else None areaJson = deviceRow[3] if len(deviceRow) > 3 else None pvUpper = deviceRow[4] if len(deviceRow) > 4 else None pvLower = deviceRow[5] if len(deviceRow) > 5 else None pvUnit = deviceRow[6] if len(deviceRow) > 6 else None self.profibusDeviceMeta[deviceName] = { 'pvUpperLimit': pvUpper, 'pvLowerLimit': pvLower, 'pvUnit': pvUnit } try: self.profibusManager.addDevice( proType=proType, masterSlaveModel=masterSlaveModel, deviceName=deviceName ) except Exception as e: print(f"初始化设备 {deviceName} 失败: {e}") continue self._loadAreasForDevice(deviceName, areaJson) def _loadAreasForDevice(self, deviceName, areaJson): if not areaJson: return try: areas = json.loads(areaJson) if isinstance(areaJson, str) else areaJson except Exception as e: print(f"解析设备 {deviceName} 的区域配置失败: {e}") return device = self.profibusManager.getDevice(deviceName) if not device or not areas: return for areaConfig in areas: if not isinstance(areaConfig, dict): continue dataType = areaConfig.get('type') valueName = areaConfig.get('valueName') if not dataType or not valueName: continue order = areaConfig.get('order', 'ABCD') bytesCount = areaConfig.get('bytes', 0) try: bytesCount = int(bytesCount) except (TypeError, ValueError): bytesCount = 0 try: device.addArea( type=dataType, nums=1, bytes=bytesCount, order=order, valueName=valueName ) except Exception as e: print(f"添加设备 {deviceName} 的区域 {valueName} 失败: {e}") continue def _refreshProfibusVarCache(self): self.profibusVarMap.clear() if not self.profibusEnabled: return with self.profibusLock: try: self.profibusManager = DevicesManange() self._loadProfibusDevicesFromDB() self.profibusManager.recalculateAddress() self._profibusConnected = False self._profibusLastUpdate = 0 except Exception as e: print(f"刷新PROFIBUS设备失败: {e}") return deviceGroups = [ self.profibusManager.dpMasterDevices, self.profibusManager.dpSlaveDevices, self.profibusManager.paMasterDevices, self.profibusManager.paSlaveDevices ] snapshot = [] for devicesDict in deviceGroups: for deviceName, device in devicesDict.items(): deviceMeta = self.profibusDeviceMeta.get(deviceName, {}) for areaIndex, area in enumerate(device.areas): valueName = getattr(area, 'valueName', None) if not valueName: continue areaInfo = { 'deviceName': deviceName, 'areaIndex': areaIndex, 'areaType': area.type, 'bytes': area.bytes, 'order': area.order, 'valueName': valueName, 'proType': device.type, 'masterSlaveModel': device.masterOrSlave, 'min': deviceMeta.get('pvLowerLimit'), 'max': deviceMeta.get('pvUpperLimit'), 'unit': deviceMeta.get('pvUnit'), 'varType': area.type } snapshot.append((valueName, areaInfo)) for valueName, areaInfo in snapshot: self.profibusVarMap[valueName] = areaInfo self.varInfoCache[valueName] = { 'modelType': 'ProfibusVar', 'variableData': areaInfo } def _ensureProfibusConnected(self): if not self.profibusManager: return False if self._profibusConnected: return True try: self.profibusManager.connect() self._profibusConnected = True except Exception as e: print(f"连接PROFIBUS失败: {e}") self._profibusConnected = False return False return True def _updateProfibusAreas(self, force=False): if not self.profibusManager: return now = time.time() if not force and (now - self._profibusLastUpdate) < 0.5: return if not self._ensureProfibusConnected(): return with self.profibusLock: try: self.profibusManager.readAreas() self._profibusLastUpdate = now except Exception as e: print(f"读取PROFIBUS区域失败: {e}") def _writeProfibusVariable(self, info, rawValue): if not self.profibusManager: return False, None if not self._ensureProfibusConnected(): return False, None device = self.profibusManager.getDevice(info['deviceName']) if not device: return False, None areaIndex = info['areaIndex'] if areaIndex >= len(device.areas): return False, None area = device.areas[areaIndex] if area.type in ['AI', 'AO']: analogValue, qualityList = self._formatAnalogValue(rawValue) if analogValue is None: return False, None valuesToWrite = [analogValue] normalizedValue = analogValue else: valuesToWrite = self._formatDiscreteValues(area, rawValue) if valuesToWrite is None: return False, None qualityList = None normalizedValue = valuesToWrite[:] with self.profibusLock: try: if qualityList is not None: self.profibusManager.writeAreas( deviceName=info['deviceName'], areaIndex=areaIndex, values=valuesToWrite, qualityValueList=qualityList ) else: self.profibusManager.writeAreas( deviceName=info['deviceName'], areaIndex=areaIndex, values=valuesToWrite ) if area.type in ['AI', 'AO']: area.currentValue = [normalizedValue] if qualityList: area.qualityValueList = qualityList else: area.currentValue = normalizedValue[:] except Exception as e: print(f"写入PROFIBUS变量 {info['valueName']} 失败: {e}") return False, None with self.cacheLock: self.variableValueCache[info['valueName']] = normalizedValue return True, normalizedValue def _formatAnalogValue(self, rawValue): qualityValue = None targetValue = rawValue if isinstance(rawValue, dict): targetValue = rawValue.get('value') qualityValue = rawValue.get('quality') or rawValue.get('qualityValue') try: analogValue = float(targetValue) except (TypeError, ValueError): return None, None qualityList = None if qualityValue is not None: normalizedQuality = self._normalizeQualityValue(qualityValue) if normalizedQuality is None: return None, None qualityList = [normalizedQuality] return analogValue, qualityList def _normalizeQualityValue(self, qualityValue): try: if isinstance(qualityValue, str): qualityValue = qualityValue.strip() if qualityValue.lower().startswith('0x'): qualityInt = int(qualityValue, 16) else: qualityInt = int(qualityValue) else: qualityInt = int(qualityValue) if qualityInt < 0 or qualityInt > 255: return None return f"0x{qualityInt:02X}" except (TypeError, ValueError): return None def _formatDiscreteValues(self, area, rawValue): bitLength = (area.bytes or 0) * 8 if bitLength <= 0: bitLength = 16 if isinstance(getattr(area, 'currentValue', None), list) and area.currentValue: baseValues = list(area.currentValue) elif isinstance(getattr(area, 'forceValue', None), list) and area.forceValue: baseValues = list(area.forceValue) else: baseValues = [0] * bitLength if len(baseValues) < bitLength: baseValues += [0] * (bitLength - len(baseValues)) else: baseValues = baseValues[:bitLength] if isinstance(rawValue, dict) and 'index' in rawValue: try: index = int(rawValue['index']) except (TypeError, ValueError): return None if index < 0 or index >= bitLength: return None baseValues[index] = 1 if rawValue.get('value') in [1, True, '1', 'on', 'ON'] else 0 return baseValues if isinstance(rawValue, (list, tuple)): normalized = [1 if item in [1, True, '1', 'on', 'ON'] else 0 for item in rawValue] if len(normalized) < bitLength: normalized += [0] * (bitLength - len(normalized)) return normalized[:bitLength] baseValues[0] = 1 if rawValue in [1, True, '1', 'on', 'ON'] else 0 return baseValues def _readProfibusVariable(self, info): if not self.profibusManager: return None self._updateProfibusAreas() device = self.profibusManager.getDevice(info['deviceName']) if not device: return None areaIndex = info['areaIndex'] if areaIndex >= len(device.areas): return None area = device.areas[areaIndex] if area.type in ['AI', 'AO']: if isinstance(area.currentValue, list) and area.currentValue: try: return float(area.currentValue[0]) except (TypeError, ValueError): return area.currentValue[0] return None if isinstance(area.currentValue, list) and area.currentValue: bitLength = (area.bytes or 0) * 8 if bitLength <= 0: bitLength = len(area.currentValue) return area.currentValue[:bitLength] return None # ==================== 对外共享的 PROFIBUS 接口 ==================== def hasProfibusSupport(self) -> bool: return self.profibusEnabled and self.profibusManager is not None def getSharedProfibusManager(self): return self.profibusManager def connectProfibus(self) -> bool: if not self.hasProfibusSupport(): return False return self._ensureProfibusConnected() def refreshProfibusVariables(self): if self.profibusEnabled: self._refreshProfibusVarCache() def updateProfibusAreas(self, force: bool = False) -> bool: if not self.hasProfibusSupport(): return False self._updateProfibusAreas(force=force) return True def listProfibusVariables(self) -> Dict[str, Dict[str, Any]]: return {name: dict(info) for name, info in self.profibusVarMap.items()} def getProfibusVariableInfo(self, variableName: str) -> Optional[Dict[str, Any]]: return self.profibusVarMap.get(variableName) def readProfibusValue(self, variableName: str, useCache: bool = True): if not self.hasProfibusSupport(): return None if useCache: with self.cacheLock: if variableName in self.variableValueCache: return self.variableValueCache[variableName] value = self.readVariableValue(variableName) with self.cacheLock: self.variableValueCache[variableName] = value return value def writeProfibusValue(self, variableName: str, value) -> bool: if not self.hasProfibusSupport(): return False return bool(self.writeVariableValue(variableName, value)) def getAllVariableNames(self): return list(self.varInfoCache.keys()) def _readVariableValueOriginal(self, variableName): varInfo = self.lookupVariable(variableName) value = None if not varInfo: if self.RpcServer: try: # 使用safeRpcCall减少阻塞 existsVar, clientNames = self.safeRpcCall( self.RpcServer.existsVar, variableName ) or (False, None) if existsVar and clientNames: varData = self.safeRpcCall( self.RpcServer.getVarValue, clientNames[0], variableName ) if varData and 'value' in varData: # 安全地转换值 try: value = float(varData['value']) except (ValueError, TypeError): print(f"无法转换变量 {variableName} 的值: {varData['value']}") return None else: return None else: return None except Exception as e: print(f"RPC读取变量 {variableName} 失败: {e}") return None return None modelType = varInfo['modelType'] info = varInfo['variableData'] try: # 拆分为独立的协议条件判断 # if modelType == 'ModbusTcpMasterVar': # value = self.modbusManager.readModbusTcpMasterValue(info) if modelType == 'ModbusTcpSlaveVar': value = self.modbusManager.readModbusTcpSlaveValue(info) # elif modelType == 'ModbusRtuMasterVar': # value = self.modbusManager.readModbusRtuMasterValue(info) elif modelType == 'ModbusRtuSlaveVar': value = self.modbusManager.readModbusRtuSlaveValue(info) elif modelType == 'HartVar': pass elif modelType == 'TcRtdVar': if not self.tcpVarManager: return None channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) if model == SimModel: if varType == 'PT100': value = self.tcpVarManager.simRTDData[channel] elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.tcpVarManager.simTCData[channel] else: if varType == 'PT100': value = self.writeRTD[channel] if self.writeRTD else 0 elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.writeTC[channel] if self.writeTC else 0 elif modelType == 'AnalogVar': if not self.tcpVarManager: return None channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) value = self.tcpVarManager.readValue(varType, channel, model=model) if varType in ['AI', 'AO']: value = self.getRealAI(value, info['max'], info['min']) elif modelType == 'ProfibusVar': value = self._readProfibusVariable(info) elif modelType == 'HartSimulateVar': if not self.hartRtuSlaveManager: return None # 根据变量名确定是主变量还是动态变量 varName = variableName.lower() if 'primary' in varName or '主变量' in variableName: value = self.hartRtuSlaveManager.readVariable('primaryVariable') elif 'dynamic1' in varName or '动态变量1' in variableName: value = self.hartRtuSlaveManager.readVariable('dynamicVariable1') elif 'dynamic2' in varName or '动态变量2' in variableName: value = self.hartRtuSlaveManager.readVariable('dynamicVariable2') elif 'dynamic3' in varName or '动态变量3' in variableName: value = self.hartRtuSlaveManager.readVariable('dynamicVariable3') else: # 默认读取主变量 value = self.hartRtuSlaveManager.readVariable('primaryVariable') if value is not None and value != 'error': externalValue = self._prepareExternalValue(value) if self.RpcClient: self.RpcClient.setVarContent( variableName, externalValue, info.get('min'), info.get('max'), info.get('varType') ) if self.historyDBManage: self.historyDBManage.writeVarValue(variableName, externalValue) return value else: return None except Exception as e: print(f"读取变量值失败: {str(e)}") return None def _prepareExternalValue(self, value): if isinstance(value, (list, dict)): try: return json.dumps(value, ensure_ascii=False) except Exception: return str(value) return value def readVariableValue(self, variableName): with self.cacheLock: if variableName in self.variableValueCache: return self.variableValueCache[variableName] return self._readVariableValueOriginal(variableName) def recvDeltaT(self): if self.tcpVarManager: return self.tcpVarManager.recvDeltaT() return None def shutdown(self): if self.tcpVarManager: self.tcpVarManager.shutdown() # 关闭所有Modbus通讯 self.modbusManager.stopAllModbus() # 关闭HART RTU从站 if self.hartRtuSlaveManager: self.hartRtuSlaveManager.stopSlave() self.closeClient() self.closeServer() # 关闭后台读取线程 if hasattr(self, 'readThreadStop') and hasattr(self, 'readThread'): self.readThreadStop.set() self.readThread.join(timeout=1) def getRealAO(self, value, highValue, lowValue): if highValue: lowValue = float(lowValue) highValue = float(highValue) return (16 * (value - lowValue) + 4 * (highValue - lowValue)) / (1000 * (highValue - lowValue)) else: return value / 1000 def getRealAI(self, mA, highValue, lowValue): """将毫安值转换为实际工程值(getRealAO的反向计算)""" try: if highValue: lowValue = float(lowValue) highValue = float(highValue) return (1000 * mA * (highValue - lowValue) - 4 * (highValue - lowValue)) / 16.0 + lowValue else: return mA * 1000 except Exception as e: print(f"工程值转换失败: {str(e)}") return 0.0 def getModelType(self, varModel): if varModel == '本地值': return localModel elif varModel == '远程值': return NetModel elif varModel == '模拟值': return SimModel def disconnectClient(self, clientName): if self.RpcServer: self.RpcServer.removeClient(clientName) def safeRpcCall(self, method, *args, **kwargs): """安全的RPC调用,带超时和异常处理""" if not self.RpcServer: return None try: # 直接调用,但使用较短的超时时间 result = method(*args, **kwargs) return result except Exception as e: # 静默处理异常,避免日志过多 return None # ==================== Modbus 通讯管理方法(委托给 ModbusManager) ==================== def startModbusTcpMaster(self): """启动 Modbus TCP 主站""" return self.modbusManager.startModbusTcpMaster() def stopModbusTcpMaster(self): """停止 Modbus TCP 主站""" return self.modbusManager.stopModbusTcpMaster() def startModbusRtuMaster(self): """启动 Modbus RTU 主站""" return self.modbusManager.startModbusRtuMaster() def stopModbusRtuMaster(self): """停止 Modbus RTU 主站""" return self.modbusManager.stopModbusRtuMaster() def startModbusTcpSlave(self): """启动 Modbus TCP 从站""" return self.modbusManager.startModbusTcpSlave() def stopModbusTcpSlave(self): """停止 Modbus TCP 从站""" return self.modbusManager.stopModbusTcpSlave() def startModbusRtuSlave(self): """启动 Modbus RTU 从站""" return self.modbusManager.startModbusRtuSlave() def stopModbusRtuSlave(self): """停止 Modbus RTU 从站""" return self.modbusManager.stopModbusRtuSlave() def stopAllModbus(self): """停止所有 Modbus 通讯""" return self.modbusManager.stopAllModbus() def getModbusStatus(self): """获取所有 Modbus 通讯状态""" return self.modbusManager.getModbusStatus() def getModbusMessages(self): """获取 Modbus 报文信息""" return self.modbusManager.getMessages() def clearModbusMessages(self): """清空 Modbus 报文记录""" self.modbusManager.clearMessages() # ==================== HART模拟RTU从站管理方法 ==================== def startHartRtuSlave(self): """启动HART RTU从站""" if not self.hartRtuSlaveManager: return False return self.hartRtuSlaveManager.startSlave() def stopHartRtuSlave(self): """停止HART RTU从站""" if self.hartRtuSlaveManager: self.hartRtuSlaveManager.stopSlave() def writeHartVariable(self, variableName: str, value: float) -> bool: """写入HART变量值""" if not self.hartRtuSlaveManager: return False return self.hartRtuSlaveManager.writeVariable(variableName, value) def readHartVariable(self, variableName: str): """读取HART变量值""" if not self.hartRtuSlaveManager: return None return self.hartRtuSlaveManager.readVariable(variableName)