0605更新

main
zcwBit 1 year ago
parent de2cc5fd56
commit dc48ede172

@ -3,9 +3,10 @@ import qtawesome
from PyQt5.QtWidgets import QPushButton,QStackedWidget, QLineEdit, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QSplitter, QButtonGroup from PyQt5.QtWidgets import QPushButton,QStackedWidget, QLineEdit, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QSplitter, QButtonGroup
from utils.DBModels.DeviceParModels import * from utils.DBModels.DeviceParModels import *
from UI.BlockParameterView import PressureTBlockView, AIFunctionBlockView, PhysicalBlockView from UI.BlockParameterView import ParmView
from UI.SearchAddressWidget import SearchAddressWidget from UI.SearchAddressWidget import SearchAddressWidget
from utils.DBModels.DeviceParModels import *
class BlockParameterManageWidget(QWidget): class BlockParameterManageWidget(QWidget):
@ -15,9 +16,9 @@ class BlockParameterManageWidget(QWidget):
def initUI(self): def initUI(self):
self.pressureTBlockView = PressureTBlockView() self.pressureTBlockView = ParmView(PressureTranslationBlock)
self.aiFunctionBlockView = AIFunctionBlockView() self.aiFunctionBlockView = ParmView(AIFunctionBlock)
self.physicalBlockView = PhysicalBlockView() self.physicalBlockView = ParmView(PhysicalBlock)
self.mainlayout = QVBoxLayout() self.mainlayout = QVBoxLayout()
self.parameStackWidget = QStackedWidget() self.parameStackWidget = QStackedWidget()

@ -22,12 +22,13 @@ class ParamsVHeader(QHeaderView):
return return
class VarTableView(QTableView): class ParmView(QTableView):
def __init__(self, parent=None): def __init__(self, dbModel = None):
super(VarTableView, self).__init__(parent) super().__init__()
self.parent = parent self.dbModel = dbModel
self.setHeader() self.setHeader()
self.setupUi() self.setupUi()
self.setData()
def setHeader(self): def setHeader(self):
self.setItemDelegateForColumn(6, VarButtonDelegate(self)) self.setItemDelegateForColumn(6, VarButtonDelegate(self))
@ -72,19 +73,9 @@ class VarTableView(QTableView):
columnWidth = int(width * (ratio / totalRatio)) columnWidth = int(width * (ratio / totalRatio))
self.setColumnWidth(i, columnWidth) self.setColumnWidth(i, columnWidth)
def setData(self):
# self.datas = PressureTranslationBlock.getallParame()
self.datas = self.dbModel.getallParame()
class PressureTBlockView(VarTableView):
def __init__(self, datas = None):
super().__init__()
self.datas = datas
self.initUI()
def initUI(self):
if not self.datas:
self.datas = PressureTranslationBlock.getallParame()
for index, data in enumerate(self.datas): for index, data in enumerate(self.datas):
desc = data[2].replace('\r\n', '').replace('\n', '') desc = data[2].replace('\r\n', '').replace('\n', '')
lines = [desc[i:i+50] + "\r\n" for i in range(0, len(desc), 50)] lines = [desc[i:i+50] + "\r\n" for i in range(0, len(desc), 50)]
@ -96,18 +87,9 @@ class PressureTBlockView(VarTableView):
self.setVerticalHeader(ParamsVHeader(self, self.model.datas)) self.setVerticalHeader(ParamsVHeader(self, self.model.datas))
class AIFunctionBlockView(PressureTBlockView):
def __init__(self):
self.datas = AIFunctionBlock.getallParame()
super().__init__(self.datas)
class PhysicalBlockView(PressureTBlockView):
def __init__(self):
self.datas = PhysicalBlock.getallParame()
super().__init__(self.datas)

@ -1,36 +0,0 @@
import json
import collections
import time
from protocol.ModBus.TCPMaster import TcpMaster
from utils.DBModels.DeviceModels import DeviceDB
# from utils.DBModels.ClientModels import DeviceDB
class Area():
startAddress = None
endAddress = None
bytes = None
type = None
addressList = []
order = 'ABCD'
nums = 0
forceValue = [0]
currentValue = [0] * nums
def __init__(self):
pass

@ -0,0 +1,26 @@
from enum import Enum
from utils.DBModels.DeviceParModels import *
from protocol.ModBus.DPV1Master import DPV1Master
class BlockType(Enum):
PB = 0
TB = 1
FB = 2
class BlockManage():
address = 66
def __init__(self):
self.DPV1Master = DPV1Master('192.168.3.10', 502)
def readDir(self):
dirHeadBytes = self.DPV1Master.readParm(address = self.address, slot = 1, index = 0, length = 12)
class Block():
slot = None
startIndex = None
def __init__(self, blockType):
self.parms = []
self.blockType = blockType
def addParms(self, data):
pass

@ -1,7 +1,6 @@
import collections import collections
import json import json
from utils.DBModels.DeviceModels import DeviceDB from utils.DBModels.DeviceModels import DeviceDB
from model.ProjectModel.AreaManage import Area
import numpy as np import numpy as np
from protocol.ModBus.ByteOrder import * from protocol.ModBus.ByteOrder import *
from protocol.ModBus.TCPMaster import * from protocol.ModBus.TCPMaster import *
@ -9,6 +8,20 @@ import struct
import time import time
#从站 "AI" "DI"可强制 #从站 "AI" "DI"可强制
#主站 "AO" "DO"可强制 #主站 "AO" "DO"可强制
class Area():
startAddress = None
endAddress = None
bytes = None
type = None
addressList = []
order = 'ABCD'
nums = 0
forceValue = [0]
currentValue = [0] * nums
def __init__(self):
pass
class Device(): class Device():
inputStartAddress = 0 inputStartAddress = 0
inputEndAddress = 0 inputEndAddress = 0

@ -1,5 +1,4 @@
import re import re
from AreaManage import Area
from DeviceManage import Device from DeviceManage import Device
def importProject(): def importProject():

@ -0,0 +1,12 @@
class Parm():
name = None
value = None
slot = None
index = None
type = None
size = None
desc = None
def __init__(self):
pass

@ -0,0 +1,136 @@
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp, hooks
import logging
import struct
import threading
import time
# from protocol.ModBus.ByteOrder import *
class DPV1Master():
resetData = struct.unpack('>hhhh', b'\x00\x00\x00\x00\x00\x00\x00\x00')
def __init__(self, host, port):
try:
self.master = modbus_tcp.TcpMaster(host = host, port = port)
self.master.set_timeout(5.0)
except Exception as e:
pass
# hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv)
# hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend)
def writeMultipleRegister(self, slaveId, address, outputValue):
try:
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=outputValue)
except Exception as e:
return 'error'
def readHoldingRegisters(self, slaveId, startAddress, varNums):
try:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)
return value
except:
return 'error'
def areAllZeros(self, tup):
return all(x == 0 for x in tup)
def searchMaster(self, callback = None):
def search():
addressList = []
for address in range(1, 126):
hexAddress = address.to_bytes(1, byteorder='little')
searchByteStream = b'\x01' + hexAddress + b'\x01\x01\xF0\x00'
searchDate = struct.unpack('>hhh', searchByteStream)
self.writeMultipleRegister(1, 750, searchDate)
time.sleep(0.3)
dir = self.readHoldingRegisters(1, 750, 10)
if not self.areAllZeros(dir):
addressList.append(address)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
callback(addressList)
t = threading.Thread(target=search)
t.start()
def readParm(self, address, slot, index, length, callback = None):
hexAddress = address.to_bytes(1, byteorder='little')
slotByte = slot.to_bytes(1, byteorder='little')
indexByte = index.to_bytes(1, byteorder='little')
readByteStream = b'\x01' + hexAddress + slotByte + indexByte + b'\xF0\x00'
searchDate = struct.unpack('>hhh', readByteStream)
self.writeMultipleRegister(1, 750, searchDate)
time.sleep(0.3)
value = self.readHoldingRegisters(1, 750, -(-length // 2)) # -(-length // 2)向上取整
self.writeMultipleRegister(1, 750, self.resetData)
if callable(callback):
callback(value)
# def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# if order == 'int':
# valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
# else:
# value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
# if order == 'ABCD': # 大端模式
# valueByte = ABCDToFloat(value)
# elif order == 'DCBA': # 小端模式
# valueByte = DCBAToFloat(value)
# elif order == 'BADC':
# valueByte = BADCToFloat(value)
# elif order == 'CDAB':
# valueByte = CDABToFloat(value)
# return valueByte
# except Exception as e:
# return 'error'
# def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# return value[0]
# except Exception as e:
# return 'error'
# def readInputCoils(self, slaveId, startAddress, varNums):
# print(slaveId, startAddress, varNums)
# try:
# value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# return value[0]
# except:
# return 'error'
#
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
# Read and write floats
# send some queries
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
if __name__ == "__main__":
tcpMaster = DPV1Master('127.0.0.1', 502)
# tcpMaster.readHoldingRegisters(1, 6, 2)
# tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5])
#
# master = modbus_tcp.TcpMaster(host="localhost", port=502)
# values = [6]
# master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)
Loading…
Cancel
Save