import sys import socket import threading from PyQt5.QtWidgets import QMainWindow, QVBoxLayout, QPushButton, \ QListWidget, QLabel, QWidget, QMessageBox, QTextEdit from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject from PyQt5.QtNetwork import QNetworkInterface,QAbstractSocket from datetime import datetime class TcpClient(object): def __init__(self): # 配置信息 self.udpPort = 54321 self.broadcastAddr = '' def discoverServers(self): # print("[*] 正在扫描局域网设备...") udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) udpSock.settimeout(2) udpSock.sendto(b"DISCOVERY_REQUEST", (self.broadcastAddr, self.udpPort)) servers = [] try: while True: data, addr = udpSock.recvfrom(1024) if data.startswith(b"DISCOVERY_RESPONSE"): # 修正点:正确解析服务端IP和TCP端口 _, hostname, tcpPort = data.decode('utf-8').split(':') serverIp = addr # 提取IP字符串 servers.append({ 'ip': serverIp, # 存储为字符串 'port': int(tcpPort), # 存储为整数 'hostname': hostname }) print() except socket.timeout: pass finally: udpSock.close() return servers def connectToServer(self, ip, tcpPort): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # 显式转换类型确保安全 client.connect((str(ip), int(tcpPort))) # print(f"[+] 已连接到 {ip}:{tcpPort}") client.send(b"Hello from client!") response = client.recv(1024) # print(f"服务端响应: {response.decode('utf-8')}") return True except Exception as e: print(f"连接失败: {e}") return False finally: client.close() class DiscoveryThread(QThread): """用于在后台执行设备发现的线程""" discoveryFinished = pyqtSignal(list) # 信号,携带发现的设备列表 def __init__(self, tcpClient): super().__init__() self.tcpClient = tcpClient def run(self): servers = self.tcpClient.discoverServers() self.discoveryFinished.emit(servers) class DeviceMasterWidget(QMainWindow): def __init__(self): super().__init__() self.tcpClient = TcpClient() self.servers = [] self.initUI() def initUI(self): self.setWindowTitle('局域网设备发现') # self.setGeometry(1000, 500, 400, 300) # 主窗口部件 centralWidget = QWidget() self.setCentralWidget(centralWidget) layout = QVBoxLayout() self.ipAddresslabel = QLabel(self) self.ipAddresslabel.setObjectName("setlabel") self.ipAddresslabel.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter) layout.addWidget(self.ipAddresslabel) # 标题标签 self.titleLabel = QLabel('点击"扫描设备"按钮发现局域网中的设备') layout.addWidget(self.titleLabel) # 设备列表 self.deviceList = QListWidget() layout.addWidget(self.deviceList) self.scanButton = QPushButton('扫描设备') self.scanButton.clicked.connect(self.startDiscovery) layout.addWidget(self.scanButton) self.scanButton.setObjectName('setButton') self.connectButton = QPushButton('连接设备') self.connectButton.clicked.connect(self.connectToDevice) self.connectButton.setEnabled(False) self.connectButton.setObjectName('setButton') layout.addWidget(self.connectButton) self.ipAddresslabel.setText('当前设备IP地址: ' + self.getLocalIp()) centralWidget.setLayout(layout) # self.show() def startDiscovery(self): """开始扫描设备""" self.scanButton.setEnabled(False) self.titleLabel.setText('正在扫描局域网设备...') self.deviceList.clear() # 创建并启动发现线程 self.discoveryThread = DiscoveryThread(self.tcpClient) self.discoveryThread.discoveryFinished.connect(self.onDiscoveryFinished) self.discoveryThread.start() def onDiscoveryFinished(self, servers): """设备扫描完成后的回调""" self.scanButton.setEnabled(True) self.servers = servers if not servers: self.titleLabel.setText('未发现任何设备') QMessageBox.information(self, '提示', '未发现任何设备') return self.titleLabel.setText(f'发现 {len(servers)} 个设备:') # 在列表中显示设备 for server in servers: item_text = f"{server['hostname']} ({server['ip'][0]}:{server['port']})" self.deviceList.addItem(item_text) self.connectButton.setEnabled(True) def connectToDevice(self): """连接选中的设备""" selectedIndex = self.deviceList.currentRow() if selectedIndex == -1: QMessageBox.warning(self, '警告', '请先选择一个设备') return selectedServer = self.servers[selectedIndex] try: success = self.tcpClient.connectToServer(selectedServer['ip'][0], selectedServer['port']) if success: QMessageBox.information(self, '连接成功', f"已连接到 {selectedServer['hostname']}") else: QMessageBox.critical(self, '连接失败', "连接失败") except Exception as e: QMessageBox.critical(self, '连接失败', f"连接失败: {str(e)}") def getLocalIp(self): for interface in QNetworkInterface.allInterfaces(): if interface.flags() & QNetworkInterface.IsUp: for entry in interface.addressEntries(): ip = entry.ip() # 修改为新的枚举访问方式 if ip.protocol() == QAbstractSocket.IPv4Protocol and not ip.isLoopback(): return ip.toString() return "192.168.1.1" # 默认回退值 class TcpServer(QObject): updateSignal = pyqtSignal(str) def __init__(self): super().__init__() # 配置信息 self.tcpPort = 12345 # 服务端TCP监听端口 self.udpPort = 54321 # 设备发现UDP端口 self.broadcastAddr = '' # 广播地址 self.tcpRunning = False self.udpRunning = False self.tcpThread = None self.udpThread = None self.serverSocket = None self.udpSocket = None def log(self, message): currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logMessage = f"[{currentTime}] {message}" self.updateSignal.emit(logMessage) def tcpServer(self): try: self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serverSocket.settimeout(1) # 设置1秒超时 self.serverSocket.bind(('0.0.0.0', self.tcpPort)) self.serverSocket.listen(5) self.log(f"TCP服务端已启动,监听端口 {self.tcpPort}") while self.tcpRunning: try: clientSock, addr = self.serverSocket.accept() self.log(f"接收到来自 {addr} 的连接") clientThread = threading.Thread(target=self.handleClient, args=(clientSock,)) clientThread.start() except socket.timeout: continue # 超时后继续检查运行标志 except: break except Exception as e: self.log(f"TCP服务器错误: {e}") finally: if self.serverSocket: self.serverSocket.close() self.log("TCP服务端已停止") def handleClient(self, clientSock): try: while True: data = clientSock.recv(1024) if not data: break self.log(f"收到消息: {data.decode('utf-8')}") clientSock.send(b"ACK") # 返回确认 except Exception as e: self.log(f"客户端异常断开: {e}") finally: clientSock.close() def udpDiscoveryServer(self): try: self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.udpSocket.bind(('', self.udpPort)) self.log(f"UDP发现服务已启动,监听端口 {self.udpPort}") while self.udpRunning: try: data, addr = self.udpSocket.recvfrom(1024) if data.decode('utf-8') == "DISCOVERY_REQUEST": self.log(f"收到来自 {addr} 的发现请求") response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}" self.udpSocket.sendto(response.encode('utf-8'), addr) except: break except Exception as e: self.log(f"UDP服务器错误: {e}") finally: if self.udpSocket: self.udpSocket.close() self.log("UDP发现服务已停止") def startTcpServer(self): if not self.tcpRunning: self.tcpRunning = True self.tcpThread = threading.Thread(target=self.tcpServer) self.tcpThread.start() def stopTcpServer(self): if self.tcpRunning: self.tcpRunning = False # 创建一个临时连接来解除accept阻塞 try: tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tempSocket.connect(('127.0.0.1', self.tcpPort)) tempSocket.close() except: pass def startUdpServer(self): if not self.udpRunning: self.udpRunning = True self.udpThread = threading.Thread(target=self.udpDiscoveryServer) self.udpThread.start() def stopUdpServer(self): if self.udpRunning: self.udpRunning = False # 发送一个空数据包来解除recvfrom阻塞 try: tempSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) tempSocket.sendto(b'', ('127.0.0.1', self.udpPort)) tempSocket.close() except: pass class DeviceSlaveWidget(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("TCP/UDP 服务器控制") # self.setGeometry(100, 100, 600, 400) self.server = TcpServer() self.server.updateSignal.connect(self.updateLog) self.initUI() def initUI(self): # 主布局 mainLayout = QVBoxLayout() # 控制按钮 self.tcpButton = QPushButton("启动 TCP 服务器") self.tcpButton.clicked.connect(self.toggleTcpServer) self.tcpButton.setObjectName('setButton') self.udpButton = QPushButton("启动 UDP 发现服务") self.udpButton.clicked.connect(self.toggleUdpServer) self.udpButton.setObjectName('setButton') # 日志显示 self.logDisplay = QTextEdit() self.logDisplay.setReadOnly(True) # 添加到布局 mainLayout.addWidget(self.tcpButton) mainLayout.addWidget(self.udpButton) mainLayout.addWidget(QLabel("服务器日志:")) mainLayout.addWidget(self.logDisplay) # 设置中心部件 container = QWidget() container.setLayout(mainLayout) self.setCentralWidget(container) def toggleTcpServer(self): if self.server.tcpRunning: self.server.stopTcpServer() self.tcpButton.setText("启动 TCP 服务器") else: self.server.startTcpServer() self.tcpButton.setText("停止 TCP 服务器") def toggleUdpServer(self): if self.server.udpRunning: self.server.stopUdpServer() self.udpButton.setText("启动 UDP 发现服务") else: self.server.startUdpServer() self.udpButton.setText("停止 UDP 发现服务") def updateLog(self, message): self.logDisplay.append(message) def closeEvent(self, event): # 关闭窗口时停止所有服务 if self.server.tcpRunning: self.server.stopTcpServer() if self.server.udpRunning: self.server.stopUdpServer() event.accept()