diff --git a/UI/ProcedureManager/StepExecutor.py b/UI/ProcedureManager/StepExecutor.py index bd20c87..cecdee2 100644 --- a/UI/ProcedureManager/StepExecutor.py +++ b/UI/ProcedureManager/StepExecutor.py @@ -53,6 +53,12 @@ class StepExecutor(QWidget): self.remainingTime = 0 self.totalSteps = 0 + # 新增:倒计时相关状态 + self.countdownStartTime = None # 倒计时开始时间 + self.estimatedTotalTime = 0 # 预估总时间 + self.actualElapsedTime = 0 # 实际已用时间 + self.lastStepStartTime = None # 上一步开始时间 + def initTimers(self): """初始化定时器""" self.timer = QTimer() @@ -292,6 +298,9 @@ class StepExecutor(QWidget): # 重置当前索引,确保从第一步开始执行 self.currentIndex = 0 + # 初始化倒计时状态 + self.initializeCountdown() + stepInterval = int(self.stepIntervalSpin.value() * 1000) self.timer.start(stepInterval) @@ -347,6 +356,9 @@ class StepExecutor(QWidget): def executeCurrentStep(self): """执行当前步骤""" + # 记录步骤开始时间 + self.lastStepStartTime = datetime.now() + stepInfo = self.tableModel.getStepInfo(self.currentIndex) if stepInfo and not stepInfo['isMain']: self.executeStep(self.currentIndex) @@ -379,6 +391,9 @@ class StepExecutor(QWidget): self.tableModel.resetAll() self.stepResults = [] + # 重新初始化倒计时状态 + self.initializeCountdown() + cycleNum = self.getCurrentCycleNumber() self.updateStatusDisplay(f"执行中 - 第{cycleNum}轮", "green") @@ -448,6 +463,12 @@ class StepExecutor(QWidget): print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}") result = self.handleStep(row, stepInfo) + # 更新实际已用时间 + if self.lastStepStartTime: + stepExecutionTime = (datetime.now() - self.lastStepStartTime).total_seconds() + self.actualElapsedTime += stepExecutionTime + print(f"步骤执行时间: {stepExecutionTime:.1f}秒, 累计已用时间: {self.actualElapsedTime:.1f}秒") + if result is None: print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果,设置为False") result = '执行失败,未检测到关键字' @@ -472,7 +493,8 @@ class StepExecutor(QWidget): """处理步骤""" description = stepInfo['description'] stepId = stepInfo['stepId'] - + stepType = stepInfo['stepType'] + # print(stepInfo, 111111111111) print(f"处理步骤 {stepId}: {description}") # 从关键词字段库中提取关键词信息 @@ -481,6 +503,7 @@ class StepExecutor(QWidget): if keywordInfo: operationType = keywordInfo['operationType'] keyword = keywordInfo['keyword'] + print(f"检测到关键词: {keyword}, 操作类型: {operationType}") # 根据操作类型执行相应的处理 @@ -493,7 +516,7 @@ class StepExecutor(QWidget): handler = stepHandlers.get(operationType) if handler: - return handler(stepId, description, keyword) + return handler(stepId, description, keyword, stepType) # 如果没有找到匹配的关键词,使用默认的模拟执行 return None @@ -517,7 +540,7 @@ class StepExecutor(QWidget): return None - def performSetOperation(self, stepId, description, keyword): + def performSetOperation(self, stepId, description, keyword, stepType = None): """执行设置操作""" # print(f"执行设置操作 (步骤 {stepId}): {description}") @@ -526,12 +549,22 @@ class StepExecutor(QWidget): matches = re.findall(variablePattern, description) if matches: - print(f"检测到 {len(matches)} 个需要设置的变量:") + # print(f"检测到 {len(matches)} 个需要设置的变量:") result = '' for varName, varValue in matches: varName = varName.strip() - print(f" 变量: {varName} = {varValue}") - success = self.protocolManager.writeVariableValue(varName, float(varValue)) + # print(f" 变量: {varName} = {varValue}") + if stepType and 'Time' in stepType: + timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) + if timeMatch: + timeoutMS = int(timeMatch.group(1)) + else: + timeoutMS = 2000 + # print(timeoutMS) + success = self.protocolManager.writeVariableValue(varName, float(varValue), trigger = True, timeoutMS = timeoutMS) + time.sleep(timeoutMS/1000 + 2) + else: + success = self.protocolManager.writeVariableValue(varName, float(varValue)) if success: result += f"{varName} = {varValue}\n" else: @@ -541,7 +574,7 @@ class StepExecutor(QWidget): return '"强制失败,未检测到关键字"' - def performCheckOperation(self, stepId, description, keyword): + def performCheckOperation(self, stepId, description, keyword, stepType = None): """执行检查操作""" print(f"执行检查操作 (步骤 {stepId}): {description}") @@ -574,7 +607,7 @@ class StepExecutor(QWidget): result += f"{varName} = {actualValue} {operator} {expectedValue} ✓\n" print(f" ✓ 检查通过: {varName} = {actualValue} {operator} {expectedValue}") else: - result += f"{varName} = {actualValue} {operator} {expectedValue} ✗\n" + result += f"失败{varName} = {actualValue} {operator} {expectedValue} ✗\n" print(f" ✗ 检查失败: {varName} = {actualValue} {operator} {expectedValue}") allPassed = False except (ValueError, TypeError): @@ -630,7 +663,7 @@ class StepExecutor(QWidget): # 未知操作符,默认返回False return False - def performWaitOperation(self, stepId, description, keyword): + def performWaitOperation(self, stepId, description, keyword, stepType): """执行等待操作,等待时间取自描述字段中的数字(秒)""" print(f"执行等待操作 (步骤 {stepId}): {description}") import re, time @@ -645,11 +678,12 @@ class StepExecutor(QWidget): print(" 未检测到等待时间") return '未检测到等待时间' - def performDeltaTOperation(self, stepId, description, keyword): + def performDeltaTOperation(self, stepId, description, keyword, stepType): """执行接收操作""" - print(f"执行接收操作 (步骤 {stepId}): {description}") + result = self.protocolManager.recvDeltaT() # 这里可以添加具体的接收操作逻辑 - return random.random() < 0.88 + result = ' '.join([f't{i+1}={num}' for i, num in enumerate(result)]) + return result def resetExecution(self, fromAuto=False): @@ -906,21 +940,76 @@ class StepExecutor(QWidget): def startCountdown(self): """开始倒计时""" - self.totalSteps = sum(1 for step in self.tableModel.stepData if not step['isMain']) + if not self.countdownStartTime: + return + + # 计算剩余时间 + self.calculateRemainingTime() + + if self.remainingTime > 0: + self.countdownTimer.start(1000) + self.updateCountdown() + else: + self.countdownLabel.setText("") + + def calculateRemainingTime(self): + """计算剩余时间""" + if not self.countdownStartTime: + self.remainingTime = 0 + return + + # 计算当前轮次已完成的子步骤数 + completedSubSteps = 0 + for i in range(self.currentIndex): + if not self.tableModel.stepData[i]['isMain']: + completedSubSteps += 1 - remainingSteps = 0 + # 计算当前轮次剩余的子步骤数 + remainingSubSteps = 0 for i in range(self.currentIndex, len(self.tableModel.stepData)): if not self.tableModel.stepData[i]['isMain']: - remainingSteps += 1 + remainingSubSteps += 1 + # 基础剩余时间:剩余步骤数 * 步骤间隔 stepInterval = self.stepIntervalSpin.value() + baseRemainingTime = remainingSubSteps * stepInterval - if remainingSteps > 0: - self.remainingTime = int(remainingSteps * stepInterval) - self.countdownTimer.start(1000) - self.updateCountdown() - else: - self.countdownLabel.setText("") + # 预估剩余步骤的实际执行时间 + estimatedRemainingExecutionTime = 0 + for i in range(self.currentIndex, len(self.tableModel.stepData)): + step = self.tableModel.stepData[i] + if not step['isMain']: + stepType = step.get('stepType', '') + if 'Time' in stepType: + timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) + if timeMatch: + timeoutMS = int(timeMatch.group(1)) + estimatedRemainingExecutionTime += (timeoutMS / 1000) + 2 + elif 'wait' in step.get('description', '').lower(): + waitMatch = re.search(r'([0-9]+(?:\.[0-9]+)?)', step.get('description', '')) + if waitMatch: + estimatedRemainingExecutionTime += float(waitMatch.group(1)) + else: + estimatedRemainingExecutionTime += 1 + + # 总预估剩余时间 + totalEstimatedRemaining = baseRemainingTime + estimatedRemainingExecutionTime + + # 考虑实际执行时间与预估时间的差异 + if self.actualElapsedTime > 0 and completedSubSteps > 0: + # 计算实际平均每步时间 + actualAvgTimePerStep = self.actualElapsedTime / completedSubSteps + # 预估平均每步时间 + estimatedAvgTimePerStep = (self.estimatedTotalTime - totalEstimatedRemaining) / completedSubSteps if completedSubSteps > 0 else 0 + + # 如果实际时间比预估时间长,调整剩余时间 + if actualAvgTimePerStep > estimatedAvgTimePerStep: + timeAdjustment = (actualAvgTimePerStep - estimatedAvgTimePerStep) * remainingSubSteps + totalEstimatedRemaining += timeAdjustment + + self.remainingTime = max(0, int(totalEstimatedRemaining)) + print(f"剩余时间计算: 剩余步骤={remainingSubSteps}, 基础时间={baseRemainingTime:.1f}秒, " + f"预估执行时间={estimatedRemainingExecutionTime:.1f}秒, 总剩余={self.remainingTime}秒") def stopCountdown(self): """停止倒计时""" @@ -933,11 +1022,17 @@ class StepExecutor(QWidget): minutes = self.remainingTime // 60 seconds = self.remainingTime % 60 + # 计算当前轮次进度 + totalSubSteps = sum(1 for step in self.tableModel.stepData if not step['isMain']) + completedSubSteps = sum(1 for i in range(self.currentIndex) if not self.tableModel.stepData[i]['isMain']) + progressPercent = (completedSubSteps / totalSubSteps * 100) if totalSubSteps > 0 else 0 + if minutes > 0: - countdownText = f"当前轮次剩余: {minutes:02d}:{seconds:02d}" + countdownText = f"当前轮次剩余: {minutes:02d}:{seconds:02d} (进度: {progressPercent:.1f}%)" else: - countdownText = f"当前轮次剩余: {seconds}秒" + countdownText = f"当前轮次剩余: {seconds}秒 (进度: {progressPercent:.1f}%)" + # 根据剩余时间设置颜色 if self.remainingTime <= 10: self.countdownLabel.setStyleSheet("color: red; font-weight: bold; font-size: 14px;") elif self.remainingTime <= 30: @@ -957,6 +1052,12 @@ class StepExecutor(QWidget): self.stopCountdown() self.remainingTime = 0 self.totalSteps = 0 + + # 重置倒计时相关状态 + self.countdownStartTime = None + self.estimatedTotalTime = 0 + self.actualElapsedTime = 0 + self.lastStepStartTime = None def isExecutionCompleted(self): """检查执行是否完成""" @@ -973,4 +1074,45 @@ class StepExecutor(QWidget): super().showEvent(event) if not hasattr(self, '_hasResizedRows'): self.tableView.resizeRowsToContents() - self._hasResizedRows = True \ No newline at end of file + self._hasResizedRows = True + + def initializeCountdown(self): + """初始化倒计时状态""" + self.countdownStartTime = datetime.now() + self.actualElapsedTime = 0 + self.lastStepStartTime = None + + # 计算预估总时间 + self.calculateEstimatedTotalTime() + + def calculateEstimatedTotalTime(self): + """计算预估总时间""" + stepInterval = self.stepIntervalSpin.value() + totalSubSteps = sum(1 for step in self.tableModel.stepData if not step['isMain']) + + # 基础时间:步骤间隔时间 + baseTime = totalSubSteps * stepInterval + + # 额外时间:预估每个步骤的实际执行时间 + extraTime = 0 + for step in self.tableModel.stepData: + if not step['isMain']: + # 根据步骤类型预估额外时间 + stepType = step.get('stepType', '') + if 'Time' in stepType: + # 设置操作带超时时间 + timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) + if timeMatch: + timeoutMS = int(timeMatch.group(1)) + extraTime += (timeoutMS / 1000) + 2 # 超时时间 + 2秒缓冲 + elif 'wait' in step.get('description', '').lower(): + # 等待操作 + waitMatch = re.search(r'([0-9]+(?:\.[0-9]+)?)', step.get('description', '')) + if waitMatch: + extraTime += float(waitMatch.group(1)) + else: + # 其他操作预估1秒 + extraTime += 1 + + self.estimatedTotalTime = baseTime + extraTime + print(f"预估总时间: {self.estimatedTotalTime:.1f}秒 (基础: {baseTime:.1f}秒, 额外: {extraTime:.1f}秒)") \ No newline at end of file diff --git a/UI/Setting/SearchDeviceWidget.py b/UI/Setting/SearchDeviceWidget.py index 65b36ac..0f6423c 100644 --- a/UI/Setting/SearchDeviceWidget.py +++ b/UI/Setting/SearchDeviceWidget.py @@ -7,6 +7,8 @@ from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject from PyQt5.QtNetwork import QNetworkInterface,QAbstractSocket from datetime import datetime +from utils import Globals + class TcpClient(object): def __init__(self): # 配置信息 @@ -34,7 +36,8 @@ class TcpClient(object): 'port': int(tcpPort), # 存储为整数 'hostname': hostname }) - print() + # self.connectToServer(serverIp, tcpPort) + print(data.decode('utf-8')) except socket.timeout: pass finally: @@ -46,10 +49,14 @@ class TcpClient(object): try: # 显式转换类型确保安全 client.connect((str(ip), int(tcpPort))) - # print(f"[+] 已连接到 {ip}:{tcpPort}") - client.send(b"Hello from client!") + print(f"[+] 已连接到 {ip}:{tcpPort}") + Globals.getValue('protocolManage').setServerMode() + clientName = Globals.getValue('protocolManage').RpcServer.getNextClientName() + client.send(clientName.encode('utf-8')) response = client.recv(1024) - # print(f"服务端响应: {response.decode('utf-8')}") + print(f"服务端响应: {response.decode('utf-8')}") + resClientName = response.decode('utf-8') + Globals.getValue('protocolManage').addClient(clientName = resClientName) return True except Exception as e: print(f"连接失败: {e}") @@ -154,7 +161,8 @@ class DeviceMasterWidget(QMainWindow): return selectedServer = self.servers[selectedIndex] - print(selectedServer['ip'][0]) + # print(selectedServer['ip'][0], selectedServer['port']) + self.tcpClient.connectToServer(selectedServer['ip'][0], selectedServer['port']) def getLocalIp(self): for interface in QNetworkInterface.allInterfaces(): @@ -190,39 +198,42 @@ class TcpServer(QObject): logMessage = f"[{currentTime}] {message}" self.updateSignal.emit(logMessage) - # def tcpServer(self): - # try: - # self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.serverSocket.settimeout(1) # 设置1秒超时 - # self.serverSocket.bind(('0.0.0.0', self.tcpPort)) - # self.serverSocket.listen(5) - # self.log(f"TCP服务端已启动,监听端口 {self.tcpPort}") + def tcpServer(self): + try: + self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.serverSocket.settimeout(1) # 设置1秒超时 + self.serverSocket.bind(('0.0.0.0', self.tcpPort)) + self.serverSocket.listen(5) + self.log(f"TCP服务端已启动,监听端口 {self.tcpPort}") - # while self.tcpRunning: - # try: - # clientSock, addr = self.serverSocket.accept() - # self.log(f"接收到来自 {addr} 的连接") - # clientThread = threading.Thread(target=self.handleClient, args=(clientSock,)) - # clientThread.start() - # except socket.timeout: - # continue # 超时后继续检查运行标志 - # except: - # break - # except Exception as e: - # self.log(f"TCP服务器错误: {e}") - # finally: - # if self.serverSocket: - # self.serverSocket.close() - # self.log("TCP服务端已停止") + while self.tcpRunning: + try: + clientSock, addr = self.serverSocket.accept() + self.log(f"接收到来自 {addr} 的连接") + clientThread = threading.Thread(target=self.handleClient, args=(clientSock,addr)) + clientThread.start() + except socket.timeout: + continue # 超时后继续检查运行标志 + except: + break + except Exception as e: + self.log(f"TCP服务器错误: {e}") + finally: + if self.serverSocket: + self.serverSocket.close() + self.log("TCP服务端已停止") - def handleClient(self, clientSock): + def handleClient(self, clientSock, addr): try: while True: data = clientSock.recv(1024) if not data: break + # print(addr, data) + Globals.getValue('protocolManage').setClentMode(data.decode('utf-8'), rabbitmqHost = addr[0]) + # print(data.decode('utf-8')) self.log(f"收到消息: {data.decode('utf-8')}") - clientSock.send(b"ACK") # 返回确认 + clientSock.send(data) # 返回确认 except Exception as e: self.log(f"客户端异常断开: {e}") finally: @@ -242,6 +253,7 @@ class TcpServer(QObject): if data.decode('utf-8') == "DISCOVERY_REQUEST": self.log(f"收到来自 {addr} 的发现请求") response = f"DISCOVERY_RESPONSE:{socket.gethostname()}:{self.tcpPort}" + # print(response) self.udpSocket.sendto(response.encode('utf-8'), addr) except: break @@ -252,22 +264,22 @@ class TcpServer(QObject): self.udpSocket.close() self.log("UDP发现服务已停止") - # def startTcpServer(self): - # if not self.tcpRunning: - # self.tcpRunning = True - # self.tcpThread = threading.Thread(target=self.tcpServer) - # self.tcpThread.start() + def startTcpServer(self): + if not self.tcpRunning: + self.tcpRunning = True + self.tcpThread = threading.Thread(target=self.tcpServer) + self.tcpThread.start() - # def stopTcpServer(self): - # if self.tcpRunning: - # self.tcpRunning = False - # # 创建一个临时连接来解除accept阻塞 - # try: - # tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # tempSocket.connect(('127.0.0.1', self.tcpPort)) - # tempSocket.close() - # except: - # pass + def stopTcpServer(self): + if self.tcpRunning: + self.tcpRunning = False + # 创建一个临时连接来解除accept阻塞 + try: + tempSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tempSocket.connect(('127.0.0.1', self.tcpPort)) + tempSocket.close() + except: + pass def startUdpServer(self): if not self.udpRunning: @@ -326,10 +338,10 @@ class DeviceSlaveWidget(QMainWindow): def toggleTcpServer(self): if self.server.tcpRunning: self.server.stopTcpServer() - # self.tcpButton.setText("启动 TCP 服务器") + print("关闭 TCP 服务器") else: self.server.startTcpServer() - # self.tcpButton.setText("停止 TCP 服务器") + print("启动 TCP 服务器") def toggleUdpServer(self): if self.server.udpRunning: @@ -338,6 +350,7 @@ class DeviceSlaveWidget(QMainWindow): else: self.server.startUdpServer() self.udpButton.setText("关闭远程通讯模式") + self.toggleTcpServer() def updateLog(self, message): self.logDisplay.append(message) diff --git a/UI/VarManages/AnalogModel.py b/UI/VarManages/AnalogModel.py index 91261cc..07c0cb8 100644 --- a/UI/VarManages/AnalogModel.py +++ b/UI/VarManages/AnalogModel.py @@ -162,8 +162,9 @@ class AnalogButtonDelegate(TcRtdButtonDelegate): varType = model.datas[sender.index[0]][6] min = model.datas[sender.index[0]][7] max = model.datas[sender.index[0]][8] + # print(model.datas[sender.index[0]][9]) pattern = re.compile(r'[^0-9\.-]+') - if varType in ['AI', 'DI']: + if varType in ['AI', 'DI'] and model.datas[sender.index[0]][9] != '模拟值': reply = QMessageBox.question(self.parent(), '警告', "AI,DI类型变量不允许强制值", diff --git a/model/ProcedureModel/ProcedureProcessor.py b/model/ProcedureModel/ProcedureProcessor.py index f0ba259..b214b1d 100644 --- a/model/ProcedureModel/ProcedureProcessor.py +++ b/model/ProcedureModel/ProcedureProcessor.py @@ -78,7 +78,7 @@ class ExcelParser: } class StepTableModel(QAbstractTableModel): - columns = ['序号', '实验步骤', '执行时间', '是否与预期一致', '实际结果', '备注'] + columns = ['序号', '实验步骤','操作类型', '执行时间', '是否与预期一致', '实际结果', '备注'] def __init__(self, testSteps): super().__init__() @@ -92,6 +92,7 @@ class StepTableModel(QAbstractTableModel): 'stepId': mainStep['步骤ID'], 'description': mainStep['步骤描述'], 'executed': False, + 'stepType': mainStep['操作类型'], 'time': None, 'result': None, 'note': "" # 新增备注字段,主步骤默认为空 @@ -104,6 +105,7 @@ class StepTableModel(QAbstractTableModel): 'isMain': False, 'stepId': f"{mainStep['步骤ID']}{subStep['序号']}", 'description': subStep['操作'], + 'stepType': subStep['操作类型'], 'executed': False, 'time': None, 'result': None, @@ -131,18 +133,20 @@ class StepTableModel(QAbstractTableModel): elif col == 1: return step['description'] elif col == 2: - return step['time'].strftime("%Y-%m-%d %H:%M:%S") if step['time'] else '' + return step['stepType'] elif col == 3: + return step['time'].strftime("%Y-%m-%d %H:%M:%S") if step['time'] else '' + elif col == 4: # print(step['result']) if step['executed']: if '失败' in step['result']: return '✘' else: return '✓' - elif col == 4: + elif col == 5: # print(step['result']) return step['result'] - elif col == 5: + elif col == 6: return step['note'] if step['note'] else '' elif role == Qt.BackgroundRole: diff --git a/protocol/ModBus/modbus_system_monitor.py b/protocol/ModBus/modbus_system_monitor.py deleted file mode 100644 index 27e11ae..0000000 --- a/protocol/ModBus/modbus_system_monitor.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf_8 -*- -""" - Modbus TestKit: Implementation of Modbus protocol in python - - (C)2009 - Luc Jean - luc.jean@gmail.com - (C)2009 - Apidev - http://www.apidev.fr - - This is distributed under GNU LGPL license, see license.txt - - This example shows how to create a modbus server in charge of monitoring - cpu consumption the machine - -""" - -from __future__ import print_function - -import time - -from modbus_tk import defines -from modbus_tk.modbus import LOGGER -from modbus_tk.modbus_tcp import TcpServer -from modbus_tk.simulator import Simulator -from modbus_tk.simulator_rpc_client import SimulatorRpcClient -from modbus_tk.utils import WorkerThread - - -class SystemDataCollector(object): - """The class in charge of getting the CPU load""" - def __init__(self, refresh_rate_in_sec): - """Constructor""" - self._simu = SimulatorRpcClient() - self._max_count = refresh_rate_in_sec * 10 - self._count = self._max_count-1 - - def collect(self): - """get the CPU load thanks to WMI""" - try: - self._count += 1 - if self._count >= self._max_count: - self._count = 0 - #WMI get the load percentage of the machine - from win32com.client import GetObject - wmi = GetObject('winmgmts:') - cpu = wmi.InstancesOf('Win32_Processor') - for (_cpu, i) in zip(cpu, range(10)): - value = _cpu.Properties_('LoadPercentage').Value - cpu_usage = int(str(value)) if value else 0 - - #execute a RPC command for changing the value - self._simu.set_values(1, "Cpu", i, (cpu_usage, )) - except Exception as excpt: - LOGGER.debug("SystemDataCollector error: %s", str(excpt)) - time.sleep(0.1) - - -def main(): - """main""" - #create the object for getting CPU data - data_collector = SystemDataCollector(5) - #create the thread in charge of calling the data collector - system_monitor = WorkerThread(data_collector.collect) - - #create the modbus TCP simulator and one slave - #and one block of analog inputs - simu = Simulator(TcpServer()) - slave = simu.server.add_slave(1) - slave.add_block("Cpu", defines.ANALOG_INPUTS, 0, 10) - - try: - LOGGER.info("'quit' for closing the server") - - #start the data collect - system_monitor.start() - - #start the simulator! will block until quit command is received - simu.start() - - except Exception as excpt: - print(excpt) - - finally: - #close the simulator - simu.close() - #stop the data collect - system_monitor.stop() - -if __name__ == "__main__": - main() diff --git a/protocol/ModBus/mysimu.py b/protocol/ModBus/mysimu.py deleted file mode 100644 index 8d6cfa2..0000000 --- a/protocol/ModBus/mysimu.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf_8 -*- -""" - Modbus TestKit: example of a custom simulator - - (C)2009 - Luc Jean - luc.jean@gmail.com - (C)2009 - Apidev - http://www.apidev.fr - - This is distributed under GNU LGPL license, see license.txt - -""" -from __future__ import print_function - -import sys -import struct - -from modbus_tk.simulator import Simulator, LOGGER -from modbus_tk.defines import HOLDING_REGISTERS -from modbus_tk.modbus_tcp import TcpServer -from modbus_tk.utils import PY2 - -try: - import serial - from modbus_tk.modbus_rtu import RtuServer -except ImportError: - pass - - -class MySimulator(Simulator): - """A custom simulator""" - - def __init__(self, *args, **kwargs): - """Constructor""" - Simulator.__init__(self, *args, **kwargs) - # add a new command: cv will make possible to change a value - self.add_command("cv", self.change_value) - self.add_command("set_pi", self.set_pi) - - # create a slave and block - slave = self.server.add_slave(1) - slave.add_block("foo", HOLDING_REGISTERS, 0, 100) - - def change_value(self, args): - """change the value of some registers""" - address = int(args[1]) - - # get the list of values and cast it to integers - values = [] - for val in args[2:]: - values.append(int(val)) - - # custom rules: if the value of reg0 is greater than 30 then reg1 is set to 1 - if address == 0 and values[0] > 30: - try: - values[1] = 1 - except IndexError: - values.append(1) - - # operates on slave 1 and block foo - slave = self.server.get_slave(1) - slave.set_values("foo", address, values) - return self._tuple_to_str(values) - - def set_pi(self, args): - """change the value of some registers""" - address = int(args[1]) - - # operates on slave 1 and block foo - slave = self.server.get_slave(1) - - if PY2: - pi_bytes = [ord(a_byte) for a_byte in struct.pack("f", 3.14)] - else: - pi_bytes = [int(a_byte) for a_byte in struct.pack("f", 3.14)] - - pi_register1 = pi_bytes[0] * 256 + pi_bytes[1] - pi_register2 = pi_bytes[2] * 256 + pi_bytes[3] - - slave.set_values("foo", address, [pi_register1, pi_register2]) - - values = slave.get_values("foo", address, 2) - return self._tuple_to_str(values) - - -def main(): - """main""" - - #Connect to the slave - if 'rtu' in sys.argv: - server = RtuServer(serial.Serial(port=sys.argv[-1])) - else: - server = TcpServer(error_on_missing_slave=True) - - simu = MySimulator(server) - - try: - LOGGER.info("'quit' for closing the server") - simu.start() - - except Exception as excpt: - print(excpt) - - finally: - simu.close() - - -if __name__ == "__main__": - help_text = """ - Usage: - python mysimu.py -> Run in TCP mode - python mysimu.py rtu /dev/ptyp5 -> Run in RTU mode and open the port given as last argument - """ - if '-h' in sys.argv: - print(help_text) - else: - main() diff --git a/protocol/ProtocolManage.py b/protocol/ProtocolManage.py index 16208ce..168b445 100644 --- a/protocol/ProtocolManage.py +++ b/protocol/ProtocolManage.py @@ -5,6 +5,8 @@ from utils.DBModels.ProtocolModel import ( from protocol.TCP.TCPVarManage import * from protocol.TCP.TemToMv import temToMv +from protocol.RPC.RpcClient import RpcClient +from protocol.RPC.RpcServer import RpcServer class ProtocolManage(object): """通讯变量查找类,用于根据变量名在数据库模型中检索变量信息""" @@ -17,11 +19,15 @@ class ProtocolManage(object): ] def __init__(self): - self.tcpVarManager = TCPVarManager('192.168.1.50', 5055) + # self.tcpVarManager = TCPVarManager('192.168.1.50', 5055) + self.tcpVarManager = TCPVarManager('127.0.0.1', 8000) self.writeTC = [0] * 8 self.writeRTD = [0] * 8 + self.RpcClient = None + self.RpcServer = None + + - @classmethod def lookupVariable(cls, variableName): """ @@ -45,9 +51,42 @@ class ProtocolManage(object): } return None + + def setClentMode(self, clentName, rabbitmqHost='localhost'): + if self.RpcClient: + self.RpcClient.close() + self.RpcClient = RpcClient(clentName, rabbitmqHost) + # 使用非阻塞方式启动RPC客户端 + self.RpcClient.startNonBlocking() + + def closeClent(self): + if self.RpcClient: + self.RpcClient.close() + self.RpcClient = None + + def setServerMode(self, rabbitmqHost='localhost'): + if self.RpcServer: + self.RpcServer.close() + self.RpcServer = RpcServer(rabbitmqHost) + + + def addClient(self, clientName): + if self.RpcServer: + self.RpcServer.addClient(clientName = clientName) + else: + return + # print(11111111111) + # time.sleep(3) + # print(self.RpcServer.broadcastRead()) + # self.RpcServer.broadcastRead() + + def closeServer(self): + if self.RpcServer: + self.RpcServer.close() + self.RpcServer = None # @classmethod - def writeVariableValue(self, variableName, value): + def writeVariableValue(self, variableName, value, trigger = None, timeoutMS = 2000): """ 根据变量名写入变量值,根据变量类型进行不同处理(不保存到数据库) @@ -62,7 +101,7 @@ class ProtocolManage(object): modelType = varInfo['model_type'] info = varInfo['variable_data'] - print(info) + # print(info) try: # 拆分为四个独立的Modbus协议条件判断 @@ -93,7 +132,7 @@ class ProtocolManage(object): varType = info['varType'] compensationVar = float(info['compensationVar']) varModel = info['varModel'] - model = self.getModelType(varModel) + model = self.getModelType(varModel) if self.getModelType(varModel) else localModel # print(value + compensationVar) if model == localModel: if varType == 'PT100': @@ -101,8 +140,13 @@ class ProtocolManage(object): else: self.writeTC[channel] = value if varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A', 'PT100'] and model != SimModel: - value = temToMv(varType, value + compensationVar) # 直接补偿温度 补偿mv调整到括号外 - self.tcpVarManager.writeValue(varType, channel, value, model=model) + value = temToMv(varType, value + compensationVar) # 直接补偿温度 补偿mv调整到括号外 + if trigger and varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: + trigger = TCTrigger + if trigger and varType in ['PT100']: + trigger = RTDTrigger + self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) + # 模拟量变量处理 @@ -110,18 +154,20 @@ class ProtocolManage(object): channel = int(info['channelNumber']) - 1 varType = info['varType'] varModel = info['varModel'] - model = self.getModelType(varModel) + model = self.getModelType(varModel) if self.getModelType(varModel) else localModel if info['varType'] in ['AI','AO']: value = self.getRealAO(value, info['max'], info['min']) - self.tcpVarManager.writeValue(varType, channel, value, model=model) - - # print(1) - + trigger = FPGATrigger if trigger else trigger + self.tcpVarManager.writeValue(varType, channel, value, trigger=trigger, model=model, timeoutMS=timeoutMS) + + # print(1) # HART模拟变量处理 elif modelType == 'HartSimulateVar': # 仅设置值,不保存到数据库 pass - + + if self.RpcClient: + self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) return True except Exception as e: print(f"写入变量值失败: {str(e)}") @@ -172,12 +218,16 @@ class ProtocolManage(object): value = self.tcpVarManager.simRTDData[channel] elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.tcpVarManager.simTCData[channel] + if self.RpcClient: + self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) return value else: if varType == 'PT100': value = self.writeRTD[channel] elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: value = self.writeTC[channel] + if self.RpcClient: + self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) return value # 模拟量变量处理 @@ -191,6 +241,8 @@ class ProtocolManage(object): if varType in ['AI','AO']: # print(value) value = self.getRealAI(value, info['max'], info['min']) + if self.RpcClient: + self.RpcClient.setVarContent(variableName, value, info['min'], info['max'], info['varType']) return value # print(1) @@ -202,6 +254,16 @@ class ProtocolManage(object): except Exception as e: print(f"读取变量值失败: {str(e)}") return str(e) + + def readRpcVariable(self): + if self.RpcServer: + print(self.RpcServer.broadcastRead()) + + # def sendTrigger(self, variableName, value, timeoutMS): + # self.writeVariableValue(variableName, value, trigger=True, timeoutMS = timeoutMS) + + def recvDeltaT(self): + return self.tcpVarManager.recvDeltaT() def shutdown(self): self.tcpVarManager.shutdown() @@ -243,3 +305,5 @@ class ProtocolManage(object): return NetModel elif varModel == '模拟值': return SimModel + + diff --git a/protocol/RPC/RpcClient.py b/protocol/RPC/RpcClient.py new file mode 100644 index 0000000..244a8f2 --- /dev/null +++ b/protocol/RPC/RpcClient.py @@ -0,0 +1,133 @@ +import pika +import uuid +import json +import threading + +class RpcClient: + def __init__(self, clientName, rabbitHost='localhost', protocolManager=None): + """ + 初始化RPC客户端 + :param clientName: 客户端名称 + :param rabbitHost: RabbitMQ服务器地址 + :param protocolManager: 协议管理器实例,用于处理读写操作 + """ + self.clientName = clientName + self.protocolManager = protocolManager + self.variables = {} + # self.variables = { + # "temp": {"type": "AI", "min": 0, "max": 100, "value": 25.5}, + # "status": {"type": DO", "min": None, "max": None, "value": "OK"}, + # } + self.credentials = pika.PlainCredentials('dcs', '123456') + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=rabbitHost, credentials=self.credentials) + ) + self.channel = self.connection.channel() + self.queueName = f"rpc_queue_{self.clientName}" + self.channel.queue_declare(queue=self.queueName) + print(f"[{self.clientName}] 等待服务端指令...") + + def onRequest(self, ch, method, props, body): + request = json.loads(body) + cmd = request.get("cmd") + response = {} + + if cmd == "ping": + # 响应ping命令 + response = {"client": self.clientName, "result": "pong"} + elif cmd == "read": + response = {"client": self.clientName, "variables": self.variables} + elif cmd == "write": + # 使用协议管理器处理写入操作 + response = self.handleWriteRequest(request) + else: + # 未知命令的响应 + response = {"client": self.clientName, "result": "fail", "reason": "unknown command"} + + ch.basic_publish( + exchange='', + routing_key=props.reply_to, + properties=pika.BasicProperties(correlation_id=props.correlation_id), + body=json.dumps(response) + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + + + + def handleWriteRequest(self, request): + """ + 处理写入请求 + :param request: 请求数据 + :return: 响应数据 + """ + varName = request.get("varName") + value = request.get("value") + + # 如果有协议管理器,使用它来处理写入 + if self.protocolManager: + try: + success = self.protocolManager.writeVariableValue(varName, value) + if success: + return { + "client": self.clientName, + "result": "success", + "varName": varName, + "value": value + } + else: + return { + "client": self.clientName, + "result": "fail", + "reason": "write failed" + } + except Exception as e: + return { + "client": self.clientName, + "result": "fail", + "reason": f"write error: {str(e)}" + } + + def setVarContent(self, variableName, value, min, max, type): + if type in ['DI', 'DO']: + min = 0 + max = 1 + self.variables[variableName] = {} + self.variables[variableName]["value"] = value + self.variables[variableName]["min"] = min + self.variables[variableName]["max"] = max + self.variables[variableName]["type"] = type + + + def start(self): + """启动客户端监听(阻塞方法)""" + self.channel.basic_qos(prefetch_count=1) + self.channel.basic_consume(queue=self.queueName, on_message_callback=self.onRequest) + self.channel.start_consuming() + + def startNonBlocking(self): + """非阻塞启动客户端(在后台线程中运行)""" + import threading + self.client_thread = threading.Thread(target=self.start, daemon=True) + self.client_thread.start() + print(f"[{self.clientName}] RPC客户端已在后台线程启动") + + def isRunning(self): + """检查客户端是否正在运行""" + return hasattr(self, 'client_thread') and self.client_thread.is_alive() + + def close(self): + """关闭RPC连接""" + try: + if self.channel and not self.channel.is_closed: + self.channel.close() + if self.connection and not self.connection.is_closed: + self.connection.close() + print(f"[{self.clientName}] 连接已关闭") + except Exception as e: + print(f"[{self.clientName}] 关闭连接时出错: {e}") + +if __name__ == "__main__": + import sys + clientName = sys.argv[1] if len(sys.argv) > 1 else "Client1" + client = RpcClient(clientName) + client.start() \ No newline at end of file diff --git a/protocol/RPC/RpcServer.py b/protocol/RPC/RpcServer.py new file mode 100644 index 0000000..d9730dc --- /dev/null +++ b/protocol/RPC/RpcServer.py @@ -0,0 +1,302 @@ +import pika +import uuid +import json +import threading +import time + +class RpcServer: + def __init__(self, rabbitHost='localhost'): + """ + 初始化RPC服务端 + :param rabbitHost: RabbitMQ服务器地址 + """ + self.clientNames = [] # 动态客户端列表 + self.credentials = pika.PlainCredentials('dcs', '123456') # 修改为你的用户名和密码 + self.rabbitHost = rabbitHost + self.connection = None + self.channel = None + self.callbackQueue = None + self.responses = {} + self.lock = threading.Lock() + self.connected = False + + # 初始化连接 + self.connectToRabbitMQ() + + def connectToRabbitMQ(self): + """连接到RabbitMQ服务器""" + try: + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.rabbitHost, credentials=self.credentials) + ) + self.channel = self.connection.channel() + result = self.channel.queue_declare(queue='', exclusive=True) + self.callbackQueue = result.method.queue + self.channel.basic_consume( + queue=self.callbackQueue, + on_message_callback=self.onResponse, + auto_ack=True + ) + self.connected = True + print(f"服务端已连接到RabbitMQ服务器: {self.rabbitHost}") + except Exception as e: + print(f"连接RabbitMQ失败: {e}") + self.connected = False + raise + + def onResponse(self, ch, method, props, body): + response = json.loads(body) + with self.lock: + self.responses[props.correlation_id] = response + + def addClient(self, clientName): + """ + 动态添加客户端 + :param clientName: 客户端名称 + :return: 是否添加成功 + """ + if not self.connected: + print("服务端未连接到RabbitMQ") + return False + + if clientName in self.clientNames: + print(f"客户端 {clientName} 已存在") + return False + + # 检查客户端是否在线 + try: + # 发送ping消息测试客户端是否响应 + response = self.call(clientName, {"cmd": "ping"}) + if response and response.get("result") == "pong": + self.clientNames.append(clientName) + print(f"客户端 {clientName} 添加成功") + return True + else: + print(f"客户端 {clientName} 无响应") + return False + except Exception as e: + print(f"添加客户端 {clientName} 失败: {e}") + return False + + def removeClient(self, clientName): + """ + 移除客户端 + :param clientName: 客户端名称 + :return: 是否移除成功 + """ + if clientName in self.clientNames: + self.clientNames.remove(clientName) + print(f"客户端 {clientName} 已移除") + return True + return False + + def getClientList(self): + """ + 获取当前客户端列表 + :return: 客户端名称列表 + """ + return self.clientNames.copy() + + def getNextClientName(self): + """ + 获取下一个可用的客户端名称 + 如果没有连接客户端就返回client1 + 如果列表中已经有客户端连接就返回clientxx中最大的client数字+1 + :return: 下一个可用的客户端名称 + """ + if not self.clientNames: + return "client1" + + # 查找所有以"client"开头的客户端名称 + client_numbers = [] + for client_name in self.clientNames: + if client_name.lower().startswith("client"): + # 提取数字部分 + try: + number_str = client_name[6:] # 去掉"client"前缀 + if number_str.isdigit(): + client_numbers.append(int(number_str)) + except (ValueError, IndexError): + continue + + if not client_numbers: + # 如果没有找到有效的client数字,返回client1 + return "client1" + + # 返回最大数字+1 + next_number = max(client_numbers) + 1 + return f"client{next_number}" + + def getClientNames(self): + """ + 获取当前所有客户端名称的列表 + :return: 客户端名称列表 + """ + return self.clientNames.copy() + + def pingClient(self, clientName): + """ + 测试客户端是否在线 + :param clientName: 客户端名称 + :return: 是否在线 + """ + try: + response = self.call(clientName, {"cmd": "ping"}) + return response and response.get("result") == "pong" + except: + return False + + def call(self, clientName, message): + """ + 调用客户端方法 + :param clientName: 客户端名称 + :param message: 消息内容 + :return: 客户端响应 + """ + if not self.connected or not self.channel: + raise Exception("服务端未连接到RabbitMQ") + + corr_id = str(uuid.uuid4()) + self.responses[corr_id] = None + + try: + self.channel.basic_publish( + exchange='', + routing_key=f"rpc_queue_{clientName}", + properties=pika.BasicProperties( + reply_to=self.callbackQueue, + correlation_id=corr_id, + ), + body=json.dumps(message) + ) + + # 等待响应,设置超时 + timeout = 10 # 10秒超时 + start_time = time.time() + while self.responses[corr_id] is None: + if time.time() - start_time > timeout: + raise Exception(f"调用客户端 {clientName} 超时") + if self.connection: + self.connection.process_data_events() + time.sleep(0.01) + + return self.responses.pop(corr_id) + except Exception as e: + # 清理超时的响应 + if corr_id in self.responses: + self.responses.pop(corr_id) + raise e + + def broadcastRead(self): + """ + 向所有客户端广播读取命令 + :return: 所有客户端的变量数据 + """ + if not self.clientNames: + print("没有可用的客户端") + return [] + + results = [] + for client in self.clientNames: + try: + resp = self.call(client, {"cmd": "read"}) + results.append(resp) + except Exception as e: + print(f"读取客户端 {client} 失败: {e}") + results.append({"client": client, "error": str(e)}) + return results + + def writeVar(self, varName, value): + """ + 写入变量到指定客户端 + :param varName: 变量名 + :param value: 变量值 + :return: 写入结果 + """ + if not self.clientNames: + return {"result": "fail", "reason": "no clients available"} + + # 先查找变量在哪个客户端 + for client in self.clientNames: + try: + resp = self.call(client, {"cmd": "read"}) + if resp and "variables" in resp and varName in resp["variables"]: + writeResp = self.call(client, {"cmd": "write", "varName": varName, "value": value}) + return writeResp + except Exception as e: + print(f"检查客户端 {client} 失败: {e}") + continue + return {"result": "fail", "reason": "var not found"} + + def scanAndAddClients(self, clientNameList): + """ + 扫描并添加客户端列表 + :param clientNameList: 客户端名称列表 + :return: 成功添加的客户端列表 + """ + addedClients = [] + for clientName in clientNameList: + if self.addClient(clientName): + addedClients.append(clientName) + return addedClients + + def checkAllClients(self): + """ + 检查所有客户端是否在线 + :return: 在线客户端列表 + """ + onlineClients = [] + offlineClients = [] + + for client in self.clientNames: + if self.pingClient(client): + onlineClients.append(client) + else: + offlineClients.append(client) + + # 移除离线客户端 + for client in offlineClients: + self.removeClient(client) + + return onlineClients + + def close(self): + """ + 关闭服务端连接 + """ + if self.connection and not self.connection.is_closed: + self.connection.close() + self.connected = False + print("服务端连接已关闭") + +if __name__ == "__main__": + # 创建服务端 + server = RpcServer() + + try: + # 动态添加客户端 + print("1. 添加客户端") + server.addClient("Client1") + server.addClient("Client2") + + print(f"当前客户端列表: {server.getClientList()}") + + # 检查客户端状态 + print("2. 检查客户端状态") + onlineClients = server.checkAllClients() + print(f"在线客户端: {onlineClients}") + + # 读取所有客户端变量 + print("3. 读取所有客户端变量") + allVars = server.broadcastRead() + print(json.dumps(allVars, ensure_ascii=False, indent=2)) + + # 写入变量 + print("4. 写入变量") + result = server.writeVar("temp", 88.8) + print(json.dumps(result, ensure_ascii=False, indent=2)) + + except Exception as e: + print(f"运行出错: {e}") + finally: + server.close() \ No newline at end of file diff --git a/protocol/TCP/TCPVarManage.py b/protocol/TCP/TCPVarManage.py index a1e9b50..8a46c52 100644 --- a/protocol/TCP/TCPVarManage.py +++ b/protocol/TCP/TCPVarManage.py @@ -36,32 +36,6 @@ class TCPVarManager: self.simAIata = [0.0] * 8 self.simDIata = [0] * 16 self.startPeriodicRead() - - def startTimeTest(self, triggerType, time = 2000): - data = [ - # 0-15: 16通道AO电流数据(A) - *self.AODATA, - - # 16: 输出模式(0:无输出/1:正常输出/2:触发输出) - 2, # 触发输出 - - # 17: 触发类型(0:无触发 1:fpga触发/2:TC触发/3:RTD触发) - triggerType, # 触发类型 - - time, # 实验时间(ms) - # 18: 实验时间(ms)和 DO状态(U16转双精度) - 0, # do补位 - - # 19: DO状态已包含在上面的do_value中 - - # 20-27: 8通道TC数据(mV) - *self.TCDATA, - - # 28-35: 8通道RTD数据(Ω) - *self.RTDDATA - ] - self.writeData(data) - # print(1) def readAiDi(self): @@ -149,22 +123,24 @@ class TCPVarManager: logging.error(f"Error writing AO/DO: {e}") return False - def readDeltaT(self): + def recvDeltaT(self): """ 读取DeltaT数据 :return: 16个DeltaT值的元组,或发生错误时返回None """ try: result = self.communicator.readDeltaT() + # print(result) if result is None: logging.warning("Failed to read DeltaT data") + return '未读取到响应时间' return result except Exception as e: logging.error(f"Error reading DeltaT: {e}") - return None + return f'未读取到响应时间{e}' - def writeValue(self, variableType, channel, value, trigger=None, model = localModel): + def writeValue(self, variableType, channel, value, trigger=None, model = localModel, timeoutMS = 2000): if variableType == "AO": if model == SimModel: self.simAOData[channel] = float(value) @@ -221,8 +197,31 @@ class TCPVarManager: # 28-35: 8通道RTD数据(Ω) *self.RTDDATA ] + elif trigger: + data = [ + # 0-15: 16通道AO电流数据(A) + *self.AODATA, + + # 16: 输出模式(0:无输出/1:正常输出/2:触发输出) + 2, # 触发输出 + + # 17: 触发类型(0:无触发 1:fpga触发/2:TC触发/3:RTD触发) + trigger, # 正常写入 + + float(timeoutMS), # 实验时间(ms) + # 18: 实验时间(ms)和 DO状态(U16转双精度) + 0, # do补位 + + # 19: DO状态已包含在上面的do_value中 + + # 20-27: 8通道TC数据(mV) + *self.TCDATA, + + # 28-35: 8通道RTD数据(Ω) + *self.RTDDATA + ] # print(data) - self.writeData(data) + self.writeData(data) def readValue(self, variableType, channel, model = localModel): # channel = channel