from utils.DBModels.ProtocolModel import ( ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ) from protocol.TCP.TCPVarManage import * from protocol.TCP.TemToMv import temToMv from protocol.RPC.RpcClient import RpcClient from protocol.RPC.RpcServer import RpcServer from protocol.ModBus.ModbusManager import ModbusManager from utils import Globals import threading import time class ProtocolManage(object): """通讯变量查找类,用于根据变量名在数据库模型中检索变量信息""" # 支持的模型类列表 MODEL_CLASSES = [ ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ] def __init__(self): # self.tcpVarManager = TCPVarManager('192.168.1.50', 5055) self.tcpVarManager = TCPVarManager('127.0.0.1', 8000) self.writeTC = [0] * 8 self.writeRTD = [0] * 8 self.RpcClient = None self.RpcServer = None self.varInfoCache = {} # 保持驼峰命名 self.historyDBManage = Globals.getValue('historyDBManage') self.variableValueCache = {} # {varName: value} # Modbus 管理器 self.modbusManager = ModbusManager() # self.modbusManager.setVariableCache(self.variableValueCache, None, self.varInfoCache) self.refreshVarCache() self.cacheLock = threading.Lock() # 设置 Modbus 管理器的缓存锁 self.modbusManager.setVariableCache(self.variableValueCache, self.cacheLock, self.varInfoCache) self.readThreadStop = threading.Event() self.readThread = threading.Thread(target=self._backgroundReadAllVariables, daemon=True) self.readThread.start() def clearVarCache(self): """清空变量信息缓存""" self.varInfoCache.clear() def refreshVarCache(self): """重新加载所有变量信息到缓存(可选实现)""" self.varInfoCache.clear() for modelClass in self.MODEL_CLASSES: try: for varInstance in modelClass.select(): varName = getattr(varInstance, 'varName', None) if varName: varData = {} for field in varInstance._meta.sorted_fields: fieldName = field.name varData[fieldName] = getattr(varInstance, fieldName) self.varInfoCache[varName] = { 'modelType': modelClass.__name__, 'variableData': varData } except Exception as e: print(f"刷新缓存时出错: {modelClass.__name__}: {e}") def lookupVariable(self, variableName): """ 根据变量名检索变量信息(优先查缓存) :param variableName: 要查询的变量名 :return: 包含变量信息和模型类型的字典,如果未找到返回None """ if variableName in self.varInfoCache: return self.varInfoCache[variableName] for modelClass in self.MODEL_CLASSES: varInstance = modelClass.getByName(variableName) if varInstance: varData = {} for field in varInstance._meta.sorted_fields: fieldName = field.name varData[fieldName] = getattr(varInstance, fieldName) result = { 'modelType': modelClass.__name__, 'variableData': varData } self.varInfoCache[variableName] = result # 写入缓存 return result return None def setClentMode(self, clentName, rabbitmqHost='localhost'): if self.RpcClient: self.RpcClient.close() self.RpcClient = RpcClient(clentName, rabbitmqHost, self) # 使用非阻塞方式启动RPC客户端 self.RpcClient.startNonBlocking() def closeClent(self): if self.RpcClient: self.RpcClient.close() self.RpcClient = None def setServerMode(self, rabbitmqHost='localhost'): if self.RpcServer: return self.RpcServer = RpcServer(rabbitmqHost) def addClient(self, clientName): if self.RpcServer: self.RpcServer.addClient(clientName=clientName) else: return def closeServer(self): if self.RpcServer: self.RpcServer.close() self.RpcServer = None def writeVariableValue(self, variableName, value, trigger=None, timeoutMS=2000): """ 根据变量名写入变量值,根据变量类型进行不同处理(不保存到数据库) :param variableName: 变量名 :param value: 要写入的值 :return: 写入成功返回True,否则返回False """ varInfo = self.lookupVariable(variableName) if not varInfo: if self.RpcServer: existsVar, clientNames = self.RpcServer.existsVar(variableName) if existsVar: value = self.RpcServer.writeVar(variableName, value) return True else: return False return False modelType = varInfo['modelType'] info = varInfo['variableData'] try: # 拆分为四个独立的Modbus协议条件判断 if modelType == 'ModbusTcpMasterVar': res = self.modbusManager.writeModbusTcpMasterValue(info, value) # print(res) if res == 'error': return elif modelType == 'ModbusTcpSlaveVar': res = self.modbusManager.writeModbusTcpSlaveValue(info, value) if res == 'error': return elif modelType == 'ModbusRtuMasterVar': res = self.modbusManager.writeModbusRtuMasterValue(info, value) if res == 'error': return elif modelType == 'ModbusRtuSlaveVar': res = self.modbusManager.writeModbusRtuSlaveValue(info, value) if res == 'error': return # HART协议变量处理 elif modelType == 'HartVar': pass # 温度/RTD变量处理 elif modelType == 'TcRtdVar': channel = int(info['channelNumber']) - 1 varType = info['varType'] compensationVar = float(info['compensationVar']) varModel = info['varModel'] model = self.getModelType(varModel) if self.getModelType(varModel) else localModel if model == localModel: if varType == 'PT100': self.writeRTD[channel] = value else: self.writeTC[channel] = value if varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A', 'PT100'] and model != SimModel: value = temToMv(varType, value + compensationVar) if trigger and varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: trigger = TCTrigger if trigger and varType in ['PT100']: trigger = RTDTrigger self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) # 模拟量变量处理 elif modelType == 'AnalogVar': channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) if self.getModelType(varModel) else localModel if info['varType'] in ['AI', 'AO']: value = self.getRealAO(value, info['max'], info['min']) trigger = FPGATrigger if trigger else trigger self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) # HART模拟变量处理 elif modelType == 'HartSimulateVar': pass if self.RpcClient: self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) return True except Exception as e: print(f"写入变量值失败: {str(e)}") return False def _backgroundReadAllVariables(self, interval=0.5): while not self.readThreadStop.is_set(): allVarNames = list(self.getAllVariableNames()) for varName in allVarNames: value = self._readVariableValueOriginal(varName) with self.cacheLock: self.variableValueCache[varName] = value time.sleep(interval) def getAllVariableNames(self): return list(self.varInfoCache.keys()) def _readVariableValueOriginal(self, variableName): varInfo = self.lookupVariable(variableName) value = None if not varInfo: if self.RpcServer: existsVar, clientNames = self.RpcServer.existsVar(variableName) if existsVar: value = float(self.RpcServer.getVarValue(clientNames[0], variableName)['value']) else: return None return None modelType = varInfo['modelType'] info = varInfo['variableData'] try: # 拆分为独立的协议条件判断 # if modelType == 'ModbusTcpMasterVar': # value = self.modbusManager.readModbusTcpMasterValue(info) if modelType == 'ModbusTcpSlaveVar': value = self.modbusManager.readModbusTcpSlaveValue(info) # elif modelType == 'ModbusRtuMasterVar': # value = self.modbusManager.readModbusRtuMasterValue(info) elif modelType == 'ModbusRtuSlaveVar': value = self.modbusManager.readModbusRtuSlaveValue(info) elif modelType == 'HartVar': pass elif modelType == 'TcRtdVar': channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) if model == SimModel: if varType == 'PT100': value = self.tcpVarManager.simRTDData[channel] elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.tcpVarManager.simTCData[channel] else: if varType == 'PT100': value = self.writeRTD[channel] elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.writeTC[channel] elif modelType == 'AnalogVar': channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] model = self.getModelType(varModel) value = self.tcpVarManager.readValue(varType, channel, model=model) if varType in ['AI', 'AO']: value = self.getRealAI(value, info['max'], info['min']) elif modelType == 'HartSimulateVar': pass if value is not None and value != 'error': if self.RpcClient: self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) self.historyDBManage.writeVarValue(variableName, value) return value else: return None except Exception as e: print(f"读取变量值失败: {str(e)}") return None def readVariableValue(self, variableName): with self.cacheLock: if variableName in self.variableValueCache: return self.variableValueCache[variableName] return self._readVariableValueOriginal(variableName) def recvDeltaT(self): return self.tcpVarManager.recvDeltaT() def shutdown(self): self.tcpVarManager.shutdown() # 关闭所有Modbus通讯 self.modbusManager.stopAllModbus() # 关闭后台读取线程 if hasattr(self, 'readThreadStop') and hasattr(self, 'readThread'): self.readThreadStop.set() self.readThread.join(timeout=1) def getRealAO(self, value, highValue, lowValue): if highValue: lowValue = float(lowValue) highValue = float(highValue) return (16 * (value - lowValue) + 4 * (highValue - lowValue)) / (1000 * (highValue - lowValue)) else: return value / 1000 def getRealAI(self, mA, highValue, lowValue): """将毫安值转换为实际工程值(getRealAO的反向计算)""" try: if highValue: lowValue = float(lowValue) highValue = float(highValue) return (1000 * mA * (highValue - lowValue) - 4 * (highValue - lowValue)) / 16.0 + lowValue else: return mA * 1000 except Exception as e: print(f"工程值转换失败: {str(e)}") return 0.0 def getModelType(self, varModel): if varModel == '本地值': return localModel elif varModel == '远程值': return NetModel elif varModel == '模拟值': return SimModel def disconnectClient(self, clientName): if self.RpcServer: self.RpcServer.removeClient(clientName) # ==================== Modbus 通讯管理方法(委托给 ModbusManager) ==================== def startModbusTcpMaster(self): """启动 Modbus TCP 主站""" return self.modbusManager.startModbusTcpMaster() def stopModbusTcpMaster(self): """停止 Modbus TCP 主站""" return self.modbusManager.stopModbusTcpMaster() def startModbusRtuMaster(self): """启动 Modbus RTU 主站""" return self.modbusManager.startModbusRtuMaster() def stopModbusRtuMaster(self): """停止 Modbus RTU 主站""" return self.modbusManager.stopModbusRtuMaster() def startModbusTcpSlave(self): """启动 Modbus TCP 从站""" return self.modbusManager.startModbusTcpSlave() def stopModbusTcpSlave(self): """停止 Modbus TCP 从站""" return self.modbusManager.stopModbusTcpSlave() def startModbusRtuSlave(self): """启动 Modbus RTU 从站""" return self.modbusManager.startModbusRtuSlave() def stopModbusRtuSlave(self): """停止 Modbus RTU 从站""" return self.modbusManager.stopModbusRtuSlave() def stopAllModbus(self): """停止所有 Modbus 通讯""" return self.modbusManager.stopAllModbus() def getModbusStatus(self): """获取所有 Modbus 通讯状态""" return self.modbusManager.getModbusStatus() def getModbusMessages(self): """获取 Modbus 报文信息""" return self.modbusManager.getMessages() def clearModbusMessages(self): """清空 Modbus 报文记录""" self.modbusManager.clearMessages()