You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
23 KiB
Python

import pika
import uuid
import json
import threading
import time
class RpcServer:
def __init__(self, rabbitHost='localhost'):
"""
初始化RPC服务端
:param rabbitHost: RabbitMQ服务器地址
"""
self.clientNames = [] # 动态客户端列表
self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码
self.rabbitHost = rabbitHost
self.connection = None
self.channel = None
self.callbackQueue = None
self.responses = {}
self.lock = threading.Lock()
4 months ago
self.connected = False
self.clientIpMap = {} # 客户端名到IP的映射
# 添加RPC调用锁防止多线程同时使用连接
self.rpcLock = threading.RLock() # 使用可重入锁
# 初始化连接
self.connectToRabbitMQ()
4 months ago
self.autoCheckThread = threading.Thread(target=self._autoCheckClients, daemon=True)
self.autoCheckThread.start()
# print(11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111)
def connectToRabbitMQ(self):
"""连接到RabbitMQ服务器"""
maxRetries = 3
for attempt in range(maxRetries):
try:
# 关闭旧连接
self.closeConnection()
print(f"尝试连接RabbitMQ服务器 (尝试 {attempt + 1}/{maxRetries})...")
# 创建新连接,使用更保守的参数
connectionParams = pika.ConnectionParameters(
host=self.rabbitHost,
credentials=self.credentials,
heartbeat=600, # 10分钟心跳
socket_timeout=15, # 15秒socket超时
blocked_connection_timeout=30, # 30秒阻塞超时
connection_attempts=1, # 单次连接尝试
retry_delay=2 # 2秒重试延迟
)
self.connection = pika.BlockingConnection(connectionParams)
self.channel = self.connection.channel()
# 设置QoS
self.channel.basic_qos(prefetch_count=1)
# 创建回调队列
result = self.channel.queue_declare(queue='', exclusive=True)
self.callbackQueue = result.method.queue
# 设置消费者
self.channel.basic_consume(
queue=self.callbackQueue,
on_message_callback=self.onResponse,
auto_ack=True
)
# 测试连接是否真正可用
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
self.connected = True
print(f"服务端已成功连接到RabbitMQ服务器: {self.rabbitHost}")
print(f"回调队列: {self.callbackQueue}")
return
except Exception as e:
print(f"连接RabbitMQ尝试 {attempt + 1} 失败: {e}")
self.closeConnection() # 确保清理失败的连接
if attempt == maxRetries - 1:
self.connected = False
raise Exception(f"连接失败,已重试 {maxRetries} 次: {e}")
# 等待后重试,使用递增延迟
waitTime = 2 * (attempt + 1)
print(f"等待 {waitTime} 秒后重试...")
time.sleep(waitTime)
def closeConnection(self):
"""安全关闭连接"""
try:
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
self.channel.close()
except:
pass
try:
if hasattr(self, 'connection') and self.connection and not self.connection.is_closed:
self.connection.close()
except:
pass
self.connection = None
self.channel = None
self.callbackQueue = None
def onResponse(self, ch, method, props, body):
"""处理客户端响应"""
try:
if not props or not props.correlation_id:
print("收到无效响应缺少correlation_id")
return
response = json.loads(body)
with self.lock:
# 确保correlation_id存在于响应字典中
if props.correlation_id in self.responses:
self.responses[props.correlation_id] = response
else:
print(f"收到未知correlation_id的响应: {props.correlation_id}")
except json.JSONDecodeError as e:
print(f"JSON解析异常: {e}")
except Exception as e:
print(f"处理响应异常: {e}")
def addClient(self, clientName):
"""动态添加客户端"""
if clientName in self.clientNames:
print(f"客户端 {clientName} 已存在")
return False
try:
# 测试客户端连接
response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=8)
if response and response.get("result") == "pong":
self.clientNames.append(clientName)
4 months ago
# 记录客户端IP
if 'ip' in response:
self.clientIpMap[clientName] = response['ip']
print(f"客户端 {clientName} 添加成功")
return True
else:
print(f"客户端 {clientName} ping响应异常")
return False
except Exception as e:
print(f"添加客户端 {clientName} 失败: {e}")
return False
def removeClient(self, clientName):
"""
移除客户端
:param clientName: 客户端名称
:return: 是否移除成功
"""
if clientName in self.clientNames:
self.clientNames.remove(clientName)
4 months ago
if clientName in self.clientIpMap:
self.clientIpMap.pop(clientName)
return True
return False
def getClientList(self):
"""
获取当前客户端列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
def getNextClientName(self):
"""
获取下一个可用的客户端名称
如果没有连接客户端就返回client1
如果列表中已经有客户端连接就返回clientxx中最大的client数字+1
:return: 下一个可用的客户端名称
"""
if not self.clientNames:
return "client1"
# 查找所有以"client"开头的客户端名称
client_numbers = []
for client_name in self.clientNames:
if client_name.lower().startswith("client"):
# 提取数字部分
try:
number_str = client_name[6:] # 去掉"client"前缀
if number_str.isdigit():
client_numbers.append(int(number_str))
except (ValueError, IndexError):
continue
if not client_numbers:
# 如果没有找到有效的client数字返回client1
return "client1"
# 返回最大数字+1
next_number = max(client_numbers) + 1
return f"client{next_number}"
def getClientNames(self):
"""
获取当前所有客户端名称的列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
4 months ago
def getClientIpMap(self):
"""获取客户端名到IP的映射"""
return self.clientIpMap.copy()
def pingClient(self, clientName):
"""测试客户端是否在线"""
try:
# 使用较短的超时时间,减少阻塞
response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=3, maxRetries=1)
return response and response.get("result") == "pong"
except Exception as e:
# 只在调试时输出详细错误
# print(f"ping客户端 {clientName} 失败: {e}")
return False
def call(self, clientName, message, timeoutSeconds=10, maxRetries=2):
"""调用客户端方法,带断联重连检测和重试机制"""
# 使用RPC锁确保线程安全
with self.rpcLock:
return self._callWithLock(clientName, message, timeoutSeconds, maxRetries)
def _callWithLock(self, clientName, message, timeoutSeconds, maxRetries):
"""在锁保护下的实际调用方法"""
lastException = None
for attempt in range(maxRetries + 1):
try:
return self.callOnce(clientName, message, timeoutSeconds)
except Exception as e:
lastException = e
errorMsg = str(e)
print(f"调用客户端 {clientName} 失败 (尝试 {attempt + 1}/{maxRetries + 1}): {errorMsg}")
# 如果是最后一次尝试,直接抛出异常
if attempt == maxRetries:
self.removeClient(clientName)
break
# 判断是否需要重连
needReconnect = any(keyword in errorMsg.lower() for keyword in [
"连接异常", "连接不可用", "连接在等待响应时断开",
"streamlosterror", "connectionclosed", "indexerror"
])
if needReconnect:
try:
print(f"检测到连接问题,尝试重新连接...")
self.reconnect()
print(f"重连成功等待1秒后重试...")
time.sleep(1)
except Exception as reconnectError:
print(f"重连失败: {reconnectError}")
if attempt == maxRetries - 1:
break
time.sleep(2) # 重连失败等待更长时间
else:
# 非连接问题,短暂等待后重试
time.sleep(0.5)
raise lastException
def callOnce(self, clientName, message, timeoutSeconds):
"""单次调用客户端"""
# 检查连接状态,如果异常则重连
if not self.isConnected():
print(f"连接异常,尝试重连...")
self.reconnect()
correlationId = str(uuid.uuid4())
self.responses[correlationId] = None
try:
# 发送消息前再次检查连接
if not self.isConnected():
raise Exception("连接不可用")
# 发送消息
self.channel.basic_publish(
exchange='',
routing_key=f"rpc_queue_{clientName}",
properties=pika.BasicProperties(
reply_to=self.callbackQueue,
correlation_id=correlationId
),
body=json.dumps(message)
)
# 等待响应
return self.waitForResponse(correlationId, clientName, timeoutSeconds)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosedByBroker) as e:
# 连接异常,清理响应并重新抛出
with self.lock:
self.responses.pop(correlationId, None)
raise Exception(f"连接异常: {e}")
except Exception as e:
# 清理响应
with self.lock:
self.responses.pop(correlationId, None)
raise e
def waitForResponse(self, correlationId, clientName, timeoutSeconds):
"""等待客户端响应"""
startTime = time.time()
while self.responses[correlationId] is None:
if time.time() - startTime > timeoutSeconds:
raise Exception(f"调用客户端 {clientName} 超时")
try:
# 检查连接状态
if not self.isConnected():
raise Exception("连接在等待响应时断开")
# 使用更短的时间限制,减少阻塞
self.connection.process_data_events(time_limit=0.05)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosedByBroker,
IndexError) as e:
raise Exception(f"连接异常: {e}")
except Exception as e:
raise Exception(f"处理数据事件异常: {e}")
time.sleep(0.01) # 减少CPU占用
# 安全获取响应
response = self.responses.get(correlationId)
if response is not None:
self.responses.pop(correlationId, None)
return response
else:
raise Exception(f"响应数据异常: {correlationId}")
def isConnected(self):
"""检查连接状态"""
try:
# 检查基本连接状态
if not (self.connection and not self.connection.is_closed and
self.channel and not self.channel.is_closed and
self.connected and self.callbackQueue):
return False
# 进一步检查通道是否真正可用
try:
# 尝试声明一个临时队列来测试通道
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
return True
except Exception as e:
print(f"通道测试失败: {e}")
return False
except Exception as e:
print(f"连接状态检查异常: {e}")
return False
def reconnect(self):
"""重新连接"""
print("检测到连接异常,正在重新连接...")
self.connected = False
# 清理响应队列,避免旧的响应干扰
with self.lock:
self.responses.clear()
try:
# 重新连接
self.connectToRabbitMQ()
# 验证连接恢复
if self.verifyConnection():
print("连接恢复成功,服务端功能正常")
else:
raise Exception("连接恢复验证失败")
except Exception as e:
print(f"重连失败: {e}")
raise e
def verifyConnection(self):
"""验证连接是否完全恢复"""
try:
if not self.isConnected():
return False
# 测试消息发送能力
testQueue = f"test_queue_{int(time.time())}"
self.channel.queue_declare(queue=testQueue, auto_delete=True)
# 发送测试消息
self.channel.basic_publish(
exchange='',
routing_key=testQueue,
body=json.dumps({"test": "connection_verify"})
)
# 删除测试队列
self.channel.queue_delete(queue=testQueue)
print("连接验证通过")
return True
except Exception as e:
print(f"连接验证失败: {e}")
return False
def broadcastRead(self):
"""向所有客户端广播读取命令"""
if not self.clientNames:
return []
results = []
for clientName in self.clientNames:
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=8)
results.append(response)
except Exception as e:
print(f"读取客户端 {clientName} 失败: {e}")
results.append({"client": clientName, "error": str(e)})
return results
def writeVar(self, varName, value):
"""写入变量到指定客户端"""
if not self.clientNames:
return {"result": "fail", "reason": "no clients available"}
4 months ago
varExists, clientsWithVar = self.existsVar(varName)
if not varExists or not clientsWithVar:
4 months ago
return {"result": "fail", "reason": "var not found"}
# 尝试写入到有该变量的客户端
for clientName in clientsWithVar:
try:
print(f"写入客户端 {clientName} 变量 {varName}{value}")
writeResponse = self.call(
clientName,
{"cmd": "write", "varName": varName, "value": value},
timeoutSeconds=8
)
if writeResponse and writeResponse.get("result") == "success":
return writeResponse
except Exception as e:
print(f"写入客户端 {clientName} 失败: {e}")
continue
4 months ago
return {"result": "fail", "reason": "write failed on all clients"}
def scanAndAddClients(self, clientNameList):
"""
扫描并添加客户端列表
:param clientNameList: 客户端名称列表
:return: 成功添加的客户端列表
"""
addedClients = []
for clientName in clientNameList:
if self.addClient(clientName):
addedClients.append(clientName)
return addedClients
def checkAllClients(self):
"""
检查所有客户端是否在线
:return: 在线客户端列表
"""
onlineClients = []
offlineClients = []
for client in self.clientNames:
if self.pingClient(client):
onlineClients.append(client)
else:
offlineClients.append(client)
# 移除离线客户端
for client in offlineClients:
self.removeClient(client)
return onlineClients
def healthCheck(self):
"""连接健康检查"""
try:
if not self.isConnected():
return False
# 尝试声明一个临时队列来测试连接
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
return True
except Exception as e:
print(f"健康检查失败: {e}")
return False
def close(self):
"""关闭服务端连接"""
self.connected = False
self.closeConnection()
print("服务端连接已关闭")
4 months ago
def _autoCheckClients(self):
"""后台线程每30秒自动检测客户端是否在线不在线则移除"""
4 months ago
while True:
try:
# 减少检查频率,避免与读取线程冲突
time.sleep(5) # 每30秒检查一次
# 复制当前客户端列表
currentClients = self.clientNames.copy()
if not currentClients:
continue
# print(f"开始检查 {len(currentClients)} 个客户端状态...")
offlineClients = []
for clientName in currentClients:
try:
# 使用更短的超时时间,避免阻塞
if not self.pingClient(clientName):
offlineClients.append(clientName)
print(f"客户端 {clientName} 离线")
except Exception as e:
print(f"检查客户端 {clientName} 状态时出错: {e}")
offlineClients.append(clientName)
# 移除离线客户端
for clientName in offlineClients:
self.removeClient(clientName)
print(f"已移除离线客户端: {clientName}")
if offlineClients:
print(f"本次检查移除了 {len(offlineClients)} 个离线客户端")
else:
# print("所有客户端状态正常")
pass
4 months ago
except Exception as e:
print(f"自动检测客户端异常: {e}")
time.sleep(10) # 异常时等待10秒
4 months ago
def existsVar(self, varName):
"""判断变量名是否存在于已连接客户端中"""
4 months ago
foundClients = []
for clientName in self.clientNames:
4 months ago
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5)
if response and "variables" in response and varName in response["variables"]:
foundClients.append(clientName)
4 months ago
except Exception as e:
continue
return (True, foundClients) if foundClients else (False, None)
4 months ago
def getVarValue(self, clientName, varName):
"""获取指定客户端上指定变量的值"""
4 months ago
if clientName not in self.clientNames:
return None
4 months ago
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5)
if response and "variables" in response and varName in response["variables"]:
return response["variables"][varName]
4 months ago
except Exception as e:
print(f"获取客户端 {clientName} 变量 {varName} 失败: {e}")
4 months ago
return None
if __name__ == "__main__":
# 创建服务端
server = RpcServer()
try:
# 动态添加客户端
print("1. 添加客户端")
server.addClient("Client1")
server.addClient("Client2")
print(f"当前客户端列表: {server.getClientList()}")
# 检查客户端状态
print("2. 检查客户端状态")
onlineClients = server.checkAllClients()
print(f"在线客户端: {onlineClients}")
# 读取所有客户端变量
print("3. 读取所有客户端变量")
allVars = server.broadcastRead()
print(json.dumps(allVars, ensure_ascii=False, indent=2))
# 写入变量
print("4. 写入变量")
result = server.writeVar("temp", 88.8)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(f"运行出错: {e}")
finally:
server.close()