You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
9.9 KiB
Python

import pika
import uuid
import json
import threading
import time
class RpcServer:
def __init__(self, rabbitHost='localhost'):
"""
初始化RPC服务端
:param rabbitHost: RabbitMQ服务器地址
"""
self.clientNames = [] # 动态客户端列表
self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码
self.rabbitHost = rabbitHost
self.connection = None
self.channel = None
self.callbackQueue = None
self.responses = {}
self.lock = threading.Lock()
self.connected = False
# 初始化连接
self.connectToRabbitMQ()
def connectToRabbitMQ(self):
"""连接到RabbitMQ服务器"""
try:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.rabbitHost, credentials=self.credentials)
)
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callbackQueue = result.method.queue
self.channel.basic_consume(
queue=self.callbackQueue,
on_message_callback=self.onResponse,
auto_ack=True
)
self.connected = True
print(f"服务端已连接到RabbitMQ服务器: {self.rabbitHost}")
except Exception as e:
print(f"连接RabbitMQ失败: {e}")
self.connected = False
raise
def onResponse(self, ch, method, props, body):
response = json.loads(body)
with self.lock:
self.responses[props.correlation_id] = response
def addClient(self, clientName):
"""
动态添加客户端
:param clientName: 客户端名称
:return: 是否添加成功
"""
if not self.connected:
print("服务端未连接到RabbitMQ")
return False
if clientName in self.clientNames:
print(f"客户端 {clientName} 已存在")
return False
# 检查客户端是否在线
try:
# 发送ping消息测试客户端是否响应
response = self.call(clientName, {"cmd": "ping"})
if response and response.get("result") == "pong":
self.clientNames.append(clientName)
print(f"客户端 {clientName} 添加成功")
return True
else:
print(f"客户端 {clientName} 无响应")
return False
except Exception as e:
print(f"添加客户端 {clientName} 失败: {e}")
return False
def removeClient(self, clientName):
"""
移除客户端
:param clientName: 客户端名称
:return: 是否移除成功
"""
if clientName in self.clientNames:
self.clientNames.remove(clientName)
print(f"客户端 {clientName} 已移除")
return True
return False
def getClientList(self):
"""
获取当前客户端列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
def getNextClientName(self):
"""
获取下一个可用的客户端名称
如果没有连接客户端就返回client1
如果列表中已经有客户端连接就返回clientxx中最大的client数字+1
:return: 下一个可用的客户端名称
"""
if not self.clientNames:
return "client1"
# 查找所有以"client"开头的客户端名称
client_numbers = []
for client_name in self.clientNames:
if client_name.lower().startswith("client"):
# 提取数字部分
try:
number_str = client_name[6:] # 去掉"client"前缀
if number_str.isdigit():
client_numbers.append(int(number_str))
except (ValueError, IndexError):
continue
if not client_numbers:
# 如果没有找到有效的client数字返回client1
return "client1"
# 返回最大数字+1
next_number = max(client_numbers) + 1
return f"client{next_number}"
def getClientNames(self):
"""
获取当前所有客户端名称的列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
def pingClient(self, clientName):
"""
测试客户端是否在线
:param clientName: 客户端名称
:return: 是否在线
"""
try:
response = self.call(clientName, {"cmd": "ping"})
return response and response.get("result") == "pong"
except:
return False
def call(self, clientName, message):
"""
调用客户端方法
:param clientName: 客户端名称
:param message: 消息内容
:return: 客户端响应
"""
if not self.connected or not self.channel:
raise Exception("服务端未连接到RabbitMQ")
corr_id = str(uuid.uuid4())
self.responses[corr_id] = None
try:
self.channel.basic_publish(
exchange='',
routing_key=f"rpc_queue_{clientName}",
properties=pika.BasicProperties(
reply_to=self.callbackQueue,
correlation_id=corr_id,
),
body=json.dumps(message)
)
# 等待响应,设置超时
timeout = 10 # 10秒超时
start_time = time.time()
while self.responses[corr_id] is None:
if time.time() - start_time > timeout:
raise Exception(f"调用客户端 {clientName} 超时")
if self.connection:
self.connection.process_data_events()
time.sleep(0.01)
return self.responses.pop(corr_id)
except Exception as e:
# 清理超时的响应
if corr_id in self.responses:
self.responses.pop(corr_id)
raise e
def broadcastRead(self):
"""
向所有客户端广播读取命令
:return: 所有客户端的变量数据
"""
if not self.clientNames:
print("没有可用的客户端")
return []
results = []
for client in self.clientNames:
try:
resp = self.call(client, {"cmd": "read"})
results.append(resp)
except Exception as e:
print(f"读取客户端 {client} 失败: {e}")
results.append({"client": client, "error": str(e)})
return results
def writeVar(self, varName, value):
"""
写入变量到指定客户端
:param varName: 变量名
:param value: 变量值
:return: 写入结果
"""
if not self.clientNames:
return {"result": "fail", "reason": "no clients available"}
# 先查找变量在哪个客户端
for client in self.clientNames:
try:
resp = self.call(client, {"cmd": "read"})
if resp and "variables" in resp and varName in resp["variables"]:
writeResp = self.call(client, {"cmd": "write", "varName": varName, "value": value})
return writeResp
except Exception as e:
print(f"检查客户端 {client} 失败: {e}")
continue
return {"result": "fail", "reason": "var not found"}
def scanAndAddClients(self, clientNameList):
"""
扫描并添加客户端列表
:param clientNameList: 客户端名称列表
:return: 成功添加的客户端列表
"""
addedClients = []
for clientName in clientNameList:
if self.addClient(clientName):
addedClients.append(clientName)
return addedClients
def checkAllClients(self):
"""
检查所有客户端是否在线
:return: 在线客户端列表
"""
onlineClients = []
offlineClients = []
for client in self.clientNames:
if self.pingClient(client):
onlineClients.append(client)
else:
offlineClients.append(client)
# 移除离线客户端
for client in offlineClients:
self.removeClient(client)
return onlineClients
def close(self):
"""
关闭服务端连接
"""
if self.connection and not self.connection.is_closed:
self.connection.close()
self.connected = False
print("服务端连接已关闭")
if __name__ == "__main__":
# 创建服务端
server = RpcServer()
try:
# 动态添加客户端
print("1. 添加客户端")
server.addClient("Client1")
server.addClient("Client2")
print(f"当前客户端列表: {server.getClientList()}")
# 检查客户端状态
print("2. 检查客户端状态")
onlineClients = server.checkAllClients()
print(f"在线客户端: {onlineClients}")
# 读取所有客户端变量
print("3. 读取所有客户端变量")
allVars = server.broadcastRead()
print(json.dumps(allVars, ensure_ascii=False, indent=2))
# 写入变量
print("4. 写入变量")
result = server.writeVar("temp", 88.8)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(f"运行出错: {e}")
finally:
server.close()