import sys import os import re from PyQt5.QtCore import Qt, QTimer, QSize, pyqtSignal, QFile, QTextStream from PyQt5.QtGui import QBrush, QColor, QStandardItemModel, QStandardItem from PyQt5.QtWidgets import (QTableView, QPushButton, QVBoxLayout, QWidget, QHBoxLayout, QLabel, QCheckBox, QSpinBox, QMenu, QFileDialog, QDialog, QLineEdit, QFormLayout, QDialogButtonBox, QMessageBox, QHeaderView, QToolBar, QAction, QDoubleSpinBox) from docx import Document from docx.shared import Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.oxml import parse_xml from docx.oxml.ns import nsdecls import qtawesome as qta from datetime import datetime import random import json import time from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill from openpyxl.utils import get_column_letter from model.ProcedureModel.ProcedureProcessor import StepTableModel from utils.DBModels.ProcedureModel import DatabaseManager from utils import Globals class StepExecutor(QWidget): tabLockRequired = pyqtSignal(bool) executionFinished = pyqtSignal(object) def __init__(self, procedureData, procedureId, dbManager): super().__init__() self.procedureData = procedureData self.procedureId = procedureId self.dbManager = dbManager self.isRunning = False self.isActive = False # 修复:添加数据健壮性检查 testSteps = procedureData.get("测试步骤", []) if not testSteps: testSteps = [] # 确保不为None self.protocolManager = Globals.getValue("protocolManage") self.initUi(testSteps) self.initExecutionState() self.initTimers() def initExecutionState(self): """初始化执行状态""" self.currentIndex = 0 self.remainingCycles = 1 self.infiniteCycles = False self.isFirstRun = True self.stepResults = [] self.remainingTime = 0 self.totalSteps = 0 # 新增:倒计时相关状态 self.countdownStartTime = None # 倒计时开始时间 self.estimatedTotalTime = 0 # 预估总时间 self.actualElapsedTime = 0 # 实际已用时间 self.lastStepStartTime = None # 上一步开始时间 def initTimers(self): """初始化定时器""" self.timer = QTimer() self.timer.timeout.connect(self.autoExecuteStep) self.countdownTimer = QTimer() self.countdownTimer.timeout.connect(self.updateCountdown) def initUi(self, testSteps): """初始化用户界面""" layout = QVBoxLayout() self.createInfoSection(layout) self.createTableSection(layout, testSteps) self.createControlSection(layout) self.createSettingsSection(layout) self.setLayout(layout) self.createToolbar() def createInfoSection(self, layout): """创建信息显示区域""" # 创建主信息容器 infoContainer = QWidget() infoContainer.setObjectName("procedureInfoContainer") infoLayout = QVBoxLayout() infoContainer.setLayout(infoLayout) # 兼容新旧数据结构 procedure_info = self.procedureData.get("规程信息", {}) test_case_info = self.procedureData.get("测试用例信息", {}) # 获取规程信息,支持新旧字段名 procedure_name = procedure_info.get("规程名称", "") or procedure_info.get("procedureName", "") procedure_number = procedure_info.get("规程编号", "") or procedure_info.get("procedureNumber", "") procedure_type = procedure_info.get("规程类型", "") or procedure_info.get("procedureType", "") # 获取测试用例信息,支持新旧字段名 test_case = test_case_info.get("测试用例", "") or test_case_info.get("testCase", "") case_number = test_case_info.get("用例编号", "") or test_case_info.get("caseNumber", "") condition_description = test_case_info.get("工况描述", "") or test_case_info.get("conditionDescription", "") # 第一行:规程基本信息(三个并排) procedureRow = QHBoxLayout() procedureRow.setSpacing(15) # 规程名称 nameGroup = self.createInfoGroup("规程名称", procedure_name, "procedureNameGroup", "procedureNameLabel") procedureRow.addWidget(nameGroup) # 规程编号 numberGroup = self.createInfoGroup("规程编号", procedure_number, "procedureNumberGroup", "procedureNumberLabel") procedureRow.addWidget(numberGroup) # 规程类型 typeGroup = self.createInfoGroup("规程类型", procedure_type, "procedureTypeGroup", "procedureTypeLabel") procedureRow.addWidget(typeGroup) procedureRow.addStretch() infoLayout.addLayout(procedureRow) # 第二行:测试用例信息(两个并排) testCaseRow = QHBoxLayout() testCaseRow.setSpacing(15) # 测试用例 testCaseGroup = self.createInfoGroup("测试用例", test_case, "testCaseGroup", "testCaseLabel") testCaseRow.addWidget(testCaseGroup) # 用例编号 caseNumberGroup = self.createInfoGroup("用例编号", case_number, "caseNumberGroup", "caseNumberLabel") testCaseRow.addWidget(caseNumberGroup) testCaseRow.addStretch() infoLayout.addLayout(testCaseRow) # 第三行:工况描述(独占一行,因为可能较长) if condition_description: descriptionRow = QHBoxLayout() descriptionGroup = self.createInfoGroup("工况描述", condition_description, "conditionDescriptionGroup", "conditionDescriptionLabel", isDescription=True) descriptionRow.addWidget(descriptionGroup) descriptionRow.addStretch() infoLayout.addLayout(descriptionRow) layout.addWidget(infoContainer) layout.addSpacing(20) def createInfoGroup(self, label, value, groupObjectName, labelObjectName, isDescription=False): """创建信息分组组件""" groupWidget = QWidget() groupWidget.setObjectName(groupObjectName) groupLayout = QVBoxLayout() groupLayout.setSpacing(8) groupWidget.setLayout(groupLayout) # 标签 labelWidget = QLabel(label) labelWidget.setObjectName(labelObjectName) groupLayout.addWidget(labelWidget) # 值 valueWidget = QLabel(value if value else "未设置") if isDescription: valueWidget.setObjectName("descriptionValue") valueWidget.setWordWrap(True) valueWidget.setMaximumHeight(80) else: valueWidget.setObjectName("infoGroupValue") groupLayout.addWidget(valueWidget) return groupWidget def createTableSection(self, layout, testSteps): """创建表格区域""" try: self.tableModel = StepTableModel(testSteps) self.tableView = QTableView() self.tableView.setModel(self.tableModel) self.tableView.setContextMenuPolicy(Qt.CustomContextMenu) self.tableView.customContextMenuRequested.connect(self.showContextMenu) # 设置表格编辑行为 self.tableView.setEditTriggers(QTableView.DoubleClicked) self.tableView.setSelectionBehavior(QTableView.SelectRows) self.setupTableHeaders() layout.addWidget(QLabel("测试步骤:")) layout.addWidget(self.tableView) except Exception as e: print(f"创建表格区域时出错: {e}") import traceback traceback.print_exc() # 如果出错,至少创建一个空的表格 self.tableView = QTableView() layout.addWidget(QLabel("测试步骤:")) layout.addWidget(self.tableView) def setupTableHeaders(self): """设置表格头部""" try: header = self.tableView.horizontalHeader() if header: header.setStretchLastSection(True) header.setSectionResizeMode(0, QHeaderView.ResizeToContents) header.setSectionResizeMode(1, QHeaderView.Stretch) header.setSectionResizeMode(2, QHeaderView.ResizeToContents) header.setSectionResizeMode(3, QHeaderView.ResizeToContents) header.setSectionResizeMode(4, QHeaderView.ResizeToContents) header.setSectionResizeMode(5, QHeaderView.ResizeToContents) header.setSectionResizeMode(6, QHeaderView.ResizeToContents) except Exception as e: print(f"设置表格头部时出错: {e}") def createControlSection(self, layout): """创建控制按钮区域""" controlLayout = QHBoxLayout() # 开始自动执行按钮 - 绿色主题 self.autoButton = QPushButton("开始自动执行") self.autoButton.clicked.connect(self.startAutoExecute) self.autoButton.setIcon(qta.icon('fa5s.play', color='white')) self.autoButton.setToolTip("开始自动执行整个规程流程\n将按照设定的轮次和间隔时间自动执行所有步骤") # 停止自动执行按钮 - 红色主题 self.stopButton = QPushButton("停止自动执行") self.stopButton.clicked.connect(self.stopAutoExecute) self.stopButton.setEnabled(False) self.stopButton.setIcon(qta.icon('fa5s.stop', color='white')) self.stopButton.setToolTip("停止当前正在执行的自动流程\n可以随时中断执行过程") # 执行下一步按钮 - 蓝色主题 self.nextButton = QPushButton("执行下一步") self.nextButton.clicked.connect(self.executeNextStep) self.nextButton.setIcon(qta.icon('fa5s.step-forward', color='white')) self.nextButton.setToolTip("手动执行下一个步骤\n用于单步调试和手动控制执行过程") # 完全重置按钮 - 橙色主题 self.resetButton = QPushButton("完全重置") self.resetButton.clicked.connect(self.resetExecution) self.resetButton.setIcon(qta.icon('fa5s.redo', color='white')) self.resetButton.setToolTip("重置所有执行状态\n清除所有步骤的执行结果和进度") # 生成报告按钮 - 紫色主题 self.exportButton = QPushButton("生成报告") self.exportButton.clicked.connect(self.onExportReportClicked) self.exportButton.setIcon(qta.icon('fa5s.file-alt', color='white')) self.exportButton.setToolTip("生成执行报告\n导出详细的执行结果和统计数据") # 创建按钮分组布局 # 第一组:执行控制按钮 executionGroup = QHBoxLayout() executionGroup.addWidget(self.autoButton) executionGroup.addWidget(self.stopButton) executionGroup.addWidget(self.nextButton) # executionGroup.addStretch() # 添加弹性空间 # 第二组:管理按钮 # managementGroup = QHBoxLayout() # managementGroup.addStretch() # 添加弹性空间 executionGroup.addWidget(self.resetButton) executionGroup.addWidget(self.exportButton) controlLayout.setSpacing(15) # 将两组按钮添加到主布局 controlLayout.addLayout(executionGroup) # controlLayout.addLayout(managementGroup) layout.addLayout(controlLayout) def createSettingsSection(self, layout): """创建设置区域""" cycleLayout = QHBoxLayout() # 执行轮次设置 cycleLayout.addWidget(QLabel("执行轮次:")) self.cycleSpin = QSpinBox() self.cycleSpin.setRange(1, 999) self.cycleSpin.setValue(1) cycleLayout.addWidget(self.cycleSpin) self.infiniteCheckbox = QCheckBox("无限循环") cycleLayout.addWidget(self.infiniteCheckbox) # 状态显示 self.statusLabel = QLabel("就绪") self.statusLabel.setObjectName("statusLabel") cycleLayout.addWidget(self.statusLabel) # 倒计时显示 self.countdownLabel = QLabel("") self.countdownLabel.setObjectName("countdownLabel") cycleLayout.addWidget(self.countdownLabel) # 步骤间隔设置 cycleLayout.addWidget(QLabel("步骤间隔(秒):")) self.stepIntervalSpin = QDoubleSpinBox() self.stepIntervalSpin.setRange(0, 60.0) self.stepIntervalSpin.setValue(1) self.stepIntervalSpin.setDecimals(2) self.stepIntervalSpin.setSingleStep(0.1) self.stepIntervalSpin.setToolTip("设置步骤执行计时器的间隔时间(秒)") cycleLayout.addWidget(self.stepIntervalSpin) cycleLayout.addStretch() layout.addLayout(cycleLayout) def createToolbar(self): """创建工具栏""" self.toolbar = QToolBar("执行工具栏") self.toolbar.setIconSize(QSize(24, 24)) toolbarActions = [ (qta.icon('fa5s.play', color='green'), "开始执行", self.startAutoExecute), (qta.icon('fa5s.stop', color='red'), "停止执行", self.stopAutoExecute), (qta.icon('fa5s.step-forward', color='blue'), "下一步", self.executeNextStep), (qta.icon('fa5s.redo', color='orange'), "重置", self.resetExecution), (qta.icon('fa5s.file-alt', color='purple'), "生成报告", self.onExportReportClicked) ] for icon, text, slot in toolbarActions: action = QAction(icon, text, self) action.triggered.connect(slot) self.toolbar.addAction(action) if text in ["下一步", "重置"]: self.toolbar.addSeparator() def showContextMenu(self, pos): """显示右键菜单""" index = self.tableView.indexAt(pos) if not index.isValid(): return menu = QMenu() jumpAction = menu.addAction("从该步骤开始执行") jumpAction.triggered.connect(lambda: self.jumpExecute(index.row())) detailAction = menu.addAction("查看步骤详情") detailAction.triggered.connect(lambda: self.showStepDetail(index.row())) if hasattr(self.tableView, 'viewport') and hasattr(self.tableView.viewport(), 'mapToGlobal'): menu.exec_(self.tableView.viewport().mapToGlobal(pos)) def jumpExecute(self, rowIndex): """跳转执行""" if 0 <= rowIndex < self.tableModel.rowCount(): stepInfo = self.tableModel.getStepInfo(rowIndex) if stepInfo and not stepInfo['isMain']: self.executeStep(rowIndex) self.currentIndex = rowIndex + 1 def showStepDetail(self, row): """显示步骤详情""" stepInfo = self.tableModel.getStepInfo(row) if not stepInfo: return detailDialog = QDialog(self) detailDialog.setWindowTitle("步骤详情") detailDialog.setMinimumWidth(500) layout = QVBoxLayout() formLayout = QFormLayout() formLayout.addRow("步骤ID", QLabel(stepInfo['stepId'])) formLayout.addRow("步骤类型", QLabel("主步骤" if stepInfo['isMain'] else "子步骤")) formLayout.addRow("步骤描述", QLabel(stepInfo['description'])) if stepInfo['time']: formLayout.addRow("执行时间", QLabel(stepInfo['time'].strftime("%Y-%m-%d %H:%M:%S"))) if stepInfo['result'] is not None: status = "成功" if stepInfo['result'] else "失败" formLayout.addRow("执行结果", QLabel(status)) layout.addLayout(formLayout) buttonBox = QDialogButtonBox(QDialogButtonBox.Ok) buttonBox.accepted.connect(detailDialog.accept) layout.addWidget(buttonBox) detailDialog.setLayout(layout) detailDialog.exec_() def updateStatusDisplay(self, message, color="blue"): """更新状态显示""" self.statusLabel.setText(message) self.statusLabel.setStyleSheet(f"color: {color}; font-weight: bold;") def startAutoExecute(self): """开始自动执行""" self.isRunning = True self.isActive = True self.tabLockRequired.emit(True) self.updateButtonStates(False, True, False, False, False) # 每次开始新的执行时,都需要初始化状态 if self.isFirstRun: self.initializeFirstRun() # 确保每次开始执行时都重新获取设置值 self.remainingCycles = self.cycleSpin.value() self.infiniteCycles = self.infiniteCheckbox.isChecked() # 重置当前索引,确保从第一步开始执行 self.currentIndex = 0 # 初始化倒计时状态 self.initializeCountdown() stepInterval = int(self.stepIntervalSpin.value() * 1000) self.timer.start(stepInterval) cycleNum = self.getCurrentCycleNumber() self.updateStatusDisplay(f"开始执行 - 第{cycleNum}轮", "green") self.startCountdown() def updateButtonStates(self, auto, stop, next, reset, export): """更新按钮状态""" self.autoButton.setEnabled(auto) self.stopButton.setEnabled(stop) self.nextButton.setEnabled(next) self.resetButton.setEnabled(reset) self.exportButton.setEnabled(export) def initializeFirstRun(self): """初始化首次执行""" self.stepResults = [] self.tableModel.resetExecutionState() self.isFirstRun = False self.createNewExecutionRecord() def stopAutoExecute(self): """停止自动执行""" self.isRunning = False self.timer.stop() self.updateButtonStates(True, False, True, True, True) self.saveCurrentResults() self.updateStatusDisplay("执行已停止", "orange") self.stopCountdown() self.executionFinished.emit(self) self.tabLockRequired.emit(False) def saveCurrentResults(self): """保存当前结果""" if hasattr(self, 'currentExecutionId') and self.stepResults: try: json.dumps(self.stepResults) self.dbManager.updateStepResults(self.currentExecutionId, self.stepResults) print("执行停止,当前轮次结果已保存") except Exception as e: print("stepResults不能序列化", self.stepResults) raise def autoExecuteStep(self): """自动执行步骤""" if self.currentIndex < self.tableModel.rowCount(): self.executeCurrentStep() else: self.handleCycleCompletion() def executeCurrentStep(self): """执行当前步骤""" # 记录步骤开始时间 self.lastStepStartTime = datetime.now() stepInfo = self.tableModel.getStepInfo(self.currentIndex) if stepInfo and not stepInfo['isMain']: self.executeStep(self.currentIndex) self.currentIndex += 1 self.startCountdown() def handleCycleCompletion(self): """处理轮次完成""" self.saveCurrentResults() if self.remainingCycles > 1: self.prepareNextCycle() else: self.finishExecution() def prepareNextCycle(self): """准备下一轮执行""" if not self.infiniteCycles: self.remainingCycles -= 1 self.timer.stop() self.currentIndex = 0 # if self.infiniteCycles != 0: self.createNewExecutionRecord() # 只在非最后一轮时清除表格状态,最后一轮完成后保留状态信息 # if self.remainingCycles > 0 or self.infiniteCycles: self.tableModel.resetAll() self.stepResults = [] # 重新初始化倒计时状态 self.initializeCountdown() cycleNum = self.getCurrentCycleNumber() self.updateStatusDisplay(f"执行中 - 第{cycleNum}轮", "green") self.startCountdown() stepInterval = int(self.stepIntervalSpin.value() * 1000) self.timer.start(stepInterval) def finishExecution(self): """完成执行""" self.stopAutoExecute() # 计算最终执行统计 totalSteps = len([step for step in self.tableModel.stepData]) executedSteps = len([step for step in self.tableModel.stepData if step.get('executed', False)]) successSteps = len([step for step in self.tableModel.stepData if step.get('executed', False) and step.get('result', False)]) failedSteps = executedSteps - successSteps successRate = (successSteps/executedSteps*100) if executedSteps > 0 else 0 # 显示最终执行状态 statusMessage = f"执行完成 - 总步骤: {totalSteps}, 成功: {successSteps}, 失败: {failedSteps}, 成功率: {successRate:.1f}%" self.updateStatusDisplay(statusMessage, "green") # 重置执行状态,为下次执行做准备 self.isFirstRun = True self.currentIndex = 0 self.executionFinished.emit(self) def createNewExecutionRecord(self): """创建新的执行记录""" executionTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.currentExecutionId = self.dbManager.insertProcedureExecution( self.procedureId, executionTime ) print(f"创建新的执行记录,ID: {self.currentExecutionId}") def getCurrentCycleNumber(self): """获取当前轮次数""" if self.infiniteCycles: return "无限" else: totalCycles = self.cycleSpin.value() # 当前正在执行的轮次 = 总轮次 - 剩余轮次 + 1 currentCycle = self.remainingCycles return f"{currentCycle}/{totalCycles}" def executeNextStep(self): """执行下一步""" self.isActive = True self.tabLockRequired.emit(True) if self.currentIndex < self.tableModel.rowCount(): stepInfo = self.tableModel.getStepInfo(self.currentIndex) if stepInfo and not stepInfo['isMain']: self.executeStep(self.currentIndex) self.currentIndex += 1 def executeStep(self, row): """执行步骤""" stepInfo = self.tableModel.getStepInfo(row) if not stepInfo: return False print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}") result = self.handleStep(row, stepInfo) # 更新实际已用时间 if self.lastStepStartTime: stepExecutionTime = (datetime.now() - self.lastStepStartTime).total_seconds() self.actualElapsedTime += stepExecutionTime print(f"步骤执行时间: {stepExecutionTime:.1f}秒, 累计已用时间: {self.actualElapsedTime:.1f}秒") if result is None: print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果,设置为False") result = '执行失败,未检测到关键字' executionTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") stepResult = { 'step_id': str(stepInfo.get('stepId', '')), 'step_description': str(stepInfo.get('description', '')), 'execution_time': executionTime, 'result': result } self.stepResults.append(stepResult) if hasattr(self, 'currentExecutionId'): self.dbManager.updateStepResults(self.currentExecutionId, self.stepResults) success = self.tableModel.updateStepResult(row, result, datetime.now()) return result def handleStep(self, rowIndex, stepInfo): """处理步骤""" description = stepInfo['description'] stepId = stepInfo['stepId'] stepType = stepInfo['stepType'] # print(stepInfo, 111111111111) print(f"处理步骤 {stepId}: {description}") # 从关键词字段库中提取关键词信息 keywordInfo = self.extractKeywordFromText(description) if keywordInfo: operationType = keywordInfo['operationType'] keyword = keywordInfo['keyword'] print(f"检测到关键词: {keyword}, 操作类型: {operationType}") # 根据操作类型执行相应的处理 stepHandlers = { "set": self.performSetOperation, "check": self.performCheckOperation, "wait": self.performWaitOperation, "deltaT": self.performDeltaTOperation } handler = stepHandlers.get(operationType) if handler: return handler(stepId, description, keyword, stepType) # 如果没有找到匹配的关键词,使用默认的模拟执行 return None def extractKeywordFromText(self, text): """从文本中提取关键词信息""" if not hasattr(self, 'dbManager') or not self.dbManager: return None try: keywords = self.dbManager.getKeywordByText(text) if keywords: # 返回第一个匹配的关键词(按长度降序排列,最长的优先) keyword, operationType = keywords[0] return { 'keyword': keyword, 'operationType': operationType } except Exception as e: print(f"提取关键词时出错: {str(e)}") return None def performSetOperation(self, stepId, description, keyword, stepType = None): """执行设置操作""" # print(f"执行设置操作 (步骤 {stepId}): {description}") # 简单匹配等号前变量名和等号后数值 variablePattern = r'([^\s=]+)=([0-9]+(?:\.[0-9]+)?)' matches = re.findall(variablePattern, description) if matches: # print(f"检测到 {len(matches)} 个需要设置的变量:") result = '' for varName, varValue in matches: varName = varName.strip() # print(f" 变量: {varName} = {varValue}") if stepType and 'Time' in stepType: timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) if timeMatch: timeoutMS = int(timeMatch.group(1)) else: timeoutMS = 2000 # print(timeoutMS) success = self.protocolManager.writeVariableValue(varName, float(varValue), trigger = True, timeoutMS = timeoutMS) time.sleep(timeoutMS/1000 + 2) else: success = self.protocolManager.writeVariableValue(varName, float(varValue)) if success: result += f"{varName} = {varValue}\n" else: result += f"{varName}强制失败\n" return result else: return '"强制失败,未检测到关键字"' def performCheckOperation(self, stepId, description, keyword, stepType = None): """执行检查操作""" print(f"执行检查操作 (步骤 {stepId}): {description}") # 支持多种比较符号的正则表达式 # 匹配格式:变量名 比较符号 数值 # 支持的比较符号:=, ==, >, <, >=, <=, !=, <> variablePattern = r'([^\s=>|<|>=|<=|!=|<>)\s*([0-9]+(?:\.[0-9]+)?)' matches = re.findall(variablePattern, description) if matches: # print(f"检测到 {len(matches)} 个需要检查的变量:") result = '' allPassed = True for varName, operator, expectedValue in matches: varName = varName.strip() expectedValue = float(expectedValue) print(f" 检查变量: {varName} {operator} {expectedValue}") # 读取变量实际值 actualValue = self.protocolManager.readVariableValue(varName) if actualValue is not None: try: actualValue = float(actualValue) # 根据操作符进行比较 comparisonResult = self.compareValues(actualValue, operator, expectedValue) if comparisonResult: result += f"{varName} = {actualValue} {operator} {expectedValue} ✓\n" print(f" ✓ 检查通过: {varName} = {actualValue} {operator} {expectedValue}") else: result += f"失败{varName} = {actualValue} {operator} {expectedValue} ✗\n" print(f" ✗ 检查失败: {varName} = {actualValue} {operator} {expectedValue}") allPassed = False except (ValueError, TypeError): result += f"{varName} = {actualValue} ✗ (类型错误)\n" print(f" ✗ 类型错误: {varName} = {actualValue}") allPassed = False else: result += f"{varName} 读取失败 ✗\n" print(f" ✗ 读取失败: {varName}") allPassed = False if allPassed: return result else: return result else: return "强制失败,未检测到关键字" def compareValues(self, actualValue, operator, expectedValue): """ 比较实际值和期望值 Args: actualValue: 实际值 operator: 比较操作符 (=, ==, >, <, >=, <=, !=, <>) expectedValue: 期望值 Returns: bool: 比较结果 """ # 对于等于操作,允许一定的误差容限 tolerance = 0.001 # 0.1%的误差容限 if operator in ['=', '==']: # 等于比较,允许误差 return abs(actualValue - expectedValue) <= (expectedValue * tolerance) elif operator == '>': # 大于比较 return actualValue > expectedValue elif operator == '<': # 小于比较 return actualValue < expectedValue elif operator == '>=': # 大于等于比较 return actualValue >= expectedValue elif operator == '<=': # 小于等于比较 return actualValue <= expectedValue elif operator in ['!=', '<>']: # 不等于比较,允许误差 return abs(actualValue - expectedValue) > (expectedValue * tolerance) else: # 未知操作符,默认返回False return False def performWaitOperation(self, stepId, description, keyword, stepType): """执行等待操作,等待时间取自描述字段中的数字(秒)""" print(f"执行等待操作 (步骤 {stepId}): {description}") import re, time # 只提取描述中的数字(支持小数),不再匹配"等待"字样 match = re.search(r'([0-9]+(?:\.[0-9]+)?)', description) if match: seconds = float(match.group(1)) print(f" 等待 {seconds} 秒...") time.sleep(seconds) return '执行成功' else: print(" 未检测到等待时间") return '未检测到等待时间' def performDeltaTOperation(self, stepId, description, keyword, stepType): """执行接收操作""" result = self.protocolManager.recvDeltaT() # 这里可以添加具体的接收操作逻辑 result = ' '.join([f't{i+1}={num}' for i, num in enumerate(result)]) return result def resetExecution(self, fromAuto=False): """重置执行""" # print(111111111111) self.tableModel.resetAll() self.currentIndex = 0 if not fromAuto: self.stopAutoExecute() self.cycleSpin.setValue(1) self.infiniteCheckbox.setChecked(False) self.isRunning = False self.isActive = False self.isFirstRun = True self.stepResults = [] if hasattr(self, 'currentExecutionId'): del self.currentExecutionId self.tabLockRequired.emit(False) self.updateStatusDisplay("已重置", "blue") self.resetCountdown() def onExportReportClicked(self): """导出报告点击事件""" self.generateReport() def generateReport(self): """生成执行报告""" try: defaultName = f"{self.procedureData['规程信息']['规程名称']}_{self.procedureData['规程信息']['规程编号']}_执行报告.docx" filePath, _ = QFileDialog.getSaveFileName( self, "保存报告", defaultName, "Word文档 (*.docx)" ) if not filePath: return None doc = Document() self.setupDocumentStyles(doc) title = doc.add_heading(f"{self.procedureData['规程信息']['规程名称']} 执行报告", 0) title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER timePara = doc.add_paragraph() timePara.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER timeRun = timePara.add_run(f"报告生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}") timeRun.font.size = Pt(10) timeRun.font.color.rgb = RGBColor(128, 128, 128) doc.add_paragraph() doc.add_heading("1. 规程基本信息", level=1) self.addProcedureInfoTable(doc) doc.add_paragraph() doc.add_heading("2. 执行结果统计", level=1) self.addExecutionStatsTable(doc) doc.add_paragraph() doc.add_heading("3. 详细步骤执行结果", level=1) self.addDetailedStepsTable(doc) doc.add_paragraph() doc.add_heading("4. 执行总结", level=1) self.addExecutionSummary(doc) doc.save(filePath) QMessageBox.information(self, "导出成功", f"报告已保存到:\n{filePath}") return filePath except Exception as e: QMessageBox.critical(self, "导出错误", f"生成报告时出错:\n{str(e)}") return None def setupDocumentStyles(self, doc): """设置文档样式""" style = doc.styles['Normal'] style.font.name = '微软雅黑' style.font.size = Pt(10.5) for i in range(1, 4): headingStyle = doc.styles[f'Heading {i}'] headingStyle.font.name = '微软雅黑' headingStyle.font.bold = True if i == 1: headingStyle.font.size = Pt(16) headingStyle.font.color.rgb = RGBColor(0, 0, 139) elif i == 2: headingStyle.font.size = Pt(14) headingStyle.font.color.rgb = RGBColor(47, 84, 150) else: headingStyle.font.size = Pt(12) headingStyle.font.color.rgb = RGBColor(68, 114, 196) def addProcedureInfoTable(self, doc): """添加规程信息表格""" infoTable = doc.add_table(rows=1, cols=2) infoTable.style = 'Table Grid' infoTable.autofit = True hdrCells = infoTable.rows[0].cells hdrCells[0].text = "项目" hdrCells[1].text = "内容" for cell in hdrCells: cell.paragraphs[0].runs[0].font.bold = True cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 255, 255) cell._tc.get_or_add_tcPr().append(parse_xml(f'')) infoItems = [ ("规程名称", self.procedureData['规程信息']['规程名称']), ("规程编号", self.procedureData['规程信息']['规程编号']), ("规程类型", self.procedureData['规程信息']['规程类型']), ("测试用例", self.procedureData['测试用例信息']['测试用例']), ("用例编号", self.procedureData['测试用例信息']['用例编号']), ("工况描述", self.procedureData['测试用例信息']['工况描述']) ] for itemName, itemValue in infoItems: rowCells = infoTable.add_row().cells rowCells[0].text = itemName rowCells[1].text = str(itemValue) rowCells[0].paragraphs[0].runs[0].font.bold = True rowCells[0].paragraphs[0].runs[0].font.color.rgb = RGBColor(68, 114, 196) def addExecutionStatsTable(self, doc): """添加执行统计表格""" totalSteps = len([step for step in self.tableModel.stepData]) executedSteps = len([step for step in self.tableModel.stepData if step.get('executed', False)]) successSteps = len([step for step in self.tableModel.stepData if step.get('executed', False) and step.get('result', False)]) failedSteps = executedSteps - successSteps successRate = (successSteps/executedSteps*100) if executedSteps > 0 else 0 statsTable = doc.add_table(rows=1, cols=2) statsTable.style = 'Table Grid' statsTable.autofit = True statsHdrCells = statsTable.rows[0].cells statsHdrCells[0].text = "统计项目" statsHdrCells[1].text = "数量" for cell in statsHdrCells: cell.paragraphs[0].runs[0].font.bold = True cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 255, 255) cell._tc.get_or_add_tcPr().append(parse_xml(f'')) statsItems = [ ("总步骤数", str(totalSteps)), ("已执行步骤数", str(executedSteps)), ("成功步骤数", str(successSteps)), ("失败步骤数", str(failedSteps)), ("执行成功率", f"{successRate:.1f}%") ] for itemName, itemValue in statsItems: rowCells = statsTable.add_row().cells rowCells[0].text = itemName rowCells[1].text = itemValue rowCells[0].paragraphs[0].runs[0].font.bold = True rowCells[0].paragraphs[0].runs[0].font.color.rgb = RGBColor(68, 114, 196) if "成功率" in itemName: if successRate >= 90: rowCells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 176, 80) elif successRate >= 70: rowCells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 192, 0) else: rowCells[1].paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) def addDetailedStepsTable(self, doc): """添加详细步骤表格""" stepsTable = doc.add_table(rows=1, cols=5) stepsTable.style = 'Table Grid' stepsTable.autofit = True stepsHdrCells = stepsTable.rows[0].cells stepsHdrCells[0].text = "步骤ID" stepsHdrCells[1].text = "步骤描述" stepsHdrCells[2].text = "执行时间" stepsHdrCells[3].text = "执行结果" stepsHdrCells[4].text = "备注" for cell in stepsHdrCells: cell.paragraphs[0].runs[0].font.bold = True cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 255, 255) cell._tc.get_or_add_tcPr().append(parse_xml(f'')) for step in self.tableModel.stepData: rowCells = stepsTable.add_row().cells rowCells[0].text = step.get('stepId', '') rowCells[1].text = step.get('description', '') if step.get('executed', False): rowCells[2].text = step.get('time', '').strftime("%Y-%m-%d %H:%M:%S") if step.get('time') else '' result = step.get('result', False) rowCells[3].text = "✓ 成功" if '失败' not in result else "✗ 失败" rowCells[4].text = result if result: rowCells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 176, 80) rowCells[3]._tc.get_or_add_tcPr().append(parse_xml(f'')) rowCells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 176, 80) else: rowCells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) rowCells[3]._tc.get_or_add_tcPr().append(parse_xml(f'')) rowCells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) else: rowCells[2].text = "" rowCells[3].text = "⏸ 未执行" rowCells[4].text = "步骤未执行" rowCells[3].paragraphs[0].runs[0].font.color.rgb = RGBColor(128, 128, 128) rowCells[4].paragraphs[0].runs[0].font.color.rgb = RGBColor(128, 128, 128) def addExecutionSummary(self, doc): """添加执行总结""" totalSteps = len([step for step in self.tableModel.stepData]) executedSteps = len([step for step in self.tableModel.stepData if step.get('executed', False)]) successSteps = len([step for step in self.tableModel.stepData if step.get('executed', False) and step.get('result', False)]) failedSteps = executedSteps - successSteps successRate = (successSteps/executedSteps*100) if executedSteps > 0 else 0 summaryPara = doc.add_paragraph() summaryPara.add_run("本次规程执行总结:\n").font.bold = True summaryText = f""" • 规程名称:{self.procedureData['规程信息']['规程名称']} • 执行时间:{datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')} • 总步骤数:{totalSteps} 个 • 已执行步骤:{executedSteps} 个 • 成功步骤:{successSteps} 个 • 失败步骤:{failedSteps} 个 • 执行成功率:{successRate:.1f}% """ if successRate >= 90: summaryText += "✓ 执行结果:优秀 - 规程执行成功,所有关键步骤均已完成。" elif successRate >= 70: summaryText += "⚠ 执行结果:良好 - 规程基本执行成功,部分步骤存在问题需要关注。" else: summaryText += "✗ 执行结果:失败 - 规程执行存在较多问题,需要重新执行或检查。" summaryPara.add_run(summaryText) for run in summaryPara.runs: run.font.size = Pt(11) def startCountdown(self): """开始倒计时""" if not self.countdownStartTime: return # 计算剩余时间 self.calculateRemainingTime() if self.remainingTime > 0: self.countdownTimer.start(1000) self.updateCountdown() else: self.countdownLabel.setText("") def calculateRemainingTime(self): """计算剩余时间""" if not self.countdownStartTime: self.remainingTime = 0 return # 计算当前轮次已完成的子步骤数 completedSubSteps = 0 for i in range(self.currentIndex): if not self.tableModel.stepData[i]['isMain']: completedSubSteps += 1 # 计算当前轮次剩余的子步骤数 remainingSubSteps = 0 for i in range(self.currentIndex, len(self.tableModel.stepData)): if not self.tableModel.stepData[i]['isMain']: remainingSubSteps += 1 # 基础剩余时间:剩余步骤数 * 步骤间隔 stepInterval = self.stepIntervalSpin.value() baseRemainingTime = remainingSubSteps * stepInterval # 预估剩余步骤的实际执行时间 estimatedRemainingExecutionTime = 0 for i in range(self.currentIndex, len(self.tableModel.stepData)): step = self.tableModel.stepData[i] if not step['isMain']: stepType = step.get('stepType', '') if 'Time' in stepType: timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) if timeMatch: timeoutMS = int(timeMatch.group(1)) estimatedRemainingExecutionTime += (timeoutMS / 1000) + 2 elif 'wait' in step.get('description', '').lower(): waitMatch = re.search(r'([0-9]+(?:\.[0-9]+)?)', step.get('description', '')) if waitMatch: estimatedRemainingExecutionTime += float(waitMatch.group(1)) else: estimatedRemainingExecutionTime += 1 # 总预估剩余时间 totalEstimatedRemaining = baseRemainingTime + estimatedRemainingExecutionTime # 考虑实际执行时间与预估时间的差异 if self.actualElapsedTime > 0 and completedSubSteps > 0: # 计算实际平均每步时间 actualAvgTimePerStep = self.actualElapsedTime / completedSubSteps # 预估平均每步时间 estimatedAvgTimePerStep = (self.estimatedTotalTime - totalEstimatedRemaining) / completedSubSteps if completedSubSteps > 0 else 0 # 如果实际时间比预估时间长,调整剩余时间 if actualAvgTimePerStep > estimatedAvgTimePerStep: timeAdjustment = (actualAvgTimePerStep - estimatedAvgTimePerStep) * remainingSubSteps totalEstimatedRemaining += timeAdjustment self.remainingTime = max(0, int(totalEstimatedRemaining)) print(f"剩余时间计算: 剩余步骤={remainingSubSteps}, 基础时间={baseRemainingTime:.1f}秒, " f"预估执行时间={estimatedRemainingExecutionTime:.1f}秒, 总剩余={self.remainingTime}秒") def stopCountdown(self): """停止倒计时""" self.countdownTimer.stop() self.countdownLabel.setText("") def updateCountdown(self): """更新倒计时""" if self.remainingTime > 0: minutes = self.remainingTime // 60 seconds = self.remainingTime % 60 # 计算当前轮次进度 totalSubSteps = sum(1 for step in self.tableModel.stepData if not step['isMain']) completedSubSteps = sum(1 for i in range(self.currentIndex) if not self.tableModel.stepData[i]['isMain']) progressPercent = (completedSubSteps / totalSubSteps * 100) if totalSubSteps > 0 else 0 if minutes > 0: countdownText = f"当前轮次剩余: {minutes:02d}:{seconds:02d} (进度: {progressPercent:.1f}%)" else: countdownText = f"当前轮次剩余: {seconds}秒 (进度: {progressPercent:.1f}%)" # 根据剩余时间设置颜色状态 if self.remainingTime <= 10: self.countdownLabel.setProperty("timeRemaining", "low") elif self.remainingTime <= 30: self.countdownLabel.setProperty("timeRemaining", "medium") else: self.countdownLabel.setProperty("timeRemaining", "high") self.countdownLabel.setText(countdownText) self.remainingTime -= 1 else: self.countdownLabel.setProperty("timeRemaining", "completed") self.countdownLabel.setText("当前轮次完成") self.countdownTimer.stop() def resetCountdown(self): """重置倒计时""" self.stopCountdown() self.remainingTime = 0 self.totalSteps = 0 # 重置倒计时相关状态 self.countdownStartTime = None self.estimatedTotalTime = 0 self.actualElapsedTime = 0 self.lastStepStartTime = None def isExecutionCompleted(self): """检查执行是否完成""" return self.currentIndex >= self.tableModel.rowCount() def getExecutionProgress(self): """获取执行进度""" totalSteps = self.tableModel.rowCount() completedSteps = self.currentIndex return completedSteps, totalSteps def showEvent(self, event): """显示事件""" super().showEvent(event) if not hasattr(self, '_hasResizedRows'): self.tableView.resizeRowsToContents() self._hasResizedRows = True def initializeCountdown(self): """初始化倒计时状态""" self.countdownStartTime = datetime.now() self.actualElapsedTime = 0 self.lastStepStartTime = None # 计算预估总时间 self.calculateEstimatedTotalTime() def calculateEstimatedTotalTime(self): """计算预估总时间""" stepInterval = self.stepIntervalSpin.value() totalSubSteps = sum(1 for step in self.tableModel.stepData if not step['isMain']) # 基础时间:步骤间隔时间 baseTime = totalSubSteps * stepInterval # 额外时间:预估每个步骤的实际执行时间 extraTime = 0 for step in self.tableModel.stepData: if not step['isMain']: # 根据步骤类型预估额外时间 stepType = step.get('stepType', '') if 'Time' in stepType: # 设置操作带超时时间 timeMatch = re.search(r'Time\s*=\s*(\d+)\s*ms', stepType, re.IGNORECASE) if timeMatch: timeoutMS = int(timeMatch.group(1)) extraTime += (timeoutMS / 1000) + 2 # 超时时间 + 2秒缓冲 elif 'wait' in step.get('description', '').lower(): # 等待操作 waitMatch = re.search(r'([0-9]+(?:\.[0-9]+)?)', step.get('description', '')) if waitMatch: extraTime += float(waitMatch.group(1)) else: # 其他操作预估1秒 extraTime += 1 self.estimatedTotalTime = baseTime + extraTime print(f"预估总时间: {self.estimatedTotalTime:.1f}秒 (基础: {baseTime:.1f}秒, 额外: {extraTime:.1f}秒)")