You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

352 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import socket
import threading
from PyQt5.QtWidgets import QMainWindow, QVBoxLayout, QPushButton, \
QListWidget, QLabel, QWidget, QMessageBox, QTextEdit
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject
from PyQt5.QtNetwork import QNetworkInterface,QAbstractSocket
from datetime import datetime
class TcpClient(object):
def __init__(self):
# 配置信息
self.udpPort = 54321
self.broadcastAddr = '<broadcast>'
def discoverServers(self):
# print("[*] 正在扫描局域网设备...")
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock.settimeout(2)
udpSock.sendto(b"DISCOVERY_REQUEST", (self.broadcastAddr, self.udpPort))
servers = []
try:
while True:
data, addr = udpSock.recvfrom(1024)
if data.startswith(b"DISCOVERY_RESPONSE"):
# 修正点正确解析服务端IP和TCP端口
_, hostname, tcpPort = data.decode('utf-8').split(':')
serverIp = addr # 提取IP字符串
servers.append({
'ip': serverIp, # 存储为字符串
'port': int(tcpPort), # 存储为整数
'hostname': hostname
})
print()
except socket.timeout:
pass
finally:
udpSock.close()
return servers
def connectToServer(self, ip, tcpPort):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 显式转换类型确保安全
client.connect((str(ip), int(tcpPort)))
# print(f"[+] 已连接到 {ip}:{tcpPort}")
client.send(b"Hello from client!")
response = client.recv(1024)
# print(f"服务端响应: {response.decode('utf-8')}")
return True
except Exception as e:
print(f"连接失败: {e}")
return False
finally:
client.close()
class DiscoveryThread(QThread):
"""用于在后台执行设备发现的线程"""
discoveryFinished = pyqtSignal(list) # 信号,携带发现的设备列表
def __init__(self, tcpClient):
super().__init__()
self.tcpClient = tcpClient
def run(self):
servers = self.tcpClient.discoverServers()
self.discoveryFinished.emit(servers)
class DeviceMasterWidget(QMainWindow):
def __init__(self):
super().__init__()
self.tcpClient = TcpClient()
self.servers = []
self.initUI()
def initUI(self):
self.setWindowTitle('局域网设备发现')
# self.setGeometry(1000, 500, 400, 300)
# 主窗口部件
centralWidget = QWidget()
self.setCentralWidget(centralWidget)
layout = QVBoxLayout()
self.ipAddresslabel = QLabel(self)
self.ipAddresslabel.setObjectName("setlabel")
self.ipAddresslabel.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
layout.addWidget(self.ipAddresslabel)
# 标题标签
self.titleLabel = QLabel('点击"扫描设备"按钮发现局域网中的设备')
layout.addWidget(self.titleLabel)
# 设备列表
self.deviceList = QListWidget()
layout.addWidget(self.deviceList)
self.scanButton = QPushButton('扫描设备')
self.scanButton.clicked.connect(self.startDiscovery)
layout.addWidget(self.scanButton)
self.scanButton.setObjectName('setButton')
self.connectButton = QPushButton('连接设备')
self.connectButton.clicked.connect(self.connectToDevice)
self.connectButton.setEnabled(False)
self.connectButton.setObjectName('setButton')
layout.addWidget(self.connectButton)
self.ipAddresslabel.setText('当前设备IP地址: ' + self.getLocalIp())
centralWidget.setLayout(layout)
# self.show()
def startDiscovery(self):
"""开始扫描设备"""
self.scanButton.setEnabled(False)
self.titleLabel.setText('正在扫描局域网设备...')
self.deviceList.clear()
# 创建并启动发现线程
self.discoveryThread = DiscoveryThread(self.tcpClient)
self.discoveryThread.discoveryFinished.connect(self.onDiscoveryFinished)
self.discoveryThread.start()
def onDiscoveryFinished(self, servers):
"""设备扫描完成后的回调"""
self.scanButton.setEnabled(True)
self.servers = servers
if not servers:
self.titleLabel.setText('未发现任何设备')
QMessageBox.information(self, '提示', '未发现任何设备')
return
self.titleLabel.setText(f'发现 {len(servers)} 个设备:')
# 在列表中显示设备
for server in servers:
itemText = f"{server['hostname']} ({server['ip'][0]})"
self.deviceList.addItem(itemText)
self.connectButton.setEnabled(True)
def connectToDevice(self):
"""连接选中的设备"""
selectedIndex = self.deviceList.currentRow()
if selectedIndex == -1:
QMessageBox.warning(self, '警告', '请先选择一个设备')
return
selectedServer = self.servers[selectedIndex]
print(selectedServer['ip'][0])
def getLocalIp(self):
for interface in QNetworkInterface.allInterfaces():
if interface.flags() & QNetworkInterface.IsUp:
for entry in interface.addressEntries():
ip = entry.ip()
# 修改为新的枚举访问方式
if ip.protocol() == QAbstractSocket.IPv4Protocol and not ip.isLoopback():
return ip.toString()
return "192.168.1.1" # 默认回退值
class TcpServer(QObject):
updateSignal = pyqtSignal(str)
def __init__(self):
super().__init__()
# 配置信息
self.tcpPort = 12345 # 服务端TCP监听端口
self.udpPort = 54321 # 设备发现UDP端口
self.broadcastAddr = '<broadcast>' # 广播地址
self.tcpRunning = False
self.udpRunning = False
self.tcpThread = None
self.udpThread = None
self.serverSocket = None
self.udpSocket = None
def log(self, message):
currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logMessage = f"[{currentTime}] {message}"
self.updateSignal.emit(logMessage)
# def tcpServer(self):
# try:
# self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.serverSocket.settimeout(1) # 设置1秒超时
# self.serverSocket.bind(('0.0.0.0', self.tcpPort))
# self.serverSocket.listen(5)
# self.log(f"TCP服务端已启动监听端口 {self.tcpPort}")
# while self.tcpRunning:
# try:
# clientSock, addr = self.serverSocket.accept()
# self.log(f"接收到来自 {addr} 的连接")
# clientThread = threading.Thread(target=self.handleClient, args=(clientSock,))
# clientThread.start()
# except socket.timeout:
# continue # 超时后继续检查运行标志
# except:
# break
# except Exception as e:
# self.log(f"TCP服务器错误: {e}")
# finally:
# if self.serverSocket:
# self.serverSocket.close()
# self.log("TCP服务端已停止")
def handleClient(self, clientSock):
try:
while True:
data = clientSock.recv(1024)
if not data:
break
self.log(f"收到消息: {data.decode('utf-8')}")
clientSock.send(b"ACK") # 返回确认
except Exception as e:
self.log(f"客户端异常断开: {e}")
finally:
clientSock.close()
def udpDiscoveryServer(self):
try:
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udpSocket.bind(('', self.udpPort))
self.log(f"UDP发现服务已启动监听端口 {self.udpPort}")
while self.udpRunning:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data.decode('utf-8') == "DISCOVERY_REQUEST":
self.log(f"收到来自 {addr} 的发现请求")
response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}"
self.udpSocket.sendto(response.encode('utf-8'), addr)
except:
break
except Exception as e:
self.log(f"UDP服务器错误: {e}")
finally:
if self.udpSocket:
self.udpSocket.close()
self.log("UDP发现服务已停止")
# def startTcpServer(self):
# if not self.tcpRunning:
# self.tcpRunning = True
# self.tcpThread = threading.Thread(target=self.tcpServer)
# self.tcpThread.start()
# def stopTcpServer(self):
# if self.tcpRunning:
# self.tcpRunning = False
# # 创建一个临时连接来解除accept阻塞
# try:
# tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# tempSocket.connect(('127.0.0.1', self.tcpPort))
# tempSocket.close()
# except:
# pass
def startUdpServer(self):
if not self.udpRunning:
self.udpRunning = True
self.udpThread = threading.Thread(target=self.udpDiscoveryServer)
self.udpThread.start()
def stopUdpServer(self):
if self.udpRunning:
self.udpRunning = False
# 发送一个空数据包来解除recvfrom阻塞
try:
tempSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tempSocket.sendto(b'', ('127.0.0.1', self.udpPort))
tempSocket.close()
except:
pass
class DeviceSlaveWidget(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TCP/UDP 服务器控制")
# self.setGeometry(100, 100, 600, 400)
self.server = TcpServer()
self.server.updateSignal.connect(self.updateLog)
self.initUI()
def initUI(self):
# 主布局
mainLayout = QVBoxLayout()
# 控制按钮
# self.tcpButton = QPushButton("启动 TCP 服务器")
# self.tcpButton.clicked.connect(self.toggleTcpServer)
# self.tcpButton.setObjectName('setButton')
self.udpButton = QPushButton("开启远程通讯模式")
self.udpButton.clicked.connect(self.toggleUdpServer)
self.udpButton.setObjectName('setButton')
# 日志显示
self.logDisplay = QTextEdit()
self.logDisplay.setReadOnly(True)
# 添加到布局
# mainLayout.addWidget(self.tcpButton)
mainLayout.addWidget(self.udpButton)
mainLayout.addWidget(QLabel("服务器日志:"))
mainLayout.addWidget(self.logDisplay)
# 设置中心部件
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
def toggleTcpServer(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
# self.tcpButton.setText("启动 TCP 服务器")
else:
self.server.startTcpServer()
# self.tcpButton.setText("停止 TCP 服务器")
def toggleUdpServer(self):
if self.server.udpRunning:
self.server.stopUdpServer()
self.udpButton.setText("开启远程通讯模式")
else:
self.server.startUdpServer()
self.udpButton.setText("关闭远程通讯模式")
def updateLog(self, message):
self.logDisplay.append(message)
def closeEvent(self, event):
# 关闭窗口时停止所有服务
if self.server.tcpRunning:
self.server.stopTcpServer()
if self.server.udpRunning:
self.server.stopUdpServer()
event.accept()