import re from numpy import var import pandas as pd from utils import Globals import openpyxl from utils.DBModels.ProtocolModel import ModbusTcpMasterVar, ModbusRtuMasterVar, ModbusRtuSlaveVar,\ ModbusTcpSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar from PyQt5.Qt import QFileDialog, QMessageBox from protocol.ProtocolManage import ProtocolManage # ID 从站地址 变量名 变量描述 变量类型 寄存器地址 工程量下限 工程量上限 # 获取ProtocolManage单例或全局对象(假设为全局唯一,实际项目可根据实际情况调整) def refreshCache(): protocolManagerInstance = Globals.getValue('protocolManage') if protocolManagerInstance: protocolManagerInstance.refreshVarCache() class ModbusVarManage(object): ModBusVarClass = { 'ModbusTcpMaster': ModbusTcpMasterVar, 'ModbusTcpSlave': ModbusTcpSlaveVar, 'ModbusRtuMaster': ModbusRtuMasterVar, 'ModbusRtuSlave': ModbusRtuSlaveVar, } def __init__(self): super().__init__() @classmethod def getVarClass(self, modbusType): """根据类型获取对应的变量类""" return self.ModBusVarClass.get(modbusType) @classmethod def importVarForm(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel, index): pattern = re.compile(r'[^0-9\.-]+') varClass = self.getVarClass(modbusType) if varType not in [0, 1, 3, 4]: errorInfo = '表{}第{}行导入失败,{}'.format(str(modbusType), str(index + 1), '变量类型有误') return errorInfo if not varName or not address or not slaveID: errorInfo = '表{}第{}行导入失败,有关键字段为空'.format(str(modbusType), str(index + 1)) return errorInfo elif re.findall(pattern, str(address)) or re.findall(pattern, str(slaveID)) or \ re.findall(pattern, str(min)) or re.findall(pattern, str(max)): errorInfo = '表{}第{}行导入失败,{}'.format(str(modbusType), str(index + 1), '寄存器地址、从站地址、工程量下限、工程量上限只能为数字') return errorInfo if varModel not in ['本地值', '远程值', '模拟值']: errorInfo = '表{}第{}行导入失败,{}'.format(str(modbusType), str(index + 1), '变量模型有误') return errorInfo if order not in ['int', 'ABCD', 'BADC', 'CDAB', 'DCBA']: errorInfo = '表{}第{}行导入失败,{}'.format(str(modbusType), str(index + 1), '字节顺序有误') return errorInfo if varClass.getByName(varName): # 如果变量已存在,更新其信息 varClass.update(varType=varType, description=des, address=address, slaveID=slaveID, min=min, max=max, order=order, varModel=varModel).where(varClass.varName == varName).execute() else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) # 操作后刷新缓存 refreshCache() # def importModbusVar(self, path): # # 导入modbus变量表 # wb = openpyxl.load_workbook(path) # ws = wb.active # IDIndex = None # slaveIndex = None # nameIndex = None # desIndex = None # typeIndex = None # addrIndex = None # minIndex = None # maxIndex = None # for index, cell in enumerate(list(ws.iter_rows())[0]): # if cell.value in ['id', 'ID']: # IDIndex = index # if cell.value == '从站地址': # slaveIndex = index # if cell.value == '变量名': # nameIndex = index # if cell.value == '变量描述': # desIndex = index # if cell.value == '变量类型': # typeIndex = index # if cell.value == '寄存器地址': # addrIndex = index # if cell.value == '工程量下限': # minIndex = index # if cell.value == '工程量上限': # maxIndex = index # # print(IDIndex, slaveIndex, nameIndex, desIndex, typeIndex, addrIndex, minIndex, maxIndex) # if IDIndex == None or slaveIndex == None or nameIndex == None or desIndex == None or typeIndex == None or addrIndex == None or minIndex == None or maxIndex == None: # # print('表头错误') # return '表头错误' # errorConList = [] # for index, row in enumerate(list(ws.iter_rows())[1:]): # try: # l = [str(x.value) for x in row] # varName = l[nameIndex] # varType = l[typeIndex] # des = '' if l[desIndex] == 'None' else l[desIndex] # address = l[addrIndex] # slaveID = l[slaveIndex] # min = '' if l[minIndex] == 'None' else l[minIndex] # max = '' if l[maxIndex] == 'None' else l[maxIndex] # if {varName, varType, address, slaveID} & {'None'}: # errorConList.append('第{}行导入失败,有关键字段为空'.format(str(index + 1))) # continue # if varType not in ['0', '1', '3', '4']: # errorConList.append('第{}行导入失败,变量类型有误'.format(str(index + 1))) # continue # if ModBusVar.getByName(varName): # ModBusVar.update(varType = varType, description = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int').where(ModBusVar.varName == varName).execute() # else: # ModBusVarModel = ModBusVar() # ModBusVarModel.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = 'int') # except Exception as e: # errorConList.append('第{}行导入失败,{}'.format(str(index + 1), str(e))) # continue # else: # if errorConList: # return '\r\n'.join(errorConList) # else: # return '导入成功' @classmethod def exportModbusVar(self, excelPath): vars = ModBusVar.get_all() if vars is 'error': return wb = openpyxl.Workbook() # Create a new worksheet ws = wb.active ws.title = "Modbus Variables" ws.append(['ID', '从站地址', '变量名', '变量描述', '变量类型', '寄存器地址', '工程量下限', '工程量上限']) for var in vars: ws.append([var.id, var.slaveID, var.varName, var.description, var.varType, var.address, var.min, var.max]) # Save the workbook wb.save(excelPath) @classmethod def createVar(self, varName, varType, des, address, slaveID, min, max, order, modbusType, varModel): # 创建变量 name = str(varName) des = str(des) varType = str(varType) address = str(address) slaveID = str(slaveID) min = str(min) max = str(max) order = order varModel = str(varModel) if self.getVarClass(modbusType).getByName(name): return 1 else: modbusVarType =self.getVarClass(modbusType)() modbusVarType.createVar(varName = varName, varType = varType, des = des, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def deleteVar(self, name, modbusType): # 删除变量 name = str(name) # print(name) self.getVarClass(modbusType).deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varType, slaveID, address, min, max, order, modbusType, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varType = str(varType) slaveID = str(slaveID) address = str(address) min = str(min) max = str(max) varModel = str(varModel) if Nname == name: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() elif self.getVarClass(modbusType).getByName(Nname): print('已有同名变量') return elif not self.getVarClass(modbusType).getByName(name): print('不存在的变量') return else: self.getVarClass(modbusType).update(varName = Nname, description = des, varType = varType, address = address, slaveID = slaveID, min = min, max = max, order = order, varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editOrder(self, name, order, modbusType): name = str(name) order = str(order) self.getVarClass(modbusType).update(order = order).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel, modbusType): name = str(name) varModel = str(varModel) self.getVarClass(modbusType).update(varModel = varModel).where(self.getVarClass(modbusType).varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self, modbusType): # 查询所有变量 vars = self.getVarClass(modbusType).get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel, var.order]) return l @classmethod def getByName(self, name, modbusType): # 查询指定变量信息 var = self.getVarClass(modbusType).getByName(name) if var: return [var.id, var.varName, var.description, var.varType, var.slaveID, var.address, var.min, var.max, var.varModel] else: return False class HartVarManage(object): def __init__(self): super(HartVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) if HartVar.getByName(name): print('已有同名变量') else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = HartVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) varModel = str(varModel) if Nname == name: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() elif HartVar.getByName(Nname): print('已有同名变量') return elif not HartVar.getByName(name): print('不存在的变量') return else: HartVar.update(varName = Nname, description = des, varModel = varModel).where(HartVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) HartVar.update(varModel = str(varModel)).where(HartVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if HartVar.getByName(varName): # 如果变量已存在,更新其信息 HartVar.update(description = des, varModel = varModel).where(HartVar.varName == varName).execute() else: hartVarType = HartVar() hartVarType.createVar(varName = varName, des = des, varModel = varModel) # 操作后刷新缓存 refreshCache() class TcRtdManage(object): def __init__(self): super(TcRtdManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel, unit): # 创建变量 name = str(varName) des = str(des) channelNumber = str(channelNumber) varModel = str(varModel) if TcRtdVar.getByName(name): print('已有同名变量') else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel, unit=unit) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = TcRtdVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.initialValue, var.varName, var.channelNumber,var.description, var.varType, var.min, var.max, var.unit, var.compensationVar, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) TcRtdVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max, compensationVar, unit): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) compensationVar = str(compensationVar) unit = str(unit) if Nname == name: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar, unit = unit).where(TcRtdVar.varName == name).execute() elif TcRtdVar.getByName(Nname): print('已有同名变量') return elif not TcRtdVar.getByName(name): print('不存在的变量') return else: TcRtdVar.update(varName=Nname, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar, unit = unit).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = TcRtdVar.getByName(name) if var: return [var.id, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.unit, var.compensationVar, var.varModel] else: return False def initVar(self): # 创建变量 for i in range(1, 9): name = '热偶输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='R', des='TC', min='100', max='200', compensationVar = '0', varModel = '本地值', unit='℃') for i in range(1, 9): name = '热阻输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='PT100', des='RTD', min='100', max='200', compensationVar = '0', varModel = '本地值', unit='℃') @classmethod def editvarType(self, name, varType): name = str(name) TcRtdVar.update(varType = str(varType)).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) TcRtdVar.update(varModel = str(varModel)).where(TcRtdVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, compensationVar, varModel, unit, initialValue='', index = None): varTypeList = ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A'] pattern = re.compile(r'[^0-9\.-]+') # or re.findall(pattern, varName) if not varName or not varType or not varModel or not min or not max or not channelNumber : errorInfo = '表TCRTD第{}行导入失败,有关键字段为空'.format(str(index + 1)) return errorInfo if re.findall(pattern, str(channelNumber)) or re.findall(pattern, str(min)) or re.findall(pattern, str(max)): errorInfo = '表TCRTD第{}行导入失败,{}'.format(str(index + 1), '通道号、工程量下限、工程量上限只能为数字') return errorInfo if varType not in varTypeList or varType != 'PT100': errorInfo = '表TCRTD第{}行导入失败,{}'.format(str(index + 1), str(index + 1), '变量类型错误') if varModel not in ['本地值', '远程值', '模拟值']: errorInfo = '表TCRTD第{}行导入失败,{}'.format(str(index + 1), str(index + 1), '变量模型有误') return errorInfo if TcRtdVar.getByChannelNumber(channelNumber): # 如果通道号已存在,更新其信息 if varType in varTypeList: #导入热电偶 TcRtdVar.update(varName = varName, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar, unit = unit, initialValue=initialValue).where((TcRtdVar.channelNumber == channelNumber) & (TcRtdVar.varType != 'PT100')).execute() else: #导入热电阻 TcRtdVar.update(varName = varName, channelNumber = channelNumber, description=des, varType=varType, min=min, max=max, compensationVar = compensationVar, unit = unit, initialValue=initialValue).where((TcRtdVar.channelNumber == channelNumber) & (TcRtdVar.varType == 'PT100')).execute() else: tcRtdVarType = TcRtdVar() tcRtdVarType.createVar(varName = varName, channelNumber=channelNumber, des = des, varType = varType, min = min, max = max, compensationVar = compensationVar, varModel = varModel, unit = unit,nitialValue=initialValue) # 操作后刷新缓存 refreshCache() class AnalogManage(object): def __init__(self): super(AnalogManage, self).__init__() @classmethod def createVar(self, varName, channelNumber, varType, des, min, max, varModel, unit, initialValue=''): # 创建变量 name = str(varName) channelNumber = str(channelNumber) des = str(des) varModel = str(varModel) unit = str(unit) initialValue = str(initialValue) # if AnalogVar.getByName(name): # print('已有同名变量') # else: analogVarType = AnalogVar() analogVarType.createVar(varName = name, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel, unit=unit, initialValue=initialValue) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = AnalogVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.initialValue, var.varName, var.channelNumber, var.description, var.varType, var.min, var.max, var.unit, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) AnalogVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, channelNumber, des, varType, min, max, unit, initialValue=''): # 修改变量信息 name = str(name) Nname = str(Nname) channelNumber = str(channelNumber) des = str(des) varType = str(varType) min = str(min) max = str(max) unit = str(unit) if Nname == name: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max, unit = unit).where(AnalogVar.varName == name).execute() elif AnalogVar.getByName(Nname): print('已有同名变量') return elif not AnalogVar.getByName(name): print('不存在的变量') return else: AnalogVar.update(varName=Nname, channelNumber =channelNumber, description=des, varType=varType, min=min, max=max, unit = unit).where(AnalogVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) AnalogVar.update(varModel = str(varModel)).where(AnalogVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = AnalogVar.getByName(name) if var: return [var.id, var.varName, var. channelNumber, var.description, var.varType, var.min, var.max, var.varModel, var.unit] else: return False def initVar(self): for i in range(1, 13): name = '无源4-20mA输出通道' + str(i) des = '无源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber= str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值', unit='mA') for i in range(13, 17): name = '有源4-20mA输出通道' + str(i) des = '有源4-20mA输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AO', des=des, min='100', max='200', varModel = '本地值', unit='mA') for i in range(1, 17): name = '有源24V数字输出通道' + str(i) des = '有源24V数字输出' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DO', des=des, min='0', max='1', varModel = '本地值', unit='V') for i in range(1, 9): name = '无源24V数字输入通道' + str(i) des = '无源24V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='0', max='1', varModel = '本地值', unit='V') for i in range(9, 17): name = '有源48V数字输入通道' + str(i) des = '有源48V数字输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='DI', des=des, min='0', max='1', varModel = '本地值', unit='V') for i in range(1, 9): name = '有源/无源4-20mA输入通道' + str(i) des = '有源/无源4-20mA输入' + str(i) self.createVar(varName=name, channelNumber=str(i), varType='AI', des=des, min='100', max='200', varModel = '本地值', unit='mA') @classmethod def importVarForm(self, varName, channelNumber, varType, des, min, max, varModel, unit, initialValue='', index = None): pattern = re.compile(r'[^0-9\.-]+') if not varName or not channelNumber or not varType or not varModel: errorInfo = '表IO第{}行导入失败,有关键字段为空'.format(str(index + 1)) return errorInfo if re.findall(pattern, str(channelNumber)) or re.findall(pattern, str(min)) or re.findall(pattern, str(max)): errorInfo = '表IO第{}行导入失败,{}'.format(str(index + 1), '通道号、工程量下限、工程量上限只能为数字') return errorInfo if varType not in ['AI', 'AO', 'DI', 'DO']: errorInfo = '表IO第{}行导入失败,{}'.format(str(index + 1), '变量类型错误') return errorInfo if varModel not in ['本地值', '远程值', '模拟值']: errorInfo = '表IO第{}行导入失败,{}'.format(str(index + 1), '变量模型有误') return errorInfo if AnalogVar.getByChannelNumber(channelNumber): # 如果通道号已存在,更新其信息 AnalogVar.update(varName = varName, description=des, varType=varType, min=min, max=max, unit = unit, initialValue=initialValue).where((AnalogVar.channelNumber == channelNumber) & (AnalogVar.varType == varType)).execute() else: analogVarType = AnalogVar() analogVarType.createVar(varName = varName, channelNumber = channelNumber, des = des, varType = varType, min = min, max = max, varModel = varModel,unit = unit, initialValue=initialValue) # 操作后刷新缓存 refreshCache() class HartSimulateVarManage(object): def __init__(self): super(HartSimulateVarManage, self).__init__() @classmethod def createVar(self, varName, des, varModel): # 创建变量 name = str(varName) des = str(des) varModel = str(varModel) if HartSimulateVar.getByName(name): print('已有同名变量') else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) # 操作后刷新缓存 refreshCache() @classmethod def getAllVar(self): # 查询所有变量 vars = HartSimulateVar.get_all() if vars is 'error': return l = [] for var in vars: l.append([var.id, var.varName, var.description, var.varModel]) return l @classmethod def deleteVar(self, name): # 删除变量 name = str(name) # print(name) HartSimulateVar.deleteVar(name = name) # 操作后刷新缓存 refreshCache() @classmethod def editVar(self, name, Nname, des, varModel): # 修改变量信息 name = str(name) Nname = str(Nname) des = str(des) if Nname == name: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() elif HartSimulateVar.getByName(Nname): print('已有同名变量') return elif not HartSimulateVar.getByName(name): print('不存在的变量') return else: HartSimulateVar.update(varName = Nname, description = des, varModel = varModel).where(HartSimulateVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def editVarModel(self, name, varModel): name = str(name) HartSimulateVar.update(varModel = str(varModel)).where(HartSimulateVar.varName == name).execute() # 操作后刷新缓存 refreshCache() @classmethod def getByName(self, name): # 查询指定变量信息 var = HartSimulateVar.getByName(name) if var: varMes = [var.id, var.varName, var.description, var.varModel] [varMes.append('') for x in range(10)] return varMes else: return False def initVar(self): self.createVar(varName='name', des='TC', varModel='本地值') @classmethod def importVarForm(self, varName, des, varModel): if AnalogVar.getByName(varName): # 如果变量已存在,更新其信息 HartSimulateVar.update(description = des, varModel = varModel).where(HartSimulateVar.varName == varName).execute() else: hartSimulateVarType = HartSimulateVar() hartSimulateVarType.createVar(varName = varName, des = des,varModel = varModel) # 操作后刷新缓存 refreshCache() class GlobalVarManager(object): # 所有支持的变量模型类列表 allVarClass = [ ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar, HartVar, TcRtdVar, AnalogVar, HartSimulateVar ] @classmethod def getAllVarNames(cls): """获取所有协议模型中存在的变量名(去重集合)""" allVarNames = set() for varClass in cls.allVarClass: try: # 查询该模型类的所有变量名 query = varClass.select(varClass.varName) names = {var.varName for var in query} allVarNames.update(names) except Exception as e: print(f"查询 {varClass.__name__} 变量名失败: {str(e)}") return allVarNames @classmethod def isVarNameExist(cls, varName): """检查新变量名是否在任意协议模型中已存在""" existingNames = cls.getAllVarNames() if str(varName) in existingNames: return True else: return False @classmethod def importVar(cls, sheetName, data): """导入变量""" # 在处理数据前,将所有NaN值替换为空字符串 data = data.fillna('') errorConList = [] match sheetName: case 'ModbusTCP主站' | 'ModbusTCP从站' | 'ModbusRTU主站' | 'ModbusRTU从站': requiredFields = ['变量名', '变量类型', '寄存器地址', '从站地址'] for index, row in data.iterrows(): # 检查关键字段是否为空 if any(pd.isna(row[field]) for field in requiredFields): errorConList.append(f'第{index+1}行关键字段为空') continue try: # 统一处理Modbus导入 modbusType = 'ModbusTcpMaster' if sheetName == 'ModbusTCP主站' else \ 'ModbusTcpSlave' if sheetName == 'ModbusTCP从站' else \ 'ModbusRtuMaster' if sheetName == 'ModbusRTU主站' else 'ModbusRtuSlave' errInfo = ModbusVarManage.importVarForm( varName=row['变量名'], varType=row['变量类型'], des=row.get('变量描述', ''), address=row['寄存器地址'], slaveID=row['从站地址'], min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), order=row.get('字节顺序', ''), modbusType=modbusType, varModel=row.get('值类型', ''), index=index ) if errInfo: errorConList.append(errInfo) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') case 'IO' | 'TCRTD': requiredFields = ['变量名', '通道序号', '变量类型'] if sheetName == 'IO' else \ ['变量名', '通道序号', '变量类型', '补偿值'] importFunc = AnalogManage.importVarForm if sheetName == 'IO' else TcRtdManage.importVarForm for index, row in data.iterrows(): if any(pd.isna(row[field]) for field in requiredFields): errorConList.append(f'第{index+1}行关键字段为空') continue try: if sheetName == 'IO': errInfo = importFunc( varName=row['变量名'], channelNumber=row['通道序号'], varType=row.get('变量类型', ''), des=row.get('变量描述', ''), min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), unit=row.get('单位', ''), varModel=row.get('值类型', ''), initialValue=row.get('初始值', ''), index=index ) if errInfo: errorConList.append(errInfo) else: # TCRTD errInfo = importFunc( varName=row['变量名'], channelNumber=row['通道序号'], varType=row['变量类型'], des=row.get('变量描述', ''), min=row.get('工程量下限', ''), max=row.get('工程量上限', ''), unit=row.get('单位', ''), compensationVar=row['补偿值'], varModel=row.get('值类型', ''), initialValue=row.get('初始值', ''), index=index ) if errInfo: errorConList.append(errInfo) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') case 'Hart模拟' | 'Hart读取': requiredFields = ['变量名'] importFunc = HartSimulateVarManage.importVarForm if sheetName == 'Hart模拟' else HartVarManage.importVarForm for index, row in data.iterrows(): if pd.isna(row['变量名']): errorConList.append(f'第{index+1}行变量名为空') continue try: importFunc( varName=row['变量名'], des=row.get('变量描述', ''), varModel=row.get('值类型', '') ) except Exception as e: errorConList.append(f'第{index+1}行导入失败: {str(e)}') modelLists = ['ModbusTcpMasterTable', 'ModbusTcpSlaveTable', 'ModbusRtuMasterTable', 'ModbusRtuSlaveTable', 'HartTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable'] for l in modelLists: Globals.getValue(l).model.initTable() if errorConList: return '\r\n'.join(errorConList)