from pythonnet import load load('coreclr') import clr # 与其他模块不同此处不直接导入pythonnet 而是导入clr(python与c#交互核心) dll_path = 'protocol\\FF\\NationalInstruments.Fieldbus' # 为了方便理解这里给到dll所在的绝对路径且包含dll文件本身 clr.AddReference(dll_path) #加载c#库 clr.AddReference('System.Collections') clr.AddReference('System') from NationalInstruments.FieldBus import Session, FBDate from NationalInstruments.FieldBus.ParameterHandler import Parameter, ParameterType from NationalInstruments.FieldBus.ElementHandler import * from System.Collections.Generic import * from System import * # import numpy as np # session = Session() # session.Open() # link = session.Links[0] # link.Open() # print([x.Tag for x in link.Devices]) # dev = link.Devices[1] # VFD = dev.GetVfdByOrdinalNumber(0) # VFD.Open() # blocks = VFD.Blocks # print([x.Tag for x in blocks]) # block = blocks[1] # block.Open() # Params = block.Parameters # print([x.Type.ToString() + ' ' + x.Name for x in Params]) # print(byte(10)) class ParamItem(object): def __init__(self, param): self.param = param self.name = param.Name self.type = param.Type.ToString() self.elemDict = {} match(param.Type): case ParameterType.ODT_SIMPLEVAR: self.elements = [param.Element] case ParameterType.ODT_ARRAY: self.elements = param.Elements case ParameterType.ODT_RECORD: self.elements = param.Elements def getElements(self): for elem in self.elements: self.elemDict[elem.Name] = [elem.Type.ToString(), elem.ValueString(), elem] return self.elemDict class FFProtocol(object): """docstring for FF""" paramDict = {} def __init__(self): pass def open(self): self.session = Session() self.session.Open() self.link = self.session.Links[0] self.link.Open() def getDevices(self): self.devices = [x for x in self.link.Devices][1:] for device in self.devices: device.Open() VFD = device.GetVfdByOrdinalNumber(0) VFD.Open() return [x.Tag for x in self.devices] def getBlocks(self, devTag): device = self.link.GetDeviceByTag(devTag) VFD = device.GetVfdByOrdinalNumber(0) blocks = [x for x in VFD.Blocks] for block in blocks: block.Open() return [x.Tag for x in blocks] def getParams(self, devTag, blockTag): device = self.link.GetDeviceByTag(devTag) VFD = device.GetVfdByOrdinalNumber(0) block = VFD.GetBlockByTag(blockTag) Params = block.Parameters paramList = [] for param in Params: paramList.append(ParamItem(param = param)) if devTag not in self.paramDict: self.paramDict[devTag] = {blockTag : paramList} else: self.paramDict[devTag][blockTag] = paramList # print(self.paramDict) return paramList def refreshParams(self, reParams = None): if reParams: for param in reParams: param.Read() # print(reParams) # for device, blockDict in self.paramDict.items(): # for block, paramList in blockDict.items(): # else: # param.param.Read() # for elem in param.elements: # return elem.get_Value() def writeValue(self, param, elem, value): param.Read() match(elem.Type): case ElementType.FF_BOOLEAN: if value in [True, False]: elem.Value = value elif value in ['True', 'true']: elem.Value = True elif value in ['False', 'false']: elem.Value = False case ElementType.FF_INTEGER8 | ElementType.FF_INTEGER16: value = int(value) elem.Value = Int16(value) case ElementType.FF_INTEGER32 | ElementType.FF_UNSIGNED16: value = int(value) elem.Value = Int32(value) case ElementType.FF_UNSIGNED8: value = int(value) elem.Value = Byte(value) case ElementType.FF_UNSIGNED32: value = int(value) elem.Value = Int64(value) case ElementType.FF_FLOAT: value = float(value) elem.Value = Single(value) case ElementType.FF_VISIBLE_STRING: value = str(value) elem.Value = value case ElementType.FF_OCTET_STRING: value = str(value) if len(value) % 2 != 0: return startIndex = 0 # byteArray = [x for x in range(int(len(value) / 2))] byteArray = Array.CreateInstance(Byte, int(len(value) / 2)) for i in range(int(len(value) / 2)): varStr = value[startIndex:startIndex + 2] startIndex += 2 byteArray[i] = Byte(int(varStr)) elem.Value = byteArray case ElementType.FF_DATE: value = str(value) valueList = [int(x) for x in value.split(':')] elem.Value.Year = Byte(valueList[0]) elem.Value.Month = Byte(valueList[1]) elem.Value.DayOfMonth =Byte(valueList[2]) elem.Value.Hour = Byte(valueList[3]) elem.Value.Minute = Byte(valueList[4]) elem.Value.Millisecond = int(valueList[5]) case ElementType.FF_TIMEOFDAY | ElementType.FF_TIME_DIFF: value = str(value) valueList = [int(x) for x in value.split(':')] elem.Value.Day = int(valueList[0]) elem.Value.Millisecond = Int64(valueList[1]) case ElementType.FF_BIT_STRING: value = str(value) for s in value: if s not in ['0', '1']: return if len(value) % 8 != 0: return j = -1 byteArray = Array.CreateInstance(Byte, int(len(value) / 8)) for i in range(len(value)): if i % 8 == 0: j += 1 if value[i] == '1': # print(type(byteArray[j])) byteArray[j] += 1<<(7-(i % 8)) byteArray[j] = Byte(byteArray[j]) elem.Value = byteArray case ElementType.FF_TIME_VALUE: value = str(value) valueList = [int(x) for x in value.split(':')] elem.Value.Lower = Int64(valueList[0]) elem.Value.Upper = Int64(valueList[1]) param.Write() def close(self): for device in self.devices: device.Close() self.link.Close() self.session.Close() if __name__ == '__main__': a = FFProtocol() a.open() devs = a.getDevices() blocks = a.getBlocks(devs[0]) par = a.getParams(devs[0], blocks[0])[7] ele1 = par.getElements()['VALUE_1'][2] ele2 = par.getElements()['VALUE_2'][2] ele3 = par.getElements()['VALUE_3'][2] ele4 = par.getElements()['VALUE_4'][2] ele5 = par.getElements()['VALUE_5'][2] ele6 = par.getElements()['VALUE_6'][2] ele7 = par.getElements()['VALUE_7'][2] ele8 = par.getElements()['VALUE_8'][2] ele9 = par.getElements()['VALUE_9'][2] ele10 = par.getElements()['VALUE_10'][2] ele11 = par.getElements()['VALUE_11'][2] ele12 = par.getElements()['VALUE_12'][2] ele13 = par.getElements()['VALUE_13'][2] ele14 = par.getElements()['VALUE_14'][2] ele15 = par.getElements()['VALUE_15'][2] a.writeValue(par.param, ele1, True) a.writeValue(par.param, ele2, 1) a.writeValue(par.param, ele3, 2) a.writeValue(par.param, ele4, 3) a.writeValue(par.param, ele5, 4) a.writeValue(par.param, ele6, 5) a.writeValue(par.param, ele7, 6) a.writeValue(par.param, ele8, 1.1) a.writeValue(par.param, ele9, 'test') a.writeValue(par.param, ele10, '9999494949494949494949494949494949494949494949494949494949494949') a.writeValue(par.param, ele11, '23:02:18:14:49:50') a.writeValue(par.param, ele12, '18:59') a.writeValue(par.param, ele13, '25:60') a.writeValue(par.param, ele14, '1111000000001111') a.writeValue(par.param, ele15, '55:55') a.close()