|
|
from pythonnet import load
|
|
|
load('coreclr')
|
|
|
import clr # 与其他模块不同此处不直接导入pythonnet 而是导入clr(python与c#交互核心)
|
|
|
dll_path = 'protocol\\FF\\NationalInstruments.Fieldbus' # 为了方便理解这里给到dll所在的绝对路径且包含dll文件本身
|
|
|
clr.AddReference(dll_path) #加载c#库
|
|
|
clr.AddReference('System.Collections')
|
|
|
clr.AddReference('System')
|
|
|
from NationalInstruments.FieldBus import Session, FBDate
|
|
|
from NationalInstruments.FieldBus.ParameterHandler import Parameter, ParameterType
|
|
|
from NationalInstruments.FieldBus.ElementHandler import *
|
|
|
from System.Collections.Generic import *
|
|
|
from System import *
|
|
|
# import numpy as np
|
|
|
|
|
|
|
|
|
# session = Session()
|
|
|
# session.Open()
|
|
|
# link = session.Links[0]
|
|
|
# link.Open()
|
|
|
# print([x.Tag for x in link.Devices])
|
|
|
# dev = link.Devices[1]
|
|
|
# VFD = dev.GetVfdByOrdinalNumber(0)
|
|
|
# VFD.Open()
|
|
|
# blocks = VFD.Blocks
|
|
|
# print([x.Tag for x in blocks])
|
|
|
# block = blocks[1]
|
|
|
# block.Open()
|
|
|
# Params = block.Parameters
|
|
|
# print([x.Type.ToString() + ' ' + x.Name for x in Params])
|
|
|
# print(byte(10))
|
|
|
class ParamItem(object):
|
|
|
def __init__(self, param):
|
|
|
self.param = param
|
|
|
self.name = param.Name
|
|
|
self.type = param.Type.ToString()
|
|
|
self.elemDict = {}
|
|
|
match(param.Type):
|
|
|
case ParameterType.ODT_SIMPLEVAR:
|
|
|
self.elements = [param.Element]
|
|
|
case ParameterType.ODT_ARRAY:
|
|
|
self.elements = param.Elements
|
|
|
case ParameterType.ODT_RECORD:
|
|
|
self.elements = param.Elements
|
|
|
|
|
|
def getElements(self):
|
|
|
for elem in self.elements:
|
|
|
self.elemDict[elem.Name] = [elem.Type.ToString(), elem.ValueString(), elem]
|
|
|
return self.elemDict
|
|
|
|
|
|
|
|
|
class FFProtocol(object):
|
|
|
"""docstring for FF"""
|
|
|
paramDict = {}
|
|
|
def __init__(self):
|
|
|
pass
|
|
|
|
|
|
def open(self):
|
|
|
self.session = Session()
|
|
|
self.session.Open()
|
|
|
self.link = self.session.Links[0]
|
|
|
self.link.Open()
|
|
|
|
|
|
|
|
|
def getDevices(self):
|
|
|
self.devices = [x for x in self.link.Devices][1:]
|
|
|
for device in self.devices:
|
|
|
device.Open()
|
|
|
VFD = device.GetVfdByOrdinalNumber(0)
|
|
|
VFD.Open()
|
|
|
return [x.Tag for x in self.devices]
|
|
|
|
|
|
def getBlocks(self, devTag):
|
|
|
device = self.link.GetDeviceByTag(devTag)
|
|
|
VFD = device.GetVfdByOrdinalNumber(0)
|
|
|
blocks = [x for x in VFD.Blocks]
|
|
|
for block in blocks:
|
|
|
block.Open()
|
|
|
return [x.Tag for x in blocks]
|
|
|
|
|
|
def getParams(self, devTag, blockTag):
|
|
|
device = self.link.GetDeviceByTag(devTag)
|
|
|
VFD = device.GetVfdByOrdinalNumber(0)
|
|
|
block = VFD.GetBlockByTag(blockTag)
|
|
|
Params = block.Parameters
|
|
|
paramList = []
|
|
|
for param in Params:
|
|
|
paramList.append(ParamItem(param = param))
|
|
|
if devTag not in self.paramDict:
|
|
|
self.paramDict[devTag] = {blockTag : paramList}
|
|
|
else:
|
|
|
self.paramDict[devTag][blockTag] = paramList
|
|
|
# print(self.paramDict)
|
|
|
return paramList
|
|
|
|
|
|
def refreshParams(self, reParams = None):
|
|
|
if reParams:
|
|
|
for param in reParams:
|
|
|
param.Read()
|
|
|
# print(reParams)
|
|
|
# for device, blockDict in self.paramDict.items():
|
|
|
# for block, paramList in blockDict.items():
|
|
|
# else:
|
|
|
# param.param.Read()
|
|
|
# for elem in param.elements:
|
|
|
# return elem.get_Value()
|
|
|
|
|
|
def writeValue(self, param, elem, value):
|
|
|
param.Read()
|
|
|
match(elem.Type):
|
|
|
case ElementType.FF_BOOLEAN:
|
|
|
if value in [True, False]:
|
|
|
elem.Value = value
|
|
|
elif value in ['True', 'true']:
|
|
|
elem.Value = True
|
|
|
elif value in ['False', 'false']:
|
|
|
elem.Value = False
|
|
|
case ElementType.FF_INTEGER8 | ElementType.FF_INTEGER16:
|
|
|
value = int(value)
|
|
|
elem.Value = Int16(value)
|
|
|
case ElementType.FF_INTEGER32 | ElementType.FF_UNSIGNED16:
|
|
|
value = int(value)
|
|
|
elem.Value = Int32(value)
|
|
|
case ElementType.FF_UNSIGNED8:
|
|
|
value = int(value)
|
|
|
elem.Value = Byte(value)
|
|
|
case ElementType.FF_UNSIGNED32:
|
|
|
value = int(value)
|
|
|
elem.Value = Int64(value)
|
|
|
case ElementType.FF_FLOAT:
|
|
|
value = float(value)
|
|
|
elem.Value = Single(value)
|
|
|
case ElementType.FF_VISIBLE_STRING:
|
|
|
value = str(value)
|
|
|
elem.Value = value
|
|
|
case ElementType.FF_OCTET_STRING:
|
|
|
value = str(value)
|
|
|
if len(value) % 2 != 0:
|
|
|
return
|
|
|
startIndex = 0
|
|
|
# byteArray = [x for x in range(int(len(value) / 2))]
|
|
|
byteArray = Array.CreateInstance(Byte, int(len(value) / 2))
|
|
|
for i in range(int(len(value) / 2)):
|
|
|
varStr = value[startIndex:startIndex + 2]
|
|
|
startIndex += 2
|
|
|
byteArray[i] = Byte(int(varStr))
|
|
|
elem.Value = byteArray
|
|
|
case ElementType.FF_DATE:
|
|
|
value = str(value)
|
|
|
valueList = [int(x) for x in value.split(':')]
|
|
|
elem.Value.Year = Byte(valueList[0])
|
|
|
elem.Value.Month = Byte(valueList[1])
|
|
|
elem.Value.DayOfMonth =Byte(valueList[2])
|
|
|
elem.Value.Hour = Byte(valueList[3])
|
|
|
elem.Value.Minute = Byte(valueList[4])
|
|
|
elem.Value.Millisecond = int(valueList[5])
|
|
|
case ElementType.FF_TIMEOFDAY | ElementType.FF_TIME_DIFF:
|
|
|
value = str(value)
|
|
|
valueList = [int(x) for x in value.split(':')]
|
|
|
elem.Value.Day = int(valueList[0])
|
|
|
elem.Value.Millisecond = Int64(valueList[1])
|
|
|
case ElementType.FF_BIT_STRING:
|
|
|
value = str(value)
|
|
|
for s in value:
|
|
|
if s not in ['0', '1']:
|
|
|
return
|
|
|
if len(value) % 8 != 0:
|
|
|
return
|
|
|
j = -1
|
|
|
byteArray = Array.CreateInstance(Byte, int(len(value) / 8))
|
|
|
for i in range(len(value)):
|
|
|
if i % 8 == 0:
|
|
|
j += 1
|
|
|
if value[i] == '1':
|
|
|
# print(type(byteArray[j]))
|
|
|
byteArray[j] += 1<<(7-(i % 8))
|
|
|
byteArray[j] = Byte(byteArray[j])
|
|
|
elem.Value = byteArray
|
|
|
case ElementType.FF_TIME_VALUE:
|
|
|
value = str(value)
|
|
|
valueList = [int(x) for x in value.split(':')]
|
|
|
elem.Value.Lower = Int64(valueList[0])
|
|
|
elem.Value.Upper = Int64(valueList[1])
|
|
|
|
|
|
param.Write()
|
|
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
for device in self.devices:
|
|
|
device.Close()
|
|
|
self.link.Close()
|
|
|
self.session.Close()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
a = FFProtocol()
|
|
|
a.open()
|
|
|
devs = a.getDevices()
|
|
|
blocks = a.getBlocks(devs[0])
|
|
|
|
|
|
par = a.getParams(devs[0], blocks[0])[7]
|
|
|
|
|
|
ele1 = par.getElements()['VALUE_1'][2]
|
|
|
ele2 = par.getElements()['VALUE_2'][2]
|
|
|
ele3 = par.getElements()['VALUE_3'][2]
|
|
|
ele4 = par.getElements()['VALUE_4'][2]
|
|
|
ele5 = par.getElements()['VALUE_5'][2]
|
|
|
ele6 = par.getElements()['VALUE_6'][2]
|
|
|
ele7 = par.getElements()['VALUE_7'][2]
|
|
|
ele8 = par.getElements()['VALUE_8'][2]
|
|
|
ele9 = par.getElements()['VALUE_9'][2]
|
|
|
ele10 = par.getElements()['VALUE_10'][2]
|
|
|
ele11 = par.getElements()['VALUE_11'][2]
|
|
|
ele12 = par.getElements()['VALUE_12'][2]
|
|
|
ele13 = par.getElements()['VALUE_13'][2]
|
|
|
ele14 = par.getElements()['VALUE_14'][2]
|
|
|
ele15 = par.getElements()['VALUE_15'][2]
|
|
|
|
|
|
a.writeValue(par.param, ele1, True)
|
|
|
a.writeValue(par.param, ele2, 1)
|
|
|
a.writeValue(par.param, ele3, 2)
|
|
|
a.writeValue(par.param, ele4, 3)
|
|
|
a.writeValue(par.param, ele5, 4)
|
|
|
a.writeValue(par.param, ele6, 5)
|
|
|
a.writeValue(par.param, ele7, 6)
|
|
|
a.writeValue(par.param, ele8, 1.1)
|
|
|
a.writeValue(par.param, ele9, 'test')
|
|
|
a.writeValue(par.param, ele10, '9999494949494949494949494949494949494949494949494949494949494949')
|
|
|
a.writeValue(par.param, ele11, '23:02:18:14:49:50')
|
|
|
a.writeValue(par.param, ele12, '18:59')
|
|
|
a.writeValue(par.param, ele13, '25:60')
|
|
|
a.writeValue(par.param, ele14, '1111000000001111')
|
|
|
a.writeValue(par.param, ele15, '55:55')
|
|
|
|
|
|
|
|
|
a.close() |