from ast import mod import re from numpy import var import pandas as pd from utils import Globals import openpyxl from utils.DBModels.ProtocolModel import ModbusTcpMasterVar, ModbusRtuMasterVar, ModbusRtuSlaveVar,\ ModbusTcpSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar from PyQt5.Qt import QFileDialog, QMessageBox from protocol.ProtocolManage import ProtocolManage # ID 从站地址 变量名 变量描述 变量类型 寄存器地址 工程量下限 工程量上限 # 获取ProtocolManage单例或全局对象(假设为全局唯一,实际项目可根据实际情况调整) def refreshCache(): protocolManagerInstance = Globals.getValue('protocolManage') if protocolManagerInstance: protocolManagerInstance.refreshVarCache() class ModbusVarManage(object): ModBusVarClass = { 'ModbusTcpMaster': ModbusTcpMasterVar, 'ModbusTcpSlave': ModbusTcpSlaveVar, 'ModbusRtuMaster': ModbusRtuMasterVar, 'ModbusRtuSlave': ModbusRtuSlaveVar, } def __init__(self): super().__init__() @classmethod def getVarClass(self, modbusType): """根据类型获取对应的变量类""" return self.ModBusVarClass.get(modbusType) @classmethod def importVarForm(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel, index): varClass = self.getVarClass(modbusType) if varType not in [0, 1, 3, 4]: errorInfo = '表{}第{}行导入失败,{}'.format(str(modbusType), str(index + 1), '变量类型有误') return errorInfo if varClass.getByName(varName): # 如果变量已存在,更新其信息 varClass.update(varType=varType, description=des, address=address, slaveID=slaveID, min=min, max=max, order=order, varModel=varModel).where(varClass.varName == varName).execute() else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) # 操作后刷新缓存 refreshCache() # def importModbusVar(self, path): # # 导入modbus变量表 # wb = openpyxl.load_workbook(path) # ws = wb.active # IDIndex = None # slaveIndex = None # nameIndex = None # desIndex = None # typeIndex = None # addrIndex = None # minIndex = None # maxIndex = None # for index, cell in enumerate(list(ws.iter_rows())[0]): # if cell.value in ['id', 'ID']: # IDIndex = index # if cell.value == '从站地址': # slaveIndex = index # if cell.value == '变量名': # nameIndex = index # if cell.value == '变量描述': # desIndex = index # if cell.value == '变量类型': # typeIndex = index # if cell.value == '寄存器地址': # addrIndex = index # if cell.value == '工程量下限': # minIndex = index # if cell.value == '工程量上限': # maxIndex = index # # print(IDIndex, slaveIndex, nameIndex, desIndex, typeIndex, addrIndex, minIndex, maxIndex) # if IDIndex == None or slaveIndex == None or nameIndex == None or desIndex == None or typeIndex == None or addrIndex == None or minIndex == None or maxIndex == None: # # print('表头错误') # return '表头错误' # errorConList = [] # for index, row in enumerate(list(ws.iter_rows())[1:]): # try: # l = [str(x.value) for x in row] # varName = l[nameIndex] # varType = l[typeIndex] # des = '' if l[desIndex] == 'None' else l[desIndex] # address = l[addrIndex] # slaveID = l[slaveIndex] # min = '' if l[minIndex] == 'None' else l[minIndex] # max = '' if l[maxIndex] == 'None' else l[maxIndex] # if {varName, varType, address, slaveID} & {'None'}: # errorConList.append('第{}行导入失败,有关键字段为空'.format(str(index + 1))) # continue # if varType not in ['0', '1', '3', '4']: # errorConList.append('第{}行导入失败,变量类型有误'.format(str(index + 1))) # continue # if ModBusVar.getByName(varName): # ModBusVar.update(varType = varType, description = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int').where(ModBusVar.varName == varName).execute() # else: # ModBusVarModel = ModBusVar() # ModBusVarModel.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int') # except Exception as e: # errorConList.append('第{}行导入失败,{}'.format(str(index + 1), str(e))) # continue # else: # if errorConList: # return '\r\n'.join(errorConList) # else: # return '导入成功' @classmethod def exportModbusVar(self, excelPath): vars = ModBusVar.get_all() if vars is 'error': return wb = openpyxl.Workbook() # Create a new worksheet ws = wb.active ws.title = "Modbus Variables" ws.append(['ID', '从站地址', '变量名', '变量描述', '变量类型', '寄存器地址', '工程量下限', '工程量上限']) for var in vars: ws.append([var.id, var.slaveID, var.varName, var.description, var.varType, var.address, var.min, var.max]) # Save the workbook wb.save(excelPath) @classmethod def createVar(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel): # 创建变量 name = str(varName) des = str(des) varType = str(varType) address = str(address) slaveID = str(slaveID) min = str(min) max = str(max) order = order varModel = str(varModel) if self.getVarClass(modbusType).getByName(name): return 1 else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def deleteVar(self, name, modbusType): # 删除变量 name = str(name) # print(name) self.getVarClass(modbusType).deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varType, slaveID, address, min, max, order, modbusType, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varType = str(varType) slaveID = str(slaveID) address = str(address) min = str(min) max = str(max) varModel = str(varModel) if Nname == name: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() elif self.getVarClass(modbusType).getByName(Nname): print('已有同名变量') return elif not self.getVarClass(modbusType).getByName(name): print('不存在的变量') return else: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editOrder(self, name, order, modbusType): name = str(name) order = str(order) self.getVarClass(modbusType).update(order = order).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel, modbusType): name = str(name) varModel = str(varModel) self.getVarClass(modbusType).update(varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self, modbusType): # 查询所有变量 vars = self.getVarClass(modbusType).get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel, var.order]) return l @classmethod def getByName(self, name, modbusType): # 查询指定变量信息 var = self.getVarClass(modbusType).getByName(name) if var: return [var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel] else: return False class HartVarManage(object): def __init__(self): super(HartVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) if HartVar.getByName(name): print('已有同名变量') else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = HartVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varModel = str(varModel) if Nname == name: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() elif HartVar.getByName(Nname): print('已有同名变量') return elif not HartVar.getByName(name): print('不存在的变量') return else: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) print('修改变量模型',name) HartVar.update(varModel = str(varModel)).where(HartVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if HartVar.getByName(varName): # 如果变量已存在,更新其信息 HartVar.update(description = des, varModel = varModel).where(HartVar.varName == varName).execute() else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) # 操作后刷新缓存 refreshCache() class TcRtdManage(object): def __init__(self): super(TcRtdManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel): # 创建变量 name = str(varName) des = str(des) channelNumber = str(channelNumber) varModel = str(varModel) if TcRtdVar.getByName(name): print('已有同名变量') else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = TcRtdVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.channelNumber,var.description, var.varType, var.min, var.max, var.compensationVar, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) TcRtdVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max, compensationVar): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) compensationVar = str(compensationVar) if Nname == name: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == name).execute() elif TcRtdVar.getByName(Nname): print('已有同名变量') return elif not TcRtdVar.getByName(name): print('不存在的变量') return else: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = TcRtdVar.getByName(name) if var: return [var.id, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.compensationVar, var.varModel] else: return False def initVar(self): # 创建变量 for i in range(1, 9): name = '热偶输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='R', des='TC', min='100', max='200', compensationVar = '0', varModel = '本地值') for i in range(1, 9): name = '热阻输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='PT100', des='RTD', min='100', max='200', compensationVar = '0', varModel = '本地值') @classmethod def editvarType(self, name, varType): name = str(name) TcRtdVar.update(varType = str(varType)).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) TcRtdVar.update(varModel = str(varModel)).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel): if TcRtdVar.getByName(varName): # 如果变量已存在,更新其信息 TcRtdVar.update(channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == varName).execute() else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel) # 操作后刷新缓存 refreshCache() class AnalogManage(object): def __init__(self): super(AnalogManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, varModel): # 创建变量 name = str(varName) channelNumber = str(channelNumber) des = str(des) varModel = str(varModel) # if AnalogVar.getByName(name): # print('已有同名变量') # else: analogVarType = AnalogVar() analogVarType.createVar(varName = varName, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = AnalogVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) AnalogVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) if Nname == name: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == name).execute() elif AnalogVar.getByName(Nname): print('已有同名变量') return elif not AnalogVar.getByName(name): print('不存在的变量') return else: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) AnalogVar.update(varModel = str(varModel)).where(AnalogVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = AnalogVar.getByName(name) if var: return [var.id, var.varName, var. channelNumber, var.description, var.varType, var.min, var.max, var.varModel] else: return False def initVar(self): for i in range(1, 13): name = '无源4-20mA输出通道' + str(i) des = '无源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber= str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值') for i in range(13, 17): name = '有源4-20mA输出通道' + str(i) des = '有源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 17): name = '有源24V数字输出通道' + str(i) des = '有源24V数字输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DO', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 9): name = '无源24V数字输入通道' + str(i) des = '无源24V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='100', max='200', varModel = '本地值') for i in range(9, 17): name = '有源48V数字输入通道' + str(i) des = '有源48V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 9): name = '有源/无源4-20mA输入通道' + str(i) des = '有源/无源4-20mA输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AI', des=des, min='100', max='200', varModel = '本地值') @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, varModel): if AnalogVar.getByName(varName): # 如果变量已存在,更新其信息 AnalogVar.update(channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == varName).execute() else: analogVarType = AnalogVar() analogVarType.createVar(varName = varName, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel) # 操作后刷新缓存 refreshCache() class HartSimulateVarManage(object): def __init__(self): super(HartSimulateVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) varModel = str(varModel) if HartSimulateVar.getByName(name): print('已有同名变量') else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = HartSimulateVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartSimulateVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) if Nname == name: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() elif HartSimulateVar.getByName(Nname): print('已有同名变量') return elif not HartSimulateVar.getByName(name): print('不存在的变量') return else: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) HartSimulateVar.update(varModel = str(varModel)).where(HartSimulateVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartSimulateVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if AnalogVar.getByName(varName): # 如果变量已存在,更新其信息 HartSimulateVar.update(description = des, varModel = varModel).where(HartSimulateVar.varName == varName).execute() else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) # 操作后刷新缓存 refreshCache() class GlobalVarManager(object): # 所有支持的变量模型类列表 allVarClass = [ ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ] @classmethod def getAllVarNames(cls): """获取所有协议模型中存在的变量名(去重集合)""" allVarNames = set() for varClass in cls.allVarClass: try: # 查询该模型类的所有变量名 query = varClass.select(varClass.varName) names = {var.varName for var in query} allVarNames.update(names) except Exception as e: print(f"查询 {varClass.__name__} 变量名失败: {str(e)}") return allVarNames @classmethod def isVarNameExist(cls, varName): """检查新变量名是否在任意协议模型中已存在""" existingNames = cls.getAllVarNames() if str(varName) in existingNames: return True else: return False @classmethod def importVar(cls, sheetName, data): """导入变量""" errorConList = [] match sheetName: case 'ModbusTCP主站' | 'ModbusTCP从站' | 'ModbusRTU主站' | 'ModbusRTU从站': requiredFields = ['变量名', '变量类型', '寄存器地址', '从站地址'] for index, row in data.iterrows(): # 检查关键字段是否为空 if any(pd.isna(row[field]) for field in requiredFields): errorConList.append(f'第{index+1}行关键字段为空') continue try: # 统一处理Modbus导入 modbusType = 'ModbusTcpMaster' if sheetName == 'ModbusTCP主站' else \ 'ModbusTcpSlave' if sheetName == 'ModbusTCP从站' else \ 'ModbusRtuMaster' if sheetName == 'ModbusRTU主站' else 'ModbusRtuSlave' errInfo = ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row.get('变量描述', ''), address=row['寄存器地址'], slaveID=row['从站地址'], min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), order=row.get('字节顺序', 'int'), modbusType=modbusType, varModel=row.get('值类型', ''), index=index ) if errInfo: errorConList.append(errInfo) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') case 'IO' | 'TCRTD': requiredFields = ['变量名', '通道序号', '变量类型'] if sheetName == 'IO' else \ ['变量名', '通道序号', '变量类型', '补偿值'] importFunc = AnalogManage.importVarForm if sheetName == 'IO' else TcRtdManage.importVarForm for index, row in data.iterrows(): if any(pd.isna(row[field]) for field in requiredFields): errorConList.append(f'第{index+1}行关键字段为空') continue try: if sheetName == 'IO': importFunc( varName=row['变量名'], channelNumber=row['通道序号'], varType=row['变量类型'], des=row.get('变量描述', ''), min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), varModel=row.get('值类型', '') ) else: # TCRTD importFunc( varName=row['变量名'], channelNumber=row['通道序号'], varType=row['变量类型'], des=row.get('变量描述', ''), min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), compensationVar=row['补偿值'], varModel=row.get('值类型', '') ) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') case 'Hart模拟' | 'Hart读取': requiredFields = ['变量名'] importFunc = HartSimulateVarManage.importVarForm if sheetName == 'Hart模拟' else HartVarManage.importVarForm for index, row in data.iterrows(): if pd.isna(row['变量名']): errorConList.append(f'第{index+1}行变量名为空') continue try: importFunc( varName=row['变量名'], des=row.get('变量描述', ''), varModel=row.get('值类型', '') ) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') modelLists = ['ModbusTcpMasterTable', 'ModbusTcpSlaveTable', 'ModbusRtuMasterTable', 'ModbusRtuSlaveTable', 'HartTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable'] for l in modelLists: Globals.getValue(l).model.initTable() if errorConList: return '\r\n'.join(errorConList)