You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

490 lines
21 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import os
from PyQt5.QtCore import Qt, QTimer, QAbstractTableModel, QModelIndex, QPoint, QSize, pyqtSignal, QFile,QTextStream
from PyQt5.QtGui import QBrush, QColor, QFont, QStandardItemModel, QStandardItem
from PyQt5.QtWidgets import (QApplication, QMainWindow, QTableView,
QPushButton, QVBoxLayout, QWidget, QHBoxLayout,
QLabel, QCheckBox, QSpinBox, QMenu, QFileDialog,
QTabWidget, QTabBar, QListWidget, QListWidgetItem, QDialog,
QLineEdit, QFormLayout, QDialogButtonBox, QMessageBox,
QHeaderView, QToolBar, QAction, QStatusBar, QComboBox, QSplitter, QAbstractItemView)
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import qtawesome as qta
from datetime import datetime
# 导入其他模块
from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel
from utils.DBModels.ProcedureModel import DatabaseManager
# 修改导入路径
class StepExecutor(QWidget):
# 添加标签锁定信号
tabLockRequired = pyqtSignal(bool)
def __init__(self, procedureData, procedureId, dbManager):
super().__init__()
self.procedureData = procedureData
self.procedureId = procedureId # 新增规程ID
self.dbManager = dbManager # 新增数据库管理器
self.isRunning = False
self.isActive = False
testSteps = procedureData["测试步骤"]
self.initUi(testSteps)
self.currentIndex = 0
self.timer = QTimer()
self.timer.timeout.connect(self.autoExecuteStep)
self.remainingCycles = 1
self.infiniteCycles = False
self.isFirstRun = True # 新增标志位,用于区分首次执行
self.stepResults = [] # 新增:存储所有步骤执行结果的列表
def initUi(self, testSteps):
layout = QVBoxLayout()
info_layout = QFormLayout()
info_layout.setLabelAlignment(Qt.AlignRight)
info_layout.addRow("规程名称:", QLabel(self.procedureData["规程信息"]["规程名称"]))
info_layout.addRow("规程编号:", QLabel(self.procedureData["规程信息"]["规程编号"]))
info_layout.addRow("规程类型:", QLabel(self.procedureData["规程信息"]["规程类型"]))
info_layout.addRow("测试用例:", QLabel(self.procedureData["测试用例信息"]["测试用例"]))
info_layout.addRow("用例编号:", QLabel(self.procedureData["测试用例信息"]["用例编号"]))
info_layout.addRow("工况描述:", QLabel(self.procedureData["测试用例信息"]["工况描述"]))
layout.addLayout(info_layout)
layout.addSpacing(20)
self.tableModel = StepTableModel(testSteps)
self.tableView = QTableView()
self.tableView.setModel(self.tableModel)
self.tableView.setContextMenuPolicy(Qt.CustomContextMenu)
self.tableView.customContextMenuRequested.connect(self.showContextMenu)
self.tableView.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.tableView.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(5, QHeaderView.Stretch) # 新增:备注列自适应宽度
layout.addWidget(QLabel("测试步骤:"))
layout.addWidget(self.tableView)
# 创建控制按钮布局
control_layout = QHBoxLayout()
self.autoButton = QPushButton(" 开始自动执行")
self.autoButton.clicked.connect(self.startAutoExecute)
self.autoButton.setIcon(qta.icon('fa5s.play', color='green'))
self.stopButton = QPushButton(" 停止自动执行")
self.stopButton.clicked.connect(self.stopAutoExecute)
self.stopButton.setEnabled(False)
self.stopButton.setIcon(qta.icon('fa5s.stop', color='red'))
self.nextButton = QPushButton(" 执行下一步")
self.nextButton.clicked.connect(self.executeNextStep)
self.nextButton.setIcon(qta.icon('fa5s.step-forward', color='blue'))
self.resetButton = QPushButton(" 完全重置")
self.resetButton.clicked.connect(self.resetExecution)
self.resetButton.setIcon(qta.icon('fa5s.redo', color='orange'))
self.exportButton = QPushButton(" 生成报告")
self.exportButton.clicked.connect(self.generateReport)
self.exportButton.setIcon(qta.icon('fa5s.file-alt', color='purple'))
control_layout.addWidget(self.autoButton)
control_layout.addWidget(self.stopButton)
control_layout.addWidget(self.nextButton)
control_layout.addWidget(self.resetButton)
control_layout.addWidget(self.exportButton)
# 添加循环设置
cycle_layout = QHBoxLayout()
cycle_layout.addWidget(QLabel("执行轮次:"))
self.cycleSpin = QSpinBox()
self.cycleSpin.setRange(1, 999)
self.cycleSpin.setValue(1)
cycle_layout.addWidget(self.cycleSpin)
self.infiniteCheckbox = QCheckBox("无限循环")
cycle_layout.addWidget(self.infiniteCheckbox)
cycle_layout.addStretch()
# 将所有布局添加到主布局
layout.addLayout(control_layout)
layout.addLayout(cycle_layout)
# 设置主布局
self.setLayout(layout)
# 初始化工具栏
self.toolbar = QToolBar("执行工具栏")
self.toolbar.setIconSize(QSize(24, 24))
# 工具栏操作
self.toolbar.addAction(qta.icon('fa5s.play', color='green'), "开始执行", self.startAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.stop', color='red'), "停止执行", self.stopAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.step-forward', color='blue'), "下一步", self.executeNextStep)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.redo', color='orange'), "重置", self.resetExecution)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.file-alt', color='purple'), "生成报告", self.generateReport)
def showContextMenu(self, pos):
index = self.tableView.indexAt(pos)
if index.isValid():
menu = QMenu()
jumpAction = menu.addAction("从该步骤开始执行")
jumpAction.triggered.connect(lambda: self.jumpExecute(index.row()))
detailAction = menu.addAction("查看步骤详情")
detailAction.triggered.connect(lambda: self.showStepDetail(index.row()))
menu.exec_(self.tableView.viewport().mapToGlobal(pos))
def jumpExecute(self, rowIndex):
if 0 <= rowIndex < self.tableModel.rowCount():
stepInfo = self.tableModel.getStepInfo(rowIndex)
if stepInfo and not stepInfo['isMain']:
self.executeStep(rowIndex)
self.currentIndex = rowIndex + 1
def showStepDetail(self, row):
step_info = self.tableModel.getStepInfo(row)
if not step_info:
return
detail_dialog = QDialog(self)
detail_dialog.setWindowTitle("步骤详情")
detail_dialog.setMinimumWidth(500)
layout = QVBoxLayout()
form_layout = QFormLayout()
form_layout.addRow("步骤ID", QLabel(step_info['stepId']))
form_layout.addRow("步骤类型", QLabel("主步骤" if step_info['isMain'] else "子步骤"))
form_layout.addRow("步骤描述", QLabel(step_info['description']))
if step_info['time']:
form_layout.addRow("执行时间"); QLabel(step_info['time'].strftime("%Y-%m-%d %H:%M:%S"))
if step_info['result'] is not None:
status = "成功" if step_info['result'] else "失败"
form_layout.addRow("执行结果"); QLabel(status)
layout.addLayout(form_layout)
button_box = QDialogButtonBox(QDialogButtonBox.Ok)
button_box.accepted.connect(detail_dialog.accept)
layout.addWidget(button_box)
detail_dialog.setLayout(layout)
detail_dialog.exec_()
def startAutoExecute(self):
self.isRunning = True
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
self.autoButton.setEnabled(False)
self.stopButton.setEnabled(True)
self.nextButton.setEnabled(False)
self.resetButton.setEnabled(False)
self.exportButton.setEnabled(False)
# 如果是首次执行,则初始化步骤结果集合
if self.isFirstRun:
self.stepResults = [] # 重置步骤结果集合
self.tableModel.resetExecutionState()
self.isFirstRun = False # 标记已执行过
self.remainingCycles = self.cycleSpin.value()
self.infiniteCycles = self.infiniteCheckbox.isChecked()
self.timer.start(1000)
def stopAutoExecute(self):
self.isRunning = False # 清除自动执行状态
self.timer.stop()
self.autoButton.setEnabled(True)
self.stopButton.setEnabled(False)
self.nextButton.setEnabled(True)
self.resetButton.setEnabled(True)
self.exportButton.setEnabled(True)
# 注意: 这里不重置isActive因为执行器仍处于激活状态
# 执行结束时更新完整步骤结果到数据库
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
def autoExecuteStep(self):
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
else:
if self.infiniteCycles or self.remainingCycles > 0:
if not self.infiniteCycles:
self.remainingCycles -= 1
self.timer.stop()
self.currentIndex = 0
should_continue = self.infiniteCycles or self.remainingCycles > 0
if should_continue:
self.tableModel.resetAll()
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
self.timer.start()
else:
self.stopAutoExecute()
else:
self.stopAutoExecute()
def executeNextStep(self):
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
# 手动执行不需要修改isRunning状态
def executeStep(self, row):
stepInfo = self.tableModel.getStepInfo(row)
if not stepInfo:
return False
print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}")
result = self.handleStep(row, stepInfo)
# 确保result总是布尔值避免None导致数据库错误
if result is None:
print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果设置为False")
result = False
# 存储执行结果到数据库
execution_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 新增获取当前执行的ID每次执行一个唯一的execution_id
if not hasattr(self, 'current_execution_id'):
self.current_execution_id = self.dbManager.insertProcedureExecution(
self.procedureId,
execution_time
)
# 存储执行结果到内存集合
step_result = {
'step_id': stepInfo['stepId'],
'step_description': stepInfo['description'],
'execution_time': execution_time,
'result': result
}
self.stepResults.append(step_result)
# 更新数据库中的步骤结果集合
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
success = self.tableModel.updateStepResult(row, result, datetime.now())
return result
def handleStep(self, rowIndex, stepInfo): # 修改参数名
description = stepInfo['description']
stepId = stepInfo['stepId']
print(f"处理步骤 {stepId}: {description}")
if "设置" in description:
return self.performLogin(stepId, description) # 修改方法名
elif "数据导入" in description:
return self.performDataImport(stepId, description) # 修改方法名
elif "验证" in description:
return self.performValidation(stepId, description) # 修改方法名
elif "导出" in description:
return self.performExport(stepId, description) # 修改方法名
elif "备份" in description:
return self.performBackup(stepId, description) # 修改方法名
elif "发送" in description:
return self.performNotification(stepId, description) # 修改方法名
def simulateExecution(self, stepId, description): # 修改方法名
import random
return random.random() < 0.9
def performLogin(self, stepId, description): # 修改方法名
print(f"执行登录操作 (步骤 {stepId}): {description}")
return True
def performDataImport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.95
def performValidation(self, stepId, description): # 修改方法名
import random
return random.random() < 0.85
def performExport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.98
def performBackup(self, stepId, description): # 修改方法名
return True
def performNotification(self, stepId, description): # 修改方法名
import random
return random.random() < 0.99
def performWait(self, stepId, description): # 修改方法名
import time
waitTime = 1 # 修改变量名
if "s" in description:
try:
waitTime = int(description.split("等待")[1].split("s")[0].strip())
except:
pass
time.sleep(waitTime)
return True
def performSetting(self, stepId, description): # 修改方法名
return True
def resetExecution(self):
self.tableModel.resetAll()
self.currentIndex = 0
self.stopAutoExecute()
self.cycleSpin.setValue(1)
self.infiniteCheckbox.setChecked(False)
self.isRunning = False
self.isActive = False
self.isFirstRun = True # 重置标志位
self.stepResults = [] # 重置步骤结果集合
# 新增重置当前执行ID
if hasattr(self, 'current_execution_id'):
del self.current_execution_id
# 发送标签解锁信号
self.tabLockRequired.emit(False)
# 修改方法签名添加file_path参数
def generateReport(self):
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
# 弹出文件选择对话框
default_name = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
file_path, _ = QFileDialog.getSaveFileName(
self,
"保存报告",
default_name,
"Word文档 (*.docx)"
)
# 如果用户取消选择,则直接返回
if not file_path:
return
doc = Document()
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
title = doc.add_paragraph(f"{proc_full_name} - 测试报告")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
title.runs[0].font.size = Pt(18)
title.runs[0].bold = True
info = doc.add_paragraph()
info.add_run("规程信息:\n").bold = True
info.add_run(f"规程编号: {self.procedureData['规程信息']['规程编号']}\n")
info.add_run(f"规程类型: {self.procedureData['规程信息']['规程类型']}\n")
case_info = doc.add_paragraph()
case_info.add_run("测试用例信息:\n").bold = True
case_info.add_run(f"测试用例: {self.procedureData['测试用例信息']['测试用例']}\n")
case_info.add_run(f"用例编号: {self.procedureData['测试用例信息']['用例编号']}\n")
case_info.add_run(f"工况描述: {self.procedureData['测试用例信息']['工况描述']}\n")
stats = doc.add_paragraph()
stats.add_run("执行统计:\n").bold = True
total = self.tableModel.rowCount()
success = sum(1 for s in self.tableModel.stepData if s['result'])
failure = total - success
stats.add_run(f"总步骤数: {total}\n成功数: {success}\n失败数: {failure}\n")
# 优化步骤表格展示
steps_table = doc.add_table(rows=1, cols=6)
steps_table.style = 'Table Grid' # 添加表格边框样式
hdr_cells = steps_table.rows[0].cells
# 设置表头
headers = ['步骤ID', '步骤类型', '步骤描述', '执行时间', '执行结果', '状态']
for i, header in enumerate(headers):
hdr_cells[i].text = header
# 设置表头样式(加粗居中)
for paragraph in hdr_cells[i].paragraphs:
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.runs[0]
run.font.bold = True
# 填充表格数据
for step in self.tableModel.stepData:
row_cells = steps_table.add_row().cells
# 步骤ID
row_cells[0].text = step['stepId']
# 步骤类型
step_type = "主步骤" if step['isMain'] else "子步骤"
row_cells[1].text = step_type
# 步骤描述
row_cells[2].text = step['description']
# 执行时间
time_text = step['time'].strftime("%Y-%m-%d %H:%M:%S") if step['time'] else 'N/A'
row_cells[3].text = time_text
# 执行结果(带颜色标记)
result_cell = row_cells[4]
if step['result'] is True:
result_text = '成功'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 128, 0) # 绿色
elif step['result'] is False:
result_text = '失败'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) # 红色
else:
result_text = '未执行'
result_cell.text = result_text
# 状态
status = '已执行' if step['executed'] else '未执行'
row_cells[5].text = status
# 设置表格自动适应宽度
for col in steps_table.columns:
col.width = doc.sections[0].page_width // len(headers)
# 设置默认文件名(包含规程全名)
if not file_path:
filename = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
else:
filename = file_path
doc.save(filename)
# 确保导出成功时显示提示信息
QMessageBox.information(self, "报告生成", f"报告已生成: {filename}")
return filename