import pika import uuid import json import threading import time class RpcServer: def __init__(self, rabbitHost='localhost'): """ 初始化RPC服务端 :param rabbitHost: RabbitMQ服务器地址 """ self.clientNames = [] # 动态客户端列表 self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码 self.rabbitHost = rabbitHost self.connection = None self.channel = None self.callbackQueue = None self.responses = {} self.lock = threading.Lock() self.connected = False self.clientIpMap = {} # 客户端名到IP的映射 # 初始化连接 self.connectToRabbitMQ() self.autoCheckThread = threading.Thread(target=self._autoCheckClients, daemon=True) self.autoCheckThread.start() def connectToRabbitMQ(self): """连接到RabbitMQ服务器""" try: self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.rabbitHost, credentials=self.credentials) ) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callbackQueue = result.method.queue self.channel.basic_consume( queue=self.callbackQueue, on_message_callback=self.onResponse, auto_ack=True ) self.connected = True print(f"服务端已连接到RabbitMQ服务器: {self.rabbitHost}") except Exception as e: print(f"连接RabbitMQ失败: {e}") self.connected = False raise def onResponse(self, ch, method, props, body): response = json.loads(body) with self.lock: self.responses[props.correlation_id] = response def addClient(self, clientName): """ 动态添加客户端 :param clientName: 客户端名称 :return: 是否添加成功 """ if not self.connected: print("服务端未连接到RabbitMQ") return False # 修复:防止重复添加同名客户端 if clientName in self.clientNames: print(f"客户端 {clientName} 已存在") return False # 检查客户端是否在线 try: # 发送ping消息测试客户端是否响应 response = self.call(clientName, {"cmd": "ping"}) if response and response.get("result") == "pong": self.clientNames.append(clientName) # 记录客户端IP if 'ip' in response: self.clientIpMap[clientName] = response['ip'] # 去重,防止多线程下重复 self.clientNames = list(dict.fromkeys(self.clientNames)) # print(f"客户端 {clientName} 添加成功") return True else: print(f"客户端 {clientName} 无响应") return False except Exception as e: print(f"添加客户端 {clientName} 失败: {e}") return False def removeClient(self, clientName): """ 移除客户端 :param clientName: 客户端名称 :return: 是否移除成功 """ if clientName in self.clientNames: self.clientNames.remove(clientName) if clientName in self.clientIpMap: self.clientIpMap.pop(clientName) return True return False def getClientList(self): """ 获取当前客户端列表 :return: 客户端名称列表 """ return self.clientNames.copy() def getNextClientName(self): """ 获取下一个可用的客户端名称 如果没有连接客户端就返回client1 如果列表中已经有客户端连接就返回clientxx中最大的client数字+1 :return: 下一个可用的客户端名称 """ if not self.clientNames: return "client1" # 查找所有以"client"开头的客户端名称 client_numbers = [] for client_name in self.clientNames: if client_name.lower().startswith("client"): # 提取数字部分 try: number_str = client_name[6:] # 去掉"client"前缀 if number_str.isdigit(): client_numbers.append(int(number_str)) except (ValueError, IndexError): continue if not client_numbers: # 如果没有找到有效的client数字,返回client1 return "client1" # 返回最大数字+1 next_number = max(client_numbers) + 1 return f"client{next_number}" def getClientNames(self): """ 获取当前所有客户端名称的列表 :return: 客户端名称列表 """ return self.clientNames.copy() def getClientIpMap(self): """获取客户端名到IP的映射""" return self.clientIpMap.copy() def pingClient(self, clientName): """ 测试客户端是否在线 :param clientName: 客户端名称 :return: 是否在线 """ try: response = self.call(clientName, {"cmd": "ping"}) return response and response.get("result") == "pong" except: return False def call(self, clientName, message): """ 调用客户端方法 :param clientName: 客户端名称 :param message: 消息内容 :return: 客户端响应 """ if not self.connected or not self.channel: raise Exception("服务端未连接到RabbitMQ") corr_id = str(uuid.uuid4()) self.responses[corr_id] = None try: self.channel.basic_publish( exchange='', routing_key=f"rpc_queue_{clientName}", properties=pika.BasicProperties( reply_to=self.callbackQueue, correlation_id=corr_id, ), body=json.dumps(message) ) # 等待响应,设置超时 timeout = 10 # 10秒超时 start_time = time.time() while self.responses[corr_id] is None: if time.time() - start_time > timeout: raise Exception(f"调用客户端 {clientName} 超时") if self.connection: self.connection.process_data_events() time.sleep(0.01) return self.responses.pop(corr_id) except Exception as e: # 清理超时的响应 if corr_id in self.responses: self.responses.pop(corr_id) raise e def broadcastRead(self): """ 向所有客户端广播读取命令 :return: 所有客户端的变量数据 """ if not self.clientNames: # print("没有可用的客户端") return [] results = [] for client in self.clientNames: try: resp = self.call(client, {"cmd": "read"}) results.append(resp) except Exception as e: print(f"读取客户端 {client} 失败: {e}") results.append({"client": client, "error": str(e)}) return results def writeVar(self, varName, value): """ 写入变量到指定客户端 :param varName: 变量名 :param value: 变量值 :return: 写入结果 """ if not self.clientNames: return {"result": "fail", "reason": "no clients available"} exists, clients = self.existsVar(varName) if not exists or not clients: return {"result": "fail", "reason": "var not found"} # 支持多个客户端有同名变量,优先写第一个成功的 for client in clients: try: print(f"写入客户端 {client} 变量 {varName} 值 {value}") writeResp = self.call(client, {"cmd": "write", "varName": varName, "value": value}) if writeResp and writeResp.get("result") == "success": return writeResp except Exception as e: continue return {"result": "fail", "reason": "write failed on all clients"} def scanAndAddClients(self, clientNameList): """ 扫描并添加客户端列表 :param clientNameList: 客户端名称列表 :return: 成功添加的客户端列表 """ addedClients = [] for clientName in clientNameList: if self.addClient(clientName): addedClients.append(clientName) return addedClients def checkAllClients(self): """ 检查所有客户端是否在线 :return: 在线客户端列表 """ onlineClients = [] offlineClients = [] for client in self.clientNames: if self.pingClient(client): onlineClients.append(client) else: offlineClients.append(client) # 移除离线客户端 for client in offlineClients: self.removeClient(client) return onlineClients def close(self): """ 关闭服务端连接 """ if self.connection and not self.connection.is_closed: self.connection.close() self.connected = False print("服务端连接已关闭") def _autoCheckClients(self): """后台线程:每2秒自动检测客户端是否在线,不在线则移除""" while True: try: self.checkAllClients() except Exception as e: print(f"自动检测客户端异常: {e}") time.sleep(1) def existsVar(self, varName): """ 判断变量名是否存在于所有已连接客户端的变量中 :param varName: 变量名 :return: (True, clientName) 或 (False, None) """ foundClients = [] for client in self.clientNames: try: resp = self.call(client, {"cmd": "read"}) if resp and "variables" in resp and varName in resp["variables"]: foundClients.append(client) except Exception as e: continue if foundClients: return True, foundClients else: return False, None def getVarValue(self, clientName, varName): """ 获取指定客户端上指定变量的值 :param clientName: 客户端名称 :param varName: 变量名 :return: 变量值或None """ if clientName not in self.clientNames: return None try: resp = self.call(clientName, {"cmd": "read"}) if resp and "variables" in resp and varName in resp["variables"]: return resp["variables"][varName] except Exception as e: pass return None if __name__ == "__main__": # 创建服务端 server = RpcServer() try: # 动态添加客户端 print("1. 添加客户端") server.addClient("Client1") server.addClient("Client2") print(f"当前客户端列表: {server.getClientList()}") # 检查客户端状态 print("2. 检查客户端状态") onlineClients = server.checkAllClients() print(f"在线客户端: {onlineClients}") # 读取所有客户端变量 print("3. 读取所有客户端变量") allVars = server.broadcastRead() print(json.dumps(allVars, ensure_ascii=False, indent=2)) # 写入变量 print("4. 写入变量") result = server.writeVar("temp", 88.8) print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as e: print(f"运行出错: {e}") finally: server.close()