You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
23 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pika
import uuid
import json
import threading
import time
class RpcServer:
def __init__(self, rabbitHost='localhost'):
"""
初始化RPC服务端
:param rabbitHost: RabbitMQ服务器地址
"""
self.clientNames = [] # 动态客户端列表
self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码
self.rabbitHost = rabbitHost
self.connection = None
self.channel = None
self.callbackQueue = None
self.responses = {}
self.lock = threading.Lock()
self.connected = False
self.clientIpMap = {} # 客户端名到IP的映射
# 添加RPC调用锁防止多线程同时使用连接
self.rpcLock = threading.RLock() # 使用可重入锁
# 初始化连接
self.connectToRabbitMQ()
self.autoCheckThread = threading.Thread(target=self._autoCheckClients, daemon=True)
self.autoCheckThread.start()
# print(11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111)
def connectToRabbitMQ(self):
"""连接到RabbitMQ服务器"""
maxRetries = 3
for attempt in range(maxRetries):
try:
# 关闭旧连接
self.closeConnection()
print(f"尝试连接RabbitMQ服务器 (尝试 {attempt + 1}/{maxRetries})...")
# 创建新连接,使用更保守的参数
connectionParams = pika.ConnectionParameters(
host=self.rabbitHost,
credentials=self.credentials,
heartbeat=600, # 10分钟心跳
socket_timeout=15, # 15秒socket超时
blocked_connection_timeout=30, # 30秒阻塞超时
connection_attempts=1, # 单次连接尝试
retry_delay=2 # 2秒重试延迟
)
self.connection = pika.BlockingConnection(connectionParams)
self.channel = self.connection.channel()
# 设置QoS
self.channel.basic_qos(prefetch_count=1)
# 创建回调队列
result = self.channel.queue_declare(queue='', exclusive=True)
self.callbackQueue = result.method.queue
# 设置消费者
self.channel.basic_consume(
queue=self.callbackQueue,
on_message_callback=self.onResponse,
auto_ack=True
)
# 测试连接是否真正可用
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
self.connected = True
print(f"服务端已成功连接到RabbitMQ服务器: {self.rabbitHost}")
print(f"回调队列: {self.callbackQueue}")
return
except Exception as e:
print(f"连接RabbitMQ尝试 {attempt + 1} 失败: {e}")
self.closeConnection() # 确保清理失败的连接
if attempt == maxRetries - 1:
self.connected = False
raise Exception(f"连接失败,已重试 {maxRetries} 次: {e}")
# 等待后重试,使用递增延迟
waitTime = 2 * (attempt + 1)
print(f"等待 {waitTime} 秒后重试...")
time.sleep(waitTime)
def closeConnection(self):
"""安全关闭连接"""
try:
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
self.channel.close()
except:
pass
try:
if hasattr(self, 'connection') and self.connection and not self.connection.is_closed:
self.connection.close()
except:
pass
self.connection = None
self.channel = None
self.callbackQueue = None
def onResponse(self, ch, method, props, body):
"""处理客户端响应"""
try:
if not props or not props.correlation_id:
print("收到无效响应缺少correlation_id")
return
response = json.loads(body)
with self.lock:
# 确保correlation_id存在于响应字典中
if props.correlation_id in self.responses:
self.responses[props.correlation_id] = response
else:
print(f"收到未知correlation_id的响应: {props.correlation_id}")
except json.JSONDecodeError as e:
print(f"JSON解析异常: {e}")
except Exception as e:
print(f"处理响应异常: {e}")
def addClient(self, clientName):
"""动态添加客户端"""
if clientName in self.clientNames:
print(f"客户端 {clientName} 已存在")
return False
try:
# 测试客户端连接
response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=8)
if response and response.get("result") == "pong":
self.clientNames.append(clientName)
# 记录客户端IP
if 'ip' in response:
self.clientIpMap[clientName] = response['ip']
print(f"客户端 {clientName} 添加成功")
return True
else:
print(f"客户端 {clientName} ping响应异常")
return False
except Exception as e:
print(f"添加客户端 {clientName} 失败: {e}")
return False
def removeClient(self, clientName):
"""
移除客户端
:param clientName: 客户端名称
:return: 是否移除成功
"""
if clientName in self.clientNames:
self.clientNames.remove(clientName)
if clientName in self.clientIpMap:
self.clientIpMap.pop(clientName)
return True
return False
def getClientList(self):
"""
获取当前客户端列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
def getNextClientName(self):
"""
获取下一个可用的客户端名称
如果没有连接客户端就返回client1
如果列表中已经有客户端连接就返回clientxx中最大的client数字+1
:return: 下一个可用的客户端名称
"""
if not self.clientNames:
return "client1"
# 查找所有以"client"开头的客户端名称
client_numbers = []
for client_name in self.clientNames:
if client_name.lower().startswith("client"):
# 提取数字部分
try:
number_str = client_name[6:] # 去掉"client"前缀
if number_str.isdigit():
client_numbers.append(int(number_str))
except (ValueError, IndexError):
continue
if not client_numbers:
# 如果没有找到有效的client数字返回client1
return "client1"
# 返回最大数字+1
next_number = max(client_numbers) + 1
return f"client{next_number}"
def getClientNames(self):
"""
获取当前所有客户端名称的列表
:return: 客户端名称列表
"""
return self.clientNames.copy()
def getClientIpMap(self):
"""获取客户端名到IP的映射"""
return self.clientIpMap.copy()
def pingClient(self, clientName):
"""测试客户端是否在线"""
try:
# 使用较短的超时时间,减少阻塞
response = self.call(clientName, {"cmd": "ping"}, timeoutSeconds=3, maxRetries=1)
return response and response.get("result") == "pong"
except Exception as e:
# 只在调试时输出详细错误
# print(f"ping客户端 {clientName} 失败: {e}")
return False
def call(self, clientName, message, timeoutSeconds=10, maxRetries=2):
"""调用客户端方法,带断联重连检测和重试机制"""
# 使用RPC锁确保线程安全
with self.rpcLock:
return self._callWithLock(clientName, message, timeoutSeconds, maxRetries)
def _callWithLock(self, clientName, message, timeoutSeconds, maxRetries):
"""在锁保护下的实际调用方法"""
lastException = None
for attempt in range(maxRetries + 1):
try:
return self.callOnce(clientName, message, timeoutSeconds)
except Exception as e:
lastException = e
errorMsg = str(e)
print(f"调用客户端 {clientName} 失败 (尝试 {attempt + 1}/{maxRetries + 1}): {errorMsg}")
# 如果是最后一次尝试,直接抛出异常
if attempt == maxRetries:
self.removeClient(clientName)
break
# 判断是否需要重连
needReconnect = any(keyword in errorMsg.lower() for keyword in [
"连接异常", "连接不可用", "连接在等待响应时断开",
"streamlosterror", "connectionclosed", "indexerror"
])
if needReconnect:
try:
print(f"检测到连接问题,尝试重新连接...")
self.reconnect()
print(f"重连成功等待1秒后重试...")
time.sleep(1)
except Exception as reconnectError:
print(f"重连失败: {reconnectError}")
if attempt == maxRetries - 1:
break
time.sleep(2) # 重连失败等待更长时间
else:
# 非连接问题,短暂等待后重试
time.sleep(0.5)
raise lastException
def callOnce(self, clientName, message, timeoutSeconds):
"""单次调用客户端"""
# 检查连接状态,如果异常则重连
if not self.isConnected():
print(f"连接异常,尝试重连...")
self.reconnect()
correlationId = str(uuid.uuid4())
self.responses[correlationId] = None
try:
# 发送消息前再次检查连接
if not self.isConnected():
raise Exception("连接不可用")
# 发送消息
self.channel.basic_publish(
exchange='',
routing_key=f"rpc_queue_{clientName}",
properties=pika.BasicProperties(
reply_to=self.callbackQueue,
correlation_id=correlationId
),
body=json.dumps(message)
)
# 等待响应
return self.waitForResponse(correlationId, clientName, timeoutSeconds)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosedByBroker) as e:
# 连接异常,清理响应并重新抛出
with self.lock:
self.responses.pop(correlationId, None)
raise Exception(f"连接异常: {e}")
except Exception as e:
# 清理响应
with self.lock:
self.responses.pop(correlationId, None)
raise e
def waitForResponse(self, correlationId, clientName, timeoutSeconds):
"""等待客户端响应"""
startTime = time.time()
while self.responses[correlationId] is None:
if time.time() - startTime > timeoutSeconds:
raise Exception(f"调用客户端 {clientName} 超时")
try:
# 检查连接状态
if not self.isConnected():
raise Exception("连接在等待响应时断开")
# 使用更短的时间限制,减少阻塞
self.connection.process_data_events(time_limit=0.05)
except (pika.exceptions.StreamLostError,
pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosedByBroker,
IndexError) as e:
raise Exception(f"连接异常: {e}")
except Exception as e:
raise Exception(f"处理数据事件异常: {e}")
time.sleep(0.01) # 减少CPU占用
# 安全获取响应
response = self.responses.get(correlationId)
if response is not None:
self.responses.pop(correlationId, None)
return response
else:
raise Exception(f"响应数据异常: {correlationId}")
def isConnected(self):
"""检查连接状态"""
try:
# 检查基本连接状态
if not (self.connection and not self.connection.is_closed and
self.channel and not self.channel.is_closed and
self.connected and self.callbackQueue):
return False
# 进一步检查通道是否真正可用
try:
# 尝试声明一个临时队列来测试通道
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
return True
except Exception as e:
print(f"通道测试失败: {e}")
return False
except Exception as e:
print(f"连接状态检查异常: {e}")
return False
def reconnect(self):
"""重新连接"""
print("检测到连接异常,正在重新连接...")
self.connected = False
# 清理响应队列,避免旧的响应干扰
with self.lock:
self.responses.clear()
try:
# 重新连接
self.connectToRabbitMQ()
# 验证连接恢复
if self.verifyConnection():
print("连接恢复成功,服务端功能正常")
else:
raise Exception("连接恢复验证失败")
except Exception as e:
print(f"重连失败: {e}")
raise e
def verifyConnection(self):
"""验证连接是否完全恢复"""
try:
if not self.isConnected():
return False
# 测试消息发送能力
testQueue = f"test_queue_{int(time.time())}"
self.channel.queue_declare(queue=testQueue, auto_delete=True)
# 发送测试消息
self.channel.basic_publish(
exchange='',
routing_key=testQueue,
body=json.dumps({"test": "connection_verify"})
)
# 删除测试队列
self.channel.queue_delete(queue=testQueue)
print("连接验证通过")
return True
except Exception as e:
print(f"连接验证失败: {e}")
return False
def broadcastRead(self):
"""向所有客户端广播读取命令"""
if not self.clientNames:
return []
results = []
for clientName in self.clientNames:
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=8)
results.append(response)
except Exception as e:
print(f"读取客户端 {clientName} 失败: {e}")
results.append({"client": clientName, "error": str(e)})
return results
def writeVar(self, varName, value):
"""写入变量到指定客户端"""
if not self.clientNames:
return {"result": "fail", "reason": "no clients available"}
varExists, clientsWithVar = self.existsVar(varName)
if not varExists or not clientsWithVar:
return {"result": "fail", "reason": "var not found"}
# 尝试写入到有该变量的客户端
for clientName in clientsWithVar:
try:
print(f"写入客户端 {clientName} 变量 {varName}{value}")
writeResponse = self.call(
clientName,
{"cmd": "write", "varName": varName, "value": value},
timeoutSeconds=8
)
if writeResponse and writeResponse.get("result") == "success":
return writeResponse
except Exception as e:
print(f"写入客户端 {clientName} 失败: {e}")
continue
return {"result": "fail", "reason": "write failed on all clients"}
def scanAndAddClients(self, clientNameList):
"""
扫描并添加客户端列表
:param clientNameList: 客户端名称列表
:return: 成功添加的客户端列表
"""
addedClients = []
for clientName in clientNameList:
if self.addClient(clientName):
addedClients.append(clientName)
return addedClients
def checkAllClients(self):
"""
检查所有客户端是否在线
:return: 在线客户端列表
"""
onlineClients = []
offlineClients = []
for client in self.clientNames:
if self.pingClient(client):
onlineClients.append(client)
else:
offlineClients.append(client)
# 移除离线客户端
for client in offlineClients:
self.removeClient(client)
return onlineClients
def healthCheck(self):
"""连接健康检查"""
try:
if not self.isConnected():
return False
# 尝试声明一个临时队列来测试连接
self.channel.queue_declare(queue='', exclusive=True, auto_delete=True)
return True
except Exception as e:
print(f"健康检查失败: {e}")
return False
def close(self):
"""关闭服务端连接"""
self.connected = False
self.closeConnection()
print("服务端连接已关闭")
def _autoCheckClients(self):
"""后台线程每30秒自动检测客户端是否在线不在线则移除"""
while True:
try:
# 减少检查频率,避免与读取线程冲突
time.sleep(5) # 每30秒检查一次
# 复制当前客户端列表
currentClients = self.clientNames.copy()
if not currentClients:
continue
# print(f"开始检查 {len(currentClients)} 个客户端状态...")
offlineClients = []
for clientName in currentClients:
try:
# 使用更短的超时时间,避免阻塞
if not self.pingClient(clientName):
offlineClients.append(clientName)
print(f"客户端 {clientName} 离线")
except Exception as e:
print(f"检查客户端 {clientName} 状态时出错: {e}")
offlineClients.append(clientName)
# 移除离线客户端
for clientName in offlineClients:
self.removeClient(clientName)
print(f"已移除离线客户端: {clientName}")
if offlineClients:
print(f"本次检查移除了 {len(offlineClients)} 个离线客户端")
else:
# print("所有客户端状态正常")
pass
except Exception as e:
print(f"自动检测客户端异常: {e}")
time.sleep(10) # 异常时等待10秒
def existsVar(self, varName):
"""判断变量名是否存在于已连接客户端中"""
foundClients = []
for clientName in self.clientNames:
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5)
if response and "variables" in response and varName in response["variables"]:
foundClients.append(clientName)
except Exception as e:
continue
return (True, foundClients) if foundClients else (False, None)
def getVarValue(self, clientName, varName):
"""获取指定客户端上指定变量的值"""
if clientName not in self.clientNames:
return None
try:
response = self.call(clientName, {"cmd": "read"}, timeoutSeconds=5)
if response and "variables" in response and varName in response["variables"]:
return response["variables"][varName]
except Exception as e:
print(f"获取客户端 {clientName} 变量 {varName} 失败: {e}")
return None
if __name__ == "__main__":
# 创建服务端
server = RpcServer()
try:
# 动态添加客户端
print("1. 添加客户端")
server.addClient("Client1")
server.addClient("Client2")
print(f"当前客户端列表: {server.getClientList()}")
# 检查客户端状态
print("2. 检查客户端状态")
onlineClients = server.checkAllClients()
print(f"在线客户端: {onlineClients}")
# 读取所有客户端变量
print("3. 读取所有客户端变量")
allVars = server.broadcastRead()
print(json.dumps(allVars, ensure_ascii=False, indent=2))
# 写入变量
print("4. 写入变量")
result = server.writeVar("temp", 88.8)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(f"运行出错: {e}")
finally:
server.close()