You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

521 lines
20 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import socket
import threading
from PyQt5.QtWidgets import QMainWindow, QVBoxLayout, QPushButton, \
QListWidget, QLabel, QWidget, QMessageBox, QTextEdit
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QTimer
from PyQt5.QtNetwork import QNetworkInterface,QAbstractSocket
from datetime import datetime
from utils import Globals
class TcpClient(object):
def __init__(self):
# 配置信息
self.udpPort = 54321
self.broadcastAddr = '<broadcast>'
def discoverServers(self):
# print("[*] 正在扫描局域网设备...")
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock.settimeout(2)
udpSock.sendto(b"DISCOVERY_REQUEST", (self.broadcastAddr, self.udpPort))
servers = []
seen = set()
local_ips = self.getLocalIps()
try:
while True:
data, addr = udpSock.recvfrom(1024)
if data.startswith(b"DISCOVERY_RESPONSE"):
_, hostname, tcpPort = data.decode('utf-8').split(':')
serverIp = addr[0]
# 跳过本机IP
if serverIp in local_ips:
continue
key = (serverIp, int(tcpPort))
if key in seen:
continue
seen.add(key)
servers.append({
'ip': (serverIp,),
'port': int(tcpPort),
'hostname': hostname
})
print(data.decode('utf-8'))
except socket.timeout:
pass
finally:
udpSock.close()
return servers
def connectToServer(self, ip, tcpPort):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 显式转换类型确保安全
client.connect((str(ip), int(tcpPort)))
# print(f"[+] 已连接到 {ip}:{tcpPort}")
Globals.getValue('protocolManage').setServerMode()
clientName = Globals.getValue('protocolManage').RpcServer.getNextClientName()
client.send(clientName.encode('utf-8'))
response = client.recv(1024)
# print(f"服务端响应: {response.decode('utf-8')}")
resClientName = response.decode('utf-8')
Globals.getValue('protocolManage').addClient(clientName = resClientName)
QMessageBox.information(None, '连接成功', f'已成功连接到从站 {ip}:{tcpPort}')
return True
except Exception as e:
print(f"连接失败: {e}")
return False
finally:
client.close()
def getLocalIps(self):
"""获取本机所有IPv4地址"""
ips = set()
for interface in QNetworkInterface.allInterfaces():
if interface.flags() & QNetworkInterface.IsUp:
for entry in interface.addressEntries():
ip = entry.ip()
if ip.protocol() == QAbstractSocket.IPv4Protocol and not ip.isLoopback():
ips.add(ip.toString())
return ips
class DiscoveryThread(QThread):
"""用于在后台执行设备发现的线程"""
discoveryFinished = pyqtSignal(list) # 信号,携带发现的设备列表
def __init__(self, tcpClient):
super().__init__()
self.tcpClient = tcpClient
def run(self):
servers = self.tcpClient.discoverServers()
self.discoveryFinished.emit(servers)
class DeviceMasterWidget(QMainWindow):
def __init__(self):
super().__init__()
self.tcpClient = TcpClient()
self.servers = []
self.discoveryThread = None
self.statusLabel = None
self.infoLabel = None
self.disconnectList = None
self.initUI()
# 定时器启动放到控件初始化后
self.statusTimer = QTimer(self)
self.statusTimer.timeout.connect(self.updateStatusInfo)
self.statusTimer.start(1000)
def initUI(self):
self.setWindowTitle('局域网设备发现')
# self.setGeometry(1000, 500, 400, 300)
# 主窗口部件
centralWidget = QWidget()
self.setCentralWidget(centralWidget)
layout = QVBoxLayout()
self.statusLabel = QLabel('服务器模式: 无')
layout.addWidget(self.statusLabel)
self.infoLabel = QLabel('连接信息: 无')
layout.addWidget(self.infoLabel)
# 新增:断开客户端区
self.disconnectList = QListWidget()
self.disconnectList.setFixedHeight(60)
layout.addWidget(QLabel('已连接客户端(点击可断开):'))
layout.addWidget(self.disconnectList)
self.disconnectList.itemClicked.connect(self.disconnectClient)
self.ipAddresslabel = QLabel(self)
self.ipAddresslabel.setObjectName("setlabel")
self.ipAddresslabel.setAlignment(Qt.AlignLeading|Qt.AlignLeft|Qt.AlignVCenter)
layout.addWidget(self.ipAddresslabel)
# 标题标签
self.titleLabel = QLabel('点击"扫描设备"按钮发现局域网中的设备')
layout.addWidget(self.titleLabel)
# 设备列表
self.deviceList = QListWidget()
layout.addWidget(self.deviceList)
self.scanButton = QPushButton('扫描设备')
self.scanButton.clicked.connect(self.startDiscovery)
layout.addWidget(self.scanButton)
self.scanButton.setObjectName('setButton')
self.connectButton = QPushButton('连接设备')
self.connectButton.clicked.connect(self.connectToDevice)
self.connectButton.setEnabled(False)
self.connectButton.setObjectName('setButton')
layout.addWidget(self.connectButton)
self.ipAddresslabel.setText('当前设备IP地址: ' + self.getLocalIp())
centralWidget.setLayout(layout)
# self.show()
def startDiscovery(self):
"""开始扫描设备"""
self.scanButton.setEnabled(False)
self.titleLabel.setText('正在扫描局域网设备...')
self.deviceList.clear()
# 创建并启动发现线程
self.discoveryThread = DiscoveryThread(self.tcpClient)
self.discoveryThread.discoveryFinished.connect(self.onDiscoveryFinished)
self.discoveryThread.start()
def onDiscoveryFinished(self, servers):
"""设备扫描完成后的回调"""
self.scanButton.setEnabled(True)
self.servers = servers
if not servers:
self.titleLabel.setText('未发现任何设备')
QMessageBox.information(self, '提示', '未发现任何设备')
return
self.titleLabel.setText(f'发现 {len(servers)} 个设备:')
# 在列表中显示设备
for server in servers:
itemText = f"{server['hostname']} ({server['ip'][0]})"
self.deviceList.addItem(itemText)
self.connectButton.setEnabled(True)
def connectToDevice(self):
"""连接选中的设备"""
selectedIndex = self.deviceList.currentRow()
if selectedIndex == -1:
QMessageBox.warning(self, '警告', '请先选择一个设备')
return
selectedServer = self.servers[selectedIndex]
from utils import Globals
protocolManage = Globals.getValue('protocolManage')
already_connected = False
ip = selectedServer['ip'][0] if isinstance(selectedServer['ip'], (list, tuple)) else selectedServer['ip']
if protocolManage and hasattr(protocolManage, 'RpcServer') and protocolManage.RpcServer:
clients = protocolManage.RpcServer.getClientNames()
ipMap = protocolManage.RpcServer.getClientIpMap() if hasattr(protocolManage.RpcServer, 'getClientIpMap') else {}
for c in clients:
if ipMap.get(c, '') == ip:
already_connected = True
break
if already_connected:
QMessageBox.information(self, '提示', f'客户端 {ip} 已连接,无需重复连接')
return
self.tcpClient.connectToServer(ip, selectedServer['port'])
def getLocalIp(self):
for interface in QNetworkInterface.allInterfaces():
if interface.flags() & QNetworkInterface.IsUp:
for entry in interface.addressEntries():
ip = entry.ip()
# 修改为新的枚举访问方式
if ip.protocol() == QAbstractSocket.IPv4Protocol and not ip.isLoopback():
return ip.toString()
return "192.168.1.1" # 默认回退值
def cleanup(self):
# 释放后台线程
if self.discoveryThread and self.discoveryThread.isRunning():
self.discoveryThread.terminate()
self.discoveryThread.wait()
def closeEvent(self, event):
self.cleanup()
event.accept()
def __del__(self):
self.cleanup()
def updateStatusInfo(self):
protocolManage = Globals.getValue('protocolManage')
mode = ''
info = ''
if protocolManage:
if hasattr(protocolManage, 'RpcServer') and protocolManage.RpcServer:
mode = '服务端模式'
try:
clients = protocolManage.RpcServer.getClientNames()
clientIpMap = protocolManage.RpcServer.getClientIpMap() if hasattr(protocolManage.RpcServer, 'getClientIpMap') else {}
info = '已连接客户端: ' + (', '.join([f"{c} ({clientIpMap.get(c, '')})" for c in clients]) if clients else '')
self.disconnectList.clear()
for c in clients:
ip = clientIpMap.get(c, '')
display = f"{c} ({ip})" if ip else c
self.disconnectList.addItem(display)
except:
info = '已连接客户端: 无'
self.disconnectList.clear()
elif hasattr(protocolManage, 'RpcClient') and protocolManage.RpcClient:
mode = '客户端模式'
try:
clientName = protocolManage.RpcClient.clientName
serverInfo = protocolManage.RpcClient.rabbitHost
info = f'客户端名: {clientName} 服务器: {serverInfo}'
self.disconnectList.clear()
except:
info = '客户端信息获取失败'
self.disconnectList.clear()
self.statusLabel.setText(f'服务器模式: {mode}')
self.infoLabel.setText(f'连接信息: {info}')
def disconnectClient(self, item):
# 支持“客户端名 (IP)”格式
text = item.text()
clientName = text.split(' ')[0]
from utils import Globals
protocolManage = Globals.getValue('protocolManage')
if protocolManage and hasattr(protocolManage, 'disconnectClient'):
protocolManage.disconnectClient(clientName)
QMessageBox.information(self, '断开连接', f'已断开客户端 {clientName}')
# 立即刷新列表
self.updateStatusInfo()
class TcpServer(QObject):
updateSignal = pyqtSignal(str)
def __init__(self):
super().__init__()
# 配置信息
self.tcpPort = 12345 # 服务端TCP监听端口
self.udpPort = 54321 # 设备发现UDP端口
self.broadcastAddr = '<broadcast>' # 广播地址
self.tcpRunning = False
self.udpRunning = False
self.tcpThread = None
self.udpThread = None
self.serverSocket = None
self.udpSocket = None
def log(self, message):
currentTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logMessage = f"[{currentTime}] {message}"
self.updateSignal.emit(logMessage)
def tcpServer(self):
try:
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.settimeout(1) # 设置1秒超时
self.serverSocket.bind(('0.0.0.0', self.tcpPort))
self.serverSocket.listen(5)
self.log(f"TCP服务端已启动监听端口 {self.tcpPort}")
while self.tcpRunning:
try:
clientSock, addr = self.serverSocket.accept()
self.log(f"接收到来自 {addr} 的连接")
clientThread = threading.Thread(target=self.handleClient, args=(clientSock,addr))
clientThread.start()
except socket.timeout:
continue # 超时后继续检查运行标志
except:
break
except Exception as e:
self.log(f"TCP服务器错误: {e}")
finally:
if self.serverSocket:
self.serverSocket.close()
self.log("TCP服务端已停止")
def handleClient(self, clientSock, addr):
try:
while True:
data = clientSock.recv(1024)
if not data:
break
Globals.getValue('protocolManage').closeServer()
Globals.getValue('protocolManage').setClentMode(data.decode('utf-8'), rabbitmqHost = addr[0])
self.log(f"收到消息: {data.decode('utf-8')}")
clientSock.send(data) # 返回确认
# 新增:弹窗提示本机已被连接
# QMessageBox.information(None, '连接提示', f'本机已被 {addr[0]} 连接')
except Exception as e:
self.log(f"客户端异常断开: {e}")
finally:
clientSock.close()
def udpDiscoveryServer(self):
try:
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.udpSocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.udpSocket.bind(('', self.udpPort))
self.log(f"UDP发现服务已启动监听端口 {self.udpPort}")
while self.udpRunning:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data.decode('utf-8') == "DISCOVERY_REQUEST":
self.log(f"收到来自 {addr} 的发现请求")
response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}"
# print(response)
self.udpSocket.sendto(response.encode('utf-8'), addr)
except:
break
except Exception as e:
self.log(f"UDP服务器错误: {e}")
finally:
if self.udpSocket:
self.udpSocket.close()
self.log("UDP发现服务已停止")
def startTcpServer(self):
if not self.tcpRunning:
self.tcpRunning = True
self.tcpThread = threading.Thread(target=self.tcpServer)
self.tcpThread.start()
def stopTcpServer(self):
if self.tcpRunning:
self.tcpRunning = False
# 唤醒accept阻塞安全关闭线程
if self.serverSocket:
try:
self.serverSocket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.serverSocket.close()
except:
pass
if self.tcpThread and self.tcpThread.is_alive():
self.tcpThread.join(timeout=1)
def startUdpServer(self):
if not self.udpRunning:
self.udpRunning = True
self.udpThread = threading.Thread(target=self.udpDiscoveryServer)
self.udpThread.start()
def stopUdpServer(self):
if self.udpRunning:
self.udpRunning = False
# 唤醒recvfrom阻塞安全关闭线程
if self.udpSocket:
try:
self.udpSocket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.udpSocket.close()
except:
pass
if self.udpThread and self.udpThread.is_alive():
self.udpThread.join(timeout=1)
class DeviceSlaveWidget(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TCP/UDP 服务器控制")
# self.setGeometry(100, 100, 600, 400)
self.server = TcpServer()
self.server.updateSignal.connect(self.updateLog)
self.statusLabel = None
self.infoLabel = None
self.initUI()
# 定时器启动放到控件初始化后
self.statusTimer = QTimer(self)
self.statusTimer.timeout.connect(self.updateStatusInfo)
self.statusTimer.start(1000)
def initUI(self):
# 主布局
mainLayout = QVBoxLayout()
self.statusLabel = QLabel('服务器模式: 无')
mainLayout.addWidget(self.statusLabel)
self.infoLabel = QLabel('连接信息: 无')
mainLayout.addWidget(self.infoLabel)
# 控制按钮
# self.tcpButton = QPushButton("启动 TCP 服务器")
# self.tcpButton.clicked.connect(self.toggleTcpServer)
# self.tcpButton.setObjectName('setButton')
self.udpButton = QPushButton("开启远程通讯模式")
self.udpButton.clicked.connect(self.toggleUdpServer)
self.udpButton.setObjectName('setButton')
# 日志显示
self.logDisplay = QTextEdit()
self.logDisplay.setReadOnly(True)
# 添加到布局
# mainLayout.addWidget(self.tcpButton)
mainLayout.addWidget(self.udpButton)
mainLayout.addWidget(QLabel("服务器日志:"))
mainLayout.addWidget(self.logDisplay)
# 设置中心部件
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
def toggleTcpServer(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
# print("关闭 TCP 服务器")
else:
self.server.startTcpServer()
# print("启动 TCP 服务器")
def toggleUdpServer(self):
if self.server.udpRunning:
self.server.stopUdpServer()
self.udpButton.setText("开启远程通讯模式")
else:
self.server.startUdpServer()
self.udpButton.setText("关闭远程通讯模式")
self.toggleTcpServer()
def updateLog(self, message):
self.logDisplay.append(message)
def cleanup(self):
if self.server.tcpRunning:
self.server.stopTcpServer()
if self.server.udpRunning:
self.server.stopUdpServer()
def closeEvent(self, event):
self.cleanup()
event.accept()
def __del__(self):
# print(111111111111111111)
self.cleanup()
def updateStatusInfo(self):
protocolManage = Globals.getValue('protocolManage')
mode = ''
info = ''
if protocolManage:
if hasattr(protocolManage, 'RpcClient') and protocolManage.RpcClient:
mode = '客户端模式'
try:
clientName = protocolManage.RpcClient.clientName
serverInfo = protocolManage.RpcClient.rabbitHost
info = f'客户端名: {clientName} 服务器: {serverInfo}'
except:
info = '客户端信息获取失败'
else:
# 没有RpcClient显示无
mode = ''
info = ''
else:
mode = ''
info = ''
self.statusLabel.setText(f'服务器模式: {mode}')
self.infoLabel.setText(f'连接信息: {info}')