0417更新

main
ZHANGXUXU\95193 9 months ago
parent 6734107967
commit 812da46ba0

@ -188,11 +188,11 @@ QPushButton#mesButton:pressed
QPushButton#setButton{
width: 54.75px;
height: 50px;
height: 40px;
background-color: #2277EF;
border-radius: 25px;
border-radius: 20px;
color: #FFFFFF;

@ -1,25 +1,20 @@
import sys
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QSize, Qt
from PyQt5.QtCore import Qt
from PyQt5.Qt import *
from PyQt5.QtGui import QPixmap, QIcon
from PyQt5.QtWidgets import QHBoxLayout, QAbstractItemView, QVBoxLayout, QSplitter, \
QApplication, QLabel, QGridLayout, QLineEdit, QComboBox, QTextEdit, QCheckBox,QVBoxLayout,QListView, QMainWindow, QStackedWidget, QMessageBox
from PyQt5.QtWidgets import QSizePolicy, QWidget
from PyQt5.QtGui import QPixmap
from PyQt5.QtWidgets import QHBoxLayout, QVBoxLayout, QApplication, QLabel, QLineEdit, QVBoxLayout, QMessageBox, QPushButton
from PyQt5.QtWidgets import QWidget
from ctypes import POINTER, cast
from ctypes.wintypes import MSG
from model.ClientModel.UserAuthorityManage import UserAuthorityManage
from utils import Globals
from PyQt5.QtWinExtras import QtWin
from win32 import win32api, win32gui
from win32.lib import win32con
from windoweffect import WindowEffect, MINMAXINFO, NCCALCSIZE_PARAMS
from model.UserModel.UserManage import UserManage
from Static import static
class LoginWidget(QWidget):
BORDER_WIDTH = 5 #设圆角

@ -1,151 +0,0 @@
from PyQt5.QtCore import (QCoreApplication, Qt)
from PyQt5.QtWidgets import (QGridLayout, QLabel, QLineEdit,
QPushButton, QSizePolicy, QSpacerItem, QWidget, QComboBox, QMessageBox, QSplitter, QListView)
from utils.DBModels.ProtocolModel import TCPSetting
class CustomBox(QComboBox):
def __init__(self):
super(CustomBox, self).__init__()
self.setCursor(Qt.PointingHandCursor)
self.setView(QListView())
def showPopup(self):
super(CustomBox, self).showPopup()
pop = self.children()[1]
pop.move(pop.x(), pop.y() + 51)
class DeviceSearchSettingWidget(QWidget):
BORDER_WIDTH = 5
def __init__(self):
super().__init__()
self.gridLayout_2 = QGridLayout(self)
self.gridLayout_2.setObjectName("gridLayout_2")
self.verticalSpacer = QSpacerItem(20, 40, QSizePolicy.Minimum, QSizePolicy.Expanding)
self.gridLayout_2.addItem(self.verticalSpacer, 0, 1, 3, 12)
self.gridLayout = QGridLayout()
self.gridLayout.setObjectName("gridLayout")
self.label = QLabel(self)
self.label.setObjectName("setlabel")
self.label.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
self.horizontalSpacer_3 = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)
self.gridLayout.addItem(self.horizontalSpacer_3, 2, 0, 1, 4)
self.horizontalSpacer_4 = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)
self.gridLayout.addItem(self.horizontalSpacer_4, 5, 0, 1, 4)
self.ipEdit = QLineEdit(self)
self.ipEdit.setObjectName("setEdit")
self.gridLayout.addWidget(self.ipEdit, 1, 0, 1, 3)
self.freEdit = QLineEdit(self)
self.freEdit.setObjectName("setEdit")
self.gridLayout.addWidget(self.freEdit, 7, 0, 1, 3)
self.portEdit = QLineEdit(self)
self.portEdit.setObjectName("setEdit")
self.gridLayout.addWidget(self.portEdit, 4, 0, 1, 3)
self.label_2 = QLabel(self)
self.label_2.setObjectName("setlabel")
self.label_2.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
self.gridLayout.addWidget(self.label_2, 3, 0, 1, 1)
self.label_3 = QLabel(self)
self.label_3.setObjectName("setlabel")
self.label_3.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
self.gridLayout.addWidget(self.label_3, 6, 0, 1, 1)
self.horizontalSpacer_5 = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)
self.gridLayout.addItem(self.horizontalSpacer_5, 8, 0, 1, 4)
self.offsetBox = CustomBox()
self.offsetBox.addItem("1")
self.offsetBox.addItem("0")
self.offsetBox.setObjectName("setBox")
#self.gridLayout.addWidget(QLabel('起始值'), 6, 0, 1, 1)
self.label_4 = QLabel('起始值')
self.label_4.setObjectName('setlabel')
self.gridLayout.addWidget(self.label_4,9, 0, 1, 2)
self.gridLayout.addWidget(self.offsetBox, 10, 0, 1, 3)
self.gridLayout.addItem(QSpacerItem(40, 20), 11, 0, 1, 4)
self.gridLayout_2.addLayout(self.gridLayout, 3, 5, 2, 2)
self.horizontalSpacer = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)
self.gridLayout_2.addItem(self.horizontalSpacer, 7, 0, 3, 5)
self.verticalSpacer_2 = QSpacerItem(20, 40, QSizePolicy.Minimum, QSizePolicy.Expanding)
self.gridLayout_2.addItem(self.verticalSpacer_2, 7, 0, 3, 5)
self.saveButton = QPushButton(self)
self.saveButton.setObjectName("setButton")
#self.gridLayout.addWidget(QSplitter(), 11, 0, 1, 5)
self.gridLayout.addWidget(self.saveButton, 12, 0, 1, 3)
self.horizontalSpacer_2 = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)
self.gridLayout_2.addItem(self.horizontalSpacer_2, 9, 2, 3, 5)
self.label.setText('IP地址')
self.label_2.setText(QCoreApplication.translate("self", u"\u7aef\u53e3\u53f7", None))
self.label_3.setText(QCoreApplication.translate("self", u"\u901a\u8baf\u9891\u7387", None))
self.saveButton.setText(QCoreApplication.translate("self", u"\u4fdd\u5b58", None))
self.saveButton.clicked.connect(self.saveSetting)
def setupUI(self):
pass
# host = CharField()
# port = IntegerField()
# frequency = CharField()
# if self.tcpType == 'master':
# query = TCPSetting.get_by_id(1)
# else:
# query = TCPSetting.get_by_id(2)
# port = str(query.port)
# host = str(query.host)
# frequency = str(query.frequency)
# offset = str(query.offset)
# self.ipEdit.setText(host)
# self.portEdit.setText(port)
# self.freEdit.setText(frequency)
# self.offsetBox.setCurrentText(offset)
def saveSetting(self):
host = self.ipEdit.text()
port = self.portEdit.text()
frequency = self.freEdit.text()
offset = self.offsetBox.currentText()
if self.tcpType == 'master':
TCPSetting.update(host = host, port = port, frequency = frequency, offset = offset).where(TCPSetting.tcpType == 'master').execute()
else:
TCPSetting.update(host = host, port = port, frequency = frequency, offset = offset).where(TCPSetting.tcpType == 'slave').execute()
QMessageBox.information(self, 'Sucess', '保存成功')

@ -136,8 +136,8 @@ class RTUSettingWidget(QWidget):
self.gridLayout_2.addItem(self.verticalSpacer, 0, 1, 3, 12)
self.saveButton = QPushButton(self)
self.saveButton.setObjectName("setButton")
self.saveButton = QPushButton()
self.saveButton.setObjectName('setButton')
self.gridLayout.addWidget(self.saveButton, 15, 1, 1, 1)

@ -0,0 +1,74 @@
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QWidget, QComboBox, QListView, QHBoxLayout, QVBoxLayout
from UI.Setting.SearchDeviceWidget import DeviceMasterWidget, DeviceSlaveWidget
class CustomBox(QComboBox):
def __init__(self):
super(CustomBox, self).__init__()
self.setCursor(Qt.PointingHandCursor)
self.setView(QListView())
def showPopup(self):
super(CustomBox, self).showPopup()
pop = self.children()[1]
pop.move(pop.x(), pop.y() + 51)
class RemoteConnectSettingWidget(QWidget):
BORDER_WIDTH = 5
def __init__(self):
super().__init__()
# 主布局改为垂直布局包含ComboBox和内容区
self.mainLayout = QVBoxLayout(self)
self.mainLayout.setObjectName("mainLayout")
# 添加模式选择 ComboBox水平居中
self.modeComboBox = QComboBox()
self.modeComboBox.addItem("主站模式")
self.modeComboBox.addItem("从站模式")
self.modeComboBox.setObjectName("setBox")
self.modeComboBox.currentTextChanged.connect(self.switchMode)
# 将ComboBox放在顶部并水平居中
hboxCombo = QHBoxLayout()
hboxCombo.addStretch() # 左侧弹簧
hboxCombo.addWidget(self.modeComboBox)
hboxCombo.addStretch() # 右侧弹簧
self.mainLayout.addLayout(hboxCombo)
self.contentLayout = QHBoxLayout()
# 存储两种模式的Widget
self.deviceMasterWidget = DeviceMasterWidget() # 主站模式
self.deviceSlavrWidget = DeviceSlaveWidget() # 从站模式
# 初始显示主站模式
self.currentWidget = self.deviceMasterWidget
self.contentLayout.addStretch() # 左侧弹簧
self.contentLayout.addWidget(self.currentWidget)
self.contentLayout.addStretch() # 右侧弹簧
self.mainLayout.addLayout(self.contentLayout)
self.mainLayout.addStretch() # 底部弹簧(将内容向上推)
def switchMode(self, mode):
"""切换主站/从站模式"""
# 移除当前Widget
self.contentLayout.removeWidget(self.currentWidget)
self.currentWidget.hide()
# 根据选择显示对应的Widget
if mode == "主站模式":
self.currentWidget = self.deviceMasterWidget
else:
self.currentWidget = self.deviceSlavrWidget
# 重新添加Widget保持居中
self.contentLayout.insertWidget(1, self.currentWidget) # 插入到两个弹簧中间
self.currentWidget.show()

@ -0,0 +1,359 @@
import sys
import socket
import threading
from PyQt5.QtWidgets import QMainWindow, QVBoxLayout, QPushButton, \
QListWidget, QLabel, QWidget, QMessageBox, QTextEdit
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject
from PyQt5.QtNetwork import QNetworkInterface,QAbstractSocket
from datetime import datetime
class TcpClient(object):
def __init__(self):
# 配置信息
self.udpPort = 54321
self.broadcastAddr = '<broadcast>'
def discoverServers(self):
# print("[*] 正在扫描局域网设备...")
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock.settimeout(2)
udpSock.sendto(b"DISCOVERY_REQUEST", (self.broadcastAddr, self.udpPort))
servers = []
try:
while True:
data, addr = udpSock.recvfrom(1024)
if data.startswith(b"DISCOVERY_RESPONSE"):
# 修正点正确解析服务端IP和TCP端口
_, hostname, tcpPort = data.decode('utf-8').split(':')
serverIp = addr # 提取IP字符串
servers.append({
'ip': serverIp, # 存储为字符串
'port': int(tcpPort), # 存储为整数
'hostname': hostname
})
print()
except socket.timeout:
pass
finally:
udpSock.close()
return servers
def connectToServer(self, ip, tcpPort):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 显式转换类型确保安全
client.connect((str(ip), int(tcpPort)))
# print(f"[+] 已连接到 {ip}:{tcpPort}")
client.send(b"Hello from client!")
response = client.recv(1024)
# print(f"服务端响应: {response.decode('utf-8')}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
finally:
client.close()
class DiscoveryThread(QThread):
"""用于在后台执行设备发现的线程"""
discoveryFinished = pyqtSignal(list) # 信号,携带发现的设备列表
def __init__(self, tcpClient):
super().__init__()
self.tcpClient = tcpClient
def run(self):
servers = self.tcpClient.discoverServers()
self.discoveryFinished.emit(servers)
class DeviceMasterWidget(QMainWindow):
def __init__(self):
super().__init__()
self.tcpClient = TcpClient()
self.servers = []
self.initUI()
def initUI(self):
self.setWindowTitle('局域网设备发现')
# self.setGeometry(1000, 500, 400, 300)
# 主窗口部件
centralWidget = QWidget()
self.setCentralWidget(centralWidget)
layout = QVBoxLayout()
self.ipAddresslabel = QLabel(self)
self.ipAddresslabel.setObjectName("setlabel")
self.ipAddresslabel.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
layout.addWidget(self.ipAddresslabel)
# 标题标签
self.titleLabel = QLabel('点击"扫描设备"按钮发现局域网中的设备')
layout.addWidget(self.titleLabel)
# 设备列表
self.deviceList = QListWidget()
layout.addWidget(self.deviceList)
self.scanButton = QPushButton('扫描设备')
self.scanButton.clicked.connect(self.startDiscovery)
layout.addWidget(self.scanButton)
self.scanButton.setObjectName('setButton')
self.connectButton = QPushButton('连接设备')
self.connectButton.clicked.connect(self.connectToDevice)
self.connectButton.setEnabled(False)
self.connectButton.setObjectName('setButton')
layout.addWidget(self.connectButton)
self.ipAddresslabel.setText('当前设备IP地址: ' + self.getLocalIp())
centralWidget.setLayout(layout)
# self.show()
def startDiscovery(self):
"""开始扫描设备"""
self.scanButton.setEnabled(False)
self.titleLabel.setText('正在扫描局域网设备...')
self.deviceList.clear()
# 创建并启动发现线程
self.discoveryThread = DiscoveryThread(self.tcpClient)
self.discoveryThread.discoveryFinished.connect(self.onDiscoveryFinished)
self.discoveryThread.start()
def onDiscoveryFinished(self, servers):
"""设备扫描完成后的回调"""
self.scanButton.setEnabled(True)
self.servers = servers
if not servers:
self.titleLabel.setText('未发现任何设备')
QMessageBox.information(self, '提示', '未发现任何设备')
return
self.titleLabel.setText(f'发现 {len(servers)} 个设备:')
# 在列表中显示设备
for server in servers:
item_text = f"{server['hostname']} ({server['ip'][0]}:{server['port']})"
self.deviceList.addItem(item_text)
self.connectButton.setEnabled(True)
def connectToDevice(self):
"""连接选中的设备"""
selectedIndex = self.deviceList.currentRow()
if selectedIndex == -1:
QMessageBox.warning(self, '警告', '请先选择一个设备')
return
selectedServer = self.servers[selectedIndex]
try:
success = self.tcpClient.connectToServer(selectedServer['ip'][0], selectedServer['port'])
if success:
QMessageBox.information(self, '连接成功',
f"已连接到 {selectedServer['hostname']}")
else:
QMessageBox.critical(self, '连接失败', "连接失败")
except Exception as e:
QMessageBox.critical(self, '连接失败', f"连接失败: {str(e)}")
def getLocalIp(self):
for interface in QNetworkInterface.allInterfaces():
if interface.flags() & QNetworkInterface.IsUp:
for entry in interface.addressEntries():
ip = entry.ip()
# 修改为新的枚举访问方式
if ip.protocol() == QAbstractSocket.IPv4Protocol and not ip.isLoopback():
return ip.toString()
return "192.168.1.1" # 默认回退值
class TcpServer(QObject):
updateSignal = pyqtSignal(str)
def __init__(self):
super().__init__()
# 配置信息
self.tcpPort = 12345 # 服务端TCP监听端口
self.udpPort = 54321 # 设备发现UDP端口
self.broadcastAddr = '<broadcast>' # 广播地址
self.tcpRunning = False
self.udpRunning = False
self.tcpThread = None
self.udpThread = None
self.serverSocket = None
self.udpSocket = None
def log(self, message):
currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logMessage = f"[{currentTime}] {message}"
self.updateSignal.emit(logMessage)
def tcpServer(self):
try:
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.settimeout(1) # 设置1秒超时
self.serverSocket.bind(('0.0.0.0', self.tcpPort))
self.serverSocket.listen(5)
self.log(f"TCP服务端已启动监听端口 {self.tcpPort}")
while self.tcpRunning:
try:
clientSock, addr = self.serverSocket.accept()
self.log(f"接收到来自 {addr} 的连接")
clientThread = threading.Thread(target=self.handleClient, args=(clientSock,))
clientThread.start()
except socket.timeout:
continue # 超时后继续检查运行标志
except:
break
except Exception as e:
self.log(f"TCP服务器错误: {e}")
finally:
if self.serverSocket:
self.serverSocket.close()
self.log("TCP服务端已停止")
def handleClient(self, clientSock):
try:
while True:
data = clientSock.recv(1024)
if not data:
break
self.log(f"收到消息: {data.decode('utf-8')}")
clientSock.send(b"ACK") # 返回确认
except Exception as e:
self.log(f"客户端异常断开: {e}")
finally:
clientSock.close()
def udpDiscoveryServer(self):
try:
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udpSocket.bind(('', self.udpPort))
self.log(f"UDP发现服务已启动监听端口 {self.udpPort}")
while self.udpRunning:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data.decode('utf-8') == "DISCOVERY_REQUEST":
self.log(f"收到来自 {addr} 的发现请求")
response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}"
self.udpSocket.sendto(response.encode('utf-8'), addr)
except:
break
except Exception as e:
self.log(f"UDP服务器错误: {e}")
finally:
if self.udpSocket:
self.udpSocket.close()
self.log("UDP发现服务已停止")
def startTcpServer(self):
if not self.tcpRunning:
self.tcpRunning = True
self.tcpThread = threading.Thread(target=self.tcpServer)
self.tcpThread.start()
def stopTcpServer(self):
if self.tcpRunning:
self.tcpRunning = False
# 创建一个临时连接来解除accept阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tempSocket.connect(('127.0.0.1', self.tcpPort))
tempSocket.close()
except:
pass
def startUdpServer(self):
if not self.udpRunning:
self.udpRunning = True
self.udpThread = threading.Thread(target=self.udpDiscoveryServer)
self.udpThread.start()
def stopUdpServer(self):
if self.udpRunning:
self.udpRunning = False
# 发送一个空数据包来解除recvfrom阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tempSocket.sendto(b'', ('127.0.0.1', self.udpPort))
tempSocket.close()
except:
pass
class DeviceSlaveWidget(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TCP/UDP 服务器控制")
# self.setGeometry(100, 100, 600, 400)
self.server = TcpServer()
self.server.updateSignal.connect(self.updateLog)
self.initUI()
def initUI(self):
# 主布局
mainLayout = QVBoxLayout()
# 控制按钮
self.tcpButton = QPushButton("启动 TCP 服务器")
self.tcpButton.clicked.connect(self.toggleTcpServer)
self.tcpButton.setObjectName('setButton')
self.udpButton = QPushButton("启动 UDP 发现服务")
self.udpButton.clicked.connect(self.toggleUdpServer)
self.udpButton.setObjectName('setButton')
# 日志显示
self.logDisplay = QTextEdit()
self.logDisplay.setReadOnly(True)
# 添加到布局
mainLayout.addWidget(self.tcpButton)
mainLayout.addWidget(self.udpButton)
mainLayout.addWidget(QLabel("服务器日志:"))
mainLayout.addWidget(self.logDisplay)
# 设置中心部件
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
def toggleTcpServer(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
self.tcpButton.setText("启动 TCP 服务器")
else:
self.server.startTcpServer()
self.tcpButton.setText("停止 TCP 服务器")
def toggleUdpServer(self):
if self.server.udpRunning:
self.server.stopUdpServer()
self.udpButton.setText("启动 UDP 发现服务")
else:
self.server.startUdpServer()
self.udpButton.setText("停止 UDP 发现服务")
def updateLog(self, message):
self.logDisplay.append(message)
def closeEvent(self, event):
# 关闭窗口时停止所有服务
if self.server.tcpRunning:
self.server.stopTcpServer()
if self.server.udpRunning:
self.server.stopUdpServer()
event.accept()

@ -4,7 +4,7 @@ from PyQt5.QtWidgets import (QGridLayout, QLabel, QLineEdit, QGroupBox, QVBoxLay
from UI.Setting.TCPSetting import TCPSettingWidget
from UI.Setting.RTUSetting import RTUSettingWidget
from UI.Setting.DeviceSearchSetting import DeviceSearchSettingWidget
from UI.Setting.RemoteConnectSetting import RemoteConnectSettingWidget
class SettingWidget(QWidget):
def __init__(self):
@ -18,7 +18,7 @@ class SettingWidget(QWidget):
self.tcpSlaveSettingWidget = TCPSettingWidget('slave')
self.rtuMasterSettingWidget = RTUSettingWidget('master')
self.rtuSlaveSettingWidget = RTUSettingWidget('slave')
self.deviceSearchSettingWidget = DeviceSearchSettingWidget()
self.remoteConnectSettingWidget = RemoteConnectSettingWidget()
self.tcpMasterSettingWidget.setupUI()
self.tcpSlaveSettingWidget.setupUI()
@ -32,6 +32,8 @@ class SettingWidget(QWidget):
self.tcpMasterLable.setObjectName('tcpLable')
self.tcpSlaveLable = QLabel('MODBUSTCP从站站设置')
self.tcpSlaveLable.setObjectName('tcpLable')
self.remoteConnectLable = QLabel('远程连接设置')
self.remoteConnectLable.setObjectName('tcpLable')
self.tcpLayout.addWidget(self.tcpMasterLable)
self.tcpLayout.addWidget(self.tcpMasterSettingWidget)
@ -53,11 +55,21 @@ class SettingWidget(QWidget):
self.rtuLayout.addWidget(self.rtuSlaveSettingWidget)
self.rtuGroup.setLayout(self.rtuLayout)
self.remoteConnectGroup = QGroupBox()
self.remoteConnectLayout = QVBoxLayout()
self.remoteConnectLayout.addWidget(self.remoteConnectLable)
self.remoteConnectLayout.addWidget(self.remoteConnectSettingWidget)
self.remoteConnectGroup.setLayout(self.remoteConnectLayout)
# 添加到主布局
self.modbusSettingLayout.addWidget(self.tcpGroup, 0, 0)
self.modbusSettingLayout.addWidget(self.rtuGroup, 0, 1)
self.modbusSettingLayout.addWidget(self.deviceSearchSettingWidget, 0, 2)
self.modbusSettingLayout.addWidget(self.tcpGroup, 0, 0, 1, 1)
self.modbusSettingLayout.addWidget(self.rtuGroup, 0, 1, 1, 1)
self.modbusSettingLayout.addWidget(self.remoteConnectGroup, 0, 2, 1, 1)
# 调整布局样式
self.modbusSettingLayout.setColumnStretch(0, 1)
self.modbusSettingLayout.setColumnStretch(1, 1)
self.modbusSettingLayout.setColumnStretch(2, 1)
self.modbusSettingLayout.setContentsMargins(10, 10, 10, 10)
self.modbusSettingLayout.setSpacing(15)

@ -77,11 +77,6 @@ class VarWidgets(QtWidgets.QWidget):
self.timer.start(50) # 启动timer
Globals.setValue(str(self.modbusType) + 'Table', self.varView)
# self.varView.model.initTable()
# [self.varView.model.append_data([1,1,1,1,1,1,1,1,1,1]) for x in range(100)]
# self.model = VarTableModel(self.dic['header'], self.dic['data'])
# self.varView.setModel(self.model)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.addWidget(self.createBtn,0, 0, 1, 1)

@ -1,62 +1,195 @@
import sys
import socket
import threading
import time
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton,
QVBoxLayout, QWidget, QLabel, QTextEdit)
from PyQt5.QtCore import Qt, pyqtSignal, QObject
from datetime import datetime
class TcpServer(object):
class TcpServer(QObject):
updateSignal = pyqtSignal(str)
def __init__(self):
# 配置信息
super().__init__()
# 配置信息
self.tcpPort = 12345 # 服务端TCP监听端口
self.udpPort = 54321 # 设备发现UDP端口
self.broadcastAddr = '<broadcast>' # 广播地址
# TCP服务端主逻辑
self.tcpRunning = False
self.udpRunning = False
self.tcpThread = None
self.udpThread = None
self.serverSocket = None
self.udpSocket = None
def log(self, message):
currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logMessage = f"[{currentTime}] {message}"
self.updateSignal.emit(logMessage)
def tcpServer(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', self.tcpPort))
server.listen(5)
print(f"[*] TCP服务端已启动监听端口 {self.tcpPort}")
try:
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.settimeout(1) # 设置1秒超时
self.serverSocket.bind(('0.0.0.0', self.tcpPort))
self.serverSocket.listen(5)
self.log(f"TCP服务端已启动监听端口 {self.tcpPort}")
while True:
clientSock, addr = server.accept()
print(f"[+] 接收到来自 {addr} 的连接")
# 创建线程处理客户端请求
client_thread = threading.Thread(target=self.handleClient, args=(clientSock,))
client_thread.start()
while self.tcpRunning:
try:
clientSock, addr = self.serverSocket.accept()
self.log(f"接收到来自 {addr} 的连接")
clientThread = threading.Thread(target=self.handleClient, args=(clientSock,))
clientThread.start()
except socket.timeout:
continue # 超时后继续检查运行标志
except:
break
except Exception as e:
self.log(f"TCP服务器错误: {e}")
finally:
if self.serverSocket:
self.serverSocket.close()
self.log("TCP服务端已停止")
# 处理客户端请求
def handleClient(self, clientSock):
try:
while True:
data = clientSock.recv(1024)
if not data:
break
print(f"收到消息: {data.decode('utf-8')}")
self.log(f"收到消息: {data.decode('utf-8')}")
clientSock.send(b"ACK") # 返回确认
except Exception as e:
print(f"客户端异常断开: {e}")
self.log(f"客户端异常断开: {e}")
finally:
clientSock.close()
# UDP设备发现服务
def udpDiscoveryServer(self):
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock.bind(('', self.udpPort))
print(f"[*] UDP发现服务已启动监听端口 {self.udpPort}")
try:
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udpSocket.bind(('', self.udpPort))
self.log(f"UDP发现服务已启动监听端口 {self.udpPort}")
while True:
data, addr = udpSock.recvfrom(1024)
while self.udpRunning:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data.decode('utf-8') == "DISCOVERY_REQUEST":
print(f"[+] 收到来自 {addr} 的发现请求")
self.log(f"收到来自 {addr} 的发现请求")
response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}"
udpSock.sendto(response.encode('utf-8'), addr)
if __name__ == '__main__':
server = TcpServer()
# 启动TCP服务端和UDP发现服务
threading.Thread(target=server.tcpServer, daemon=True).start()
threading.Thread(target=server.udpDiscoveryServer, daemon=True).start()
# 防止主线程退出
while True:
time.sleep(1)
self.udpSocket.sendto(response.encode('utf-8'), addr)
except:
break
except Exception as e:
self.log(f"UDP服务器错误: {e}")
finally:
if self.udpSocket:
self.udpSocket.close()
self.log("UDP发现服务已停止")
def startTcpServer(self):
if not self.tcpRunning:
self.tcpRunning = True
self.tcpThread = threading.Thread(target=self.tcpServer)
self.tcpThread.start()
def stopTcpServer(self):
if self.tcpRunning:
self.tcpRunning = False
# 创建一个临时连接来解除accept阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tempSocket.connect(('127.0.0.1', self.tcpPort))
tempSocket.close()
except:
pass
def startUdpServer(self):
if not self.udpRunning:
self.udpRunning = True
self.udpThread = threading.Thread(target=self.udpDiscoveryServer)
self.udpThread.start()
def stopUdpServer(self):
if self.udpRunning:
self.udpRunning = False
# 发送一个空数据包来解除recvfrom阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tempSocket.sendto(b'', ('127.0.0.1', self.udpPort))
tempSocket.close()
except:
pass
class ServerWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TCP/UDP 服务器控制")
# self.setGeometry(100, 100, 600, 400)
self.server = TcpServer()
self.server.updateSignal.connect(self.updateLog)
self.initUI()
def initUI(self):
# 主布局
mainLayout = QVBoxLayout()
# 控制按钮
self.tcpButton = QPushButton("启动 TCP 服务器")
self.tcpButton.clicked.connect(self.toggle_tcp_server)
self.udp_button = QPushButton("启动 UDP 发现服务")
self.udp_button.clicked.connect(self.toggleUdpServer)
# 日志显示
self.logDisplay = QTextEdit()
self.logDisplay.setReadOnly(True)
# 添加到布局
mainLayout.addWidget(self.tcpButton)
mainLayout.addWidget(self.udp_button)
mainLayout.addWidget(QLabel("服务器日志:"))
mainLayout.addWidget(self.logDisplay)
# 设置中心部件
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
def toggle_tcp_server(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
self.tcpButton.setText("启动 TCP 服务器")
else:
self.server.startTcpServer()
self.tcpButton.setText("停止 TCP 服务器")
def toggleUdpServer(self):
if self.server.udpRunning:
self.server.stopUdpServer()
self.udp_button.setText("启动 UDP 发现服务")
else:
self.server.startUdpServer()
self.udp_button.setText("停止 UDP 发现服务")
def updateLog(self, message):
self.logDisplay.append(message)
def closeEvent(self, event):
# 关闭窗口时停止所有服务
if self.server.tcpRunning:
self.server.stopTcpServer()
if self.server.udpRunning:
self.server.stopUdpServer()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = ServerWindow()
window.show()
sys.exit(app.exec_())

@ -1,65 +0,0 @@
diff a/utils/DBModels/ProtocolModel.py b/utils/DBModels/ProtocolModel.py (rejected hunks)
@@ -92,3 +92,62 @@
self.order = order
# print(self.createTime)
self.save()
+
+
+class HartVar(BaseModel):
+ varName = CharField()
+ createTime = CharField()
+ description = CharField()
+ # 查询变量是否存在
+ @classmethod
+ def getByName(cls, varName):
+ try:
+ return cls.get(cls.varName == str(varName))
+ except:
+ return False
+
+ # 删除变量
+ @classmethod
+ def deleteVar(cls, name):
+ query = cls.delete().where(cls.varName == name)
+ query.execute()
+
+ # 创建变量
+ def createVar(self, varName, des):
+ self.varName = varName
+ self.description = des
+ self.createTime = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
+ # print(self.createTime)
+ self.save()
+
+class TcRtdVar(BaseModel):
+ varName = CharField()
+ createTime = CharField()
+ description = CharField()
+ varType = IntegerField()
+ min = CharField()
+ max = CharField()
+ # 查询变量是否存在
+ @classmethod
+ def getByName(cls, varName):
+ try:
+ return cls.get(cls.varName == str(varName))
+ except:
+ return False
+
+ # 删除变量
+ @classmethod
+ def deleteVar(cls, name):
+ query = cls.delete().where(cls.varName == name)
+ query.execute()
+
+ # 创建变量
+ def createVar(self, varName, des, varType, min, max):
+ self.varName = varName
+ self.description = des
+ self.varType = varType
+ self.createTime = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
+ self.min = min
+ self.max = max
+ # print(self.createTime)
+ self.save()
\ No newline at end of file
Loading…
Cancel
Save