import pika import uuid import json import threading import time class RpcServer: def __init__(self, rabbitHost='localhost'): """ 初始化RPC服务端 :param rabbitHost: RabbitMQ服务器地址 """ self.clientNames = [] # 动态客户端列表 self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码 self.rabbitHost = rabbitHost self.connection = None self.channel = None self.callbackQueue = None self.responses = {} self.lock = threading.Lock() self.connected = False self.clientIpMap = {} # 客户端名到IP的映射 # 添加RPC调用锁,防止多线程同时使用连接 self.rpcLock = threading.RLock() # 使用可重入锁 # 初始化连接 self.connectToRabbitMQ() self.autoCheckThread = threading.Thread(target=self._autoCheckClients, daemon=True) self.autoCheckThread.start() # print(11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111) def connectToRabbitMQ(self): """连接到RabbitMQ服务器""" maxRetries = 3 for attempt in range(maxRetries): try: # 关闭旧连接 self.closeConnection() print(f"尝试连接RabbitMQ服务器 (尝试 {attempt + 1}/{maxRetries})...") # 创建新连接,使用更保守的参数 connectionParams = pika.ConnectionParameters( host=self.rabbitHost, credentials=self.credentials, heartbeat=600, # 10分钟心跳 socket_timeout=15, # 15秒socket超时 blocked_connection_timeout=30, # 30秒阻塞超时 connection_attempts=1, # 单次连接尝试 retry_delay=2 # 2秒重试延迟 ) self.connection = pika.BlockingConnection(connectionParams) self.channel = self.connection.channel() # 设置QoS self.channel.basic_qos(prefetch_count=1) # 创建回调队列 result = self.channel.queue_declare(queue='', exclusive=True) self.callbackQueue = result.method.queue # 设置消费者 self.channel.basic_consume( queue=self.callbackQueue, on_message_callback=self.onResponse, auto_ack=True ) # 测试连接是否真正可用 self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) self.connected = True print(f"服务端已成功连接到RabbitMQ服务器: {self.rabbitHost}") print(f"回调队列: {self.callbackQueue}") return except Exception as e: print(f"连接RabbitMQ尝试 {attempt + 1} 失败: {e}") self.closeConnection() # 确保清理失败的连接 if attempt == maxRetries - 1: self.connected = False raise Exception(f"连接失败,已重试 {maxRetries} 次: {e}") # 等待后重试,使用递增延迟 waitTime = 2 * (attempt + 1) print(f"等待 {waitTime} 秒后重试...") time.sleep(waitTime) def closeConnection(self): """安全关闭连接""" try: if hasattr(self, 'channel') and self.channel and not self.channel.is_closed: self.channel.close() except: pass try: if hasattr(self, 'connection') and self.connection and not self.connection.is_closed: self.connection.close() except: pass self.connection = None self.channel = None self.callbackQueue = None def onResponse(self, ch, method, props, body): """处理客户端响应""" try: if not props or not props.correlation_id: print("收到无效响应:缺少correlation_id") return response = json.loads(body) with self.lock: # 确保correlation_id存在于响应字典中 if props.correlation_id in self.responses: self.responses[props.correlation_id] = response else: print(f"收到未知correlation_id的响应: {props.correlation_id}") except json.JSONDecodeError as e: print(f"JSON解析异常: {e}") except Exception as e: print(f"处理响应异常: {e}") def addClient(self, clientName): """动态添加客户端""" if clientName in self.clientNames: print(f"客户端 {clientName} 已存在") return False try: # 测试客户端连接 response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=8) if response and response.get("result") == "pong": self.clientNames.append(clientName) # 记录客户端IP if 'ip' in response: self.clientIpMap[clientName] = response['ip'] print(f"客户端 {clientName} 添加成功") return True else: print(f"客户端 {clientName} ping响应异常") return False except Exception as e: print(f"添加客户端 {clientName} 失败: {e}") return False def removeClient(self, clientName): """ 移除客户端 :param clientName: 客户端名称 :return: 是否移除成功 """ if clientName in self.clientNames: self.clientNames.remove(clientName) if clientName in self.clientIpMap: self.clientIpMap.pop(clientName) return True return False def getClientList(self): """ 获取当前客户端列表 :return: 客户端名称列表 """ return self.clientNames.copy() def getNextClientName(self): """ 获取下一个可用的客户端名称 如果没有连接客户端就返回client1 如果列表中已经有客户端连接就返回clientxx中最大的client数字+1 :return: 下一个可用的客户端名称 """ if not self.clientNames: return "client1" # 查找所有以"client"开头的客户端名称 client_numbers = [] for client_name in self.clientNames: if client_name.lower().startswith("client"): # 提取数字部分 try: number_str = client_name[6:] # 去掉"client"前缀 if number_str.isdigit(): client_numbers.append(int(number_str)) except (ValueError, IndexError): continue if not client_numbers: # 如果没有找到有效的client数字,返回client1 return "client1" # 返回最大数字+1 next_number = max(client_numbers) + 1 return f"client{next_number}" def getClientNames(self): """ 获取当前所有客户端名称的列表 :return: 客户端名称列表 """ return self.clientNames.copy() def getClientIpMap(self): """获取客户端名到IP的映射""" return self.clientIpMap.copy() def pingClient(self, clientName): """测试客户端是否在线""" try: # 使用较短的超时时间,减少阻塞 response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=3, maxRetries=1) return response and response.get("result") == "pong" except Exception as e: # 只在调试时输出详细错误 # print(f"ping客户端 {clientName} 失败: {e}") return False def call(self, clientName, message, timeoutSeconds=10, maxRetries=2): """调用客户端方法,带断联重连检测和重试机制""" # 使用RPC锁确保线程安全 with self.rpcLock: return self._callWithLock(clientName, message, timeoutSeconds, maxRetries) def _callWithLock(self, clientName, message, timeoutSeconds, maxRetries): """在锁保护下的实际调用方法""" lastException = None for attempt in range(maxRetries + 1): try: return self.callOnce(clientName, message, timeoutSeconds) except Exception as e: lastException = e errorMsg = str(e) print(f"调用客户端 {clientName} 失败 (尝试 {attempt + 1}/{maxRetries + 1}): {errorMsg}") # 如果是最后一次尝试,直接抛出异常 if attempt == maxRetries: self.removeClient(clientName) break # 判断是否需要重连 needReconnect = any(keyword in errorMsg.lower() for keyword in [ "连接异常", "连接不可用", "连接在等待响应时断开", "streamlosterror", "connectionclosed", "indexerror" ]) if needReconnect: try: print(f"检测到连接问题,尝试重新连接...") self.reconnect() print(f"重连成功,等待1秒后重试...") time.sleep(1) except Exception as reconnectError: print(f"重连失败: {reconnectError}") if attempt == maxRetries - 1: break time.sleep(2) # 重连失败等待更长时间 else: # 非连接问题,短暂等待后重试 time.sleep(0.5) raise lastException def callOnce(self, clientName, message, timeoutSeconds): """单次调用客户端""" # 检查连接状态,如果异常则重连 if not self.isConnected(): print(f"连接异常,尝试重连...") self.reconnect() correlationId = str(uuid.uuid4()) self.responses[correlationId] = None try: # 发送消息前再次检查连接 if not self.isConnected(): raise Exception("连接不可用") # 发送消息 self.channel.basic_publish( exchange='', routing_key=f"rpc_queue_{clientName}", properties=pika.BasicProperties( reply_to=self.callbackQueue, correlation_id=correlationId ), body=json.dumps(message) ) # 等待响应 return self.waitForResponse(correlationId, clientName, timeoutSeconds) except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker) as e: # 连接异常,清理响应并重新抛出 with self.lock: self.responses.pop(correlationId, None) raise Exception(f"连接异常: {e}") except Exception as e: # 清理响应 with self.lock: self.responses.pop(correlationId, None) raise e def waitForResponse(self, correlationId, clientName, timeoutSeconds): """等待客户端响应""" startTime = time.time() while self.responses[correlationId] is None: if time.time() - startTime > timeoutSeconds: raise Exception(f"调用客户端 {clientName} 超时") try: # 检查连接状态 if not self.isConnected(): raise Exception("连接在等待响应时断开") # 使用更短的时间限制,减少阻塞 self.connection.process_data_events(time_limit=0.05) except (pika.exceptions.StreamLostError, pika.exceptions.ConnectionClosedByBroker, pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker, IndexError) as e: raise Exception(f"连接异常: {e}") except Exception as e: raise Exception(f"处理数据事件异常: {e}") time.sleep(0.01) # 减少CPU占用 # 安全获取响应 response = self.responses.get(correlationId) if response is not None: self.responses.pop(correlationId, None) return response else: raise Exception(f"响应数据异常: {correlationId}") def isConnected(self): """检查连接状态""" try: # 检查基本连接状态 if not (self.connection and not self.connection.is_closed and self.channel and not self.channel.is_closed and self.connected and self.callbackQueue): return False # 进一步检查通道是否真正可用 try: # 尝试声明一个临时队列来测试通道 self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) return True except Exception as e: print(f"通道测试失败: {e}") return False except Exception as e: print(f"连接状态检查异常: {e}") return False def reconnect(self): """重新连接""" print("检测到连接异常,正在重新连接...") self.connected = False # 清理响应队列,避免旧的响应干扰 with self.lock: self.responses.clear() try: # 重新连接 self.connectToRabbitMQ() # 验证连接恢复 if self.verifyConnection(): print("连接恢复成功,服务端功能正常") else: raise Exception("连接恢复验证失败") except Exception as e: print(f"重连失败: {e}") raise e def verifyConnection(self): """验证连接是否完全恢复""" try: if not self.isConnected(): return False # 测试消息发送能力 testQueue = f"test_queue_{int(time.time())}" self.channel.queue_declare(queue=testQueue, auto_delete=True) # 发送测试消息 self.channel.basic_publish( exchange='', routing_key=testQueue, body=json.dumps({"test": "connection_verify"}) ) # 删除测试队列 self.channel.queue_delete(queue=testQueue) print("连接验证通过") return True except Exception as e: print(f"连接验证失败: {e}") return False def broadcastRead(self): """向所有客户端广播读取命令""" if not self.clientNames: return [] results = [] for clientName in self.clientNames: try: response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=8) results.append(response) except Exception as e: print(f"读取客户端 {clientName} 失败: {e}") results.append({"client": clientName, "error": str(e)}) return results def writeVar(self, varName, value): """写入变量到指定客户端""" if not self.clientNames: return {"result": "fail", "reason": "no clients available"} varExists, clientsWithVar = self.existsVar(varName) if not varExists or not clientsWithVar: return {"result": "fail", "reason": "var not found"} # 尝试写入到有该变量的客户端 for clientName in clientsWithVar: try: print(f"写入客户端 {clientName} 变量 {varName} 值 {value}") writeResponse = self.call( clientName, {"cmd": "write", "varName": varName, "value": value}, timeoutSeconds=8 ) if writeResponse and writeResponse.get("result") == "success": return writeResponse except Exception as e: print(f"写入客户端 {clientName} 失败: {e}") continue return {"result": "fail", "reason": "write failed on all clients"} def scanAndAddClients(self, clientNameList): """ 扫描并添加客户端列表 :param clientNameList: 客户端名称列表 :return: 成功添加的客户端列表 """ addedClients = [] for clientName in clientNameList: if self.addClient(clientName): addedClients.append(clientName) return addedClients def checkAllClients(self): """ 检查所有客户端是否在线 :return: 在线客户端列表 """ onlineClients = [] offlineClients = [] for client in self.clientNames: if self.pingClient(client): onlineClients.append(client) else: offlineClients.append(client) # 移除离线客户端 for client in offlineClients: self.removeClient(client) return onlineClients def healthCheck(self): """连接健康检查""" try: if not self.isConnected(): return False # 尝试声明一个临时队列来测试连接 self.channel.queue_declare(queue='', exclusive=True, auto_delete=True) return True except Exception as e: print(f"健康检查失败: {e}") return False def close(self): """关闭服务端连接""" self.connected = False self.closeConnection() print("服务端连接已关闭") def _autoCheckClients(self): """后台线程:每30秒自动检测客户端是否在线,不在线则移除""" while True: try: # 减少检查频率,避免与读取线程冲突 time.sleep(5) # 每30秒检查一次 # 复制当前客户端列表 currentClients = self.clientNames.copy() if not currentClients: continue # print(f"开始检查 {len(currentClients)} 个客户端状态...") offlineClients = [] for clientName in currentClients: try: # 使用更短的超时时间,避免阻塞 if not self.pingClient(clientName): offlineClients.append(clientName) print(f"客户端 {clientName} 离线") except Exception as e: print(f"检查客户端 {clientName} 状态时出错: {e}") offlineClients.append(clientName) # 移除离线客户端 for clientName in offlineClients: self.removeClient(clientName) print(f"已移除离线客户端: {clientName}") if offlineClients: print(f"本次检查移除了 {len(offlineClients)} 个离线客户端") else: print("所有客户端状态正常") except Exception as e: print(f"自动检测客户端异常: {e}") time.sleep(10) # 异常时等待10秒 def existsVar(self, varName): """判断变量名是否存在于已连接客户端中""" foundClients = [] for clientName in self.clientNames: try: response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5) if response and "variables" in response and varName in response["variables"]: foundClients.append(clientName) except Exception as e: continue return (True, foundClients) if foundClients else (False, None) def getVarValue(self, clientName, varName): """获取指定客户端上指定变量的值""" if clientName not in self.clientNames: return None try: response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5) if response and "variables" in response and varName in response["variables"]: return response["variables"][varName] except Exception as e: print(f"获取客户端 {clientName} 变量 {varName} 失败: {e}") return None if __name__ == "__main__": # 创建服务端 server = RpcServer() try: # 动态添加客户端 print("1. 添加客户端") server.addClient("Client1") server.addClient("Client2") print(f"当前客户端列表: {server.getClientList()}") # 检查客户端状态 print("2. 检查客户端状态") onlineClients = server.checkAllClients() print(f"在线客户端: {onlineClients}") # 读取所有客户端变量 print("3. 读取所有客户端变量") allVars = server.broadcastRead() print(json.dumps(allVars, ensure_ascii=False, indent=2)) # 写入变量 print("4. 写入变量") result = server.writeVar("temp", 88.8) print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as e: print(f"运行出错: {e}") finally: server.close()