From ea3f8acb28f724e8b9daf7bad4a7ff0bd72c6dfe Mon Sep 17 00:00:00 2001 From: zcwBit Date: Tue, 29 Jul 2025 17:48:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=85=AC=E7=BD=91=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=A8=A1=E5=BC=8F=E6=96=AD=E8=81=94=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/ProtocolManage.py | 86 +++++-- protocol/RPC/RpcClient.py | 230 ++++++++++++++--- protocol/RPC/RpcServer.py | 507 +++++++++++++++++++++++++++---------- 3 files changed, 636 insertions(+), 187 deletions(-) diff --git a/protocol/ProtocolManage.py b/protocol/ProtocolManage.py index 330266a..98e3f52 100644 --- a/protocol/ProtocolManage.py +++ b/protocol/ProtocolManage.py @@ -106,7 +106,7 @@ class ProtocolManage(object): def setServerMode(self, rabbitmqHost='localhost'): if self.RpcServer: - return + self.RpcServer.close() self.RpcServer = RpcServer(rabbitmqHost) def addClient(self, clientName): @@ -131,11 +131,15 @@ class ProtocolManage(object): varInfo = self.lookupVariable(variableName) if not varInfo: if self.RpcServer: - existsVar, clientNames = self.RpcServer.existsVar(variableName) - if existsVar: - value = self.RpcServer.writeVar(variableName, value) - return True - else: + try: + existsVar, clientNames = self.RpcServer.existsVar(variableName) + if existsVar: + result = self.RpcServer.writeVar(variableName, value) + return result.get("result") == "success" if result else False + else: + return False + except Exception as e: + print(f"RPC写入变量 {variableName} 失败: {e}") return False return False @@ -211,14 +215,28 @@ class ProtocolManage(object): print(f"写入变量值失败: {str(e)}") return False - def _backgroundReadAllVariables(self, interval=0.5): + def _backgroundReadAllVariables(self, interval=1.0): + """后台读取所有变量,增加间隔减少RPC调用频率""" while not self.readThreadStop.is_set(): - allVarNames = list(self.getAllVariableNames()) - for varName in allVarNames: - value = self._readVariableValueOriginal(varName) - with self.cacheLock: - self.variableValueCache[varName] = value - time.sleep(interval) + try: + allVarNames = list(self.getAllVariableNames()) + for varName in allVarNames: + if self.readThreadStop.is_set(): + break + try: + value = self._readVariableValueOriginal(varName) + with self.cacheLock: + self.variableValueCache[varName] = value + except Exception as e: + # 单个变量读取失败不影响其他变量 + print(f"读取变量 {varName} 失败: {e}") + continue + + # 增加间隔,减少RPC调用频率 + time.sleep(interval) + except Exception as e: + print(f"后台读取线程异常: {e}") + time.sleep(5) # 异常时等待更长时间 def getAllVariableNames(self): return list(self.varInfoCache.keys()) @@ -228,10 +246,29 @@ class ProtocolManage(object): value = None if not varInfo: if self.RpcServer: - existsVar, clientNames = self.RpcServer.existsVar(variableName) - if existsVar: - value = float(self.RpcServer.getVarValue(clientNames[0], variableName)['value']) - else: + try: + # 使用safeRpcCall减少阻塞 + existsVar, clientNames = self.safeRpcCall( + self.RpcServer.existsVar, variableName + ) or (False, None) + + if existsVar and clientNames: + varData = self.safeRpcCall( + self.RpcServer.getVarValue, clientNames[0], variableName + ) + if varData and 'value' in varData: + # 安全地转换值 + try: + value = float(varData['value']) + except (ValueError, TypeError): + print(f"无法转换变量 {variableName} 的值: {varData['value']}") + return None + else: + return None + else: + return None + except Exception as e: + print(f"RPC读取变量 {variableName} 失败: {e}") return None return None modelType = varInfo['modelType'] @@ -336,7 +373,20 @@ class ProtocolManage(object): def disconnectClient(self, clientName): if self.RpcServer: self.RpcServer.removeClient(clientName) - + + def safeRpcCall(self, method, *args, **kwargs): + """安全的RPC调用,带超时和异常处理""" + if not self.RpcServer: + return None + + try: + # 直接调用,但使用较短的超时时间 + result = method(*args, **kwargs) + return result + except Exception as e: + # 静默处理异常,避免日志过多 + return None + # ==================== Modbus 通讯管理方法(委托给 ModbusManager) ==================== def startModbusTcpMaster(self): diff --git a/protocol/RPC/RpcClient.py b/protocol/RPC/RpcClient.py index 42b095a..65d6bac 100644 --- a/protocol/RPC/RpcClient.py +++ b/protocol/RPC/RpcClient.py @@ -4,7 +4,8 @@ import json import threading import socket import requests - +import time +import time class RpcClient: def __init__(self, clientName, rabbitHost='localhost', protocolManager=None): """ @@ -17,44 +18,102 @@ class RpcClient: self.protocolManager = protocolManager self.rabbitHost = rabbitHost self.variables = {} - # self.variables = { - # "temp": {"type": "AI", "min": 0, "max": 100, "value": 25.5}, - # "status": {"type": DO", "min": None, "max": None, "value": "OK"}, - # } self.credentials = pika.PlainCredentials('dcs', '123456') - self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=rabbitHost, credentials=self.credentials) - ) - self.channel = self.connection.channel() + self.connection = None + self.channel = None self.queueName = f"rpc_queue_{self.clientName}" - self.channel.queue_declare(queue=self.queueName) - # print(f"[{self.clientName}] 等待服务端指令...") + self.connectWithRetry() + + def connectWithRetry(self, maxRetries=3): + """带重试机制的连接方法""" + for attempt in range(maxRetries): + try: + # 简化连接参数 + connectionParams = pika.ConnectionParameters( + host=self.rabbitHost, + credentials=self.credentials, + heartbeat=300, # 5分钟心跳 + socket_timeout=10 # 10秒socket超时 + ) + + self.connection = pika.BlockingConnection(connectionParams) + self.channel = self.connection.channel() + + # 设置QoS以避免消息积压 + self.channel.basic_qos(prefetch_count=1) + + # 简化队列声明 + self.channel.queue_declare(queue=self.queueName, durable=False) + + print(f"[{self.clientName}] 连接成功,等待服务端指令...") + return + + except Exception as e: + print(f"[{self.clientName}] 连接尝试 {attempt + 1} 失败: {e}") + if attempt == maxRetries - 1: + raise Exception(f"连接失败,已重试 {maxRetries} 次") + time.sleep(2 ** attempt) # 指数退避 def onRequest(self, ch, method, props, body): - request = json.loads(body) - cmd = request.get("cmd") - response = {} - - if cmd == "ping": - # 响应ping命令,带本机IP - ip = self.getLocalIp() - response = {"client": self.clientName, "result": "pong", "ip": ip} - elif cmd == "read": - response = {"client": self.clientName, "variables": self.variables} - elif cmd == "write": - # 使用协议管理器处理写入操作 - response = self.handleWriteRequest(request) - else: - # 未知命令的响应 - response = {"client": self.clientName, "result": "fail", "reason": "unknown command"} - - ch.basic_publish( - exchange='', - routing_key=props.reply_to, - properties=pika.BasicProperties(correlation_id=props.correlation_id), - body=json.dumps(response) - ) - ch.basic_ack(delivery_tag=method.delivery_tag) + """处理服务端请求""" + try: + request = json.loads(body) + cmd = request.get("cmd", "unknown") + + # 处理命令 + response = self.processCommand(cmd, request) + + # 发送响应 + if props.reply_to: + ch.basic_publish( + exchange='', + routing_key=props.reply_to, + properties=pika.BasicProperties(correlation_id=props.correlation_id), + body=json.dumps(response) + ) + + except Exception as e: + print(f"[{self.clientName}] 处理请求异常: {e}") + # 发送错误响应 + if props.reply_to: + errorResponse = {"client": self.clientName, "result": "error", "reason": str(e)} + try: + ch.basic_publish( + exchange='', + routing_key=props.reply_to, + properties=pika.BasicProperties(correlation_id=props.correlation_id), + body=json.dumps(errorResponse) + ) + except: + pass + + + def processCommand(self, cmd, request): + """处理命令并返回响应""" + try: + if cmd == "ping": + return self.handlePingCommand() + elif cmd == "read": + return self.handleReadCommand() + elif cmd == "write": + return self.handleWriteCommand(request) + else: + return {"client": self.clientName, "result": "fail", "reason": "unknown command"} + except Exception as e: + return {"client": self.clientName, "result": "error", "reason": str(e)} + + def handlePingCommand(self): + """处理ping命令""" + localIp = self.getLocalIp() + return {"client": self.clientName, "result": "pong", "ip": localIp} + + def handleReadCommand(self): + """处理读取命令""" + return {"client": self.clientName, "variables": self.variables} + + def handleWriteCommand(self, request): + """处理写入命令""" + return self.handleWriteRequest(request) @@ -109,16 +168,103 @@ class RpcClient: def start(self): """启动客户端监听(阻塞方法)""" - self.channel.basic_qos(prefetch_count=1) - self.channel.basic_consume(queue=self.queueName, on_message_callback=self.onRequest) - self.channel.start_consuming() + try: + # 简化启动逻辑 + self.setupConsumer() + print(f"[{self.clientName}] 开始监听队列: {self.queueName}") + self.channel.start_consuming() + + except KeyboardInterrupt: + print(f"[{self.clientName}] 收到中断信号,正在停止...") + + except Exception as e: + print(f"[{self.clientName}] 监听异常: {e}") + + finally: + print(f"[{self.clientName}] 监听已停止") + + def setupConsumer(self): + """设置消费者""" + try: + # 简化消费者设置 + self.channel.basic_consume( + queue=self.queueName, + on_message_callback=self.onRequest, + auto_ack=True # 自动确认,简化处理 + ) + + except Exception as e: + print(f"[{self.clientName}] 设置消费者失败: {e}") + raise e + + def handleConnectionError(self): + """处理连接错误""" + try: + if self.channel and not self.channel.is_closed: + self.channel.stop_consuming() + except: + pass + + self.closeConnectionSafely() + time.sleep(2) # 等待2秒后重试 + + def reconnectWithBackoff(self): + """重连方法""" + try: + self.closeConnectionSafely() + self.connectWithRetry() + except Exception as e: + print(f"[{self.clientName}] 重连失败: {e}") + raise e + + def closeConnectionSafely(self): + """安全关闭连接""" + try: + if hasattr(self, 'channel') and self.channel and not self.channel.is_closed: + self.channel.close() + except: + pass + try: + if hasattr(self, 'connection') and self.connection and not self.connection.is_closed: + self.connection.close() + except: + pass + self.channel = None + self.connection = None def startNonBlocking(self): """非阻塞启动客户端(在后台线程中运行)""" - import threading - self.client_thread = threading.Thread(target=self.start, daemon=True) - self.client_thread.start() - print(f"[{self.clientName}] RPC客户端已在后台线程启动") + try: + # 停止现有线程 + if hasattr(self, 'client_thread') and self.client_thread and self.client_thread.is_alive(): + print(f"[{self.clientName}] 停止现有线程...") + self.stopClient() + + import threading + self.shouldStop = False + self.client_thread = threading.Thread(target=self.startWithMonitoring, daemon=True) + self.client_thread.start() + print(f"[{self.clientName}] RPC客户端已在后台线程启动") + except Exception as e: + print(f"[{self.clientName}] 启动后台线程失败: {e}") + + def startWithMonitoring(self): + """带监控的启动方法""" + try: + self.start() + except Exception as e: + print(f"[{self.clientName}] 客户端线程异常退出: {e}") + finally: + print(f"[{self.clientName}] 客户端线程已结束") + + def stopClient(self): + """停止客户端""" + self.shouldStop = True + try: + if hasattr(self, 'channel') and self.channel and not self.channel.is_closed: + self.channel.stop_consuming() + except: + pass def isRunning(self): """检查客户端是否正在运行""" diff --git a/protocol/RPC/RpcServer.py b/protocol/RPC/RpcServer.py index 546441b..a9f56cf 100644 --- a/protocol/RPC/RpcServer.py +++ b/protocol/RPC/RpcServer.py @@ -21,66 +21,134 @@ class RpcServer: self.connected = False self.clientIpMap = {} # 客户端名到IP的映射 + # 添加RPC调用锁,防止多线程同时使用连接 + self.rpcLock = threading.RLock() # 使用可重入锁 + # 初始化连接 self.connectToRabbitMQ() self.autoCheckThread = threading.Thread(target=self._autoCheckClients, daemon=True) self.autoCheckThread.start() + # print(11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111) def connectToRabbitMQ(self): """连接到RabbitMQ服务器""" + maxRetries = 3 + for attempt in range(maxRetries): + try: + # 关闭旧连接 + self.closeConnection() + + print(f"尝试连接RabbitMQ服务器 (尝试 {attempt + 1}/{maxRetries})...") + + # 创建新连接,使用更保守的参数 + connectionParams = pika.ConnectionParameters( + host=self.rabbitHost, + credentials=self.credentials, + heartbeat=600, # 10分钟心跳 + socket_timeout=15, # 15秒socket超时 + blocked_connection_timeout=30, # 30秒阻塞超时 + connection_attempts=1, # 单次连接尝试 + retry_delay=2 # 2秒重试延迟 + ) + + self.connection = pika.BlockingConnection(connectionParams) + self.channel = self.connection.channel() + + # 设置QoS + self.channel.basic_qos(prefetch_count=1) + + # 创建回调队列 + result = self.channel.queue_declare(queue='', exclusive=True) + self.callbackQueue = result.method.queue + + # 设置消费者 + self.channel.basic_consume( + queue=self.callbackQueue, + on_message_callback=self.onResponse, + auto_ack=True + ) + + # 测试连接是否真正可用 + self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) + + self.connected = True + print(f"服务端已成功连接到RabbitMQ服务器: {self.rabbitHost}") + print(f"回调队列: {self.callbackQueue}") + return + + except Exception as e: + print(f"连接RabbitMQ尝试 {attempt + 1} 失败: {e}") + self.closeConnection() # 确保清理失败的连接 + + if attempt == maxRetries - 1: + self.connected = False + raise Exception(f"连接失败,已重试 {maxRetries} 次: {e}") + + # 等待后重试,使用递增延迟 + waitTime = 2 * (attempt + 1) + print(f"等待 {waitTime} 秒后重试...") + time.sleep(waitTime) + + def closeConnection(self): + """安全关闭连接""" try: - self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.rabbitHost, credentials=self.credentials) - ) - self.channel = self.connection.channel() - result = self.channel.queue_declare(queue='', exclusive=True) - self.callbackQueue = result.method.queue - self.channel.basic_consume( - queue=self.callbackQueue, - on_message_callback=self.onResponse, - auto_ack=True - ) - self.connected = True - print(f"服务端已连接到RabbitMQ服务器: {self.rabbitHost}") - except Exception as e: - print(f"连接RabbitMQ失败: {e}") - self.connected = False - raise + if hasattr(self, 'channel') and self.channel and not self.channel.is_closed: + self.channel.close() + except: + pass + try: + if hasattr(self, 'connection') and self.connection and not self.connection.is_closed: + self.connection.close() + except: + pass + self.connection = None + self.channel = None + self.callbackQueue = None + def onResponse(self, ch, method, props, body): - response = json.loads(body) - with self.lock: - self.responses[props.correlation_id] = response + """处理客户端响应""" + try: + if not props or not props.correlation_id: + print("收到无效响应:缺少correlation_id") + return + + response = json.loads(body) + with self.lock: + # 确保correlation_id存在于响应字典中 + if props.correlation_id in self.responses: + self.responses[props.correlation_id] = response + else: + print(f"收到未知correlation_id的响应: {props.correlation_id}") + + except json.JSONDecodeError as e: + print(f"JSON解析异常: {e}") + except Exception as e: + print(f"处理响应异常: {e}") def addClient(self, clientName): - """ - 动态添加客户端 - :param clientName: 客户端名称 - :return: 是否添加成功 - """ - if not self.connected: - print("服务端未连接到RabbitMQ") - return False - # 修复:防止重复添加同名客户端 + """动态添加客户端""" if clientName in self.clientNames: print(f"客户端 {clientName} 已存在") return False - # 检查客户端是否在线 + try: - # 发送ping消息测试客户端是否响应 - response = self.call(clientName, {"cmd": "ping"}) + # 测试客户端连接 + response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=8) + if response and response.get("result") == "pong": self.clientNames.append(clientName) + # 记录客户端IP if 'ip' in response: self.clientIpMap[clientName] = response['ip'] - # 去重,防止多线程下重复 - self.clientNames = list(dict.fromkeys(self.clientNames)) - # print(f"客户端 {clientName} 添加成功") + + print(f"客户端 {clientName} 添加成功") return True else: - print(f"客户端 {clientName} 无响应") + print(f"客户端 {clientName} ping响应异常") return False + except Exception as e: print(f"添加客户端 {clientName} 失败: {e}") return False @@ -147,99 +215,253 @@ class RpcServer: return self.clientIpMap.copy() def pingClient(self, clientName): - """ - 测试客户端是否在线 - :param clientName: 客户端名称 - :return: 是否在线 - """ + """测试客户端是否在线""" try: - response = self.call(clientName, {"cmd": "ping"}) + # 使用较短的超时时间,减少阻塞 + response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=3, maxRetries=1) return response and response.get("result") == "pong" - except: + except Exception as e: + # 只在调试时输出详细错误 + # print(f"ping客户端 {clientName} 失败: {e}") return False - def call(self, clientName, message): - """ - 调用客户端方法 - :param clientName: 客户端名称 - :param message: 消息内容 - :return: 客户端响应 - """ - if not self.connected or not self.channel: - raise Exception("服务端未连接到RabbitMQ") - - corr_id = str(uuid.uuid4()) - self.responses[corr_id] = None + def call(self, clientName, message, timeoutSeconds=10, maxRetries=2): + """调用客户端方法,带断联重连检测和重试机制""" + # 使用RPC锁确保线程安全 + with self.rpcLock: + return self._callWithLock(clientName, message, timeoutSeconds, maxRetries) + + def _callWithLock(self, clientName, message, timeoutSeconds, maxRetries): + """在锁保护下的实际调用方法""" + lastException = None + + for attempt in range(maxRetries + 1): + try: + return self.callOnce(clientName, message, timeoutSeconds) + + except Exception as e: + lastException = e + errorMsg = str(e) + print(f"调用客户端 {clientName} 失败 (尝试 {attempt + 1}/{maxRetries + 1}): {errorMsg}") + + # 如果是最后一次尝试,直接抛出异常 + if attempt == maxRetries: + self.removeClient(clientName) + break + + # 判断是否需要重连 + needReconnect = any(keyword in errorMsg.lower() for keyword in [ + "连接异常", "连接不可用", "连接在等待响应时断开", + "streamlosterror", "connectionclosed", "indexerror" + ]) + + if needReconnect: + try: + print(f"检测到连接问题,尝试重新连接...") + self.reconnect() + print(f"重连成功,等待1秒后重试...") + time.sleep(1) + except Exception as reconnectError: + print(f"重连失败: {reconnectError}") + if attempt == maxRetries - 1: + break + time.sleep(2) # 重连失败等待更长时间 + else: + # 非连接问题,短暂等待后重试 + time.sleep(0.5) + + raise lastException + + def callOnce(self, clientName, message, timeoutSeconds): + """单次调用客户端""" + # 检查连接状态,如果异常则重连 + if not self.isConnected(): + print(f"连接异常,尝试重连...") + self.reconnect() + + correlationId = str(uuid.uuid4()) + self.responses[correlationId] = None try: + # 发送消息前再次检查连接 + if not self.isConnected(): + raise Exception("连接不可用") + + # 发送消息 self.channel.basic_publish( exchange='', routing_key=f"rpc_queue_{clientName}", properties=pika.BasicProperties( reply_to=self.callbackQueue, - correlation_id=corr_id, + correlation_id=correlationId ), body=json.dumps(message) ) - # 等待响应,设置超时 - timeout = 10 # 10秒超时 - start_time = time.time() - while self.responses[corr_id] is None: - if time.time() - start_time > timeout: - raise Exception(f"调用客户端 {clientName} 超时") - if self.connection: - self.connection.process_data_events() - time.sleep(0.01) + # 等待响应 + return self.waitForResponse(correlationId, clientName, timeoutSeconds) + + except (pika.exceptions.StreamLostError, + pika.exceptions.ConnectionClosedByBroker, + pika.exceptions.AMQPConnectionError, + pika.exceptions.ChannelClosedByBroker) as e: + # 连接异常,清理响应并重新抛出 + with self.lock: + self.responses.pop(correlationId, None) + raise Exception(f"连接异常: {e}") + + except Exception as e: + # 清理响应 + with self.lock: + self.responses.pop(correlationId, None) + raise e + + def waitForResponse(self, correlationId, clientName, timeoutSeconds): + """等待客户端响应""" + startTime = time.time() + + while self.responses[correlationId] is None: + if time.time() - startTime > timeoutSeconds: + raise Exception(f"调用客户端 {clientName} 超时") + + try: + # 检查连接状态 + if not self.isConnected(): + raise Exception("连接在等待响应时断开") + + # 使用更短的时间限制,减少阻塞 + self.connection.process_data_events(time_limit=0.05) + + except (pika.exceptions.StreamLostError, + pika.exceptions.ConnectionClosedByBroker, + pika.exceptions.AMQPConnectionError, + pika.exceptions.ChannelClosedByBroker, + IndexError) as e: + raise Exception(f"连接异常: {e}") + except Exception as e: + raise Exception(f"处理数据事件异常: {e}") + + time.sleep(0.01) # 减少CPU占用 + + # 安全获取响应 + response = self.responses.get(correlationId) + if response is not None: + self.responses.pop(correlationId, None) + return response + else: + raise Exception(f"响应数据异常: {correlationId}") + + def isConnected(self): + """检查连接状态""" + try: + # 检查基本连接状态 + if not (self.connection and not self.connection.is_closed and + self.channel and not self.channel.is_closed and + self.connected and self.callbackQueue): + return False + + # 进一步检查通道是否真正可用 + try: + # 尝试声明一个临时队列来测试通道 + self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) + return True + except Exception as e: + print(f"通道测试失败: {e}") + return False - return self.responses.pop(corr_id) except Exception as e: - # 清理超时的响应 - if corr_id in self.responses: - self.responses.pop(corr_id) + print(f"连接状态检查异常: {e}") + return False + + def reconnect(self): + """重新连接""" + print("检测到连接异常,正在重新连接...") + self.connected = False + + # 清理响应队列,避免旧的响应干扰 + with self.lock: + self.responses.clear() + + try: + # 重新连接 + self.connectToRabbitMQ() + + # 验证连接恢复 + if self.verifyConnection(): + print("连接恢复成功,服务端功能正常") + else: + raise Exception("连接恢复验证失败") + + except Exception as e: + print(f"重连失败: {e}") raise e + + def verifyConnection(self): + """验证连接是否完全恢复""" + try: + if not self.isConnected(): + return False + + # 测试消息发送能力 + testQueue = f"test_queue_{int(time.time())}" + self.channel.queue_declare(queue=testQueue, auto_delete=True) + + # 发送测试消息 + self.channel.basic_publish( + exchange='', + routing_key=testQueue, + body=json.dumps({"test": "connection_verify"}) + ) + + # 删除测试队列 + self.channel.queue_delete(queue=testQueue) + + print("连接验证通过") + return True + + except Exception as e: + print(f"连接验证失败: {e}") + return False def broadcastRead(self): - """ - 向所有客户端广播读取命令 - :return: 所有客户端的变量数据 - """ + """向所有客户端广播读取命令""" if not self.clientNames: - # print("没有可用的客户端") return [] results = [] - for client in self.clientNames: + for clientName in self.clientNames: try: - resp = self.call(client, {"cmd": "read"}) - results.append(resp) + response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=8) + results.append(response) except Exception as e: - print(f"读取客户端 {client} 失败: {e}") - results.append({"client": client, "error": str(e)}) + print(f"读取客户端 {clientName} 失败: {e}") + results.append({"client": clientName, "error": str(e)}) return results def writeVar(self, varName, value): - """ - 写入变量到指定客户端 - :param varName: 变量名 - :param value: 变量值 - :return: 写入结果 - """ + """写入变量到指定客户端""" if not self.clientNames: return {"result": "fail", "reason": "no clients available"} - exists, clients = self.existsVar(varName) - if not exists or not clients: + varExists, clientsWithVar = self.existsVar(varName) + if not varExists or not clientsWithVar: return {"result": "fail", "reason": "var not found"} - # 支持多个客户端有同名变量,优先写第一个成功的 - for client in clients: + + # 尝试写入到有该变量的客户端 + for clientName in clientsWithVar: try: - print(f"写入客户端 {client} 变量 {varName} 值 {value}") - writeResp = self.call(client, {"cmd": "write", "varName": varName, "value": value}) - if writeResp and writeResp.get("result") == "success": - return writeResp + print(f"写入客户端 {clientName} 变量 {varName} 值 {value}") + writeResponse = self.call( + clientName, + {"cmd": "write", "varName": varName, "value": value}, + timeoutSeconds=8 + ) + if writeResponse and writeResponse.get("result") == "success": + return writeResponse except Exception as e: + print(f"写入客户端 {clientName} 失败: {e}") continue + return {"result": "fail", "reason": "write failed on all clients"} def scanAndAddClients(self, clientNameList): @@ -274,58 +496,89 @@ class RpcServer: return onlineClients + def healthCheck(self): + """连接健康检查""" + try: + if not self.isConnected(): + return False + + # 尝试声明一个临时队列来测试连接 + self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) + return True + except Exception as e: + print(f"健康检查失败: {e}") + return False + def close(self): - """ - 关闭服务端连接 - """ - if self.connection and not self.connection.is_closed: - self.connection.close() - self.connected = False - print("服务端连接已关闭") + """关闭服务端连接""" + self.connected = False + self.closeConnection() + print("服务端连接已关闭") def _autoCheckClients(self): - """后台线程:每2秒自动检测客户端是否在线,不在线则移除""" + """后台线程:每30秒自动检测客户端是否在线,不在线则移除""" while True: try: - self.checkAllClients() + # 减少检查频率,避免与读取线程冲突 + time.sleep(5) # 每30秒检查一次 + + # 复制当前客户端列表 + currentClients = self.clientNames.copy() + if not currentClients: + continue + + # print(f"开始检查 {len(currentClients)} 个客户端状态...") + offlineClients = [] + + for clientName in currentClients: + try: + # 使用更短的超时时间,避免阻塞 + if not self.pingClient(clientName): + offlineClients.append(clientName) + print(f"客户端 {clientName} 离线") + except Exception as e: + print(f"检查客户端 {clientName} 状态时出错: {e}") + offlineClients.append(clientName) + + # 移除离线客户端 + for clientName in offlineClients: + self.removeClient(clientName) + print(f"已移除离线客户端: {clientName}") + + if offlineClients: + print(f"本次检查移除了 {len(offlineClients)} 个离线客户端") + else: + print("所有客户端状态正常") + except Exception as e: print(f"自动检测客户端异常: {e}") - time.sleep(1) + time.sleep(10) # 异常时等待10秒 def existsVar(self, varName): - """ - 判断变量名是否存在于所有已连接客户端的变量中 - :param varName: 变量名 - :return: (True, clientName) 或 (False, None) - """ + """判断变量名是否存在于已连接客户端中""" foundClients = [] - for client in self.clientNames: + for clientName in self.clientNames: try: - resp = self.call(client, {"cmd": "read"}) - if resp and "variables" in resp and varName in resp["variables"]: - foundClients.append(client) + response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5) + if response and "variables" in response and varName in response["variables"]: + foundClients.append(clientName) except Exception as e: continue - if foundClients: - return True, foundClients - else: - return False, None + + return (True, foundClients) if foundClients else (False, None) def getVarValue(self, clientName, varName): - """ - 获取指定客户端上指定变量的值 - :param clientName: 客户端名称 - :param varName: 变量名 - :return: 变量值或None - """ + """获取指定客户端上指定变量的值""" if clientName not in self.clientNames: return None + try: - resp = self.call(clientName, {"cmd": "read"}) - if resp and "variables" in resp and varName in resp["variables"]: - return resp["variables"][varName] + response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5) + if response and "variables" in response and varName in response["variables"]: + return response["variables"][varName] except Exception as e: - pass + print(f"获取客户端 {clientName} 变量 {varName} 失败: {e}") + return None if __name__ == "__main__": @@ -358,4 +611,4 @@ if __name__ == "__main__": except Exception as e: print(f"运行出错: {e}") finally: - server.close() \ No newline at end of file + server.close() \ No newline at end of file