You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

567 lines
24 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import os
from PyQt5.QtCore import Qt, QTimer, QAbstractTableModel, QModelIndex, QPoint, QSize, pyqtSignal, QFile,QTextStream
from PyQt5.QtGui import QBrush, QColor, QFont, QStandardItemModel, QStandardItem
from PyQt5.QtWidgets import (QApplication, QMainWindow, QTableView,
QPushButton, QVBoxLayout, QWidget, QHBoxLayout,
QLabel, QCheckBox, QSpinBox, QMenu, QFileDialog,
QTabWidget, QTabBar, QListWidget, QListWidgetItem, QDialog,
QLineEdit, QFormLayout, QDialogButtonBox, QMessageBox,
QHeaderView, QToolBar, QAction, QStatusBar, QComboBox, QSplitter, QAbstractItemView, QDoubleSpinBox)
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import qtawesome as qta
from datetime import datetime
# 导入其他模块
from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel
from utils.DBModels.ProcedureModel import DatabaseManager
from utils import Globals
# 修改导入路径
class StepExecutor(QWidget):
# 添加标签锁定信号
tabLockRequired = pyqtSignal(bool)
# 添加执行完成信号
executionFinished = pyqtSignal(object) # 发送执行器实例
def __init__(self, procedureData, procedureId, dbManager):
super().__init__()
self.procedureData = procedureData
self.procedureId = procedureId # 新增规程ID
self.dbManager = dbManager # 新增数据库管理器
self.isRunning = False
self.isActive = False
testSteps = procedureData["测试步骤"]
self.initUi(testSteps)
self.currentIndex = 0
self.timer = QTimer()
self.timer.timeout.connect(self.autoExecuteStep)
self.remainingCycles = 1
self.infiniteCycles = False
self.isFirstRun = True # 新增标志位,用于区分首次执行
self.stepResults = [] # 新增:存储所有步骤执行结果的列表
# 新增:倒计时相关变量
self.countdownTimer = QTimer()
self.countdownTimer.timeout.connect(self.updateCountdown)
self.remainingTime = 0 # 当前轮次剩余时间(秒)
self.totalSteps = 0 # 当前轮次总步骤数
self.protocolManage = Globals.getValue("protocolManage")
def initUi(self, testSteps):
layout = QVBoxLayout()
info_layout = QFormLayout()
info_layout.setLabelAlignment(Qt.AlignRight)
info_layout.addRow("规程名称:", QLabel(self.procedureData["规程信息"]["规程名称"]))
info_layout.addRow("规程编号:", QLabel(self.procedureData["规程信息"]["规程编号"]))
info_layout.addRow("规程类型:", QLabel(self.procedureData["规程信息"]["规程类型"]))
info_layout.addRow("测试用例:", QLabel(self.procedureData["测试用例信息"]["测试用例"]))
info_layout.addRow("用例编号:", QLabel(self.procedureData["测试用例信息"]["用例编号"]))
info_layout.addRow("工况描述:", QLabel(self.procedureData["测试用例信息"]["工况描述"]))
layout.addLayout(info_layout)
layout.addSpacing(20)
self.tableModel = StepTableModel(testSteps)
self.tableView = QTableView()
self.tableView.setModel(self.tableModel)
self.tableView.setContextMenuPolicy(Qt.CustomContextMenu)
self.tableView.customContextMenuRequested.connect(self.showContextMenu)
# QHeaderView保护
hh = self.tableView.horizontalHeader()
if hh:
hh.setSectionResizeMode(0, QHeaderView.ResizeToContents)
hh.setSectionResizeMode(1, QHeaderView.Stretch)
hh.setSectionResizeMode(2, QHeaderView.ResizeToContents)
hh.setSectionResizeMode(3, QHeaderView.ResizeToContents)
hh.setSectionResizeMode(5, QHeaderView.Stretch)
vh = self.tableView.verticalHeader()
if vh:
self.tableView.setWordWrap(True)
vh.setSectionResizeMode(QHeaderView.ResizeToContents)
layout.addWidget(QLabel("测试步骤:"))
layout.addWidget(self.tableView)
# 创建控制按钮布局
control_layout = QHBoxLayout()
self.autoButton = QPushButton(" 开始自动执行")
self.autoButton.clicked.connect(self.startAutoExecute)
self.autoButton.setIcon(qta.icon('fa5s.play', color='green'))
self.stopButton = QPushButton(" 停止自动执行")
self.stopButton.clicked.connect(self.stopAutoExecute)
self.stopButton.setEnabled(False)
self.stopButton.setIcon(qta.icon('fa5s.stop', color='red'))
self.nextButton = QPushButton(" 执行下一步")
self.nextButton.clicked.connect(self.executeNextStep)
self.nextButton.setIcon(qta.icon('fa5s.step-forward', color='blue'))
self.resetButton = QPushButton(" 完全重置")
self.resetButton.clicked.connect(self.resetExecution)
self.resetButton.setIcon(qta.icon('fa5s.redo', color='orange'))
self.exportButton = QPushButton(" 生成报告")
self.exportButton.clicked.connect(self.onExportReportClicked)
self.exportButton.setIcon(qta.icon('fa5s.file-alt', color='purple'))
control_layout.addWidget(self.autoButton)
control_layout.addWidget(self.stopButton)
control_layout.addWidget(self.nextButton)
control_layout.addWidget(self.resetButton)
control_layout.addWidget(self.exportButton)
# 添加循环设置
cycle_layout = QHBoxLayout()
cycle_layout.addWidget(QLabel("执行轮次:"))
self.cycleSpin = QSpinBox()
self.cycleSpin.setRange(1, 999)
self.cycleSpin.setValue(1)
cycle_layout.addWidget(self.cycleSpin)
self.infiniteCheckbox = QCheckBox("无限循环")
cycle_layout.addWidget(self.infiniteCheckbox)
# 新增:状态显示标签
self.statusLabel = QLabel("就绪")
self.statusLabel.setStyleSheet("color: blue; font-weight: bold;")
cycle_layout.addWidget(self.statusLabel)
# 新增:倒计时显示标签
self.countdownLabel = QLabel("")
self.countdownLabel.setStyleSheet("color: red; font-weight: bold; font-size: 14px;")
cycle_layout.addWidget(self.countdownLabel)
# 新增:步骤间隔时间设置
cycle_layout.addWidget(QLabel("步骤间隔(秒):"))
self.stepIntervalSpin = QDoubleSpinBox()
self.stepIntervalSpin.setRange(0, 60.0)
self.stepIntervalSpin.setValue(1)
self.stepIntervalSpin.setDecimals(2) # 支持一位小数
self.stepIntervalSpin.setSingleStep(0.1) # 每次调整0.1秒
self.stepIntervalSpin.setToolTip("设置步骤执行计时器的间隔时间(秒)")
cycle_layout.addWidget(self.stepIntervalSpin)
cycle_layout.addStretch()
# 将所有布局添加到主布局
layout.addLayout(control_layout)
layout.addLayout(cycle_layout)
# 设置主布局
self.setLayout(layout)
# 初始化工具栏
self.toolbar = QToolBar("执行工具栏")
self.toolbar.setIconSize(QSize(24, 24))
# 工具栏操作
self.toolbar.addAction(qta.icon('fa5s.play', color='green'), "开始执行", self.startAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.stop', color='red'), "停止执行", self.stopAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.step-forward', color='blue'), "下一步", self.executeNextStep)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.redo', color='orange'), "重置", self.resetExecution)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.file-alt', color='purple'), "生成报告", self.onExportReportClicked)
def showContextMenu(self, pos):
index = self.tableView.indexAt(pos)
if index.isValid():
menu = QMenu()
jumpAction = menu.addAction("从该步骤开始执行")
if jumpAction:
jumpAction.triggered.connect(lambda: self.jumpExecute(index.row()))
detailAction = menu.addAction("查看步骤详情")
if detailAction:
detailAction.triggered.connect(lambda: self.showStepDetail(index.row()))
if hasattr(self.tableView, 'viewport') and hasattr(self.tableView.viewport(), 'mapToGlobal'):
menu.exec_(self.tableView.viewport().mapToGlobal(pos))
def jumpExecute(self, rowIndex):
if 0 <= rowIndex < self.tableModel.rowCount():
stepInfo = self.tableModel.getStepInfo(rowIndex)
if stepInfo and not stepInfo['isMain']:
self.executeStep(rowIndex)
self.currentIndex = rowIndex + 1
def showStepDetail(self, row):
step_info = self.tableModel.getStepInfo(row)
if not step_info:
return
detail_dialog = QDialog(self)
detail_dialog.setWindowTitle("步骤详情")
detail_dialog.setMinimumWidth(500)
layout = QVBoxLayout()
form_layout = QFormLayout()
form_layout.addRow("步骤ID", QLabel(step_info['stepId']))
form_layout.addRow("步骤类型", QLabel("主步骤" if step_info['isMain'] else "子步骤"))
form_layout.addRow("步骤描述", QLabel(step_info['description']))
if step_info['time']:
form_layout.addRow("执行时间", QLabel(step_info['time'].strftime("%Y-%m-%d %H:%M:%S")))
if step_info['result'] is not None:
status = "成功" if step_info['result'] else "失败"
form_layout.addRow("执行结果", QLabel(status))
layout.addLayout(form_layout)
button_box = QDialogButtonBox(QDialogButtonBox.Ok)
button_box.accepted.connect(detail_dialog.accept)
layout.addWidget(button_box)
detail_dialog.setLayout(layout)
detail_dialog.exec_()
def updateStatusDisplay(self, message, color="blue"):
"""更新状态显示"""
self.statusLabel.setText(message)
self.statusLabel.setStyleSheet(f"color: {color}; font-weight: bold;")
def startAutoExecute(self):
self.isRunning = True
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
self.autoButton.setEnabled(False)
self.stopButton.setEnabled(True)
self.nextButton.setEnabled(False)
self.resetButton.setEnabled(False)
self.exportButton.setEnabled(False)
# 如果是首次执行,则初始化步骤结果集合
if self.isFirstRun:
self.stepResults = [] # 重置步骤结果集合
self.tableModel.resetExecutionState()
self.isFirstRun = False # 标记已执行过
# 创建第一个执行记录
self.createNewExecutionRecord()
self.remainingCycles = self.cycleSpin.value()
self.infiniteCycles = self.infiniteCheckbox.isChecked()
# 使用用户设置的步骤间隔时间启动计时器
stepInterval = int(self.stepIntervalSpin.value() * 1000) # 转换为毫秒
self.timer.start(stepInterval)
# 更新状态显示
self.updateStatusDisplay(f"开始执行 - 第1轮", "green")
# 开始倒计时(在第一个步骤执行前)
self.startCountdown()
def stopAutoExecute(self):
self.isRunning = False # 清除自动执行状态
self.timer.stop()
self.autoButton.setEnabled(True)
self.stopButton.setEnabled(False)
self.nextButton.setEnabled(True)
self.resetButton.setEnabled(True)
self.exportButton.setEnabled(True)
# 注意: 这里不重置isActive因为执行器仍处于激活状态
# 执行结束时保存当前轮次的步骤结果到数据库
if hasattr(self, 'current_execution_id') and self.stepResults:
import json
try:
json.dumps(self.stepResults)
except Exception as e:
print("stepResults不能序列化", self.stepResults)
raise
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
print(f"执行停止,当前轮次结果已保存")
# 更新状态显示
self.updateStatusDisplay("执行已停止", "orange")
# 停止倒计时
self.stopCountdown()
# 发送执行完成信号
self.executionFinished.emit(self)
# 执行完毕后自动解锁标签页
self.tabLockRequired.emit(False)
# self.resetExecution() # 自动执行完毕后自动重置
def autoExecuteStep(self):
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
# 步骤执行完成后,更新倒计时
self.startCountdown()
else:
# 当前轮次执行完成,立即存储执行结果
if hasattr(self, 'current_execution_id') and self.stepResults:
import json
try:
json.dumps(self.stepResults)
except Exception as e:
print("stepResults不能序列化", self.stepResults)
raise
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
print(f"{self.getCurrentCycleNumber()} 轮执行完成,结果已存储")
if self.infiniteCycles or self.remainingCycles > 0:
if not self.infiniteCycles:
self.remainingCycles -= 1
self.timer.stop()
self.currentIndex = 0
should_continue = self.infiniteCycles or self.remainingCycles > 0
if should_continue:
# 开始新一轮执行前,创建新的执行记录
self.createNewExecutionRecord()
self.tableModel.resetAll() # 确保每轮自动清除颜色
self.stepResults = [] # 清空步骤结果,准备新一轮
# 更新状态显示
cycle_num = self.getCurrentCycleNumber()
self.updateStatusDisplay(f"执行中 - 第{cycle_num}", "green")
# 重新开始倒计时
self.startCountdown()
# 使用用户设置的步骤间隔时间重新启动计时器
stepInterval = int(self.stepIntervalSpin.value() * 1000) # 转换为毫秒
self.timer.start(stepInterval)
else:
self.stopAutoExecute()
# 发送执行完成信号
self.executionFinished.emit(self)
else:
self.stopAutoExecute()
# 发送执行完成信号
self.executionFinished.emit(self)
def createNewExecutionRecord(self):
"""创建新的执行记录"""
execution_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.current_execution_id = self.dbManager.insertProcedureExecution(
self.procedureId,
execution_time
)
print(f"创建新的执行记录ID: {self.current_execution_id}")
def getCurrentCycleNumber(self):
"""获取当前执行的轮次数"""
if self.infiniteCycles:
return "无限"
else:
total_cycles = self.cycleSpin.value()
completed_cycles = total_cycles - self.remainingCycles
return f"{completed_cycles + 1}/{total_cycles}"
def executeNextStep(self):
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
# 手动执行不需要修改isRunning状态
def executeStep(self, row):
stepInfo = self.tableModel.getStepInfo(row)
if not stepInfo:
return False
print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}")
result = self.handleStep(row, stepInfo)
if result is None:
print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果设置为False")
result = False
execution_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 只保存基础类型,避免递归
step_result = {
'step_id': str(stepInfo.get('stepId', '')),
'step_description': str(stepInfo.get('description', '')),
'execution_time': execution_time,
'result': bool(result)
}
self.stepResults.append(step_result)
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
success = self.tableModel.updateStepResult(row, result, datetime.now())
return result
def handleStep(self, rowIndex, stepInfo): # 修改参数名
description = stepInfo['description']
stepId = stepInfo['stepId']
print(f"处理步骤 {stepId}: {description}")
if "设置" in description:
return self.performWrite(stepId, description) # 修改方法名
elif "检查" in description:
return self.performDataImport(stepId, description) # 修改方法名
elif "接收" in description:
return self.performValidation(stepId, description) # 修改方法名
elif "导出" in description:
return self.performExport(stepId, description) # 修改方法名
elif "备份" in description:
return self.performBackup(stepId, description) # 修改方法名
elif "发送" in description:
return self.performNotification(stepId, description) # 修改方法名
def simulateExecution(self, stepId, description): # 修改方法名
import random
return random.random() < 0.9
def performWrite(self, stepId, description): # 修改方法名
print(f"执行登录操作 (步骤 {stepId}): {description}")
return True
def performDataImport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.95
def performValidation(self, stepId, description): # 修改方法名
import random
return random.random() < 0.85
def performExport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.98
def performBackup(self, stepId, description): # 修改方法名
return True
def performNotification(self, stepId, description): # 修改方法名
import random
return random.random() < 0.99
def performWait(self, stepId, description): # 修改方法名
import time
waitTime = 1 # 修改变量名
if "s" in description:
try:
waitTime = int(description.split("等待")[1].split("s")[0].strip())
except:
pass
time.sleep(waitTime)
return True
def performSetting(self, stepId, description): # 修改方法名
return True
def resetExecution(self, fromAuto=False):
self.tableModel.resetAll()
self.currentIndex = 0
if not fromAuto:
self.stopAutoExecute()
self.cycleSpin.setValue(1)
self.infiniteCheckbox.setChecked(False)
self.isRunning = False
self.isActive = False
self.isFirstRun = True # 重置标志位
self.stepResults = [] # 重置步骤结果集合禁止赋值为tableModel.stepData
if hasattr(self, 'current_execution_id'):
del self.current_execution_id
self.tabLockRequired.emit(False)
self.updateStatusDisplay("已重置", "blue")
self.resetCountdown()
def onExportReportClicked(self):
self.generateReport()
def startCountdown(self):
"""开始倒计时"""
# 计算当前轮次的总步骤数(只计算子步骤)
self.totalSteps = sum(1 for step in self.tableModel.stepData if not step['isMain'])
# 计算剩余步骤数(从当前索引开始到结束)
remainingSteps = 0
for i in range(self.currentIndex, len(self.tableModel.stepData)):
if not self.tableModel.stepData[i]['isMain']:
remainingSteps += 1
# 获取步骤间隔时间
stepInterval = self.stepIntervalSpin.value()
# 计算当前轮次的剩余时间(剩余步骤数 * 步骤间隔时间)
if remainingSteps > 0:
self.remainingTime = int(remainingSteps * stepInterval) # 转换为整数秒
self.countdownTimer.start(1000) # 每秒更新一次
self.updateCountdown()
else:
# 如果没有剩余步骤,清空显示
self.countdownLabel.setText("")
def stopCountdown(self):
"""停止倒计时"""
self.countdownTimer.stop()
self.countdownLabel.setText("")
def updateCountdown(self):
"""更新倒计时显示"""
if self.remainingTime > 0:
minutes = self.remainingTime // 60
seconds = self.remainingTime % 60
if minutes > 0:
countdown_text = f"当前轮次剩余: {minutes:02d}:{seconds:02d}"
else:
countdown_text = f"当前轮次剩余: {seconds}"
# 根据剩余时间调整颜色
if self.remainingTime <= 10:
self.countdownLabel.setStyleSheet("color: red; font-weight: bold; font-size: 14px;")
elif self.remainingTime <= 30:
self.countdownLabel.setStyleSheet("color: orange; font-weight: bold; font-size: 14px;")
else:
self.countdownLabel.setStyleSheet("color: blue; font-weight: bold; font-size: 14px;")
self.countdownLabel.setText(countdown_text)
self.remainingTime -= 1
else:
self.countdownLabel.setText("当前轮次完成")
self.countdownLabel.setStyleSheet("color: green; font-weight: bold; font-size: 14px;")
self.countdownTimer.stop()
def updateCycleCountdown(self):
"""更新轮次剩余时间倒计时 - 已废弃,保留兼容性"""
self.updateCountdown()
def resetCountdown(self):
"""重置倒计时"""
self.stopCountdown()
self.remainingTime = 0
self.totalSteps = 0
def isExecutionCompleted(self):
"""检查规程是否执行完成"""
# 检查是否所有步骤都执行完成
return self.currentIndex >= self.tableModel.rowCount()
def getExecutionProgress(self):
"""获取执行进度"""
total_steps = self.tableModel.rowCount()
completed_steps = self.currentIndex
return completed_steps, total_steps
def showEvent(self, event):
super().showEvent(event)
# 只在第一次显示时调整行高
if not hasattr(self, '_hasResizedRows'):
self.tableView.resizeRowsToContents()
self._hasResizedRows = True