You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
12 KiB
Python

import pika
import uuid
import json
import threading
import socket
import requests
import time
import time
class RpcClient:
def __init__(self, clientName, rabbitHost='localhost', protocolManager=None):
"""
初始化RPC客户端
:param clientName: 客户端名称
:param rabbitHost: RabbitMQ服务器地址
:param protocolManager: 协议管理器实例,用于处理读写操作
"""
self.clientName = clientName
self.protocolManager = protocolManager
self.rabbitHost = rabbitHost
self.variables = {}
self.credentials = pika.PlainCredentials('dcs', '123456')
self.connection = None
self.channel = None
self.queueName = f"rpc_queue_{self.clientName}"
self.connectWithRetry()
def connectWithRetry(self, maxRetries=3):
"""带重试机制的连接方法"""
for attempt in range(maxRetries):
try:
# 简化连接参数
connectionParams = pika.ConnectionParameters(
host=self.rabbitHost,
credentials=self.credentials,
heartbeat=300, # 5分钟心跳
socket_timeout=10 # 10秒socket超时
)
self.connection = pika.BlockingConnection(connectionParams)
self.channel = self.connection.channel()
# 设置QoS以避免消息积压
self.channel.basic_qos(prefetch_count=1)
# 简化队列声明
self.channel.queue_declare(queue=self.queueName, durable=False)
print(f"[{self.clientName}] 连接成功,等待服务端指令...")
return
except Exception as e:
print(f"[{self.clientName}] 连接尝试 {attempt + 1} 失败: {e}")
if attempt == maxRetries - 1:
raise Exception(f"连接失败,已重试 {maxRetries}")
time.sleep(2 ** attempt) # 指数退避
def onRequest(self, ch, method, props, body):
"""处理服务端请求"""
try:
request = json.loads(body)
cmd = request.get("cmd", "unknown")
# 处理命令
response = self.processCommand(cmd, request)
# 发送响应
if props.reply_to:
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(response)
)
except Exception as e:
print(f"[{self.clientName}] 处理请求异常: {e}")
# 发送错误响应
if props.reply_to:
errorResponse = {"client": self.clientName, "result": "error", "reason": str(e)}
try:
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(errorResponse)
)
except:
pass
def processCommand(self, cmd, request):
"""处理命令并返回响应"""
try:
if cmd == "ping":
return self.handlePingCommand()
elif cmd == "read":
return self.handleReadCommand()
elif cmd == "write":
return self.handleWriteCommand(request)
else:
return {"client": self.clientName, "result": "fail", "reason": "unknown command"}
except Exception as e:
return {"client": self.clientName, "result": "error", "reason": str(e)}
def handlePingCommand(self):
"""处理ping命令"""
localIp = self.getLocalIp()
return {"client": self.clientName, "result": "pong", "ip": localIp}
def handleReadCommand(self):
"""处理读取命令"""
return {"client": self.clientName, "variables": self.variables}
def handleWriteCommand(self, request):
"""处理写入命令"""
return self.handleWriteRequest(request)
def handleWriteRequest(self, request):
"""
处理写入请求
:param request: 请求数据
:return: 响应数据
"""
varName = '.'.join(request.get("varName").split('.')[:-1])
value = request.get("value")
# 如果有协议管理器,使用它来处理写入
if self.protocolManager:
try:
success = self.protocolManager.writeVariableValue(varName, value)
if success:
return {
"client": self.clientName,
"result": "success",
"varName": varName,
"value": value
}
else:
return {
"client": self.clientName,
"result": "fail",
"reason": "write failed"
}
except Exception as e:
return {
"client": self.clientName,
"result": "fail",
"reason": f"write error: {str(e)}"
}
def setVarContent(self, variableName, value, min, max, type):
if type in ['DI', 'DO']:
min = 0
max = 1
variableName = variableName + '.' + self.clientName
# 确保变量条目存在,如果不存在则创建
if variableName not in self.variables:
self.variables[variableName] = {}
self.variables[variableName]["value"] = str(value)
self.variables[variableName]["min"] = min
self.variables[variableName]["max"] = max
self.variables[variableName]["type"] = type
def start(self):
"""启动客户端监听(阻塞方法)"""
try:
# 简化启动逻辑
self.setupConsumer()
print(f"[{self.clientName}] 开始监听队列: {self.queueName}")
self.channel.start_consuming()
except KeyboardInterrupt:
print(f"[{self.clientName}] 收到中断信号,正在停止...")
except Exception as e:
print(f"[{self.clientName}] 监听异常: {e}")
finally:
print(f"[{self.clientName}] 监听已停止")
def setupConsumer(self):
"""设置消费者"""
try:
# 简化消费者设置
self.channel.basic_consume(
queue=self.queueName,
on_message_callback=self.onRequest,
auto_ack=True # 自动确认,简化处理
)
except Exception as e:
print(f"[{self.clientName}] 设置消费者失败: {e}")
raise e
def handleConnectionError(self):
"""处理连接错误"""
try:
if self.channel and not self.channel.is_closed:
self.channel.stop_consuming()
except:
pass
self.closeConnectionSafely()
time.sleep(2) # 等待2秒后重试
def reconnectWithBackoff(self):
"""重连方法"""
try:
self.closeConnectionSafely()
self.connectWithRetry()
except Exception as e:
print(f"[{self.clientName}] 重连失败: {e}")
raise e
def closeConnectionSafely(self):
"""安全关闭连接"""
try:
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
self.channel.close()
except:
pass
try:
if hasattr(self, 'connection') and self.connection and not self.connection.is_closed:
self.connection.close()
except:
pass
self.channel = None
self.connection = None
def startNonBlocking(self):
"""非阻塞启动客户端(在后台线程中运行)"""
try:
# 停止现有线程
if hasattr(self, 'client_thread') and self.client_thread and self.client_thread.is_alive():
print(f"[{self.clientName}] 停止现有线程...")
self.stopClient()
import threading
self.shouldStop = False
self.client_thread = threading.Thread(target=self.startWithMonitoring, daemon=True)
self.client_thread.start()
print(f"[{self.clientName}] RPC客户端已在后台线程启动")
except Exception as e:
print(f"[{self.clientName}] 启动后台线程失败: {e}")
def startWithMonitoring(self):
"""带监控的启动方法"""
try:
self.start()
except Exception as e:
print(f"[{self.clientName}] 客户端线程异常退出: {e}")
finally:
print(f"[{self.clientName}] 客户端线程已结束")
def stopClient(self):
"""停止客户端"""
self.shouldStop = True
try:
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
self.channel.stop_consuming()
except:
pass
def isRunning(self):
"""检查客户端是否正在运行"""
return hasattr(self, 'client_thread') and self.client_thread.is_alive()
def close(self):
"""关闭RPC连接"""
try:
if self.channel and not self.channel.is_closed:
self.channel.queue_delete(queue=self.queueName)
self.channel.close()
if self.connection and not self.connection.is_closed:
self.connection.close()
print(f"[{self.clientName}] 连接已关闭")
except Exception as e:
print(f"[{self.clientName}] 关闭连接时出错: {e}")
def getLocalIp(self):
# 获取本机第一个非回环IPv4地址
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return '127.0.0.1'\
@classmethod
def getNextClientNameFromRabbitMQ(cls, rabbitHost, username='dcs', password='123456'):
# try:
# 获取所有队列
url = f"http://{rabbitHost}:15672/api/queues"
response = requests.get(url, auth=(username, password), timeout=5)
queues = response.json()
print(queues)
# 筛选RPC队列
if not queues:
return "client1"
rpcQueues = [q['name'] for q in queues if q['name'].startswith('rpc_queue_client')]
# 提取客户端编号
clientNumbers = []
for queueName in rpcQueues:
# rpc_queue_client1 -> client1 -> 1
if queueName.startswith('rpc_queue_client'):
try:
numberStr = queueName.replace('rpc_queue_client', '')
if numberStr.isdigit():
clientNumbers.append(int(numberStr))
except:
continue
# 返回下一个可用编号
if not clientNumbers:
return "client1"
else:
nextNumber = max(clientNumbers) + 1
return f"client{nextNumber}"
# except Exception as e:
# print(f"从RabbitMQ获取客户端名称失败: {e}")
# # 降级方案:使用时间戳
# import time
# timestamp = int(time.time() % 10000)
# return f"client{timestamp}"
if __name__ == "__main__":
import sys
clientName = sys.argv[1] if len(sys.argv) > 1 else "Client1"
client = RpcClient(clientName)
client.start()