You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
6.9 KiB
Python

6 months ago
import sys
7 months ago
import socket
import threading
6 months ago
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton,
QVBoxLayout, QWidget, QLabel, QTextEdit)
from PyQt5.QtCore import Qt, pyqtSignal, QObject
from datetime import datetime
7 months ago
6 months ago
class TcpServer(QObject):
updateSignal = pyqtSignal(str)
7 months ago
def __init__(self):
6 months ago
super().__init__()
# 配置信息
7 months ago
self.tcpPort = 12345 # 服务端TCP监听端口
6 months ago
self.udpPort = 54321 # 设备发现UDP端口
7 months ago
self.broadcastAddr = '<broadcast>' # 广播地址
6 months ago
self.tcpRunning = False
self.udpRunning = False
self.tcpThread = None
self.udpThread = None
self.serverSocket = None
self.udpSocket = None
def log(self, message):
currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logMessage = f"[{currentTime}] {message}"
self.updateSignal.emit(logMessage)
7 months ago
def tcpServer(self):
6 months ago
try:
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.settimeout(1) # 设置1秒超时
self.serverSocket.bind(('0.0.0.0', self.tcpPort))
self.serverSocket.listen(5)
self.log(f"TCP服务端已启动监听端口 {self.tcpPort}")
while self.tcpRunning:
try:
clientSock, addr = self.serverSocket.accept()
self.log(f"接收到来自 {addr} 的连接")
clientThread = threading.Thread(target=self.handleClient, args=(clientSock,))
clientThread.start()
except socket.timeout:
continue # 超时后继续检查运行标志
except:
break
except Exception as e:
self.log(f"TCP服务器错误: {e}")
finally:
if self.serverSocket:
self.serverSocket.close()
self.log("TCP服务端已停止")
7 months ago
def handleClient(self, clientSock):
try:
while True:
data = clientSock.recv(1024)
if not data:
break
6 months ago
self.log(f"收到消息: {data.decode('utf-8')}")
7 months ago
clientSock.send(b"ACK") # 返回确认
except Exception as e:
6 months ago
self.log(f"客户端异常断开: {e}")
7 months ago
finally:
clientSock.close()
def udpDiscoveryServer(self):
6 months ago
try:
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udpSocket.bind(('', self.udpPort))
self.log(f"UDP发现服务已启动监听端口 {self.udpPort}")
while self.udpRunning:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data.decode('utf-8') == "DISCOVERY_REQUEST":
self.log(f"收到来自 {addr} 的发现请求")
response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}"
self.udpSocket.sendto(response.encode('utf-8'), addr)
except:
break
except Exception as e:
self.log(f"UDP服务器错误: {e}")
finally:
if self.udpSocket:
self.udpSocket.close()
self.log("UDP发现服务已停止")
def startTcpServer(self):
if not self.tcpRunning:
self.tcpRunning = True
self.tcpThread = threading.Thread(target=self.tcpServer)
self.tcpThread.start()
def stopTcpServer(self):
if self.tcpRunning:
self.tcpRunning = False
# 创建一个临时连接来解除accept阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tempSocket.connect(('127.0.0.1', self.tcpPort))
tempSocket.close()
except:
pass
def startUdpServer(self):
if not self.udpRunning:
self.udpRunning = True
self.udpThread = threading.Thread(target=self.udpDiscoveryServer)
self.udpThread.start()
def stopUdpServer(self):
if self.udpRunning:
self.udpRunning = False
# 发送一个空数据包来解除recvfrom阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tempSocket.sendto(b'', ('127.0.0.1', self.udpPort))
tempSocket.close()
except:
pass
class ServerWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TCP/UDP 服务器控制")
# self.setGeometry(100, 100, 600, 400)
self.server = TcpServer()
self.server.updateSignal.connect(self.updateLog)
self.initUI()
def initUI(self):
# 主布局
mainLayout = QVBoxLayout()
# 控制按钮
self.tcpButton = QPushButton("启动 TCP 服务器")
self.tcpButton.clicked.connect(self.toggle_tcp_server)
self.udp_button = QPushButton("启动 UDP 发现服务")
self.udp_button.clicked.connect(self.toggleUdpServer)
# 日志显示
self.logDisplay = QTextEdit()
self.logDisplay.setReadOnly(True)
# 添加到布局
mainLayout.addWidget(self.tcpButton)
mainLayout.addWidget(self.udp_button)
mainLayout.addWidget(QLabel("服务器日志:"))
mainLayout.addWidget(self.logDisplay)
# 设置中心部件
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
def toggle_tcp_server(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
self.tcpButton.setText("启动 TCP 服务器")
else:
self.server.startTcpServer()
self.tcpButton.setText("停止 TCP 服务器")
def toggleUdpServer(self):
if self.server.udpRunning:
self.server.stopUdpServer()
self.udp_button.setText("启动 UDP 发现服务")
else:
self.server.startUdpServer()
self.udp_button.setText("停止 UDP 发现服务")
def updateLog(self, message):
self.logDisplay.append(message)
def closeEvent(self, event):
# 关闭窗口时停止所有服务
if self.server.tcpRunning:
self.server.stopTcpServer()
if self.server.udpRunning:
self.server.stopUdpServer()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = ServerWindow()
window.show()
sys.exit(app.exec_())