|
|
|
|
import pika
|
|
|
|
|
import uuid
|
|
|
|
|
import json
|
|
|
|
|
import threading
|
|
|
|
|
import socket
|
|
|
|
|
import requests
|
|
|
|
|
import time
|
|
|
|
|
import time
|
|
|
|
|
class RpcClient:
|
|
|
|
|
def __init__(self, clientName, rabbitHost='localhost', protocolManager=None):
|
|
|
|
|
"""
|
|
|
|
|
初始化RPC客户端
|
|
|
|
|
:param clientName: 客户端名称
|
|
|
|
|
:param rabbitHost: RabbitMQ服务器地址
|
|
|
|
|
:param protocolManager: 协议管理器实例,用于处理读写操作
|
|
|
|
|
"""
|
|
|
|
|
self.clientName = clientName
|
|
|
|
|
self.protocolManager = protocolManager
|
|
|
|
|
self.rabbitHost = rabbitHost
|
|
|
|
|
self.variables = {}
|
|
|
|
|
self.credentials = pika.PlainCredentials('dcs', '123456')
|
|
|
|
|
self.connection = None
|
|
|
|
|
self.channel = None
|
|
|
|
|
self.queueName = f"rpc_queue_{self.clientName}"
|
|
|
|
|
self.connectWithRetry()
|
|
|
|
|
|
|
|
|
|
def connectWithRetry(self, maxRetries=3):
|
|
|
|
|
"""带重试机制的连接方法"""
|
|
|
|
|
for attempt in range(maxRetries):
|
|
|
|
|
try:
|
|
|
|
|
# 简化连接参数
|
|
|
|
|
connectionParams = pika.ConnectionParameters(
|
|
|
|
|
host=self.rabbitHost,
|
|
|
|
|
credentials=self.credentials,
|
|
|
|
|
heartbeat=300, # 5分钟心跳
|
|
|
|
|
socket_timeout=10 # 10秒socket超时
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.connection = pika.BlockingConnection(connectionParams)
|
|
|
|
|
self.channel = self.connection.channel()
|
|
|
|
|
|
|
|
|
|
# 设置QoS以避免消息积压
|
|
|
|
|
self.channel.basic_qos(prefetch_count=1)
|
|
|
|
|
|
|
|
|
|
# 简化队列声明
|
|
|
|
|
self.channel.queue_declare(queue=self.queueName, durable=False)
|
|
|
|
|
|
|
|
|
|
print(f"[{self.clientName}] 连接成功,等待服务端指令...")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 连接尝试 {attempt + 1} 失败: {e}")
|
|
|
|
|
if attempt == maxRetries - 1:
|
|
|
|
|
raise Exception(f"连接失败,已重试 {maxRetries} 次")
|
|
|
|
|
time.sleep(2 ** attempt) # 指数退避
|
|
|
|
|
|
|
|
|
|
def onRequest(self, ch, method, props, body):
|
|
|
|
|
"""处理服务端请求"""
|
|
|
|
|
try:
|
|
|
|
|
request = json.loads(body)
|
|
|
|
|
cmd = request.get("cmd", "unknown")
|
|
|
|
|
|
|
|
|
|
# 处理命令
|
|
|
|
|
response = self.processCommand(cmd, request)
|
|
|
|
|
|
|
|
|
|
# 发送响应
|
|
|
|
|
if props.reply_to:
|
|
|
|
|
ch.basic_publish(
|
|
|
|
|
exchange='',
|
|
|
|
|
routing_key=props.reply_to,
|
|
|
|
|
properties=pika.BasicProperties(correlation_id=props.correlation_id),
|
|
|
|
|
body=json.dumps(response)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 处理请求异常: {e}")
|
|
|
|
|
# 发送错误响应
|
|
|
|
|
if props.reply_to:
|
|
|
|
|
errorResponse = {"client": self.clientName, "result": "error", "reason": str(e)}
|
|
|
|
|
try:
|
|
|
|
|
ch.basic_publish(
|
|
|
|
|
exchange='',
|
|
|
|
|
routing_key=props.reply_to,
|
|
|
|
|
properties=pika.BasicProperties(correlation_id=props.correlation_id),
|
|
|
|
|
body=json.dumps(errorResponse)
|
|
|
|
|
)
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processCommand(self, cmd, request):
|
|
|
|
|
"""处理命令并返回响应"""
|
|
|
|
|
try:
|
|
|
|
|
if cmd == "ping":
|
|
|
|
|
return self.handlePingCommand()
|
|
|
|
|
elif cmd == "read":
|
|
|
|
|
return self.handleReadCommand()
|
|
|
|
|
elif cmd == "write":
|
|
|
|
|
return self.handleWriteCommand(request)
|
|
|
|
|
else:
|
|
|
|
|
return {"client": self.clientName, "result": "fail", "reason": "unknown command"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {"client": self.clientName, "result": "error", "reason": str(e)}
|
|
|
|
|
|
|
|
|
|
def handlePingCommand(self):
|
|
|
|
|
"""处理ping命令"""
|
|
|
|
|
localIp = self.getLocalIp()
|
|
|
|
|
return {"client": self.clientName, "result": "pong", "ip": localIp}
|
|
|
|
|
|
|
|
|
|
def handleReadCommand(self):
|
|
|
|
|
"""处理读取命令"""
|
|
|
|
|
return {"client": self.clientName, "variables": self.variables}
|
|
|
|
|
|
|
|
|
|
def handleWriteCommand(self, request):
|
|
|
|
|
"""处理写入命令"""
|
|
|
|
|
return self.handleWriteRequest(request)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handleWriteRequest(self, request):
|
|
|
|
|
"""
|
|
|
|
|
处理写入请求
|
|
|
|
|
:param request: 请求数据
|
|
|
|
|
:return: 响应数据
|
|
|
|
|
"""
|
|
|
|
|
varName = '.'.join(request.get("varName").split('.')[:-1])
|
|
|
|
|
value = request.get("value")
|
|
|
|
|
|
|
|
|
|
# 如果有协议管理器,使用它来处理写入
|
|
|
|
|
if self.protocolManager:
|
|
|
|
|
try:
|
|
|
|
|
success = self.protocolManager.writeVariableValue(varName, value)
|
|
|
|
|
if success:
|
|
|
|
|
return {
|
|
|
|
|
"client": self.clientName,
|
|
|
|
|
"result": "success",
|
|
|
|
|
"varName": varName,
|
|
|
|
|
"value": value
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
return {
|
|
|
|
|
"client": self.clientName,
|
|
|
|
|
"result": "fail",
|
|
|
|
|
"reason": "write failed"
|
|
|
|
|
}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {
|
|
|
|
|
"client": self.clientName,
|
|
|
|
|
"result": "fail",
|
|
|
|
|
"reason": f"write error: {str(e)}"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def setVarContent(self, variableName, value, min, max, type):
|
|
|
|
|
if type in ['DI', 'DO']:
|
|
|
|
|
min = 0
|
|
|
|
|
max = 1
|
|
|
|
|
variableName = variableName + '.' + self.clientName
|
|
|
|
|
|
|
|
|
|
# 确保变量条目存在,如果不存在则创建
|
|
|
|
|
if variableName not in self.variables:
|
|
|
|
|
self.variables[variableName] = {}
|
|
|
|
|
|
|
|
|
|
self.variables[variableName]["value"] = str(value)
|
|
|
|
|
self.variables[variableName]["min"] = min
|
|
|
|
|
self.variables[variableName]["max"] = max
|
|
|
|
|
self.variables[variableName]["type"] = type
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
"""启动客户端监听(阻塞方法)"""
|
|
|
|
|
try:
|
|
|
|
|
# 简化启动逻辑
|
|
|
|
|
self.setupConsumer()
|
|
|
|
|
print(f"[{self.clientName}] 开始监听队列: {self.queueName}")
|
|
|
|
|
self.channel.start_consuming()
|
|
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print(f"[{self.clientName}] 收到中断信号,正在停止...")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 监听异常: {e}")
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
print(f"[{self.clientName}] 监听已停止")
|
|
|
|
|
|
|
|
|
|
def setupConsumer(self):
|
|
|
|
|
"""设置消费者"""
|
|
|
|
|
try:
|
|
|
|
|
# 简化消费者设置
|
|
|
|
|
self.channel.basic_consume(
|
|
|
|
|
queue=self.queueName,
|
|
|
|
|
on_message_callback=self.onRequest,
|
|
|
|
|
auto_ack=True # 自动确认,简化处理
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 设置消费者失败: {e}")
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
def handleConnectionError(self):
|
|
|
|
|
"""处理连接错误"""
|
|
|
|
|
try:
|
|
|
|
|
if self.channel and not self.channel.is_closed:
|
|
|
|
|
self.channel.stop_consuming()
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
self.closeConnectionSafely()
|
|
|
|
|
time.sleep(2) # 等待2秒后重试
|
|
|
|
|
|
|
|
|
|
def reconnectWithBackoff(self):
|
|
|
|
|
"""重连方法"""
|
|
|
|
|
try:
|
|
|
|
|
self.closeConnectionSafely()
|
|
|
|
|
self.connectWithRetry()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 重连失败: {e}")
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
def closeConnectionSafely(self):
|
|
|
|
|
"""安全关闭连接"""
|
|
|
|
|
try:
|
|
|
|
|
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
|
|
|
|
|
self.channel.close()
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
try:
|
|
|
|
|
if hasattr(self, 'connection') and self.connection and not self.connection.is_closed:
|
|
|
|
|
self.connection.close()
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
self.channel = None
|
|
|
|
|
self.connection = None
|
|
|
|
|
|
|
|
|
|
def startNonBlocking(self):
|
|
|
|
|
"""非阻塞启动客户端(在后台线程中运行)"""
|
|
|
|
|
try:
|
|
|
|
|
# 停止现有线程
|
|
|
|
|
if hasattr(self, 'client_thread') and self.client_thread and self.client_thread.is_alive():
|
|
|
|
|
print(f"[{self.clientName}] 停止现有线程...")
|
|
|
|
|
self.stopClient()
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
self.shouldStop = False
|
|
|
|
|
self.client_thread = threading.Thread(target=self.startWithMonitoring, daemon=True)
|
|
|
|
|
self.client_thread.start()
|
|
|
|
|
print(f"[{self.clientName}] RPC客户端已在后台线程启动")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 启动后台线程失败: {e}")
|
|
|
|
|
|
|
|
|
|
def startWithMonitoring(self):
|
|
|
|
|
"""带监控的启动方法"""
|
|
|
|
|
try:
|
|
|
|
|
self.start()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 客户端线程异常退出: {e}")
|
|
|
|
|
finally:
|
|
|
|
|
print(f"[{self.clientName}] 客户端线程已结束")
|
|
|
|
|
|
|
|
|
|
def stopClient(self):
|
|
|
|
|
"""停止客户端"""
|
|
|
|
|
self.shouldStop = True
|
|
|
|
|
try:
|
|
|
|
|
if hasattr(self, 'channel') and self.channel and not self.channel.is_closed:
|
|
|
|
|
self.channel.stop_consuming()
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def isRunning(self):
|
|
|
|
|
"""检查客户端是否正在运行"""
|
|
|
|
|
return hasattr(self, 'client_thread') and self.client_thread.is_alive()
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
"""关闭RPC连接"""
|
|
|
|
|
try:
|
|
|
|
|
if self.channel and not self.channel.is_closed:
|
|
|
|
|
self.channel.queue_delete(queue=self.queueName)
|
|
|
|
|
self.channel.close()
|
|
|
|
|
if self.connection and not self.connection.is_closed:
|
|
|
|
|
self.connection.close()
|
|
|
|
|
print(f"[{self.clientName}] 连接已关闭")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"[{self.clientName}] 关闭连接时出错: {e}")
|
|
|
|
|
|
|
|
|
|
def getLocalIp(self):
|
|
|
|
|
# 获取本机第一个非回环IPv4地址
|
|
|
|
|
try:
|
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
s.connect(('8.8.8.8', 80))
|
|
|
|
|
ip = s.getsockname()[0]
|
|
|
|
|
s.close()
|
|
|
|
|
return ip
|
|
|
|
|
except:
|
|
|
|
|
return '127.0.0.1'\
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def getNextClientNameFromRabbitMQ(cls, rabbitHost, username='dcs', password='123456'):
|
|
|
|
|
# try:
|
|
|
|
|
# 获取所有队列
|
|
|
|
|
url = f"http://{rabbitHost}:15672/api/queues"
|
|
|
|
|
response = requests.get(url, auth=(username, password), timeout=5)
|
|
|
|
|
queues = response.json()
|
|
|
|
|
print(queues)
|
|
|
|
|
# 筛选RPC队列
|
|
|
|
|
if not queues:
|
|
|
|
|
return "client1"
|
|
|
|
|
rpcQueues = [q['name'] for q in queues if q['name'].startswith('rpc_queue_client')]
|
|
|
|
|
|
|
|
|
|
# 提取客户端编号
|
|
|
|
|
clientNumbers = []
|
|
|
|
|
for queueName in rpcQueues:
|
|
|
|
|
# rpc_queue_client1 -> client1 -> 1
|
|
|
|
|
if queueName.startswith('rpc_queue_client'):
|
|
|
|
|
try:
|
|
|
|
|
numberStr = queueName.replace('rpc_queue_client', '')
|
|
|
|
|
if numberStr.isdigit():
|
|
|
|
|
clientNumbers.append(int(numberStr))
|
|
|
|
|
except:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 返回下一个可用编号
|
|
|
|
|
if not clientNumbers:
|
|
|
|
|
return "client1"
|
|
|
|
|
else:
|
|
|
|
|
nextNumber = max(clientNumbers) + 1
|
|
|
|
|
return f"client{nextNumber}"
|
|
|
|
|
|
|
|
|
|
# except Exception as e:
|
|
|
|
|
# print(f"从RabbitMQ获取客户端名称失败: {e}")
|
|
|
|
|
# # 降级方案:使用时间戳
|
|
|
|
|
# import time
|
|
|
|
|
# timestamp = int(time.time() % 10000)
|
|
|
|
|
# return f"client{timestamp}"
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
import sys
|
|
|
|
|
clientName = sys.argv[1] if len(sys.argv) > 1 else "Client1"
|
|
|
|
|
client = RpcClient(clientName)
|
|
|
|
|
client.start()
|