You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
14 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from utils.DBModels.ProtocolModel import (
ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar,
HartVar, TcRtdVar, AnalogVar, HartSimulateVar
)
from protocol.TCP.TCPVarManage import *
from protocol.TCP.TemToMv import temToMv
from protocol.RPC.RpcClient import RpcClient
from protocol.RPC.RpcServer import RpcServer
from utils import Globals
import threading
import time
class ProtocolManage(object):
"""通讯变量查找类,用于根据变量名在数据库模型中检索变量信息"""
# 支持的模型类列表
MODEL_CLASSES = [
ModbusTcpMasterVar, ModbusTcpSlaveVar,
ModbusRtuMasterVar, ModbusRtuSlaveVar,
HartVar, TcRtdVar, AnalogVar, HartSimulateVar
]
def __init__(self):
# self.tcpVarManager = TCPVarManager('192.168.1.50', 5055)
self.tcpVarManager = TCPVarManager('127.0.0.1', 8000)
self.writeTC = [0] * 8
self.writeRTD = [0] * 8
self.RpcClient = None
self.RpcServer = None
self.varInfoCache = {} # 保持驼峰命名
self.historyDBManage = Globals.getValue('historyDBManage')
self.variableValueCache = {} # {varName: value}
self.refreshVarCache()
self.cacheLock = threading.Lock()
self.readThreadStop = threading.Event()
self.readThread = threading.Thread(target=self._backgroundReadAllVariables, daemon=True)
self.readThread.start()
def clearVarCache(self):
"""清空变量信息缓存"""
self.varInfoCache.clear()
def refreshVarCache(self):
"""重新加载所有变量信息到缓存(可选实现)"""
self.varInfoCache.clear()
for modelClass in self.MODEL_CLASSES:
try:
for varInstance in modelClass.select():
varName = getattr(varInstance, 'varName', None)
if varName:
varData = {}
for field in varInstance._meta.sorted_fields:
fieldName = field.name
varData[fieldName] = getattr(varInstance, fieldName)
self.varInfoCache[varName] = {
'modelType': modelClass.__name__,
'variableData': varData
}
except Exception as e:
print(f"刷新缓存时出错: {modelClass.__name__}: {e}")
def lookupVariable(self, variableName):
"""
根据变量名检索变量信息(优先查缓存)
:param variableName: 要查询的变量名
:return: 包含变量信息和模型类型的字典如果未找到返回None
"""
if variableName in self.varInfoCache:
# print(111)
return self.varInfoCache[variableName]
for modelClass in self.MODEL_CLASSES:
varInstance = modelClass.getByName(variableName)
if varInstance:
varData = {}
for field in varInstance._meta.sorted_fields:
fieldName = field.name
varData[fieldName] = getattr(varInstance, fieldName)
result = {
'modelType': modelClass.__name__,
'variableData': varData
}
self.varInfoCache[variableName] = result # 写入缓存
return result
return None
def setClentMode(self, clentName, rabbitmqHost='localhost'):
if self.RpcClient:
self.RpcClient.close()
self.RpcClient = RpcClient(clentName, rabbitmqHost, self)
# 使用非阻塞方式启动RPC客户端
self.RpcClient.startNonBlocking()
def closeClent(self):
if self.RpcClient:
self.RpcClient.close()
self.RpcClient = None
def setServerMode(self, rabbitmqHost='localhost'):
if self.RpcServer:
return
self.RpcServer = RpcServer(rabbitmqHost)
def addClient(self, clientName):
if self.RpcServer:
self.RpcServer.addClient(clientName = clientName)
else:
return
# print(11111111111)
# time.sleep(3)
# print(self.RpcServer.broadcastRead())
# self.RpcServer.broadcastRead()
def closeServer(self):
if self.RpcServer:
self.RpcServer.close()
self.RpcServer = None
# @classmethod
def writeVariableValue(self, variableName, value, trigger = None, timeoutMS = 2000):
"""
根据变量名写入变量值,根据变量类型进行不同处理(不保存到数据库)
:param variableName: 变量名
:param value: 要写入的值
:return: 写入成功返回True否则返回False
"""
varInfo = self.lookupVariable(variableName)
if not varInfo:
if self.RpcServer:
existsVar, clientNames = self.RpcServer.existsVar(variableName)
if existsVar:
value = self.RpcServer.writeVar(variableName, value)
return True
else:
return False
return False
modelType = varInfo['modelType']
info = varInfo['variableData']
# print(info)
try:
# 拆分为四个独立的Modbus协议条件判断
if modelType == 'ModbusTcpMasterVar':
# 仅设置值,不保存到数据库
pass
elif modelType == 'ModbusTcpSlaveVar':
# 仅设置值,不保存到数据库
pass
elif modelType == 'ModbusRtuMasterVar':
# 仅设置值,不保存到数据库
pass
elif modelType == 'ModbusRtuSlaveVar':
# 仅设置值,不保存到数据库
pass
# HART协议变量处理
elif modelType == 'HartVar':
# 仅设置值,不保存到数据库
pass
# 温度/RTD变量处理
elif modelType == 'TcRtdVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
compensationVar = float(info['compensationVar'])
varModel = info['varModel']
model = self.getModelType(varModel) if self.getModelType(varModel) else localModel
# print(value + compensationVar)
if model == localModel:
if varType == 'PT100':
self.writeRTD[channel] = value
else:
self.writeTC[channel] = value
if varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A', 'PT100'] and model != SimModel:
value = temToMv(varType, value + compensationVar) # 直接补偿温度 补偿mv调整到括号外
if trigger and varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
trigger = TCTrigger
if trigger and varType in ['PT100']:
trigger = RTDTrigger
self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS)
# 模拟量变量处理
elif modelType == 'AnalogVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
varModel = info['varModel']
model = self.getModelType(varModel) if self.getModelType(varModel) else localModel
if info['varType'] in ['AI','AO']:
value = self.getRealAO(value, info['max'], info['min'])
trigger = FPGATrigger if trigger else trigger
self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS)
# print(1)
# HART模拟变量处理
elif modelType == 'HartSimulateVar':
# 仅设置值,不保存到数据库
pass
if self.RpcClient:
self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType'])
return True
except Exception as e:
print(f"写入变量值失败: {str(e)}")
return False
def _backgroundReadAllVariables(self, interval=0.5):
while not self.readThreadStop.is_set():
allVarNames = list(self.getAllVariableNames())
for varName in allVarNames:
value = self._readVariableValueOriginal(varName)
with self.cacheLock:
self.variableValueCache[varName] = value
time.sleep(interval)
def getAllVariableNames(self):
# 直接从缓存获取所有变量名,降低与数据库模型的耦合
return list(self.varInfoCache.keys())
def _readVariableValueOriginal(self, variableName):
# 完全保留原有读取逻辑
# print(12)
varInfo = self.lookupVariable(variableName)
value = None
if not varInfo:
if self.RpcServer:
existsVar, clientNames = self.RpcServer.existsVar(variableName)
if existsVar:
value = float(self.RpcServer.getVarValue(clientNames[0], variableName)['value'])
# return value
else:
return None
return None
modelType = varInfo['modelType']
info = varInfo['variableData']
try:
# 拆分为独立的协议条件判断
if modelType == 'ModbusTcpMasterVar':
pass
elif modelType == 'ModbusTcpSlaveVar':
pass
elif modelType == 'ModbusRtuMasterVar':
pass
elif modelType == 'ModbusRtuSlaveVar':
pass
elif modelType == 'HartVar':
pass
elif modelType == 'TcRtdVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
varModel = info['varModel']
model = self.getModelType(varModel)
if model == SimModel:
if varType == 'PT100':
value = self.tcpVarManager.simRTDData[channel]
elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
value = self.tcpVarManager.simTCData[channel]
# if self.RpcClient:
# self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType'])
# return value
else:
if varType == 'PT100':
value = self.writeRTD[channel]
elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
value = self.writeTC[channel]
# if self.RpcClient:
# self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType'])
# return value
elif modelType == 'AnalogVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
varModel = info['varModel']
model = self.getModelType(varModel)
value = self.tcpVarManager.readValue(varType, channel, model=model)
if varType in ['AI','AO']:
value = self.getRealAI(value, info['max'], info['min'])
# return value
elif modelType == 'HartSimulateVar':
pass
if value is not None:
if self.RpcClient:
self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType'])
self.historyDBManage.writeVarValue(variableName, value)
# print('sucess')
return value
else:
return None
except Exception as e:
print(f"读取变量值失败: {str(e)}")
return None
def readVariableValue(self, variableName):
# 优先从缓存读取,未命中则走原有逻辑
with self.cacheLock:
if variableName in self.variableValueCache:
return self.variableValueCache[variableName]
return self._readVariableValueOriginal(variableName)
# def sendTrigger(self, variableName, value, timeoutMS):
# self.writeVariableValue(variableName, value, trigger=True, timeoutMS = timeoutMS)
def recvDeltaT(self):
return self.tcpVarManager.recvDeltaT()
def shutdown(self):
self.tcpVarManager.shutdown()
# 关闭后台读取线程
if hasattr(self, 'readThreadStop') and hasattr(self, 'readThread'):
self.readThreadStop.set()
self.readThread.join(timeout=1)
def getRealAO(self, value,highValue, lowValue):
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
return (16 * (value - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue))
else:
return value/1000
def getRealAI(self, mA, highValue, lowValue):
"""
将毫安值转换为实际工程值getRealAO的反向计算
:param mA: 毫安值
:param highValue: 工程值上限
:param lowValue: 工程值下限
:return: 实际工程值
"""
try:
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
# 反向计算: value = (1000 * mA * (highValue - lowValue) - 4*(highValue - lowValue)) / 16 + lowValue
return (1000 * mA * (highValue - lowValue) - 4 * (highValue - lowValue)) / 16.0 + lowValue
else:
return mA * 1000
except Exception as e:
print(f"工程值转换失败: {str(e)}")
return 0.0 # 默认返回0避免中断流程
def getModelType(self, varModel):
if varModel == '本地值':
return localModel
elif varModel == '远程值':
return NetModel
elif varModel == '模拟值':
return SimModel
def disconnectClient(self, clientName):
if self.RpcServer:
self.RpcServer.removeClient(clientName)