from utils import Globals import openpyxl from utils.DBModels.ProtocolModel import ModbusTcpMasterVar, ModbusRtuMasterVar, ModbusRtuSlaveVar,\ ModbusTcpSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar from PyQt5.Qt import QFileDialog, QMessageBox # ID 从站地址 变量名 变量描述 变量类型 寄存器地址 工程量下限 工程量上限 class ModbusVarManage(object): ModBusVarClass = { 'ModbusTcpMaster': ModbusTcpMasterVar, 'ModbusTcpSlave': ModbusTcpSlaveVar, 'ModbusRtuMaster': ModbusRtuMasterVar, 'ModbusRtuSlave': ModbusRtuSlaveVar, } def __init__(self): super().__init__() @classmethod def getVarClass(self, modbusType): """根据类型获取对应的变量类""" return self.ModBusVarClass.get(modbusType) @classmethod def importVarForm(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel): varClass = self.getVarClass(modbusType) if varClass.getByName(varName): # 如果变量已存在,更新其信息 varClass.update(varType=varType, description=des, address=address, slaveID=slaveID, min=min, max=max, order=order, varModel=varModel).where(varClass.varName == varName).execute() else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) # def importModbusVar(self, path): # # 导入modbus变量表 # wb = openpyxl.load_workbook(path) # ws = wb.active # IDIndex = None # slaveIndex = None # nameIndex = None # desIndex = None # typeIndex = None # addrIndex = None # minIndex = None # maxIndex = None # for index, cell in enumerate(list(ws.iter_rows())[0]): # if cell.value in ['id', 'ID']: # IDIndex = index # if cell.value == '从站地址': # slaveIndex = index # if cell.value == '变量名': # nameIndex = index # if cell.value == '变量描述': # desIndex = index # if cell.value == '变量类型': # typeIndex = index # if cell.value == '寄存器地址': # addrIndex = index # if cell.value == '工程量下限': # minIndex = index # if cell.value == '工程量上限': # maxIndex = index # # print(IDIndex, slaveIndex, nameIndex, desIndex, typeIndex, addrIndex, minIndex, maxIndex) # if IDIndex == None or slaveIndex == None or nameIndex == None or desIndex == None or typeIndex == None or addrIndex == None or minIndex == None or maxIndex == None: # # print('表头错误') # return '表头错误' # errorConList = [] # for index, row in enumerate(list(ws.iter_rows())[1:]): # try: # l = [str(x.value) for x in row] # varName = l[nameIndex] # varType = l[typeIndex] # des = '' if l[desIndex] == 'None' else l[desIndex] # address = l[addrIndex] # slaveID = l[slaveIndex] # min = '' if l[minIndex] == 'None' else l[minIndex] # max = '' if l[maxIndex] == 'None' else l[maxIndex] # if {varName, varType, address, slaveID} & {'None'}: # errorConList.append('第{}行导入失败,有关键字段为空'.format(str(index + 1))) # continue # if varType not in ['0', '1', '3', '4']: # errorConList.append('第{}行导入失败,变量类型有误'.format(str(index + 1))) # continue # if ModBusVar.getByName(varName): # ModBusVar.update(varType = varType, description = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int').where(ModBusVar.varName == varName).execute() # else: # ModBusVarModel = ModBusVar() # ModBusVarModel.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int') # except Exception as e: # errorConList.append('第{}行导入失败,{}'.format(str(index + 1), str(e))) # continue # else: # if errorConList: # return '\r\n'.join(errorConList) # else: # return '导入成功' @classmethod def exportModbusVar(self, excelPath): vars = ModBusVar.get_all() if vars is 'error': return wb = openpyxl.Workbook() # Create a new worksheet ws = wb.active ws.title = "Modbus Variables" ws.append(['ID', '从站地址', '变量名', '变量描述', '变量类型', '寄存器地址', '工程量下限', '工程量上限']) for var in vars: ws.append([var.id, var.slaveID, var.varName, var.description, var.varType, var.address, var.min, var.max]) # Save the workbook wb.save(excelPath) @classmethod def createVar(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel): # 创建变量 name = str(varName) des = str(des) varType = str(varType) address = str(address) slaveID = str(slaveID) min = str(min) max = str(max) order = order varModel = str(varModel) if self.getVarClass(modbusType).getByName(name): return 1 else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) @classmethod def deleteVar(self, name, modbusType): # 删除变量 name = str(name) # print(name) self.getVarClass(modbusType).deleteVar(name = name) @classmethod def editVar(self, name, Nname, des, varType, slaveID, address, min, max, order, modbusType, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varType = str(varType) slaveID = str(slaveID) address = str(address) min = str(min) max = str(max) varModel = str(varModel) if Nname == name: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() elif self.getVarClass(modbusType).getByName(Nname): print('已有同名变量') return elif not self.getVarClass(modbusType).getByName(name): print('不存在的变量') return else: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() @classmethod def editOrder(self, name, order, modbusType): name = str(name) order = str(order) self.getVarClass(modbusType).update(order = order).where(self.getVarClass(modbusType).varName == name).execute() @classmethod def editVarModel(self, name, varModel, modbusType): name = str(name) varModel = str(varModel) self.getVarClass(modbusType).update(varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() @classmethod def getAllVar(self, modbusType): # 查询所有变量 vars = self.getVarClass(modbusType).get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel, var.order]) return l @classmethod def getByName(self, name, modbusType): # 查询指定变量信息 var = self.getVarClass(modbusType).getByName(name) if var: return [var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel] else: return False class HartVarManage(object): def __init__(self): super(HartVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) if HartVar.getByName(name): print('已有同名变量') else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) @classmethod def getAllVar(self): # 查询所有变量 vars = HartVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartVar.deleteVar(name = name) @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varModel = str(varModel) if Nname == name: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() elif HartVar.getByName(Nname): print('已有同名变量') return elif not HartVar.getByName(name): print('不存在的变量') return else: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() @classmethod def editVarModel(self, name, varModel): name = str(name) print('修改变量模型',name) HartVar.update(varModel = str(varModel)).where(HartVar.varName == name).execute() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if HartVar.getByName(varName): # 如果变量已存在,更新其信息 HartVar.update(description = des, varModel = varModel).where(HartVar.varName == varName).execute() else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) class TcRtdManage(object): def __init__(self): super(TcRtdManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel): # 创建变量 name = str(varName) des = str(des) channelNumber = str(channelNumber) varModel = str(varModel) if TcRtdVar.getByName(name): print('已有同名变量') else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel) @classmethod def getAllVar(self): # 查询所有变量 vars = TcRtdVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.channelNumber,var.description, var.varType, var.min, var.max, var.compensationVar, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) TcRtdVar.deleteVar(name = name) @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max, compensationVar): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) compensationVar = str(compensationVar) if Nname == name: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == name).execute() elif TcRtdVar.getByName(Nname): print('已有同名变量') return elif not TcRtdVar.getByName(name): print('不存在的变量') return else: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == name).execute() @classmethod def getByName(self, name): # 查询指定变量信息 var = TcRtdVar.getByName(name) if var: return [var.id, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.compensationVar, var.varModel] else: return False def initVar(self): # 创建变量 for i in range(1, 9): name = '热偶输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='R', des='TC', min='100', max='200', compensationVar = '0', varModel = '本地值') for i in range(1, 9): name = '热阻输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='PT100', des='RTD', min='100', max='200', compensationVar = '0', varModel = '本地值') @classmethod def editvarType(self, name, varType): name = str(name) TcRtdVar.update(varType = str(varType)).where(TcRtdVar.varName == name).execute() @classmethod def editVarModel(self, name, varModel): name = str(name) TcRtdVar.update(varModel = str(varModel)).where(TcRtdVar.varName == name).execute() @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel): if TcRtdVar.getByName(varName): # 如果变量已存在,更新其信息 TcRtdVar.update(channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar).where(TcRtdVar.varName == varName).execute() else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel) class AnalogManage(object): def __init__(self): super(AnalogManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, varModel): # 创建变量 name = str(varName) channelNumber = str(channelNumber) des = str(des) varModel = str(varModel) # if AnalogVar.getByName(name): # print('已有同名变量') # else: analogVarType = AnalogVar() analogVarType.createVar(varName = varName, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel) @classmethod def getAllVar(self): # 查询所有变量 vars = AnalogVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) AnalogVar.deleteVar(name = name) @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) if Nname == name: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == name).execute() elif AnalogVar.getByName(Nname): print('已有同名变量') return elif not AnalogVar.getByName(name): print('不存在的变量') return else: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == name).execute() @classmethod def editVarModel(self, name, varModel): name = str(name) AnalogVar.update(varModel = str(varModel)).where(AnalogVar.varName == name).execute() @classmethod def getByName(self, name): # 查询指定变量信息 var = AnalogVar.getByName(name) if var: return [var.id, var.varName, var. channelNumber, var.description, var.varType, var.min, var.max, var.varModel] else: return False def initVar(self): for i in range(1, 13): name = '无源4-20mA输出通道' + str(i) des = '无源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber= str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值') for i in range(13, 17): name = '有源4-20mA输出通道' + str(i) des = '有源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 17): name = '有源24V数字输出通道' + str(i) des = '有源24V数字输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DO', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 9): name = '无源24V数字输入通道' + str(i) des = '无源24V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='100', max='200', varModel = '本地值') for i in range(9, 17): name = '有源48V数字输入通道' + str(i) des = '有源48V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='100', max='200', varModel = '本地值') for i in range(1, 9): name = '有源/无源4-20mA输入通道' + str(i) des = '有源/无源4-20mA输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AI', des=des, min='100', max='200', varModel = '本地值') @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, varModel): if AnalogVar.getByName(varName): # 如果变量已存在,更新其信息 AnalogVar.update(channelNumber =channelNumber, description=des, varType=varType, min=min, max=max).where(AnalogVar.varName == varName).execute() else: analogVarType = AnalogVar() analogVarType.createVar(varName = varName, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel) class HartSimulateVarManage(object): def __init__(self): super(HartSimulateVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) varModel = str(varModel) if HartSimulateVar.getByName(name): print('已有同名变量') else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) @classmethod def getAllVar(self): # 查询所有变量 vars = HartSimulateVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartSimulateVar.deleteVar(name = name) @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) if Nname == name: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() elif HartSimulateVar.getByName(Nname): print('已有同名变量') return elif not HartSimulateVar.getByName(name): print('不存在的变量') return else: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() @classmethod def editVarModel(self, name, varModel): name = str(name) HartSimulateVar.update(varModel = str(varModel)).where(HartSimulateVar.varName == name).execute() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartSimulateVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if AnalogVar.getByName(varName): # 如果变量已存在,更新其信息 HartSimulateVar.update(description = des, varModel = varModel).where(HartSimulateVar.varName == varName).execute() else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) class GlobalVarManager(object): # 所有支持的变量模型类列表 allVarClass = [ ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ] @classmethod def getAllVarNames(cls): """获取所有协议模型中存在的变量名(去重集合)""" allVarNames = set() for varClass in cls.allVarClass: try: # 查询该模型类的所有变量名 query = varClass.select(varClass.varName) names = {var.varName for var in query} allVarNames.update(names) except Exception as e: print(f"查询 {varClass.__name__} 变量名失败: {str(e)}") return allVarNames @classmethod def isVarNameExist(cls, varName): """检查新变量名是否在任意协议模型中已存在""" existingNames = cls.getAllVarNames() if str(varName) in existingNames: return True else: return False @classmethod def importVar(cls,sheetName, data): """导入变量""" match sheetName: case 'ModbusTCP主站': for index, row in data.iterrows(): ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row['变量描述'], address=row['寄存器地址'], slaveID=row['从站地址'], min=row['工程量下限'], max=row['工程量上限'], order=row['字节顺序'], modbusType='ModbusTcpMaster', varModel=row['值类型'] ) case 'ModbusTCP从站': for index, row in data.iterrows(): ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row['变量描述'], address=row['寄存器地址'], slaveID=row['从站地址'], min=row['工程量下限'], max=row['工程量上限'], order=row['字节顺序'], modbusType='ModbusTcpSlave', varModel=row['值类型'] ) case 'ModbusRTU主站': for index, row in data.iterrows(): ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row['变量描述'], address=row['寄存器地址'], slaveID=row['从站地址'], min=row['工程量下限'], max=row['工程量上限'], order=row['字节顺序'], modbusType='ModbusTcpSlave', varModel=row['值类型'] ) case 'ModbusRTU从站': for index, row in data.iterrows(): ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row['变量描述'], address=row['寄存器地址'], slaveID=row['从站地址'], min=row['工程量下限'], max=row['工程量上限'], order=row['字节顺序'], modbusType='ModbusTcpSlave', varModel=row['值类型'] ) case 'IO': for index, row in data.iterrows(): AnalogManage.importVarForm( varName=row['变量名'], channelNumber=row['通道序号'], varType=row['变量类型'], des=row['变量描述'], min=row['工程量下限'], max=row['工程量上限'], varModel=row['值类型'] ) case 'TCRTD': for index, row in data.iterrows(): TcRtdManage.importVarForm( varName=row['变量名'], channelNumber=row['通道序号'], varType=row['变量类型'], des=row['变量描述'], min=row['工程量下限'], max=row['工程量上限'], compensationVar=row['补偿值'], varModel=row['值类型'] ) case 'Hart模拟': for index, row in data.iterrows(): HartSimulateVarManage.importVarForm( varName=row['变量名'], des=row['变量描述'], varModel=row['值类型'] ) case 'Hart读取': for index, row in data.iterrows(): HartVarManage.importVarForm( varName=row['变量名'], des=row['变量描述'], varModel=row['值类型'] ) modelLists = ['ModbusTcpMasterTable', 'ModbusTcpSlaveTable', 'ModbusRtuMasterTable', \ 'ModbusRtuSlaveTable', 'HartTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable'] for l in modelLists: Globals.getValue(l).model.initTable()