0625更新

main
zcwBit 9 months ago
parent c92d4b0d5a
commit 27df3dcf75

@ -0,0 +1,244 @@
import sys
import os
from PyQt5.QtCore import Qt, QTimer, QAbstractTableModel, QModelIndex, QPoint, QSize, pyqtSignal, QFile, QTextStream
from PyQt5.QtGui import QBrush, QColor, QFont, QStandardItemModel, QStandardItem
from PyQt5.QtWidgets import (QApplication, QMainWindow, QTableView,
QPushButton, QVBoxLayout, QWidget, QHBoxLayout,
QLabel, QCheckBox, QSpinBox, QMenu, QFileDialog,
QTabWidget, QTabBar, QListWidget, QListWidgetItem, QDialog,
QLineEdit, QFormLayout, QDialogButtonBox, QMessageBox,
QHeaderView, QToolBar, QAction, QStatusBar, QComboBox, QSplitter, QAbstractItemView)
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import qtawesome as qta
from datetime import datetime
# 导入其他模块
from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel
from utils.DBModels.ProcedureModel import DatabaseManager
from UI.ProcedureManager.StepExecutor import StepExecutor # 修改导入路径
class HistoryViewerWidget(QDialog):
def __init__(self, dbManager, parent=None):
super().__init__(parent)
self.dbManager = dbManager
self.setWindowTitle("历史记录查看器")
self.setGeometry(200, 200, 1000, 800)
self.initUi()
self.loadHistory()
def initUi(self):
layout = QVBoxLayout()
# 搜索栏
searchLayout = QHBoxLayout()
self.searchEdit = QLineEdit()
self.searchEdit.setPlaceholderText("搜索规程...")
self.searchEdit.textChanged.connect(self.loadHistory)
searchLayout.addWidget(QLabel("搜索:"))
searchLayout.addWidget(self.searchEdit)
# 将搜索栏布局添加到主布局
layout.addLayout(searchLayout) # 修复:在此处添加搜索栏
# 历史记录表格
self.table = QTableView()
self.table.setEditTriggers(QTableView.NoEditTriggers)
self.model = QStandardItemModel()
self.model.setHorizontalHeaderLabels(["ID", "规程全名", "类型", "执行时间", "报告路径"])
self.table.setModel(self.model)
self.table.doubleClicked.connect(self.openReportOrDetails)
self.table.setSelectionBehavior(QTableView.SelectRows)
# 新增:设置表格的右键菜单策略
self.table.setContextMenuPolicy(Qt.CustomContextMenu)
self.table.customContextMenuRequested.connect(self.showHistoryContextMenu)
# 新增:设置表格列宽自适应内容
self.table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
# 步骤详情表格
self.stepTable = QTableView()
self.stepTable.setEditTriggers(QTableView.NoEditTriggers) # 新增:禁用编辑
self.stepModel = QStandardItemModel()
self.stepModel.setHorizontalHeaderLabels(["步骤ID", "步骤描述", "执行时间", "执行结果"])
self.stepTable.setModel(self.stepModel)
# 新增:设置表格列宽自适应内容
self.stepTable.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
# 分割窗口
splitter = QSplitter(Qt.Vertical)
splitter.addWidget(self.table)
splitter.addWidget(self.stepTable)
splitter.setSizes([400, 400]) # 初始分配高度
layout.addWidget(splitter)
# 新增:操作按钮布局
button_layout = QHBoxLayout()
self.deleteButton = QPushButton("删除历史记录")
self.deleteButton.setIcon(qta.icon('fa5s.trash', color='red'))
self.deleteButton.clicked.connect(self.deleteSelectedHistory)
button_layout.addWidget(self.deleteButton)
# 新增:导出报告按钮
self.exportButton = QPushButton("导出报告")
self.exportButton.setIcon(qta.icon('fa5s.file-export', color='green'))
self.exportButton.clicked.connect(self.exportReport)
button_layout.addWidget(self.exportButton)
button_layout.addStretch()
layout.addLayout(button_layout)
# 按钮
buttonBox = QDialogButtonBox(QDialogButtonBox.Ok)
buttonBox.accepted.connect(self.accept)
layout.addWidget(buttonBox)
self.setLayout(layout)
def loadHistory(self):
self.model.removeRows(0, self.model.rowCount())
filterText = self.searchEdit.text().strip()
history = self.dbManager.getExecutionHistory(filterText)
for record in history:
# 修改使用正确的列索引0-4
row = [
QStandardItem(str(record[0])), # ID
QStandardItem(record[1]), # 规程全名(名称+编号)
QStandardItem(record[2]), # 类型
QStandardItem(record[3]), # 执行时间
QStandardItem(record[4]) # 报告路径
]
self.model.appendRow(row)
self.table.resizeColumnsToContents()
def openReportOrDetails(self, index):
# 修改:将 get_step_results 改为 getStepResults
execution_id = self.model.item(index.row(), 0).text()
step_results = self.dbManager.getStepResults(execution_id) # 修正方法名
self.showStepDetails(step_results)
# 保存操作历史
self.saveOperationHistory(execution_id, "查看步骤详情", "")
def saveOperationHistory(self, executionId, operationType, operationDetail):
"""保存操作历史记录"""
self.dbManager.saveOperationHistory(
executionId,
operationType,
operationDetail,
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
def showStepDetails(self, stepResults):
"""显示步骤详情"""
self.stepModel.removeRows(0, self.stepModel.rowCount())
if not stepResults:
return
for step in stepResults:
row = [
QStandardItem(step['step_id']),
QStandardItem(step['step_description']),
QStandardItem(step['execution_time']),
QStandardItem('成功' if step['result'] else '失败')
]
self.stepModel.appendRow(row)
# 新增删除历史记录的方法
def deleteSelectedHistory(self):
"""删除选中的历史记录"""
selected_indexes = self.table.selectionModel().selectedRows()
if not selected_indexes:
QMessageBox.warning(self, "未选择", "请先选择要删除的历史记录")
return
# 获取选中的执行ID
execution_ids = []
for index in selected_indexes:
execution_id = self.model.item(index.row(), 0).text()
execution_ids.append(execution_id)
# 确认删除
reply = QMessageBox.question(
self,
"确认删除",
f"确定要删除选中的 {len(execution_ids)} 条历史记录吗?\n此操作不可恢复!",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
# 修改:将 delete_execution_history 改为 deleteExecutionHistory
success = self.dbManager.deleteExecutionHistory(execution_ids) # 修正方法名
if success:
QMessageBox.information(self, "删除成功", "已成功删除选中的历史记录")
self.loadHistory() # 重新加载历史记录
else:
QMessageBox.warning(self, "删除失败", "删除历史记录时发生错误")
def exportReport(self):
"""导出选中历史记录的报告"""
selected_indexes = self.table.selectionModel().selectedRows()
if not selected_indexes:
QMessageBox.warning(self, "未选择", "请先选择要导出的历史记录")
return
if len(selected_indexes) > 1:
QMessageBox.warning(self, "选择过多", "一次只能导出一个历史记录的报告")
return
index = selected_indexes[0]
execution_id = self.model.item(index.row(), 0).text()
# 修改报告路径现在是第5列索引4
reportPath = self.model.item(index.row(), 4).text()
# 获取执行详情数据
# 修改:将 get_execution_details 改为 getExecutionDetails
execution_data = self.dbManager.getExecutionDetails(execution_id) # 修正方法名
if not execution_data:
QMessageBox.warning(self, "数据错误", "无法获取执行详情数据")
return
# 生成报告
try:
# 创建临时StepExecutor实例用于生成报告
executor = StepExecutor(execution_data['procedure_content'],
execution_data['procedure_id'],
self.dbManager)
executor.stepResults = execution_data['step_results']
# 设置步骤数据
for step in executor.tableModel.stepData:
step_result = next((s for s in execution_data['step_results']
if s['step_id'] == step['stepId']), None)
if step_result:
step['executed'] = True
step['result'] = step_result['result']
step['time'] = datetime.strptime(step_result['execution_time'], "%Y-%m-%d %H:%M:%S")
# 生成报告文件
default_name = f"{execution_data['procedure_name']}_历史报告_{execution_id}.docx"
# file_path, _ = QFileDialog.getSaveFileName(
# self, "保存报告", default_name, "Word文档 (*.docx)"
# )
# if file_path:
res = executor.generateReport()
# QMessageBox.information(self, "导出成功", f"报告已保存到:\n{res}")
except Exception as e:
QMessageBox.critical(self, "导出错误", f"生成报告时出错:\n{str(e)}")
# 新增:历史记录表格的右键菜单
def showHistoryContextMenu(self, pos):
"""显示历史记录的右键菜单"""
index = self.table.indexAt(pos)
if index.isValid():
menu = QMenu()
deleteAction = menu.addAction("删除历史记录")
deleteAction.triggered.connect(self.deleteSelectedHistory)
menu.exec_(self.table.viewport().mapToGlobal(pos))

@ -18,228 +18,9 @@ from datetime import datetime
# 导入其他模块 # 导入其他模块
from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel
from utils.DBModels.ProcedureModel import DatabaseManager from utils.DBModels.ProcedureModel import DatabaseManager
from UI.ProcedureManager.HistoryViewer import HistoryViewerWidget
from UI.ProcedureManager.StepExecutor import StepExecutor # 修改导入路径
class HistoryViewer(QDialog):
def __init__(self, dbManager, parent=None):
super().__init__(parent)
self.dbManager = dbManager
self.setWindowTitle("历史记录查看器")
self.setGeometry(200, 200, 1000, 800)
self.initUi()
self.loadHistory()
def initUi(self):
layout = QVBoxLayout()
# 搜索栏
searchLayout = QHBoxLayout()
self.searchEdit = QLineEdit()
self.searchEdit.setPlaceholderText("搜索规程...")
self.searchEdit.textChanged.connect(self.loadHistory)
searchLayout.addWidget(QLabel("搜索:"))
searchLayout.addWidget(self.searchEdit)
# 将搜索栏布局添加到主布局
layout.addLayout(searchLayout) # 修复:在此处添加搜索栏
# 历史记录表格
self.table = QTableView()
self.table.setEditTriggers(QTableView.NoEditTriggers)
self.model = QStandardItemModel()
self.model.setHorizontalHeaderLabels(["ID", "规程全名", "类型", "执行时间", "报告路径"])
self.table.setModel(self.model)
self.table.doubleClicked.connect(self.openReportOrDetails)
self.table.setSelectionBehavior(QTableView.SelectRows)
# 新增:设置表格的右键菜单策略
self.table.setContextMenuPolicy(Qt.CustomContextMenu)
self.table.customContextMenuRequested.connect(self.showHistoryContextMenu)
# 新增:设置表格列宽自适应内容
self.table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
# 步骤详情表格
self.stepTable = QTableView()
self.stepTable.setEditTriggers(QTableView.NoEditTriggers) # 新增:禁用编辑
self.stepModel = QStandardItemModel()
self.stepModel.setHorizontalHeaderLabels(["步骤ID", "步骤描述", "执行时间", "执行结果"])
self.stepTable.setModel(self.stepModel)
# 新增:设置表格列宽自适应内容
self.stepTable.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
# 分割窗口
splitter = QSplitter(Qt.Vertical)
splitter.addWidget(self.table)
splitter.addWidget(self.stepTable)
splitter.setSizes([400, 400]) # 初始分配高度
layout.addWidget(splitter)
# 新增:操作按钮布局
button_layout = QHBoxLayout()
self.deleteButton = QPushButton("删除历史记录")
self.deleteButton.setIcon(qta.icon('fa5s.trash', color='red'))
self.deleteButton.clicked.connect(self.deleteSelectedHistory)
button_layout.addWidget(self.deleteButton)
# 新增:导出报告按钮
self.exportButton = QPushButton("导出报告")
self.exportButton.setIcon(qta.icon('fa5s.file-export', color='green'))
self.exportButton.clicked.connect(self.exportReport)
button_layout.addWidget(self.exportButton)
button_layout.addStretch()
layout.addLayout(button_layout)
# 按钮
buttonBox = QDialogButtonBox(QDialogButtonBox.Ok)
buttonBox.accepted.connect(self.accept)
layout.addWidget(buttonBox)
self.setLayout(layout)
def loadHistory(self):
self.model.removeRows(0, self.model.rowCount())
filterText = self.searchEdit.text().strip()
history = self.dbManager.getExecutionHistory(filterText)
for record in history:
# 修改使用正确的列索引0-4
row = [
QStandardItem(str(record[0])), # ID
QStandardItem(record[1]), # 规程全名(名称+编号)
QStandardItem(record[2]), # 类型
QStandardItem(record[3]), # 执行时间
QStandardItem(record[4]) # 报告路径
]
self.model.appendRow(row)
self.table.resizeColumnsToContents()
def openReportOrDetails(self, index):
# 修改:将 get_step_results 改为 getStepResults
execution_id = self.model.item(index.row(), 0).text()
step_results = self.dbManager.getStepResults(execution_id) # 修正方法名
self.showStepDetails(step_results)
# 保存操作历史
self.saveOperationHistory(execution_id, "查看步骤详情", "")
def saveOperationHistory(self, executionId, operationType, operationDetail):
"""保存操作历史记录"""
self.dbManager.saveOperationHistory(
executionId,
operationType,
operationDetail,
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
def showStepDetails(self, stepResults):
"""显示步骤详情"""
self.stepModel.removeRows(0, self.stepModel.rowCount())
if not stepResults:
return
for step in stepResults:
row = [
QStandardItem(step['step_id']),
QStandardItem(step['step_description']),
QStandardItem(step['execution_time']),
QStandardItem('成功' if step['result'] else '失败')
]
self.stepModel.appendRow(row)
# 新增删除历史记录的方法
def deleteSelectedHistory(self):
"""删除选中的历史记录"""
selected_indexes = self.table.selectionModel().selectedRows()
if not selected_indexes:
QMessageBox.warning(self, "未选择", "请先选择要删除的历史记录")
return
# 获取选中的执行ID
execution_ids = []
for index in selected_indexes:
execution_id = self.model.item(index.row(), 0).text()
execution_ids.append(execution_id)
# 确认删除
reply = QMessageBox.question(
self,
"确认删除",
f"确定要删除选中的 {len(execution_ids)} 条历史记录吗?\n此操作不可恢复!",
QMessageBox.Yes | QMessageBox.No
)
if reply == QMessageBox.Yes:
# 修改:将 delete_execution_history 改为 deleteExecutionHistory
success = self.dbManager.deleteExecutionHistory(execution_ids) # 修正方法名
if success:
QMessageBox.information(self, "删除成功", "已成功删除选中的历史记录")
self.loadHistory() # 重新加载历史记录
else:
QMessageBox.warning(self, "删除失败", "删除历史记录时发生错误")
def exportReport(self):
"""导出选中历史记录的报告"""
selected_indexes = self.table.selectionModel().selectedRows()
if not selected_indexes:
QMessageBox.warning(self, "未选择", "请先选择要导出的历史记录")
return
if len(selected_indexes) > 1:
QMessageBox.warning(self, "选择过多", "一次只能导出一个历史记录的报告")
return
index = selected_indexes[0]
execution_id = self.model.item(index.row(), 0).text()
# 修改报告路径现在是第5列索引4
reportPath = self.model.item(index.row(), 4).text()
# 获取执行详情数据
# 修改:将 get_execution_details 改为 getExecutionDetails
execution_data = self.dbManager.getExecutionDetails(execution_id) # 修正方法名
if not execution_data:
QMessageBox.warning(self, "数据错误", "无法获取执行详情数据")
return
# 生成报告
try:
# 创建临时StepExecutor实例用于生成报告
executor = StepExecutor(execution_data['procedure_content'],
execution_data['procedure_id'],
self.dbManager)
executor.stepResults = execution_data['step_results']
# 设置步骤数据
for step in executor.tableModel.stepData:
step_result = next((s for s in execution_data['step_results']
if s['step_id'] == step['stepId']), None)
if step_result:
step['executed'] = True
step['result'] = step_result['result']
step['time'] = datetime.strptime(step_result['execution_time'], "%Y-%m-%d %H:%M:%S")
# 生成报告文件
default_name = f"{execution_data['procedure_name']}_历史报告_{execution_id}.docx"
file_path, _ = QFileDialog.getSaveFileName(
self, "保存报告", default_name, "Word文档 (*.docx)"
)
if file_path:
executor.generateReport(file_path)
QMessageBox.information(self, "导出成功", f"报告已保存到:\n{file_path}")
except Exception as e:
QMessageBox.critical(self, "导出错误", f"生成报告时出错:\n{str(e)}")
# 新增:历史记录表格的右键菜单
def showHistoryContextMenu(self, pos):
"""显示历史记录的右键菜单"""
index = self.table.indexAt(pos)
if index.isValid():
menu = QMenu()
deleteAction = menu.addAction("删除历史记录")
deleteAction.triggered.connect(self.deleteSelectedHistory)
menu.exec_(self.table.viewport().mapToGlobal(pos))
class ExecutionDetailDialog(QDialog): class ExecutionDetailDialog(QDialog):
"""步骤详情对话框""" """步骤详情对话框"""
@ -289,469 +70,7 @@ class AddCategoryDialog(QDialog):
def getCategoryName(self): def getCategoryName(self):
return self.nameEdit.text().strip() return self.nameEdit.text().strip()
class StepExecutor(QWidget):
# 添加标签锁定信号
tabLockRequired = pyqtSignal(bool)
def __init__(self, procedureData, procedureId, dbManager):
super().__init__()
self.procedureData = procedureData
self.procedureId = procedureId # 新增规程ID
self.dbManager = dbManager # 新增数据库管理器
self.isRunning = False
self.isActive = False
testSteps = procedureData["测试步骤"]
self.initUi(testSteps)
self.currentIndex = 0
self.timer = QTimer()
self.timer.timeout.connect(self.autoExecuteStep)
self.remainingCycles = 1
self.infiniteCycles = False
self.isFirstRun = True # 新增标志位,用于区分首次执行
self.stepResults = [] # 新增:存储所有步骤执行结果的列表
def initUi(self, testSteps):
layout = QVBoxLayout()
info_layout = QFormLayout()
info_layout.setLabelAlignment(Qt.AlignRight)
info_layout.addRow("规程名称:", QLabel(self.procedureData["规程信息"]["规程名称"]))
info_layout.addRow("规程编号:", QLabel(self.procedureData["规程信息"]["规程编号"]))
info_layout.addRow("规程类型:", QLabel(self.procedureData["规程信息"]["规程类型"]))
info_layout.addRow("测试用例:", QLabel(self.procedureData["测试用例信息"]["测试用例"]))
info_layout.addRow("用例编号:", QLabel(self.procedureData["测试用例信息"]["用例编号"]))
info_layout.addRow("工况描述:", QLabel(self.procedureData["测试用例信息"]["工况描述"]))
layout.addLayout(info_layout)
layout.addSpacing(20)
self.tableModel = StepTableModel(testSteps)
self.tableView = QTableView()
self.tableView.setModel(self.tableModel)
self.tableView.setContextMenuPolicy(Qt.CustomContextMenu)
self.tableView.customContextMenuRequested.connect(self.showContextMenu)
self.tableView.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.tableView.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(5, QHeaderView.Stretch) # 新增:备注列自适应宽度
layout.addWidget(QLabel("测试步骤:"))
layout.addWidget(self.tableView)
# 创建控制按钮布局
control_layout = QHBoxLayout()
self.autoButton = QPushButton(" 开始自动执行")
self.autoButton.clicked.connect(self.startAutoExecute)
self.autoButton.setIcon(qta.icon('fa5s.play', color='green'))
self.stopButton = QPushButton(" 停止自动执行")
self.stopButton.clicked.connect(self.stopAutoExecute)
self.stopButton.setEnabled(False)
self.stopButton.setIcon(qta.icon('fa5s.stop', color='red'))
self.nextButton = QPushButton(" 执行下一步")
self.nextButton.clicked.connect(self.executeNextStep)
self.nextButton.setIcon(qta.icon('fa5s.step-forward', color='blue'))
self.resetButton = QPushButton(" 完全重置")
self.resetButton.clicked.connect(self.resetExecution)
self.resetButton.setIcon(qta.icon('fa5s.redo', color='orange'))
self.exportButton = QPushButton(" 生成报告")
self.exportButton.clicked.connect(self.generateReport)
self.exportButton.setIcon(qta.icon('fa5s.file-alt', color='purple'))
control_layout.addWidget(self.autoButton)
control_layout.addWidget(self.stopButton)
control_layout.addWidget(self.nextButton)
control_layout.addWidget(self.resetButton)
control_layout.addWidget(self.exportButton)
# 添加循环设置
cycle_layout = QHBoxLayout()
cycle_layout.addWidget(QLabel("执行轮次:"))
self.cycleSpin = QSpinBox()
self.cycleSpin.setRange(1, 999)
self.cycleSpin.setValue(1)
cycle_layout.addWidget(self.cycleSpin)
self.infiniteCheckbox = QCheckBox("无限循环")
cycle_layout.addWidget(self.infiniteCheckbox)
cycle_layout.addStretch()
# 将所有布局添加到主布局
layout.addLayout(control_layout)
layout.addLayout(cycle_layout)
# 设置主布局
self.setLayout(layout)
# 初始化工具栏
self.toolbar = QToolBar("执行工具栏")
self.toolbar.setIconSize(QSize(24, 24))
# 工具栏操作
self.toolbar.addAction(qta.icon('fa5s.play', color='green'), "开始执行", self.startAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.stop', color='red'), "停止执行", self.stopAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.step-forward', color='blue'), "下一步", self.executeNextStep)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.redo', color='orange'), "重置", self.resetExecution)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.file-alt', color='purple'), "生成报告", self.generateReport)
def showContextMenu(self, pos):
index = self.tableView.indexAt(pos)
if index.isValid():
menu = QMenu()
jumpAction = menu.addAction("从该步骤开始执行")
jumpAction.triggered.connect(lambda: self.jumpExecute(index.row()))
detailAction = menu.addAction("查看步骤详情")
detailAction.triggered.connect(lambda: self.showStepDetail(index.row()))
menu.exec_(self.tableView.viewport().mapToGlobal(pos))
def jumpExecute(self, rowIndex):
if 0 <= rowIndex < self.tableModel.rowCount():
stepInfo = self.tableModel.getStepInfo(rowIndex)
if stepInfo and not stepInfo['isMain']:
self.executeStep(rowIndex)
self.currentIndex = rowIndex + 1
def showStepDetail(self, row):
step_info = self.tableModel.getStepInfo(row)
if not step_info:
return
detail_dialog = QDialog(self)
detail_dialog.setWindowTitle("步骤详情")
detail_dialog.setMinimumWidth(500)
layout = QVBoxLayout()
form_layout = QFormLayout()
form_layout.addRow("步骤ID", QLabel(step_info['stepId']))
form_layout.addRow("步骤类型", QLabel("主步骤" if step_info['isMain'] else "子步骤"))
form_layout.addRow("步骤描述", QLabel(step_info['description']))
if step_info['time']:
form_layout.addRow("执行时间"); QLabel(step_info['time'].strftime("%Y-%m-%d %H:%M:%S"))
if step_info['result'] is not None:
status = "成功" if step_info['result'] else "失败"
form_layout.addRow("执行结果"); QLabel(status)
layout.addLayout(form_layout)
button_box = QDialogButtonBox(QDialogButtonBox.Ok)
button_box.accepted.connect(detail_dialog.accept)
layout.addWidget(button_box)
detail_dialog.setLayout(layout)
detail_dialog.exec_()
def startAutoExecute(self):
self.isRunning = True
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
self.autoButton.setEnabled(False)
self.stopButton.setEnabled(True)
self.nextButton.setEnabled(False)
self.resetButton.setEnabled(False)
self.exportButton.setEnabled(False)
# 如果是首次执行,则初始化步骤结果集合
if self.isFirstRun:
self.stepResults = [] # 重置步骤结果集合
self.tableModel.resetExecutionState()
self.isFirstRun = False # 标记已执行过
self.remainingCycles = self.cycleSpin.value()
self.infiniteCycles = self.infiniteCheckbox.isChecked()
self.timer.start(1000)
def stopAutoExecute(self):
self.isRunning = False # 清除自动执行状态
self.timer.stop()
self.autoButton.setEnabled(True)
self.stopButton.setEnabled(False)
self.nextButton.setEnabled(True)
self.resetButton.setEnabled(True)
self.exportButton.setEnabled(True)
# 注意: 这里不重置isActive因为执行器仍处于激活状态
# 执行结束时更新完整步骤结果到数据库
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
def autoExecuteStep(self):
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
else:
if self.infiniteCycles or self.remainingCycles > 0:
if not self.infiniteCycles:
self.remainingCycles -= 1
self.timer.stop()
self.currentIndex = 0
should_continue = self.infiniteCycles or self.remainingCycles > 0
if should_continue:
self.timer.start()
else:
self.stopAutoExecute()
else:
self.stopAutoExecute()
def executeNextStep(self):
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
# 手动执行不需要修改isRunning状态
def executeStep(self, row):
stepInfo = self.tableModel.getStepInfo(row)
if not stepInfo:
return False
print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}")
result = self.handleStep(row, stepInfo)
# 确保result总是布尔值避免None导致数据库错误
if result is None:
print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果设置为False")
result = False
# 存储执行结果到数据库
execution_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 新增获取当前执行的ID每次执行一个唯一的execution_id
if not hasattr(self, 'current_execution_id'):
self.current_execution_id = self.dbManager.insertProcedureExecution(
self.procedureId,
execution_time
)
# 存储执行结果到内存集合
step_result = {
'step_id': stepInfo['stepId'],
'step_description': stepInfo['description'],
'execution_time': execution_time,
'result': result
}
self.stepResults.append(step_result)
# 更新数据库中的步骤结果集合
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
success = self.tableModel.updateStepResult(row, result, datetime.now())
return result
def handleStep(self, rowIndex, stepInfo): # 修改参数名
description = stepInfo['description']
stepId = stepInfo['stepId']
print(f"处理步骤 {stepId}: {description}")
if "设置" in description:
return self.performLogin(stepId, description) # 修改方法名
elif "数据导入" in description:
return self.performDataImport(stepId, description) # 修改方法名
elif "验证" in description:
return self.performValidation(stepId, description) # 修改方法名
elif "导出" in description:
return self.performExport(stepId, description) # 修改方法名
elif "备份" in description:
return self.performBackup(stepId, description) # 修改方法名
elif "发送" in description:
return self.performNotification(stepId, description) # 修改方法名
def simulateExecution(self, stepId, description): # 修改方法名
import random
return random.random() < 0.9
def performLogin(self, stepId, description): # 修改方法名
print(f"执行登录操作 (步骤 {stepId}): {description}")
return True
def performDataImport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.95
def performValidation(self, stepId, description): # 修改方法名
import random
return random.random() < 0.85
def performExport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.98
def performBackup(self, stepId, description): # 修改方法名
return True
def performNotification(self, stepId, description): # 修改方法名
import random
return random.random() < 0.99
def performWait(self, stepId, description): # 修改方法名
import time
waitTime = 1 # 修改变量名
if "s" in description:
try:
waitTime = int(description.split("等待")[1].split("s")[0].strip())
except:
pass
time.sleep(waitTime)
return True
def performSetting(self, stepId, description): # 修改方法名
return True
def resetExecution(self):
self.tableModel.resetAll()
self.currentIndex = 0
self.stopAutoExecute()
self.cycleSpin.setValue(1)
self.infiniteCheckbox.setChecked(False)
self.isRunning = False
self.isActive = False
self.isFirstRun = True # 重置标志位
self.stepResults = [] # 重置步骤结果集合
# 新增重置当前执行ID
if hasattr(self, 'current_execution_id'):
del self.current_execution_id
# 发送标签解锁信号
self.tabLockRequired.emit(False)
# 修改方法签名添加file_path参数
def generateReport(self):
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
# 弹出文件选择对话框
default_name = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
file_path, _ = QFileDialog.getSaveFileName(
self,
"保存报告",
default_name,
"Word文档 (*.docx)"
)
# 如果用户取消选择,则直接返回
if not file_path:
return
doc = Document()
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
title = doc.add_paragraph(f"{proc_full_name} - 测试报告")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
title.runs[0].font.size = Pt(18)
title.runs[0].bold = True
info = doc.add_paragraph()
info.add_run("规程信息:\n").bold = True
info.add_run(f"规程编号: {self.procedureData['规程信息']['规程编号']}\n")
info.add_run(f"规程类型: {self.procedureData['规程信息']['规程类型']}\n")
case_info = doc.add_paragraph()
case_info.add_run("测试用例信息:\n").bold = True
case_info.add_run(f"测试用例: {self.procedureData['测试用例信息']['测试用例']}\n")
case_info.add_run(f"用例编号: {self.procedureData['测试用例信息']['用例编号']}\n")
case_info.add_run(f"工况描述: {self.procedureData['测试用例信息']['工况描述']}\n")
stats = doc.add_paragraph()
stats.add_run("执行统计:\n").bold = True
total = self.tableModel.rowCount()
success = sum(1 for s in self.tableModel.stepData if s['result'])
failure = total - success
stats.add_run(f"总步骤数: {total}\n成功数: {success}\n失败数: {failure}\n")
# 优化步骤表格展示
steps_table = doc.add_table(rows=1, cols=6)
steps_table.style = 'Table Grid' # 添加表格边框样式
hdr_cells = steps_table.rows[0].cells
# 设置表头
headers = ['步骤ID', '步骤类型', '步骤描述', '执行时间', '执行结果', '状态']
for i, header in enumerate(headers):
hdr_cells[i].text = header
# 设置表头样式(加粗居中)
for paragraph in hdr_cells[i].paragraphs:
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.runs[0]
run.font.bold = True
# 填充表格数据
for step in self.tableModel.stepData:
row_cells = steps_table.add_row().cells
# 步骤ID
row_cells[0].text = step['stepId']
# 步骤类型
step_type = "主步骤" if step['isMain'] else "子步骤"
row_cells[1].text = step_type
# 步骤描述
row_cells[2].text = step['description']
# 执行时间
time_text = step['time'].strftime("%Y-%m-%d %H:%M:%S") if step['time'] else 'N/A'
row_cells[3].text = time_text
# 执行结果(带颜色标记)
result_cell = row_cells[4]
if step['result'] is True:
result_text = '成功'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 128, 0) # 绿色
elif step['result'] is False:
result_text = '失败'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) # 红色
else:
result_text = '未执行'
result_cell.text = result_text
# 状态
status = '已执行' if step['executed'] else '未执行'
row_cells[5].text = status
# 设置表格自动适应宽度
for col in steps_table.columns:
col.width = doc.sections[0].page_width // len(headers)
# 设置默认文件名(包含规程全名)
if not file_path:
filename = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
else:
filename = file_path
doc.save(filename)
# 确保导出成功时显示提示信息
QMessageBox.information(self, "报告生成", f"报告已生成: {filename}")
class ProcedureManager(QMainWindow): class ProcedureManager(QMainWindow):
def __init__(self): def __init__(self):
@ -867,6 +186,7 @@ class ProcedureManager(QMainWindow):
return return
procId = currentItem.data(Qt.UserRole) # 修改变量名 procId = currentItem.data(Qt.UserRole) # 修改变量名
# print(procId, 22222222222222)
procedureData = self.db.getProcedureContent(procId) # 修改变量名 procedureData = self.db.getProcedureContent(procId) # 修改变量名
if not procedureData: if not procedureData:
@ -1135,7 +455,7 @@ class ProcedureManager(QMainWindow):
# 新增:打开历史记录查看器 # 新增:打开历史记录查看器
def openHistoryViewer(self): def openHistoryViewer(self):
historyViewer = HistoryViewer(self.db, self) historyViewer = HistoryViewerWidget(self.db, self)
historyViewer.exec_() historyViewer.exec_()
def loadStylesheet(self): def loadStylesheet(self):

@ -0,0 +1,490 @@
import sys
import os
from PyQt5.QtCore import Qt, QTimer, QAbstractTableModel, QModelIndex, QPoint, QSize, pyqtSignal, QFile,QTextStream
from PyQt5.QtGui import QBrush, QColor, QFont, QStandardItemModel, QStandardItem
from PyQt5.QtWidgets import (QApplication, QMainWindow, QTableView,
QPushButton, QVBoxLayout, QWidget, QHBoxLayout,
QLabel, QCheckBox, QSpinBox, QMenu, QFileDialog,
QTabWidget, QTabBar, QListWidget, QListWidgetItem, QDialog,
QLineEdit, QFormLayout, QDialogButtonBox, QMessageBox,
QHeaderView, QToolBar, QAction, QStatusBar, QComboBox, QSplitter, QAbstractItemView)
from docx import Document
from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
import qtawesome as qta
from datetime import datetime
# 导入其他模块
from model.ProcedureModel.ProcedureProcessor import ExcelParser, StepTableModel
from utils.DBModels.ProcedureModel import DatabaseManager
# 修改导入路径
class StepExecutor(QWidget):
# 添加标签锁定信号
tabLockRequired = pyqtSignal(bool)
def __init__(self, procedureData, procedureId, dbManager):
super().__init__()
self.procedureData = procedureData
self.procedureId = procedureId # 新增规程ID
self.dbManager = dbManager # 新增数据库管理器
self.isRunning = False
self.isActive = False
testSteps = procedureData["测试步骤"]
self.initUi(testSteps)
self.currentIndex = 0
self.timer = QTimer()
self.timer.timeout.connect(self.autoExecuteStep)
self.remainingCycles = 1
self.infiniteCycles = False
self.isFirstRun = True # 新增标志位,用于区分首次执行
self.stepResults = [] # 新增:存储所有步骤执行结果的列表
def initUi(self, testSteps):
layout = QVBoxLayout()
info_layout = QFormLayout()
info_layout.setLabelAlignment(Qt.AlignRight)
info_layout.addRow("规程名称:", QLabel(self.procedureData["规程信息"]["规程名称"]))
info_layout.addRow("规程编号:", QLabel(self.procedureData["规程信息"]["规程编号"]))
info_layout.addRow("规程类型:", QLabel(self.procedureData["规程信息"]["规程类型"]))
info_layout.addRow("测试用例:", QLabel(self.procedureData["测试用例信息"]["测试用例"]))
info_layout.addRow("用例编号:", QLabel(self.procedureData["测试用例信息"]["用例编号"]))
info_layout.addRow("工况描述:", QLabel(self.procedureData["测试用例信息"]["工况描述"]))
layout.addLayout(info_layout)
layout.addSpacing(20)
self.tableModel = StepTableModel(testSteps)
self.tableView = QTableView()
self.tableView.setModel(self.tableModel)
self.tableView.setContextMenuPolicy(Qt.CustomContextMenu)
self.tableView.customContextMenuRequested.connect(self.showContextMenu)
self.tableView.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.tableView.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents)
self.tableView.horizontalHeader().setSectionResizeMode(5, QHeaderView.Stretch) # 新增:备注列自适应宽度
layout.addWidget(QLabel("测试步骤:"))
layout.addWidget(self.tableView)
# 创建控制按钮布局
control_layout = QHBoxLayout()
self.autoButton = QPushButton(" 开始自动执行")
self.autoButton.clicked.connect(self.startAutoExecute)
self.autoButton.setIcon(qta.icon('fa5s.play', color='green'))
self.stopButton = QPushButton(" 停止自动执行")
self.stopButton.clicked.connect(self.stopAutoExecute)
self.stopButton.setEnabled(False)
self.stopButton.setIcon(qta.icon('fa5s.stop', color='red'))
self.nextButton = QPushButton(" 执行下一步")
self.nextButton.clicked.connect(self.executeNextStep)
self.nextButton.setIcon(qta.icon('fa5s.step-forward', color='blue'))
self.resetButton = QPushButton(" 完全重置")
self.resetButton.clicked.connect(self.resetExecution)
self.resetButton.setIcon(qta.icon('fa5s.redo', color='orange'))
self.exportButton = QPushButton(" 生成报告")
self.exportButton.clicked.connect(self.generateReport)
self.exportButton.setIcon(qta.icon('fa5s.file-alt', color='purple'))
control_layout.addWidget(self.autoButton)
control_layout.addWidget(self.stopButton)
control_layout.addWidget(self.nextButton)
control_layout.addWidget(self.resetButton)
control_layout.addWidget(self.exportButton)
# 添加循环设置
cycle_layout = QHBoxLayout()
cycle_layout.addWidget(QLabel("执行轮次:"))
self.cycleSpin = QSpinBox()
self.cycleSpin.setRange(1, 999)
self.cycleSpin.setValue(1)
cycle_layout.addWidget(self.cycleSpin)
self.infiniteCheckbox = QCheckBox("无限循环")
cycle_layout.addWidget(self.infiniteCheckbox)
cycle_layout.addStretch()
# 将所有布局添加到主布局
layout.addLayout(control_layout)
layout.addLayout(cycle_layout)
# 设置主布局
self.setLayout(layout)
# 初始化工具栏
self.toolbar = QToolBar("执行工具栏")
self.toolbar.setIconSize(QSize(24, 24))
# 工具栏操作
self.toolbar.addAction(qta.icon('fa5s.play', color='green'), "开始执行", self.startAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.stop', color='red'), "停止执行", self.stopAutoExecute)
self.toolbar.addAction(qta.icon('fa5s.step-forward', color='blue'), "下一步", self.executeNextStep)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.redo', color='orange'), "重置", self.resetExecution)
self.toolbar.addSeparator()
self.toolbar.addAction(qta.icon('fa5s.file-alt', color='purple'), "生成报告", self.generateReport)
def showContextMenu(self, pos):
index = self.tableView.indexAt(pos)
if index.isValid():
menu = QMenu()
jumpAction = menu.addAction("从该步骤开始执行")
jumpAction.triggered.connect(lambda: self.jumpExecute(index.row()))
detailAction = menu.addAction("查看步骤详情")
detailAction.triggered.connect(lambda: self.showStepDetail(index.row()))
menu.exec_(self.tableView.viewport().mapToGlobal(pos))
def jumpExecute(self, rowIndex):
if 0 <= rowIndex < self.tableModel.rowCount():
stepInfo = self.tableModel.getStepInfo(rowIndex)
if stepInfo and not stepInfo['isMain']:
self.executeStep(rowIndex)
self.currentIndex = rowIndex + 1
def showStepDetail(self, row):
step_info = self.tableModel.getStepInfo(row)
if not step_info:
return
detail_dialog = QDialog(self)
detail_dialog.setWindowTitle("步骤详情")
detail_dialog.setMinimumWidth(500)
layout = QVBoxLayout()
form_layout = QFormLayout()
form_layout.addRow("步骤ID", QLabel(step_info['stepId']))
form_layout.addRow("步骤类型", QLabel("主步骤" if step_info['isMain'] else "子步骤"))
form_layout.addRow("步骤描述", QLabel(step_info['description']))
if step_info['time']:
form_layout.addRow("执行时间"); QLabel(step_info['time'].strftime("%Y-%m-%d %H:%M:%S"))
if step_info['result'] is not None:
status = "成功" if step_info['result'] else "失败"
form_layout.addRow("执行结果"); QLabel(status)
layout.addLayout(form_layout)
button_box = QDialogButtonBox(QDialogButtonBox.Ok)
button_box.accepted.connect(detail_dialog.accept)
layout.addWidget(button_box)
detail_dialog.setLayout(layout)
detail_dialog.exec_()
def startAutoExecute(self):
self.isRunning = True
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
self.autoButton.setEnabled(False)
self.stopButton.setEnabled(True)
self.nextButton.setEnabled(False)
self.resetButton.setEnabled(False)
self.exportButton.setEnabled(False)
# 如果是首次执行,则初始化步骤结果集合
if self.isFirstRun:
self.stepResults = [] # 重置步骤结果集合
self.tableModel.resetExecutionState()
self.isFirstRun = False # 标记已执行过
self.remainingCycles = self.cycleSpin.value()
self.infiniteCycles = self.infiniteCheckbox.isChecked()
self.timer.start(1000)
def stopAutoExecute(self):
self.isRunning = False # 清除自动执行状态
self.timer.stop()
self.autoButton.setEnabled(True)
self.stopButton.setEnabled(False)
self.nextButton.setEnabled(True)
self.resetButton.setEnabled(True)
self.exportButton.setEnabled(True)
# 注意: 这里不重置isActive因为执行器仍处于激活状态
# 执行结束时更新完整步骤结果到数据库
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
def autoExecuteStep(self):
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
else:
if self.infiniteCycles or self.remainingCycles > 0:
if not self.infiniteCycles:
self.remainingCycles -= 1
self.timer.stop()
self.currentIndex = 0
should_continue = self.infiniteCycles or self.remainingCycles > 0
if should_continue:
self.tableModel.resetAll()
if hasattr(self, 'current_execution_id'):
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
self.timer.start()
else:
self.stopAutoExecute()
else:
self.stopAutoExecute()
def executeNextStep(self):
self.isActive = True
# 发送标签锁定信号
self.tabLockRequired.emit(True)
if self.currentIndex < self.tableModel.rowCount():
step_info = self.tableModel.getStepInfo(self.currentIndex)
if step_info and not step_info['isMain']:
self.executeStep(self.currentIndex)
self.currentIndex += 1
# 手动执行不需要修改isRunning状态
def executeStep(self, row):
stepInfo = self.tableModel.getStepInfo(row)
if not stepInfo:
return False
print(f"开始执行步骤 {stepInfo['stepId']}: {stepInfo['description']}")
result = self.handleStep(row, stepInfo)
# 确保result总是布尔值避免None导致数据库错误
if result is None:
print(f"警告:步骤 {stepInfo['stepId']} 返回了None结果设置为False")
result = False
# 存储执行结果到数据库
execution_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 新增获取当前执行的ID每次执行一个唯一的execution_id
if not hasattr(self, 'current_execution_id'):
self.current_execution_id = self.dbManager.insertProcedureExecution(
self.procedureId,
execution_time
)
# 存储执行结果到内存集合
step_result = {
'step_id': stepInfo['stepId'],
'step_description': stepInfo['description'],
'execution_time': execution_time,
'result': result
}
self.stepResults.append(step_result)
# 更新数据库中的步骤结果集合
self.dbManager.updateStepResults(self.current_execution_id, self.stepResults)
success = self.tableModel.updateStepResult(row, result, datetime.now())
return result
def handleStep(self, rowIndex, stepInfo): # 修改参数名
description = stepInfo['description']
stepId = stepInfo['stepId']
print(f"处理步骤 {stepId}: {description}")
if "设置" in description:
return self.performLogin(stepId, description) # 修改方法名
elif "数据导入" in description:
return self.performDataImport(stepId, description) # 修改方法名
elif "验证" in description:
return self.performValidation(stepId, description) # 修改方法名
elif "导出" in description:
return self.performExport(stepId, description) # 修改方法名
elif "备份" in description:
return self.performBackup(stepId, description) # 修改方法名
elif "发送" in description:
return self.performNotification(stepId, description) # 修改方法名
def simulateExecution(self, stepId, description): # 修改方法名
import random
return random.random() < 0.9
def performLogin(self, stepId, description): # 修改方法名
print(f"执行登录操作 (步骤 {stepId}): {description}")
return True
def performDataImport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.95
def performValidation(self, stepId, description): # 修改方法名
import random
return random.random() < 0.85
def performExport(self, stepId, description): # 修改方法名
import random
return random.random() < 0.98
def performBackup(self, stepId, description): # 修改方法名
return True
def performNotification(self, stepId, description): # 修改方法名
import random
return random.random() < 0.99
def performWait(self, stepId, description): # 修改方法名
import time
waitTime = 1 # 修改变量名
if "s" in description:
try:
waitTime = int(description.split("等待")[1].split("s")[0].strip())
except:
pass
time.sleep(waitTime)
return True
def performSetting(self, stepId, description): # 修改方法名
return True
def resetExecution(self):
self.tableModel.resetAll()
self.currentIndex = 0
self.stopAutoExecute()
self.cycleSpin.setValue(1)
self.infiniteCheckbox.setChecked(False)
self.isRunning = False
self.isActive = False
self.isFirstRun = True # 重置标志位
self.stepResults = [] # 重置步骤结果集合
# 新增重置当前执行ID
if hasattr(self, 'current_execution_id'):
del self.current_execution_id
# 发送标签解锁信号
self.tabLockRequired.emit(False)
# 修改方法签名添加file_path参数
def generateReport(self):
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
# 弹出文件选择对话框
default_name = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
file_path, _ = QFileDialog.getSaveFileName(
self,
"保存报告",
default_name,
"Word文档 (*.docx)"
)
# 如果用户取消选择,则直接返回
if not file_path:
return
doc = Document()
# 生成规程全名(名称+编号)
proc_full_name = f"{self.procedureData['规程信息']['规程名称']}({self.procedureData['规程信息']['规程编号']})"
title = doc.add_paragraph(f"{proc_full_name} - 测试报告")
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
title.runs[0].font.size = Pt(18)
title.runs[0].bold = True
info = doc.add_paragraph()
info.add_run("规程信息:\n").bold = True
info.add_run(f"规程编号: {self.procedureData['规程信息']['规程编号']}\n")
info.add_run(f"规程类型: {self.procedureData['规程信息']['规程类型']}\n")
case_info = doc.add_paragraph()
case_info.add_run("测试用例信息:\n").bold = True
case_info.add_run(f"测试用例: {self.procedureData['测试用例信息']['测试用例']}\n")
case_info.add_run(f"用例编号: {self.procedureData['测试用例信息']['用例编号']}\n")
case_info.add_run(f"工况描述: {self.procedureData['测试用例信息']['工况描述']}\n")
stats = doc.add_paragraph()
stats.add_run("执行统计:\n").bold = True
total = self.tableModel.rowCount()
success = sum(1 for s in self.tableModel.stepData if s['result'])
failure = total - success
stats.add_run(f"总步骤数: {total}\n成功数: {success}\n失败数: {failure}\n")
# 优化步骤表格展示
steps_table = doc.add_table(rows=1, cols=6)
steps_table.style = 'Table Grid' # 添加表格边框样式
hdr_cells = steps_table.rows[0].cells
# 设置表头
headers = ['步骤ID', '步骤类型', '步骤描述', '执行时间', '执行结果', '状态']
for i, header in enumerate(headers):
hdr_cells[i].text = header
# 设置表头样式(加粗居中)
for paragraph in hdr_cells[i].paragraphs:
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
run = paragraph.runs[0]
run.font.bold = True
# 填充表格数据
for step in self.tableModel.stepData:
row_cells = steps_table.add_row().cells
# 步骤ID
row_cells[0].text = step['stepId']
# 步骤类型
step_type = "主步骤" if step['isMain'] else "子步骤"
row_cells[1].text = step_type
# 步骤描述
row_cells[2].text = step['description']
# 执行时间
time_text = step['time'].strftime("%Y-%m-%d %H:%M:%S") if step['time'] else 'N/A'
row_cells[3].text = time_text
# 执行结果(带颜色标记)
result_cell = row_cells[4]
if step['result'] is True:
result_text = '成功'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(0, 128, 0) # 绿色
elif step['result'] is False:
result_text = '失败'
# 确保单元格有run对象
if not result_cell.paragraphs[0].runs:
result_cell.paragraphs[0].add_run(result_text)
result_cell.paragraphs[0].runs[0].font.color.rgb = RGBColor(255, 0, 0) # 红色
else:
result_text = '未执行'
result_cell.text = result_text
# 状态
status = '已执行' if step['executed'] else '未执行'
row_cells[5].text = status
# 设置表格自动适应宽度
for col in steps_table.columns:
col.width = doc.sections[0].page_width // len(headers)
# 设置默认文件名(包含规程全名)
if not file_path:
filename = f"{proc_full_name}_报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
else:
filename = file_path
doc.save(filename)
# 确保导出成功时显示提示信息
QMessageBox.information(self, "报告生成", f"报告已生成: {filename}")
return filename

@ -1,7 +1,7 @@
import re import re
import qtawesome import qtawesome
from PyQt5 import QtGui from PyQt5 import QtGui
from PyQt5.QtCore import Qt, QVariant from PyQt5.QtCore import Qt, QVariant, QTimer
from PyQt5.QtWidgets import QMessageBox from PyQt5.QtWidgets import QMessageBox
from websocket import send from websocket import send
from protocol.TCP.Analog import getRealAO from protocol.TCP.Analog import getRealAO
@ -19,7 +19,21 @@ class AnalogModel(VarTableModel):
table : 缺省参数 table : 缺省参数
''' '''
VarTableModel.__init__(self, header, data, table = table) VarTableModel.__init__(self, header, data, table = table)
# 修改为驼峰命名
self.valueCache = {}
self.cacheTimer = QTimer()
self.cacheTimer.timeout.connect(self.refreshValueCache)
self.cacheTimer.start(300) # 每秒刷新一次缓存
# 方法名改为驼峰命名
def refreshValueCache(self):
"""刷新所有变量的值缓存"""
for row in self.datas:
varName = row[3]
try:
self.valueCache[varName] = Globals.getValue('protocolManage').readVariableValue(varName)
except:
self.valueCache[varName] = None
def initTable(self): def initTable(self):
self.datas = [] self.datas = []
@ -39,6 +53,8 @@ class AnalogModel(VarTableModel):
self.checkList = ['Unchecked'] * len(self.datas) self.checkList = ['Unchecked'] * len(self.datas)
# self.layoutChanged.emit() # self.layoutChanged.emit()
self.table.proxy.invalidate() self.table.proxy.invalidate()
# 初始化后立即刷新缓存
self.refreshValueCache()
def data(self, QModelIndex, role=None): def data(self, QModelIndex, role=None):
# print(Qt.__dict__.items()) # print(Qt.__dict__.items())
@ -70,9 +86,10 @@ class AnalogModel(VarTableModel):
# # 获取变量值并插入表格 # # 获取变量值并插入表格
if QModelIndex.column() == 2: if QModelIndex.column() == 2:
return self.table.valueList[QModelIndex.row()] varName = self.datas[QModelIndex.row()][3]
# 使用修改后的缓存变量名
return self.valueCache.get(varName, None)
return QVariant(self.datas[QModelIndex.row()][QModelIndex.column()]) return QVariant(self.datas[QModelIndex.row()][QModelIndex.column()])
def flags(self, index): def flags(self, index):
@ -142,8 +159,8 @@ class AnalogButtonDelegate(TcRtdButtonDelegate):
sender = self.sender() sender = self.sender()
model = self.parent().model model = self.parent().model
value = model.datas[sender.index[0]][1] value = model.datas[sender.index[0]][1]
min = model.datas[sender.index[0]][6] min = model.datas[sender.index[0]][7]
max = model.datas[sender.index[0]][7] max = model.datas[sender.index[0]][8]
pattern = re.compile(r'[^0-9\.-]+') pattern = re.compile(r'[^0-9\.-]+')
if not value or re.findall(pattern, str(value)): if not value or re.findall(pattern, str(value)):
reply = QMessageBox.question(self.parent(), reply = QMessageBox.question(self.parent(),
@ -181,12 +198,17 @@ class AnalogButtonDelegate(TcRtdButtonDelegate):
"超出量程范围2", "超出量程范围2",
QMessageBox.Yes) QMessageBox.Yes)
return return
if sender.index[0] < 8: # if sender.index[0] < 8:
model.table.realList[sender.index[0]] = getRealAO(float(value), max, min) # model.table.realList[sender.index[0]] = getRealAO(float(value), max, min)
model.table.valueList[sender.index[0]] = float(value) # model.table.valueList[sender.index[0]] = float(value)
# else:
# model.table.realList[sender.index[0]] = float(value)
# model.table.valueList[sender.index[0]] = float(value)
res = Globals.getValue('protocolManage').writeVariableValue(model.datas[sender.index[0]][3], float(value))
if res:
forceVars = Globals.getValue('forceVars')
forceVars.add(model.datas[sender.index[0]][3])
Globals.setValue('forceVars', forceVars)
else: else:
model.table.realList[sender.index[0]] = float(value) QMessageBox.information(self.parent(), '提示', '写入失败', QMessageBox.Yes)
model.table.valueList[sender.index[0]] = float(value)
forceVars = Globals.getValue('forceVars')
forceVars.add(model.datas[sender.index[0]][3])
Globals.setValue('forceVars', forceVars)

@ -1,6 +1,6 @@
import qtawesome import qtawesome
from PyQt5.QtCore import Qt, QVariant, QSize from PyQt5.QtCore import Qt, QVariant, QSize, QTimer
from PyQt5.QtWidgets import QHBoxLayout, QWidget, QMessageBox, QComboBox from PyQt5.QtWidgets import QHBoxLayout, QWidget, QMessageBox, QComboBox
from protocol.TCP.TemToMv import temToMv from protocol.TCP.TemToMv import temToMv
@ -17,6 +17,22 @@ class TcRtdModel(VarTableModel):
table : 缺省参数 table : 缺省参数
''' '''
VarTableModel.__init__(self, header, data, table = table) VarTableModel.__init__(self, header, data, table = table)
# 添加定时刷新缓存机制
self.valueCache = {}
self.cacheTimer = QTimer()
self.cacheTimer.timeout.connect(self.refreshValueCache)
self.cacheTimer.start(300) # 每秒刷新一次缓存
# 添加缓存刷新方法
def refreshValueCache(self):
"""刷新所有变量的值缓存"""
for row in self.datas:
varName = row[3]
try:
self.valueCache[varName] = Globals.getValue('protocolManage').readVariableValue(varName)
# print(Globals.getValue('protocolManage').readVariableValue(varName))
except:
self.valueCache[varName] = None
def initTable(self): def initTable(self):
self.datas = [] self.datas = []
@ -25,7 +41,6 @@ class TcRtdModel(VarTableModel):
# if proType in ['5']: # if proType in ['5']:
varDatas = TcRtdManage.getAllVar() varDatas = TcRtdManage.getAllVar()
if not varDatas: if not varDatas:
# self.layoutChanged.emit() # self.layoutChanged.emit()
self.table.proxy.invalidate() self.table.proxy.invalidate()
@ -38,6 +53,8 @@ class TcRtdModel(VarTableModel):
self.checkList = ['Unchecked'] * len(self.datas) self.checkList = ['Unchecked'] * len(self.datas)
# self.layoutChanged.emit() # self.layoutChanged.emit()
self.table.proxy.invalidate() self.table.proxy.invalidate()
# 初始化后立即刷新缓存
self.refreshValueCache()
def data(self, QModelIndex, role=None): def data(self, QModelIndex, role=None):
# print(Qt.__dict__.items()) # print(Qt.__dict__.items())
@ -67,11 +84,11 @@ class TcRtdModel(VarTableModel):
if role != Qt.DisplayRole: if role != Qt.DisplayRole:
return QVariant() return QVariant()
# # 获取变量值并插入表格 # 修改使用缓存值代替table.valueList
if QModelIndex.column() == 2: if QModelIndex.column() == 2:
return self.table.valueList[QModelIndex.row()]# # 获取变量值并插入表格 varName = self.datas[QModelIndex.row()][3]
# if QModelIndex.column() == 2: # print(self.valueCache.get(varName, None))
return self.valueCache.get(varName, None)
return QVariant(self.datas[QModelIndex.row()][QModelIndex.column()]) return QVariant(self.datas[QModelIndex.row()][QModelIndex.column()])
@ -145,10 +162,10 @@ class TcRtdButtonDelegate(VarButtonDelegate):
sender = self.sender() sender = self.sender()
model = self.parent().model model = self.parent().model
value = model.datas[sender.index[0]][1] value = model.datas[sender.index[0]][1]
varType = model.datas[sender.index[0]][5] varType = model.datas[sender.index[0]][6]
min = model.datas[sender.index[0]][6] min = model.datas[sender.index[0]][7]
max = model.datas[sender.index[0]][7] max = model.datas[sender.index[0]][8]
compensation = model.datas[sender.index[0]][8] compensation = model.datas[sender.index[0]][9]
pattern = re.compile(r'[^0-9\.-]+') pattern = re.compile(r'[^0-9\.-]+')
if not value or re.findall(pattern, str(value) + str(compensation)): if not value or re.findall(pattern, str(value) + str(compensation)):
reply = QMessageBox.question(self.parent(), reply = QMessageBox.question(self.parent(),
@ -184,12 +201,19 @@ class TcRtdButtonDelegate(VarButtonDelegate):
"输入值有误", "输入值有误",
QMessageBox.Yes) QMessageBox.Yes)
return return
mv += float(compensation)
model.table.mvList[sender.index[0]] = mv # 直接写入变量值
model.table.valueList[sender.index[0]] = float(value) res = Globals.getValue('protocolManage').writeVariableValue(
forceVars = Globals.getValue('forceVars') model.datas[sender.index[0]][3],
forceVars.add(model.datas[sender.index[0]][3]) float(value)
Globals.setValue('forceVars', forceVars) )
if res:
forceVars = Globals.getValue('forceVars')
forceVars.add(model.datas[sender.index[0]][3])
Globals.setValue('forceVars', forceVars)
else:
QMessageBox.information(self.parent(), '提示', '写入失败', QMessageBox.Yes)
def edit_action(self): def edit_action(self):
sender = self.sender() sender = self.sender()

@ -139,8 +139,8 @@ class TcRtdTableView(VarTableView):
self.setItemDelegateForColumn(6, TcRtdTypeDelegate(self)) self.setItemDelegateForColumn(6, TcRtdTypeDelegate(self))
self.valueList = [0] * 16 self.valueList = [0] * 16
self.mvList = [0] * 16 self.mvList = [0] * 16
self.workThread = RTDTCThread(self) # self.workThread = RTDTCThread(self)
Globals.setValue('RTDTCThread', self.workThread) # Globals.setValue('RTDTCThread', self.workThread)
# self.workThread.getValueList.connect(self.getValueList) # self.workThread.getValueList.connect(self.getValueList)
def setupColumnWidths(self): def setupColumnWidths(self):
self.header.setSectionResizeMode(QHeaderView.Stretch) self.header.setSectionResizeMode(QHeaderView.Stretch)
@ -152,10 +152,10 @@ class TcRtdTableView(VarTableView):
class AnalogTableView(VarTableView): class AnalogTableView(VarTableView):
def __init__(self, parent=None): def __init__(self, parent=None):
super(AnalogTableView, self).__init__(parent) super(AnalogTableView, self).__init__(parent)
self.valueList = [0] * 8 + [0] * 8 + [0] * 16 + [0] * 8 + [0] * 8 + [0] * 8 # self.valueList = [0] * 8 + [0] * 8 + [0] * 16 + [0] * 8 + [0] * 8 + [0] * 8
self.realList = [0] * 8 + [0] * 8 + [0] * 16 + [0] * 8 + [0] * 8 + [0] * 8 # self.realList = [0] * 8 + [0] * 8 + [0] * 16 + [0] * 8 + [0] * 8 + [0] * 8
self.workThread = AnalogThread(self) # self.workThread = AnalogThread(self)
Globals.setValue('AnalogThread', self.workThread) # Globals.setValue('AnalogThread', self.workThread)
def setupColumnWidths(self): def setupColumnWidths(self):
self.header.setSectionResizeMode(QHeaderView.Stretch) self.header.setSectionResizeMode(QHeaderView.Stretch)

@ -27,7 +27,7 @@ from utils.DBModels.ProtocolModel import ModbusTcpMasterVar, ModbusRtuMasterVar,
from utils.DBModels.UserModels import User from utils.DBModels.UserModels import User
from utils.DBModels.DeviceModels import DeviceDB from utils.DBModels.DeviceModels import DeviceDB
from protocol.ProtocolManage import ProtocolManage
@ -118,8 +118,12 @@ class ProjectManage(object):
def switchProject(self, name): def switchProject(self, name):
# 切换工程 # 切换工程
currentDB = Globals.getValue('currentProDB') currentDB = Globals.getValue('currentProDB')
currentProtocolManage = Globals.getValue('protocolManage')
if currentDB: if currentDB:
currentDB.close() currentDB.close()
if Globals.getValue('protocolManage'):
currentProtocolManage.shutdown()
Client.initDB() Client.initDB()
dbPath = os.path.join('project', name, 'db', 'project.db') dbPath = os.path.join('project', name, 'db', 'project.db')
@ -136,6 +140,16 @@ class ProjectManage(object):
Globals.setValue('currentProDB', projectDB) Globals.setValue('currentProDB', projectDB)
Globals.setValue('currentProType', 1)#切换工程 Globals.setValue('currentProType', 1)#切换工程
protocolManage = ProtocolManage()
# protocolManage.writeVariableValue('无源4-20mA输出通道1', 150)
# protocolManage.writeVariableValue('有源24V数字输出通道12', 1)
# protocolManage.writeVariableValue('热偶输出2', 40)
# protocolManage.writeVariableValue('热阻输出3', 50)
# print(protocolManage.readVariableValue('有源/无源4-20mA输入通道2'))
# print(protocolManage.readVariableValue('无源24V数字输入通道2'))
# print(a)
Globals.setValue('protocolManage', protocolManage)
# if Globals.getValue('FFThread').isRunning(): # if Globals.getValue('FFThread').isRunning():
# Globals.getValue('FFThread').reStart() # Globals.getValue('FFThread').reStart()
# return # return

@ -0,0 +1,217 @@
from utils.DBModels.ProtocolModel import (
ModbusTcpMasterVar, ModbusTcpSlaveVar, ModbusRtuMasterVar, ModbusRtuSlaveVar,
HartVar, TcRtdVar, AnalogVar, HartSimulateVar
)
from protocol.TCP.TCPVarManage import TCPVarManager
from protocol.TCP.TemToMv import temToMv
class ProtocolManage(object):
"""通讯变量查找类,用于根据变量名在数据库模型中检索变量信息"""
# 支持的模型类列表
MODEL_CLASSES = [
ModbusTcpMasterVar, ModbusTcpSlaveVar,
ModbusRtuMasterVar, ModbusRtuSlaveVar,
HartVar, TcRtdVar, AnalogVar, HartSimulateVar
]
def __init__(self):
self.tcpVarManager = TCPVarManager('127.0.0.1', 8000)
self.writeTC = [0] * 8
self.writeRTD = [0] * 8
@classmethod
def lookupVariable(cls, variableName):
"""
根据变量名检索变量信息
:param variableName: 要查询的变量名
:return: 包含变量信息和模型类型的字典如果未找到返回None
"""
for modelClass in cls.MODEL_CLASSES:
varInstance = modelClass.getByName(variableName)
if varInstance:
# 获取变量字段信息
varData = {}
for field in varInstance._meta.sorted_fields:
fieldName = field.name
varData[fieldName] = getattr(varInstance, fieldName)
return {
'model_type': modelClass.__name__,
'variable_data': varData
}
return None
# @classmethod
def writeVariableValue(self, variableName, value):
"""
根据变量名写入变量值根据变量类型进行不同处理不保存到数据库
:param variableName: 变量名
:param value: 要写入的值
:return: 写入成功返回True否则返回False
"""
varInfo = self.lookupVariable(variableName)
if not varInfo:
return False
modelType = varInfo['model_type']
info = varInfo['variable_data']
print(info)
try:
# 拆分为四个独立的Modbus协议条件判断
if modelType == 'ModbusTcpMasterVar':
# 仅设置值,不保存到数据库
pass
if modelType == 'ModbusTcpSlaveVar':
# 仅设置值,不保存到数据库
pass
if modelType == 'ModbusRtuMasterVar':
# 仅设置值,不保存到数据库
pass
if modelType == 'ModbusRtuSlaveVar':
# 仅设置值,不保存到数据库
pass
# HART协议变量处理
elif modelType == 'HartVar':
# 仅设置值,不保存到数据库
pass
# 温度/RTD变量处理
elif modelType == 'TcRtdVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
compensationVar = float(info['compensationVar'])
# print(value + compensationVar)
if varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A', 'PT100']:
mvΩvalue = temToMv(varType, value + compensationVar) # 直接补偿温度 补偿mv调整到括号外
self.tcpVarManager.writeValue(varType, channel, mvΩvalue)
if varType == 'PT100':
self.wrtieRTD[channel] = value
else:
self.writeTC[channel] = value
# 模拟量变量处理
elif modelType == 'AnalogVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
if info['varType'] == 'AO':
value = self.getRealAO(value, info['max'], info['min'])
self.tcpVarManager.writeValue(varType, channel, value)
# print(1)
# HART模拟变量处理
elif modelType == 'HartSimulateVar':
# 仅设置值,不保存到数据库
pass
return True
except Exception as e:
print(f"写入变量值失败: {str(e)}")
return False
# 添加读取函数
def readVariableValue(self, variableName):
"""
根据变量名读取变量值根据变量类型进行不同处理留空
:param variableName: 变量名
:return: 读取的值或None失败时
"""
varInfo = self.lookupVariable(variableName)
if not varInfo:
return None
modelType = varInfo['model_type']
info = varInfo['variable_data']
try:
# 拆分为独立的协议条件判断
if modelType == 'ModbusTcpMasterVar':
# 读取操作(留空)
pass
if modelType == 'ModbusTcpSlaveVar':
pass
if modelType == 'ModbusRtuMasterVar':
pass
if modelType == 'ModbusRtuSlaveVar':
pass
# HART协议变量处理
elif modelType == 'HartVar':
pass
# 温度/RTD变量处理
elif modelType == 'TcRtdVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
if varType == 'PT100':
value = self.writeRTD[channel]
elif varType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
value = self.writeTC[channel]
return value
# 模拟量变量处理
elif modelType == 'AnalogVar':
channel = int(info['channelNumber']) - 1
varType = info['varType']
# print(varType, channel)
value = self.tcpVarManager.readValue(varType, channel)
if varType in ['AI','AO']:
value = self.getRealAI(value, info['max'], info['min'])
return value
# print(1)
# HART模拟变量处理
elif modelType == 'HartSimulateVar':
pass
return None # 暂时返回None
except Exception as e:
print(f"读取变量值失败: {str(e)}")
return str(e)
def shutdown(self):
self.tcpVarManager.shutdown()
def getRealAO(self, value,highValue, lowValue):
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
return (16 * (value - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue))
else:
return value/1000
def getRealAI(self, mA, highValue, lowValue):
"""
将毫安值转换为实际工程值getRealAO的反向计算
:param mA: 毫安值
:param highValue: 工程值上限
:param lowValue: 工程值下限
:return: 实际工程值
"""
try:
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
# 反向计算: value = (1000 * mA * (highValue - lowValue) - 4*(highValue - lowValue)) / 16 + lowValue
return (1000 * mA * (highValue - lowValue) - 4 * (highValue - lowValue)) / 16.0 + lowValue
else:
return mA * 1000
except Exception as e:
print(f"工程值转换失败: {str(e)}")
return 0.0 # 默认返回0避免中断流程

@ -230,10 +230,11 @@ class TCPCommunicator:
#print(ai_chunk) #print(ai_chunk)
if i == 8: if i == 8:
diValues = struct.unpack('<d', ai_chunk)[0] diValues = struct.unpack('<d', ai_chunk)[0]
# print(struct.unpack('<d', ai_chunk)[0])
diBools = [(int(diValues) >> j) & 1 for j in range(16)] diBools = [(int(diValues) >> j) & 1 for j in range(16)]
l.append(diBools) l.append(diBools)
else: else:
aiValues = struct.unpack('<d', ai_chunk) aiValues = struct.unpack('<d', ai_chunk)[0]
l.append(aiValues) l.append(aiValues)
# 解析16个DI状态 (8字节) # 解析16个DI状态 (8字节)
#di_chunk = body[end_idx_ai:end_idx_di] #di_chunk = body[end_idx_ai:end_idx_di]
@ -243,9 +244,9 @@ class TCPCommunicator:
# all_ai_values.append(ai_values) # all_ai_values.append(ai_values)
# all_di_states.append(di_bools) # all_di_states.append(di_bools)
# print(l)
result = (l[0:8], l[8]) result = (l[0:8], l[8])
#print(result) # print(result, 111)
# print(result)
if self.aiCallback: if self.aiCallback:
self.aiCallback(result[0], result[1], packetNum) self.aiCallback(result[0], result[1], packetNum)

@ -1,7 +1,7 @@
import logging import logging
import time import time
import threading import threading
from IOTCPClinet import TCPCommunicator from protocol.TCP.IOTCPClinet import TCPCommunicator
FPGATrigger = 1 FPGATrigger = 1
TCTrigger = 2 TCTrigger = 2
@ -17,10 +17,10 @@ class TCPVarManager:
""" """
self.communicator = TCPCommunicator(host, port) self.communicator = TCPCommunicator(host, port)
logging.info(f"TCPVarManager initialized for {host}:{port}") logging.info(f"TCPVarManager initialized for {host}:{port}")
self.AODATA = [0.0] * 16 self.AODATA = [0.004] * 16
self.DODATA = [0] * 16 self.DODATA = [0] * 16
self.TCDATA = [0.0] * 8 self.TCDATA = [0.0] * 8 # mv
self.RTDDATA = [0.0] * 8 self.RTDDATA = [0.0] * 8 # Ω
self.AIDATA = [0.0] * 8 self.AIDATA = [0.0] * 8
self.DIDATA = [0] * 16 self.DIDATA = [0] * 16
self.DELTATDATA = [0] * 16 self.DELTATDATA = [0] * 16
@ -35,11 +35,11 @@ class TCPVarManager:
2, # 触发输出 2, # 触发输出
# 17: 触发类型0:无触发 1fpga触发/2TC触发/3RTD触发 # 17: 触发类型0:无触发 1fpga触发/2TC触发/3RTD触发
triggerType, # TC触发 triggerType, # 触发类型
time, # 实验时间ms time, # 实验时间ms
# 18: 实验时间ms和 DO状态U16转双精度 # 18: 实验时间ms和 DO状态U16转双精度
0, # 将16位整数转换为双精度浮点数 0, # do补位
# 19: DO状态已包含在上面的do_value中 # 19: DO状态已包含在上面的do_value中
@ -49,7 +49,7 @@ class TCPVarManager:
# 28-35: 8通道RTD数据Ω # 28-35: 8通道RTD数据Ω
*self.RTDDATA *self.RTDDATA
] ]
self.writeAoDo(data, self.DODATA) self.writeData(data)
# print(1) # print(1)
@ -105,31 +105,31 @@ class TCPVarManager:
self.stop_event.set() self.stop_event.set()
logging.info("Stopped periodic read") logging.info("Stopped periodic read")
def writeAoDo(self, aoData, do_states): def writeData(self, data):
""" """
写入AO和DO数据 写入AO和DO数据
:param aoData: 36个AO值的列表 :param data: 36个AO值的列表
:param do_states: 16个DO状态的列表 (0或1) :param self.DODATA: 16个DO状态的列表 (0或1)
:return: 写入成功返回True否则返回False :return: 写入成功返回True否则返回False
""" """
if len(aoData) != 36: if len(data) != 36:
logging.error("AO data must contain exactly 36 values") logging.error("AO data must contain exactly 36 values")
return False return False
if len(do_states) != 16: if len(self.DODATA) != 16:
logging.error("DO states must contain exactly 16 values") logging.error("DO states must contain exactly 16 values")
return False return False
try: try:
# 将DO状态转换为单个浮点数协议要求 # 将DO状态转换为单个浮点数协议要求
do_value = 0 do_value = 0
for i, state in enumerate(do_states): for i, state in enumerate(self.DODATA):
do_value |= (state << i) do_value |= (state << i)
# 替换aoData中第20个元素为DO状态值 # 替换data中第20个元素为DO状态值
aoData[19] = float(do_value) data[19] = float(do_value)
success = self.communicator.writeAo(aoData) success = self.communicator.writeAo(data)
if not success: if not success:
logging.warning("AO/DO write operation failed") logging.warning("AO/DO write operation failed")
return success return success
@ -150,6 +150,59 @@ class TCPVarManager:
except Exception as e: except Exception as e:
logging.error(f"Error reading DeltaT: {e}") logging.error(f"Error reading DeltaT: {e}")
return None return None
def writeValue(self, variableType, channel, value, trigger=None):
if variableType == "AO":
self.AODATA[channel] = float(value)
elif variableType == "DO":
self.DODATA[channel] = int(value)
elif variableType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
self.TCDATA[channel] = float(value)
elif variableType == "PT100":
self.RTDDATA[channel] = float(value)
if not trigger:
data = [
# 0-15: 16通道AO电流数据A
*self.AODATA,
# 16: 输出模式0无输出/1正常输出/2触发输出
2, # 正常输出
# 17: 触发类型0:无触发 1fpga触发/2TC触发/3RTD触发
0, # 正常写入
0, # 实验时间ms
# 18: 实验时间ms和 DO状态U16转双精度
0, # do补位
# 19: DO状态已包含在上面的do_value中
# 20-27: 8通道TC数据mV
*self.TCDATA,
# 28-35: 8通道RTD数据Ω
*self.RTDDATA
]
# print(data)
self.writeData(data)
def readValue(self, variableType, channel):
# channel = channel
if variableType == "AI":
# print(self.AIDATA)
return self.AIDATA[channel]
elif variableType == "DI":
return self.DIDATA[channel]
elif variableType == "AO":
return self.AODATA[channel]
elif variableType == "DO":
return self.DODATA[channel]
def shutdown(self): def shutdown(self):
"""关闭连接并清理资源""" """关闭连接并清理资源"""

@ -41,8 +41,8 @@ class Box1Simulator:
def reset_simulated_values(self): def reset_simulated_values(self):
"""重置模拟值到初始状态""" """重置模拟值到初始状态"""
# 模拟AI值 (0-10V范围) # 模拟AI值 (4-20mA范围单位A)
self.ai_values = [random.uniform(0, 10) for _ in range(8)] self.ai_values = [random.uniform(0.004, 0.020) for _ in range(8)]
# 模拟DI状态 (随机0或1) # 模拟DI状态 (随机0或1)
self.di_states = [random.randint(0, 1) for _ in range(16)] self.di_states = [random.randint(0, 1) for _ in range(16)]
@ -164,10 +164,12 @@ class Box1Simulator:
def create_ai_response(self, packet_num): def create_ai_response(self, packet_num):
"""创建AI响应数据包""" """创建AI响应数据包"""
# 模拟值轻微变化 # 模拟值在4-20mA范围内轻微波动单位A
for i in range(8): for i in range(8):
self.ai_values[i] += random.uniform(-0.1, 0.1) self.ai_values[i] += random.uniform(-0.001, 0.001)
self.ai_values[i] = max(0.0, min(10.0, self.ai_values[i])) # 限制在4-20mA范围内 (0.004-0.020A)
self.ai_values[i] = max(0.004, min(0.020, self.ai_values[i]))
# print(self.ai_values)
# 随机翻转一些DI状态 # 随机翻转一些DI状态
for i in range(4): for i in range(4):
@ -177,22 +179,31 @@ class Box1Simulator:
# 修改生成500组AI/DI数据 # 修改生成500组AI/DI数据
full_body = b'' full_body = b''
for _ in range(500):
# 修改使用小端字节序打包8个AI值 for i in range(8):
ai_bytes = struct.pack('<' + 'd'*8, *self.ai_values) for _ in range(500):
# 修复:移除解包操作符(*),直接传入浮点数
ai_bytes = struct.pack('<d', self.ai_values[i])
# 打包DI状态为一个64位整数 # 打包DI状态为一个64位整数
# 添加一组数据到响应体
full_body += ai_bytes
for _ in range(500):
di_state_int = 0 di_state_int = 0
for i, state in enumerate(self.di_states): for i, state in enumerate(self.di_states):
di_state_int |= (state << i) di_state_int |= (state << i)
di_bytes = struct.pack('<Q', di_state_int) di_bytes = struct.pack('<d', di_state_int)
# print(di_bytes, di_state_int)
# 添加一组数据到响应体 full_body += di_bytes
full_body += ai_bytes + di_bytes
# print(len(full_body))
# 使用logging记录发送的AI数据 # 使用logging记录发送的AI数据
logging.info(f"发送的AI为: AI={self.ai_values}, DI={self.di_states} (共500组)") # logging.info(f"发送的AI为: AI={self.ai_values}, DI={self.di_states} (共500组)")
return self.create_packet(self.RESP_READ_AI, full_body, packet_num) return self.create_packet(self.RESP_READ_AI, full_body, packet_num)

@ -28,6 +28,8 @@ def _init():#初始化
_globalDict['MainWindow'] = None _globalDict['MainWindow'] = None
_globalDict['blockManage'] = None _globalDict['blockManage'] = None
_globalDict['protocolManage'] = None
def setValue(key,value): def setValue(key,value):
""" 定义一个全局变量 """ """ 定义一个全局变量 """
_globalDict[key] = value _globalDict[key] = value

Loading…
Cancel
Save