import collections import json from utils.DBModels.DeviceModels import DeviceDB from model.ProjectModel.AreaManage import Area import numpy as np from protocol.ModBus.ByteOrder import * from protocol.ModBus.TCPMaster import * import struct #从站 "AI" "DI"可强制 #主站 "AO" "DO"可强制 class Device(): inputStartAddress = 0 inputEndAddress = 0 outputStartAddress = 0 outputEndAddress = 0 protocolType = None masterOrSlave = None deviceName = None deviceWidget = None areaTabWidget = [] def __init__(self): self.inputAreas = [] self.outputAreas = [] self.indexDict = collections.OrderedDict() # 有序字典 键:总区域索引 值: [类型(输入 : 0, 或输出 : 1), 该类型的第几个区域索引] def addArea(self, type, nums, bytes, order = 'ABCD'): area = Area() bytes = int(bytes) area.type = type area.order = order area.bytes = bytes # type: ignore area.length = self.getLength(nums, bytes) # type: ignore area.nums = nums area.currentValue = [0] * nums if type in ["AI", "DI"]: area.startAddress = 0 if not self.inputEndAddress else self.inputEndAddress + 1 # type: ignore area.endAddress = area.startAddress + area.length # type: ignore self.inputEndAddress = area.endAddress area.addressList = np.arange(area.startAddress, area.endAddress + 1, area.bytes).tolist() self.indexDict[len(self.indexDict.values())] = len(self.inputAreas) self.inputAreas.append(area) elif type in ["DO" , "AO"]: area.startAddress = 0 if not self.outputEndAddress else self.outputEndAddress + 1 # type: ignore area.endAddress = area.startAddress + area.length # type: ignore self.outputEndAddress = area.endAddress area.addressList = np.arange(area.startAddress, area.endAddress + 1, area.bytes).tolist() self.indexDict[len(self.indexDict.values())] = len(self.outputAreas) self.outputAreas.append(area) # print(id(self), self.outputAreas) # print(area.addressList, area.startAddress, area.endAddress, self.outputAreas) def delArea(self, index, type): if type in ["DI", "AI"]: self.inputAreas.pop(self.indexDict[index]) elif type in ["AO", "DO"]: self.outputAreas.pop(self.indexDict[index]) # self.recalculateAddress() def recalculateAddress(self): for inputOrOutput, areas in enumerate([self.inputAreas, self.outputAreas]): endAddress = 0 for index, area in enumerate(areas): if index == 0 and inputOrOutput == 0: area.startAddress = self.inputStartAddress elif index == 0 and inputOrOutput == 1: area.startAddress = self.outputStartAddress else: area.startAddress = areas[index - 1].endAddress + 1 area.endAddress = area.startAddress + area.length area.addressList = np.arange(area.startAddress, area.endAddress + 1, area.bytes).tolist() # print(area.addressList, area.startAddress, area.endAddress, self.deviceName) endAddress = area.endAddress else: if inputOrOutput == 0: self.inputEndAddress = endAddress elif inputOrOutput == 1: self.outputEndAddress = endAddress # endAddress = 0 # print(self.deviceName, self.inputEndAddress) def editArea(self, index, type, order, bytes): if type in ["DI", "AI"]: self.inputAreas[self.indexDict[index]].order = order self.inputAreas[self.indexDict[index]].bytes = bytes elif type in ["AO", "DO"]: self.outputAreas[self.indexDict[index]].order = order self.outputAreas[self.indexDict[index]].bytes = bytes # self.recalculateAddress() def getAreaValues(self, index): if self.masterOrSlave == "从站": return self.outputAreas[index].currentValue else: # print(self.inputAreas, index) return self.inputAreas[index].currentValue def getLength(self, nums, bytes): length = int(nums) * int(bytes) # length = length / 2 return length @classmethod def delAreas(self, deviceName, id): jsonCon = json.loads(DeviceDB.getByName(deviceName=deviceName).areaJson) if not jsonCon or len(jsonCon) < id: return jsonCon.pop(id) if jsonCon == []: areaJson = None DeviceDB.update(areaJson=areaJson).where(DeviceDB.deviceName == deviceName).execute() else: for index, areajsonId in enumerate(jsonCon): areajsonId["id"] = index + 1 areaJson = json.dumps(jsonCon) DeviceDB.update(areaJson=areaJson).where(DeviceDB.deviceName == deviceName).execute() @classmethod def getAreaJson(self, deviceNames): deviceName = deviceNames jsonConsStr = DeviceDB.getByName(deviceName=deviceName).areaJson if jsonConsStr is None: return else: jsonCons = json.loads(jsonConsStr) return jsonCons class DevicesManange(): def __init__(self): self.dpMasterDevices = collections.OrderedDict() self.dpSlaveDevices = collections.OrderedDict() # 有序字典 (OrderedDict) self.paMasterDevices = collections.OrderedDict() self.paSlaveDevices = collections.OrderedDict() self.dpSlaveModbus = TcpMaster(host = '192.168.2.10', port = 502) self.paSlaveModbus = TcpMaster(host = '192.168.4.10', port = 502) self.dpMasterModbus = TcpMaster(host = '192.168.0.40', port = 502) self.paMasterModbus = TcpMaster(host = '192.168.3.10', port = 502) def addDevice(self, proType, masterSlaveModel, deviceName, deviceWidget, areaTabWidget): device = Device() device.type = proType device.masterOrSlave = masterSlaveModel device.deviceName = deviceName device.deviceWidget = deviceWidget device.areaTabWidget.append(areaTabWidget) if proType == "DP" and masterSlaveModel == "主站": curProDict = self.dpMasterDevices elif proType == "DP" and masterSlaveModel == "从站": curProDict = self.dpSlaveDevices elif proType == "PA" and masterSlaveModel == "主站": curProDict = self.paMasterDevices elif proType == "PA" and masterSlaveModel == "从站": curProDict = self.paSlaveDevices if len(curProDict) == 0: device.inputStartAddress = 0 device.outputStartAddress = 0 else: device.inputStartAddress = list(curProDict.values())[-1].inputEndAddress + 1 device.outputStartAddress = list(curProDict.values())[-1].outputEndAddress + 1 curProDict[deviceName] = device def initDevices(self): pass def delDevice(self, deviceName): for devicesDict in [self.paMasterDevices, self.paSlaveDevices, self.dpMasterDevices, self.dpSlaveDevices]: if deviceName in devicesDict: del devicesDict[deviceName] self.recalculateAddress() def recalculateAddress(self): for devicesDict in [self.paMasterDevices, self.paSlaveDevices, self.dpMasterDevices, self.dpSlaveDevices]: # print(len(devicesDict)) for index, (deviceName, device) in enumerate(devicesDict.items()): if index == 0: device.inputStartAddress = 0 device.outputStartAddress = 0 else: inputAddress = list(devicesDict.values())[index - 1].inputEndAddress outputAddress = list(devicesDict.values())[index - 1].outputEndAddress device.inputStartAddress = 0 if inputAddress == 0 else inputAddress + 1 device.outputStartAddress = 0 if outputAddress == 0 else outputAddress + 1 # print(device.inputStartAddress, device.inputEndAddress, deviceName, '输入') # print(device.outputStartAddress, device.outputEndAddress, deviceName, 'shuchu') device.recalculateAddress() # previousDevice = device def getDevice(self, deviceName): for devicesDict in [self.paMasterDevices, self.paSlaveDevices, self.dpMasterDevices, self.dpSlaveDevices]: if deviceName in devicesDict: return devicesDict[deviceName] def getAllDeviceObj(self): deviceObjLst = [] for devicesDict in [self.paMasterDevices, self.paSlaveDevices, self.dpMasterDevices, self.dpSlaveDevices]: for deviceName in devicesDict: deviceObjLst.append(devicesDict[deviceName]) return deviceObjLst def writeAreas(self, deviceName, values): # print(values) bytes = b"" device = self.getDevice(deviceName) # print(device, deviceName) if device.type == "DP" and device.masterOrSlave == "主站": curProDict = self.dpMasterDevices areas = device.outputAreas modbusM = self.dpMasterModbus elif device.type == "DP" and device.masterOrSlave == "从站": curProDict = self.dpSlaveDevices areas = device.inputAreas modbusM = self.dpSlaveModbus elif device.type == "PA" and device.masterOrSlave == "主站": areas = device.outputAreas curProDict = self.paMasterDevices modbusM = self.paMasterModbus elif device.type == "PA" and device.masterOrSlave == "从站": areas = device.inputAreas curProDict = self.paSlaveDevices modbusM = self.paSlaveModbus # print(values) for area, value in zip(areas, values): area.forceValue = value if area.type in ['DO', 'DI'] and device.type == 'PA': area.forceValue = value[8:] + value[:8] elif area.type in ['DO', 'DI'] and device.type == 'DP' and device.masterOrSlave == "主站": area.forceValue = value[8:] + value[:8] for device in curProDict.values(): forceAreas = device.outputAreas if device.masterOrSlave == "主站" else device.inputAreas for area in forceAreas: # print(area.forceValue) if area.type in ["AI", "AO"]: byte = floatToBytes(area.forceValue, area.bytes, order = area.order) elif area.type in ["DI", "DO"]: # if device.type == "PA" and device.masterOrSlave == "从站" and len(area.forceValue) == 16: byte = coilsToBytes([int(x) for x in area.forceValue], area.bytes) bytes += byte else: if len(bytes) % 2 != 0: bytes += struct.pack('B', 0) # print(bytes) values = struct.unpack('!' + 'H' * int(len(bytes) / 2), bytes) modbusM.writeMultipleRegister(slaveId = 1, address = 0, outputValue = values) # print(struct.unpack('>f', struct.pack('!HH', *values[:2]))) def readAreas(self): for index, curProDict in enumerate([self.paMasterDevices, self.paSlaveDevices, self.dpMasterDevices, self.dpSlaveDevices]): match index: case 0: areaType = 'input' modbusM = self.paMasterModbus case 1: areaType = 'output' modbusM = self.paSlaveModbus case 2: areaType = 'input' modbusM = self.dpMasterModbus case 3: areaType = 'output' modbusM = self.dpSlaveModbus if len(list(curProDict.values())) == 0: continue inputEndAddress = max([x.inputEndAddress for x in list(curProDict.values())]) outputEndAddress = max([x.outputEndAddress for x in list(curProDict.values())]) bytesNums = inputEndAddress if areaType == 'input' else outputEndAddress intNums = int(bytesNums / 2) if bytesNums % 2 == 0 else int(bytesNums / 2) + 1 if bytesNums == 0: continue intValues = modbusM.readHoldingRegisters(slaveId = 1, startAddress = 0, varNums = intNums) bytesValues = struct.pack(f"!{'H' * len(intValues)}", *intValues) for device in curProDict.values(): readAreas = device.inputAreas if areaType == 'input' else device.outputAreas for area in readAreas: if area.startAddress == 0: bytes = bytesValues[area.startAddress:area.endAddress] else: bytes = bytesValues[area.startAddress - 1:area.endAddress] if area.type in ['AI', 'AO']: for i in range(0, len(bytes), 4): byte = bytes[i:i+4] if len(byte) != 4: continue area.currentValue[i] = round(struct.unpack('!f', reorderBytes(byte, area.order))[0], 4) elif area.type in ['DI', 'DO']: if device.masterOrSlave == '主站' and device.type == 'DP': bytes = bytes else: bytes = bytes[::-1] area.currentValue = bytesToCoils(bytes) # print(area.currentValue) @classmethod def addAreas(self, type, order, bytes, deviceName): if DeviceDB.getByName(deviceName=deviceName).areaJson is None: jsonCon = ([{ "id": 1, "type": type, "order": order, "bytes": bytes, }]) else: jsonCon = json.loads(DeviceDB.getByName(deviceName=deviceName).areaJson) id = jsonCon[-1]["id"] + 1 jsonCon.append({ "id": id, "type": type, "order": order, "bytes": bytes, }) areaJson = json.dumps(jsonCon) DeviceDB.update(areaJson=areaJson).where(DeviceDB.deviceName == deviceName).execute() @classmethod def updataAreas(self, type, order, bytes, deviceName, index): if DeviceDB.getByName(deviceName=deviceName) is None: return False else: jsonCon = json.loads(DeviceDB.getByName(deviceName=deviceName).areaJson) for area in jsonCon: if index == area["id"]: area["type"] = type area["order"] = order area["bytes"] = bytes areaJson = json.dumps(jsonCon) DeviceDB.update(areaJson=areaJson).where(DeviceDB.deviceName == deviceName).execute() @classmethod def getAreaID(self, deviceNames): deviceName = deviceNames jsonConsStr = DeviceDB.getByName(deviceName=deviceName).areaJson if jsonConsStr is None: return else: jsonCons = json.loads(jsonConsStr) id = [] for jsonCon in jsonCons: id.append(jsonCon["id"]) return id @classmethod def getChannelLength(self, deviceName): number = 0 if DeviceDB.getByName(deviceName=deviceName).areaJson is None: return number else: numbers = json.loads(DeviceDB.getByName(deviceName=deviceName).areaJson) for i in numbers: number += int(i['nums']) return number def editDevies(self): pass @classmethod def getMasterSlaveModel(self, deviceName): deviceName = deviceName masterSlaveModel = DeviceDB.getByName(deviceName=deviceName).masterSlaveModel if masterSlaveModel: return masterSlaveModel @classmethod def getAllDevice(self): # 查询所有设备 devices = DeviceDB.get_all() if devices is 'error': return l = [] for x in devices: l.append([x.deviceName, x.proType, x.masterSlaveModel, x.areaJson]) return l