diff --git a/HART通讯测试.docx b/HART通讯测试.docx new file mode 100644 index 0000000..d6a9b1b Binary files /dev/null and b/HART通讯测试.docx differ diff --git a/Static/Hart.qss b/Static/Hart.qss new file mode 100644 index 0000000..0904391 --- /dev/null +++ b/Static/Hart.qss @@ -0,0 +1,23 @@ +/* ==================== HART Tab标签页样式 ==================== */ +QTabBar#hartTabBar::tab { + font-family: "PingFangSC-Medium", "Microsoft YaHei", sans-serif; + font-size: 16px; + width: 160px; + height: 40px; + padding: 8px 16px; + margin: 2px; + border-radius: 6px; + background-color: #F3F4F6; + color: #6B7280; +} + +QTabBar#hartTabBar::tab:hover { + background-color: #E3F2FD; + color: #2277EF; +} + +QTabBar#hartTabBar::tab:selected { + background-color: #2277EF; + color: #FFFFFF; + font-weight: 600; +} \ No newline at end of file diff --git a/UI/HartWidgets/HartCalibrationWidget.py b/UI/HartWidgets/HartCalibrationWidget.py new file mode 100644 index 0000000..427f2a4 --- /dev/null +++ b/UI/HartWidgets/HartCalibrationWidget.py @@ -0,0 +1,327 @@ +from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox, + QLabel, QLineEdit, QPushButton, QComboBox, + QTabWidget, QMessageBox, QGridLayout, QDoubleSpinBox, + QSpinBox, QFormLayout, QCheckBox) +from PyQt5.QtCore import Qt + +class HartCalibrationWidget(QWidget): + def __init__(self, parent=None): + super(HartCalibrationWidget, self).__init__(parent) + self.hartComm = None + self.initUI() + + def initUI(self): + # 创建主布局 + mainLayout = QVBoxLayout() + + # --- 1. 标准校准区域 --- + calibGroup = QGroupBox("标准校准操作") + calibLayout = QVBoxLayout(calibGroup) + + infoLabel1 = QLabel("标准校准操作将校准设备的测量值。请确保在执行校准前已正确连接设备。") + infoLabel1.setWordWrap(True) + calibLayout.addWidget(infoLabel1) + + buttonLayout = QGridLayout() + self.calibrate4mAButton = QPushButton("4mA校准 (Command 37)") + self.calibrate20mAButton = QPushButton("20mA校准 (Command 36)") + self.calibrateZeroPointButton = QPushButton("零点校准 (Command 43)") + buttonLayout.addWidget(self.calibrate4mAButton, 0, 0) + buttonLayout.addWidget(self.calibrate20mAButton, 0, 1) + buttonLayout.addWidget(self.calibrateZeroPointButton, 1, 0, 1, 2) + calibLayout.addLayout(buttonLayout) + + warningLabel1 = QLabel("警告:校准操作将修改设备的校准参数,请确保您知道您在做什么!") + warningLabel1.setStyleSheet("color: red;") + warningLabel1.setWordWrap(True) + calibLayout.addWidget(warningLabel1) + + mainLayout.addWidget(calibGroup) + + # --- 2. 环路电流微调区域 --- + trimGroup = QGroupBox("环路电流微调") + trimLayout = QVBoxLayout(trimGroup) + + infoLabel2 = QLabel("环路电流微调用于精确调整4mA和20mA输出。请先进入固定电流模式使设备输出固定电流,再使用高精度电流表测量实际输出电流,然后输入测量值进行微调。") + infoLabel2.setWordWrap(True) + trimLayout.addWidget(infoLabel2) + + trim4mAGroup = QGroupBox("4mA微调 (Command 45)") + trim4mALayout = QFormLayout(trim4mAGroup) + self.trim4mASpinBox = QDoubleSpinBox() + self.trim4mASpinBox.setRange(3.0, 5.0) + self.trim4mASpinBox.setSingleStep(0.001) + self.trim4mASpinBox.setDecimals(3) + self.trim4mASpinBox.setValue(4.0) + self.trim4mAButton = QPushButton("执行4mA微调") + trim4mALayout.addRow(QLabel("测量的电流值(mA):"), self.trim4mASpinBox) + trim4mALayout.addRow("", self.trim4mAButton) + + trim20mAGroup = QGroupBox("20mA微调 (Command 46)") + trim20mALayout = QFormLayout(trim20mAGroup) + self.trim20mASpinBox = QDoubleSpinBox() + self.trim20mASpinBox.setRange(19.0, 21.0) + self.trim20mASpinBox.setSingleStep(0.001) + self.trim20mASpinBox.setDecimals(3) + self.trim20mASpinBox.setValue(20.0) + self.trim20mAButton = QPushButton("执行20mA微调") + trim20mALayout.addRow(QLabel("测量的电流值(mA):"), self.trim20mASpinBox) + trim20mALayout.addRow("", self.trim20mAButton) + + trimLayout.addWidget(trim4mAGroup) + trimLayout.addWidget(trim20mAGroup) + + warningLabel2 = QLabel("警告:微调操作将修改设备的校准参数,请确保使用高精度电流表测量!") + warningLabel2.setStyleSheet("color: red;") + warningLabel2.setWordWrap(True) + trimLayout.addWidget(warningLabel2) + + mainLayout.addWidget(trimGroup) + + # --- 3. 固定电流输出区域 --- + fixedCurrentGroup = QGroupBox("固定电流输出 (Command 40)") + fixedCurrentLayout = QVBoxLayout(fixedCurrentGroup) + + infoLabel3 = QLabel("固定电流输出模式用于测试和校准目的。在此模式下,设备将输出固定电流值,不受测量值变化的影响。") + infoLabel3.setWordWrap(True) + fixedCurrentLayout.addWidget(infoLabel3) + + self.enableFixedCurrentCheckBox = QCheckBox("启用固定电流输出") + fixedCurrentLayout.addWidget(self.enableFixedCurrentCheckBox) + + fixedCurrentFormLayout = QFormLayout() + self.fixedCurrentSpinBox = QDoubleSpinBox() + self.fixedCurrentSpinBox.setRange(3.8, 21.0) + self.fixedCurrentSpinBox.setSingleStep(0.1) + self.fixedCurrentSpinBox.setDecimals(2) + self.fixedCurrentSpinBox.setValue(4.0) + self.fixedCurrentSpinBox.setEnabled(False) + fixedCurrentFormLayout.addRow(QLabel("固定电流值(mA):"), self.fixedCurrentSpinBox) + fixedCurrentLayout.addLayout(fixedCurrentFormLayout) + + self.applyFixedCurrentButton = QPushButton("应用设置") + fixedCurrentLayout.addWidget(self.applyFixedCurrentButton) + + warningLabel3 = QLabel("警告:启用固定电流输出模式将使设备不再响应测量值变化!请在完成测试后务必退出固定电流模式。") + warningLabel3.setStyleSheet("color: red;") + warningLabel3.setWordWrap(True) + fixedCurrentLayout.addWidget(warningLabel3) + + mainLayout.addWidget(fixedCurrentGroup) + + mainLayout.addStretch(1) + self.setLayout(mainLayout) + + # --- 连接所有信号和槽 --- + self.calibrate4mAButton.clicked.connect(self.onCalibrate4mA) + self.calibrate20mAButton.clicked.connect(self.onCalibrate20mA) + self.calibrateZeroPointButton.clicked.connect(self.onCalibrateZeroPoint) + self.trim4mAButton.clicked.connect(self.onTrimLoopCurrent4mA) + self.trim20mAButton.clicked.connect(self.onTrimLoopCurrent20mA) + self.enableFixedCurrentCheckBox.toggled.connect(self.fixedCurrentSpinBox.setEnabled) + self.applyFixedCurrentButton.clicked.connect(self.onSetFixedCurrentOutput) + + def setHartComm(self, hartComm): + """设置HART通信对象""" + self.hartComm = hartComm + + def onCalibrate4mA(self): + """4mA校准 command 37""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + reply = QMessageBox.question(self, "确认", "确定要执行4mA校准吗?此操作将修改设备校准参数。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.calibrate4mA() + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + QMessageBox.information(self, "成功", f"4mA校准成功!\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"4mA校准失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "4mA校准失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"4mA校准时发生异常: {str(e)}") + + def onCalibrate20mA(self): + """20mA校准 command 36""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + reply = QMessageBox.question(self, "确认", "确定要执行20mA校准吗?此操作将修改设备校准参数。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.calibrate20mA() + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + QMessageBox.information(self, "成功", f"20mA校准成功!\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"20mA校准失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "20mA校准失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"20mA校准时发生异常: {str(e)}") + + def onCalibrateZeroPoint(self): + """零点校准 command 43""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + reply = QMessageBox.question(self, "确认", "确定要执行零点校准吗?此操作将修改设备校准参数。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.calibrateZeroPoint() + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + QMessageBox.information(self, "成功", f"零点校准成功!\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"零点校准失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "零点校准失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"零点校准时发生异常: {str(e)}") + + def onTrimLoopCurrent4mA(self): + """微调环路电流4mA command 45""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + measured_current = self.trim4mASpinBox.value() + reply = QMessageBox.question(self, "确认", f"确定要使用测量值 {measured_current} mA 执行4mA微调吗?此操作将修改设备校准参数。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.trimLoopCurrent4mA(measured_current) + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + QMessageBox.information(self, "成功", f"4mA环路电流微调成功!\n测量值: {measured_current} mA\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"4mA环路电流微调失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "4mA环路电流微调失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"4mA环路电流微调时发生异常: {str(e)}") + + def onTrimLoopCurrent20mA(self): + """微调环路电流20mA command 46""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + measured_current = self.trim20mASpinBox.value() + reply = QMessageBox.question(self, "确认", f"确定要使用测量值 {measured_current} mA 执行20mA微调吗?此操作将修改设备校准参数。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.trimLoopCurrent20mA(measured_current) + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + QMessageBox.information(self, "成功", f"20mA环路电流微调成功!\n测量值: {measured_current} mA\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"20mA环路电流微调失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "20mA环路电流微调失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"20mA环路电流微调时发生异常: {str(e)}") + + def onSetFixedCurrentOutput(self): + """设置固定电流输出/退出固定模式 command 40""" + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + enable = self.enableFixedCurrentCheckBox.isChecked() + current_value = self.fixedCurrentSpinBox.value() if enable else 0.0 + + if enable: + message = f"确定要启用固定电流输出模式,输出电流值为 {current_value} mA 吗?\n\n警告:启用此模式后,设备将不再响应测量值变化!" + else: + message = "确定要退出固定电流输出模式吗?" + + reply = QMessageBox.question(self, "确认", message, QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + if reply != QMessageBox.Yes: + return + + try: + raw_response = self.hartComm.setFixedCurrentOutput(enable, current_value) + response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response + + if isinstance(response_dict, dict) and response_dict.get("status") == "success": + details = self.formatResponseDetails(response_dict) + if enable: + QMessageBox.information(self, "成功", f"已启用固定电流输出模式!\n电流值: {current_value} mA\n\n{details}") + else: + QMessageBox.information(self, "成功", f"已退出固定电流输出模式!\n\n{details}") + elif isinstance(response_dict, dict): + error_msg = response_dict.get("error", "未知错误") + QMessageBox.critical(self, "错误", f"设置固定电流输出失败: {error_msg}") + else: + QMessageBox.critical(self, "错误", "设置固定电流输出失败: 无效的响应格式") + except Exception as e: + QMessageBox.critical(self, "异常", f"设置固定电流输出时发生异常: {str(e)}") + + def formatResponseDetails(self, response): + """格式化响应详细信息""" + details = "" + + # 添加消息对象的详细信息 + msg = response.get("msg", {}) + if msg: + details += "消息详情:\n" + for key, value in msg.items(): + # 处理二进制数据,转换为可读形式 + if isinstance(value, bytes): + try: + # 尝试解码为ASCII字符串,移除不可打印字符 + printable_chars = "".join(chr(c) for c in value if 32 <= c <= 126) + if printable_chars: + value_str = f"{printable_chars} (HEX: {value.hex()})" + else: + value_str = f"HEX: {value.hex()}" + except: + value_str = f"HEX: {value.hex()}" + else: + value_str = str(value) + + details += f" {key}: {value_str}\n" + + # 添加原始数据的详细信息 + raw_data = response.get("raw_data") + if raw_data: + details += "\n原始数据:\n" + details += f" HEX: {raw_data.hex()}" + + return details \ No newline at end of file diff --git a/UI/HartWidgets/HartConnectionWidget.py b/UI/HartWidgets/HartConnectionWidget.py new file mode 100644 index 0000000..034296f --- /dev/null +++ b/UI/HartWidgets/HartConnectionWidget.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART设备连接界面模块 +实现设备连接、断开等功能 +""" + +import serial +import serial.tools.list_ports +from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QComboBox, QGroupBox, QFormLayout, + QMessageBox, QSpinBox, QRadioButton) +from PyQt5.QtCore import Qt, pyqtSignal, QThread, QSize +from PyQt5.QtGui import QFont +import qtawesome + +from protocol.HART import HARTCommunication, PrimaryMasterMode, SecondaryMasterMode, BurstModeON, BurstModeOFF + +class ScanDeviceThread(QThread): + """在后台扫描设备的线程""" + scanCompleted = pyqtSignal(object) # 扫描完成信号,参数为找到的设备信息或None + + def __init__(self, hartComm): + super().__init__() + self.hartComm = hartComm + + def run(self): + result = self.hartComm.scanForDevice() + self.scanCompleted.emit(result) + +class HartConnectionWidget(QWidget): + """ + HART设备连接界面模块 + 实现设备连接、断开等功能 + """ + + # 自定义信号 + connectionStatusChanged = pyqtSignal(bool) # 连接状态改变信号 + hartCommCreated = pyqtSignal(object) # HART通信对象创建信号 + + def __init__(self): + """ + 初始化连接界面 + """ + super().__init__() + + # 初始化变量 + self.hartComm = None + self.isConnected = False + + # 初始化界面 + self.initUI() + + # 检查COM7是否可用 + self.refreshPortList() + + def initUI(self): + """ + 初始化用户界面 + """ + # 创建主布局 + mainLayout = QVBoxLayout() + mainLayout.setContentsMargins(20, 20, 20, 20) + mainLayout.setSpacing(20) + + # 创建连接设置组 + connectionGroup = QGroupBox("串口设置") + connectionGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold)) + connectionLayout = QFormLayout() + try: + connectionLayout.setLabelAlignment(Qt.AlignRight) + except AttributeError: + pass # Linter may fail on this + self.portLabel = QLabel("COM7") + connectionLayout.addRow("串口:", self.portLabel) + self.baudrateLabel = QLabel("1200") + connectionLayout.addRow("波特率:", self.baudrateLabel) + self.parityLabel = QLabel("奇校验") + connectionLayout.addRow("校验方式:", self.parityLabel) + self.timeoutLabel = QLabel("3 秒") + connectionLayout.addRow("超时时间:", self.timeoutLabel) + connectionGroup.setLayout(connectionLayout) + mainLayout.addWidget(connectionGroup) + + # 创建地址与模式设置组 + configGroup = QGroupBox("寻址与模式") + configGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold)) + configLayout = QFormLayout() + try: + configLayout.setLabelAlignment(Qt.AlignRight) + except AttributeError: + pass # Linter may fail on this + + # 寻址方式 + addressingLayout = QHBoxLayout() + self.rbSpecificAddress = QRadioButton("指定短地址") + self.rbSpecificAddress.setChecked(True) + self.rbScanAddress = QRadioButton("扫描设备(轮询)") + addressingLayout.addWidget(self.rbSpecificAddress) + addressingLayout.addWidget(self.rbScanAddress) + configLayout.addRow("寻址方式:", addressingLayout) + + # 短地址输入 + self.pollingAddressSpinBox = QSpinBox() + self.pollingAddressSpinBox.setRange(0, 63) + configLayout.addRow("轮询地址:", self.pollingAddressSpinBox) + + # 主站模式 + self.masterModeCombo = QComboBox() + self.masterModeCombo.addItems(["主要主站", "次要主站"]) + configLayout.addRow("主站模式:", self.masterModeCombo) + + # 突发模式 + self.burstModeCombo = QComboBox() + self.burstModeCombo.addItems(["关闭", "开启"]) + configLayout.addRow("突发模式:", self.burstModeCombo) + + configGroup.setLayout(configLayout) + mainLayout.addWidget(configGroup) + + # 创建按钮布局 + buttonLayout = QHBoxLayout() + buttonLayout.setSpacing(10) + + # 连接/扫描按钮 + self.connectButton = QPushButton("连接设备") + self.connectButton.setObjectName("startProtocolBtn") + self.connectButton.setIcon(qtawesome.icon('fa5s.play-circle', color='#047857')) + self.connectButton.setIconSize(QSize(22, 22)) + self.connectButton.clicked.connect(self.connectDevice) + + self.scanButton = QPushButton("扫描设备") + self.scanButton.setObjectName("forceBtn") # Yellow style for scanning + self.scanButton.setIcon(qtawesome.icon('fa5s.search', color='#D97706')) + self.scanButton.setIconSize(QSize(22, 22)) + self.scanButton.clicked.connect(self.scanDevice) + self.scanButton.setVisible(False) # 默认隐藏 + buttonLayout.addWidget(self.connectButton) + buttonLayout.addWidget(self.scanButton) + + # 信号连接 + self.rbSpecificAddress.toggled.connect(self.updateAddressingMode) + + # 断开按钮 + self.disconnectButton = QPushButton("断开连接") + self.disconnectButton.setObjectName("delBtn") # Red style + self.disconnectButton.setIcon(qtawesome.icon('fa5s.stop-circle', color='#B91C1C')) + self.disconnectButton.setIconSize(QSize(22, 22)) + self.disconnectButton.clicked.connect(self.disconnectDevice) + self.disconnectButton.setEnabled(False) + buttonLayout.addWidget(self.disconnectButton) + + mainLayout.addLayout(buttonLayout) + + # 添加状态标签 + self.statusLabel = QLabel("未连接") + try: + self.statusLabel.setAlignment(Qt.AlignCenter) + except AttributeError: + pass # Linter may fail on this + self.statusLabel.setFont(QFont("Microsoft YaHei", 10)) + self.statusLabel.setStyleSheet("color: red;") + mainLayout.addWidget(self.statusLabel) + + # 添加弹性空间 + mainLayout.addStretch(1) + + # 设置主布局 + self.setLayout(mainLayout) + + def refreshPortList(self): + """ + 刷新串口列表(仅检查COM7是否可用) + """ + ports = [port.device for port in serial.tools.list_ports.comports()] + + # 检查COM7是否可用 + if "COM7" in ports: + self.connectButton.setEnabled(True) + self.statusLabel.setText("未连接") + else: + self.connectButton.setEnabled(False) + self.statusLabel.setText("未检测到COM7串口设备") + + def updateAddressingMode(self, checked): + is_specific = self.rbSpecificAddress.isChecked() + self.pollingAddressSpinBox.setEnabled(is_specific) + self.connectButton.setVisible(is_specific) + self.scanButton.setVisible(not is_specific) + + def scanDevice(self): + """开始扫描设备""" + if self.isConnected: + QMessageBox.information(self, "提示", "设备已连接。如需重新扫描,请先断开连接。") + return + + if not self._ensure_serial_connection(): + return + + self.scanButton.setEnabled(False) + self.scanButton.setText("扫描中...") + self.statusLabel.setText("正在扫描设备 (0-63)...") + self.statusLabel.setStyleSheet("color: orange;") + + self.scanThread = ScanDeviceThread(self.hartComm) + self.scanThread.scanCompleted.connect(self.onScanCompleted) + self.scanThread.start() + + def onScanCompleted(self, result): + """扫描完成后的处理""" + self.scanButton.setEnabled(True) + self.scanButton.setText("扫描设备") + + if result and 'address' in result: + found_addr = result['address'] + QMessageBox.information(self, "扫描成功", f"在轮询地址 {found_addr} 找到设备!\n已自动配置为该地址。") + self.pollingAddressSpinBox.setValue(found_addr) + self.rbSpecificAddress.setChecked(True) # 切换回指定地址模式 + + # 自动连接 + self.connectDevice() + else: + QMessageBox.warning(self, "扫描失败", "未在任何轮询地址 (0-63) 找到设备。") + self.statusLabel.setText("扫描完成,未找到设备") + self.statusLabel.setStyleSheet("color: orange;") + + def _ensure_serial_connection(self): + """确保串口已连接并创建HART对象""" + if self.hartComm and self.hartComm.isConnected(): + return True + + if self.hartComm: + self.hartComm.disconnect() + + port = "COM7" + baudrate = 1200 + timeout = 3 + parity = serial.PARITY_ODD + + try: + self.hartComm = HARTCommunication(port=port, baudrate=baudrate, parity=parity, timeout=timeout) + if self.hartComm.connect(): + self.hartCommCreated.emit(self.hartComm) + return True + else: + self.hartComm = None + QMessageBox.critical(self, "连接失败", "无法打开COM7串口,请检查设备连接。") + return False + except Exception as e: + self.hartComm = None + QMessageBox.critical(self, "连接错误", f"连接过程中发生错误:{str(e)}") + return False + + def connectDevice(self): + """使用指定配置连接设备并验证""" + if self.isConnected: + QMessageBox.information(self, "提示", "设备已连接。如需更换地址或模式,请先断开连接。") + return + + if not self._ensure_serial_connection() or not self.hartComm: + return + + try: + # 应用界面上的配置 + is_primary = self.masterModeCombo.currentIndex() == 0 + self.hartComm.setMasterMode(isPrimaryMaster=is_primary) + is_burst_on = self.burstModeCombo.currentIndex() == 1 + self.hartComm.setBurstMode(enable=is_burst_on) + polling_address = self.pollingAddressSpinBox.value() + self.hartComm.buildHartAddress(pollingAddress=polling_address, isLongFrame=False) + + self.statusLabel.setText(f"正在验证地址 {polling_address}...") + self.statusLabel.setStyleSheet("color: orange;") + app_instance = QApplication.instance() + if app_instance: + app_instance.processEvents() + + # 验证设备通信 + device_info = self.hartComm.readUniqueId() + + if isinstance(device_info, list) and device_info: + self.isConnected = True + self.statusLabel.setText(f"已连接 (地址: {polling_address})") + self.statusLabel.setStyleSheet("color: green;") + self.disconnectButton.setEnabled(True) + self.connectButton.setEnabled(False) + self.scanButton.setVisible(False) + self._set_config_controls_enabled(False) + self.connectionStatusChanged.emit(True) + QMessageBox.information(self, "连接成功", f"成功连接到地址为 {polling_address} 的设备。") + else: + # self.isConnected remains False + self.statusLabel.setText("连接失败") + self.statusLabel.setStyleSheet("color: red;") + self.connectionStatusChanged.emit(False) + QMessageBox.critical(self, "连接失败", f"在地址 {polling_address} 未找到设备或设备无响应。") + # 保持串口打开,允许用户尝试其他地址, 但需要一个断开按钮 + self.disconnectButton.setEnabled(True) + + except Exception as e: + self.isConnected = False + self.statusLabel.setText("连接错误") + self.statusLabel.setStyleSheet("color: red;") + self.connectionStatusChanged.emit(False) + QMessageBox.critical(self, "配置错误", f"应用配置时发生错误:{str(e)}") + + def disconnectDevice(self, silent=False): + """ + 断开设备连接 + """ + try: + if self.hartComm: + self.hartComm.disconnect() + + self.isConnected = False + self.statusLabel.setText("未连接") + self.statusLabel.setStyleSheet("color: red;") + + self.disconnectButton.setEnabled(False) + self.connectButton.setEnabled(True) + self._set_config_controls_enabled(True) + self.updateAddressingMode(self.rbSpecificAddress.isChecked()) + + self.connectionStatusChanged.emit(False) + + if not silent: + QMessageBox.information(self, "断开连接", "已成功断开设备连接!") + except Exception as e: + if not silent: + QMessageBox.warning(self, "断开连接错误", f"断开连接过程中发生错误:{str(e)}") + # 恢复UI到断开状态 + self.isConnected = False + self.statusLabel.setText("未连接") + self.statusLabel.setStyleSheet("color: red;") + self.disconnectButton.setEnabled(False) + self.connectButton.setEnabled(True) + self._set_config_controls_enabled(True) + self.connectionStatusChanged.emit(False) + + def _set_config_controls_enabled(self, enabled): + """启用或禁用配置相关的控件""" + self.pollingAddressSpinBox.setEnabled(enabled) + self.masterModeCombo.setEnabled(enabled) + self.burstModeCombo.setEnabled(enabled) + self.rbSpecificAddress.setEnabled(enabled) + self.rbScanAddress.setEnabled(enabled) + + def updatePollingAddress(self, new_address): + """ + 从外部更新轮询地址的UI显示 + """ + self.pollingAddressSpinBox.setValue(new_address) + if self.isConnected: + self.statusLabel.setText(f"已连接 (地址: {new_address})") \ No newline at end of file diff --git a/UI/HartWidgets/HartDeviceConfigWidget.py b/UI/HartWidgets/HartDeviceConfigWidget.py new file mode 100644 index 0000000..2c9526c --- /dev/null +++ b/UI/HartWidgets/HartDeviceConfigWidget.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART设备配置界面模块 +实现HART设备参数配置功能,包括查询和修改仪表基本信息、标签描述及出厂日期、装配号等 +""" + +import sys +import time +from datetime import datetime +from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox, + QLabel, QLineEdit, QPushButton, QGridLayout, + QTabWidget, QMessageBox, QDateEdit, QSpinBox) +from PyQt5.QtCore import Qt, pyqtSignal, QDate +from PyQt5.QtGui import QFont +from protocol.HART import tools + +class HartDeviceConfigWidget(QWidget): + """ + HART设备配置界面类 + 实现HART设备参数配置功能,包括查询和修改仪表基本信息、标签描述及出厂日期、装配号等 + """ + + def __init__(self): + """ + 初始化设备配置界面 + """ + super().__init__() + + # 初始化HART通信对象 + self.hartComm = None + self.connectionWidget = None + + # 初始化界面 + self.initUI() + + def initUI(self): + """ + 初始化用户界面 + """ + # 创建主布局 + self.mainLayout = QVBoxLayout(self) + self.mainLayout.setContentsMargins(10, 10, 10, 10) + + # --- 1. 仪表基本信息区域 --- + infoGroup = QGroupBox("仪表基本信息") + infoLayout = QGridLayout(infoGroup) + + self.messageLabel = QLabel("设备消息:") + self.messageEdit = QLineEdit() + self.messageEdit.setMaxLength(32) + infoLayout.addWidget(self.messageLabel, 0, 0) + infoLayout.addWidget(self.messageEdit, 0, 1) + + self.queryBasicInfoBtn = QPushButton("查询信息") + self.updateBasicInfoBtn = QPushButton("更新信息") + infoButtonLayout = QHBoxLayout() + infoButtonLayout.addWidget(self.queryBasicInfoBtn) + infoButtonLayout.addWidget(self.updateBasicInfoBtn) + + self.mainLayout.addWidget(infoGroup) + self.mainLayout.addLayout(infoButtonLayout) + + # --- 2. 仪表标签与描述区域 --- + tagGroup = QGroupBox("仪表标签与描述") + tagLayout = QGridLayout(tagGroup) + + self.tagLabel = QLabel("设备标签:") + self.tagEdit = QLineEdit() + self.tagEdit.setMaxLength(8) + tagLayout.addWidget(self.tagLabel, 0, 0) + tagLayout.addWidget(self.tagEdit, 0, 1) + + self.descLabel = QLabel("设备描述:") + self.descEdit = QLineEdit() + self.descEdit.setMaxLength(16) + tagLayout.addWidget(self.descLabel, 1, 0) + tagLayout.addWidget(self.descEdit, 1, 1) + + self.dateLabel = QLabel("出厂日期:") + self.dateEdit = QDateEdit() + self.dateEdit.setDisplayFormat("dd/MM/yyyy") + self.dateEdit.setCalendarPopup(True) + self.dateEdit.setDate(QDate.currentDate()) + tagLayout.addWidget(self.dateLabel, 2, 0) + tagLayout.addWidget(self.dateEdit, 2, 1) + + self.queryTagDescBtn = QPushButton("查询信息") + self.updateTagDescBtn = QPushButton("更新信息") + tagButtonLayout = QHBoxLayout() + tagButtonLayout.addWidget(self.queryTagDescBtn) + tagButtonLayout.addWidget(self.updateTagDescBtn) + + self.mainLayout.addWidget(tagGroup) + self.mainLayout.addLayout(tagButtonLayout) + + # --- 3. 仪表装配号区域 --- + assemblyGroup = QGroupBox("仪表装配号") + assemblyLayout = QGridLayout(assemblyGroup) + + self.assemblyLabel = QLabel("装配号:") + self.assemblyEdit = QSpinBox() + self.assemblyEdit.setRange(0, 16777215) + self.assemblyEdit.setButtonSymbols(QSpinBox.NoButtons) + assemblyLayout.addWidget(self.assemblyLabel, 0, 0) + assemblyLayout.addWidget(self.assemblyEdit, 0, 1) + + self.queryAssemblyBtn = QPushButton("查询装配号") + self.updateAssemblyBtn = QPushButton("更新装配号") + assemblyButtonLayout = QHBoxLayout() + assemblyButtonLayout.addWidget(self.queryAssemblyBtn) + assemblyButtonLayout.addWidget(self.updateAssemblyBtn) + + self.mainLayout.addWidget(assemblyGroup) + self.mainLayout.addLayout(assemblyButtonLayout) + + # --- 4. 轮询地址修改区域 --- + pollingGroup = QGroupBox("轮询地址修改 (仅短帧)") + pollingLayout = QGridLayout(pollingGroup) + + self.pollingAddressLabel = QLabel("新轮询地址:") + self.pollingAddressSpinBox = QSpinBox() + self.pollingAddressSpinBox.setRange(0, 63) + pollingLayout.addWidget(self.pollingAddressLabel, 0, 0) + pollingLayout.addWidget(self.pollingAddressSpinBox, 0, 1) + + self.updatePollingAddressBtn = QPushButton("更新轮询地址") + pollingButtonLayout = QHBoxLayout() + pollingButtonLayout.addWidget(self.updatePollingAddressBtn) + + self.mainLayout.addWidget(pollingGroup) + self.mainLayout.addLayout(pollingButtonLayout) + + self.mainLayout.addStretch(1) + + # --- 连接所有信号和槽 --- + self.queryBasicInfoBtn.clicked.connect(self.queryBasicInfo) + self.updateBasicInfoBtn.clicked.connect(self.updateBasicInfo) + self.queryTagDescBtn.clicked.connect(self.queryTagDesc) + self.updateTagDescBtn.clicked.connect(self.updateTagDesc) + self.queryAssemblyBtn.clicked.connect(self.queryAssembly) + self.updateAssemblyBtn.clicked.connect(self.updateAssembly) + self.updatePollingAddressBtn.clicked.connect(self.updatePollingAddress) + + def setHartComm(self, hartComm): + """ + 设置HART通信对象 + + Args: + hartComm: HART通信对象 + """ + self.hartComm = hartComm + + def setConnectionWidget(self, widget): + """设置连接窗口的引用""" + self.connectionWidget = widget + + def queryBasicInfo(self): + """ + 查询仪表基本信息 - command 12 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 发送查询命令 + result = self.hartComm.readMessage() + # print(result) + # 处理响应数据 + if result: + # 兼容处理:如果result是列表,则取第一个元素 + msg = result[0] if isinstance(result, list) and len(result) > 0 else result + + if isinstance(msg, dict): + message_data = msg.get('message') + if isinstance(message_data, bytes): + try: + # 使用新的 unpack_packed_ascii 函数 + message_str = tools.unpack_packed_ascii(message_data, 32) + self.messageEdit.setText(message_str.strip()) + except Exception as e: + # 如果解码失败,显示原始数据的十六进制表示和错误 + message_hex = message_data.hex() + self.messageEdit.setText(f"HEX: {message_hex}") + QMessageBox.critical(self, "解码错误", f"解码消息失败: {e}") + return + + QMessageBox.warning(self, "警告", "未能获取设备基本信息") + + except Exception as e: + QMessageBox.critical(self, "错误", f"查询设备基本信息失败: {str(e)}") + + def updateBasicInfo(self): + """ + 修改仪表基本信息 - command 17 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 获取消息内容 + message_text = self.messageEdit.text() + + # 直接将字符串传递给通信函数,由底层负责打包 + result = self.hartComm.writeMessage(message_text) + + # 处理响应数据 + if result and len(result) > 0: + QMessageBox.information(self, "成功", "成功更新设备基本信息") + else: + QMessageBox.warning(self, "警告", "未能更新设备基本信息") + + except Exception as e: + QMessageBox.critical(self, "错误", f"更新设备基本信息失败: {str(e)}") + + def queryTagDesc(self): + """ + 查询仪表标签、描述及出厂日期 - command 13 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 发送查询命令 + result = self.hartComm.readTagDescriptorDate() + + # 处理响应数据 + if result: + # 兼容处理:如果result是列表,则取第一个元素 + msg = result[0] if isinstance(result, list) and len(result) > 0 else result + + if isinstance(msg, dict) and msg.get("status") == "success": + tag_str = msg.get("device_tag_name", "") + desc_str = msg.get("device_descriptor", "") + date_dict = msg.get("date", {}) + + self.tagEdit.setText(tag_str.strip()) + self.descEdit.setText(desc_str.strip()) + + if isinstance(date_dict, dict) and all(k in date_dict for k in ['day', 'month', 'year']): + day = date_dict.get('day', 1) + month = date_dict.get('month', 1) + year = date_dict.get('year', 1900) + self.dateEdit.setDate(QDate(year, month, day)) + + QMessageBox.information(self, "成功", "成功读取设备标签、描述及日期信息") + return + elif isinstance(msg, dict) and 'error' in msg: + QMessageBox.warning(self, "查询失败", f"未能获取设备标签、描述及日期信息: {msg.get('error')}") + return + + QMessageBox.warning(self, "警告", "未能获取设备标签、描述及日期信息") + + except Exception as e: + QMessageBox.critical(self, "错误", f"查询设备标签、描述及日期信息失败: {str(e)}") + + def updateTagDesc(self): + """ + 修改仪表标签、描述及出厂日期 - command 18 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 获取标签和描述 + tag_text = self.tagEdit.text() + descriptor_text = self.descEdit.text() + + date = self.dateEdit.date() + # According to the spec, year should be (actual_year - 1900) + year_for_protocol = date.year() - 1900 + if not (0 <= year_for_protocol <= 255): + QMessageBox.warning(self, "年份错误", "年份必须在1900年至2155年之间") + return + date_tuple = (date.day(), date.month(), year_for_protocol) + + # 直接将字符串传递给通信函数,由底层负责打包 + result = self.hartComm.writeTagDescriptorDate(tag_text, descriptor_text, date_tuple) + + # 处理响应数据 + if result and len(result) > 0: + QMessageBox.information(self, "成功", "成功更新设备标签、描述及日期信息") + else: + QMessageBox.warning(self, "警告", "未能更新设备标签、描述及日期信息") + + except Exception as e: + QMessageBox.critical(self, "错误", f"更新设备标签、描述及日期信息失败: {str(e)}") + + def queryAssembly(self): + """ + 查询仪表装配号 - command 16 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 发送查询命令 + result = self.hartComm.readFinalAssemblyNumber() + + # 处理响应数据 + if result: + # 兼容处理:如果result是列表,则取第一个元素 + msg = result[0] if isinstance(result, list) and len(result) > 0 else result + + if isinstance(msg, dict) and msg.get("status") == "success": + assembly_no = msg.get("final_assembly_no", 0) + self.assemblyEdit.setValue(assembly_no) + QMessageBox.information(self, "成功", f"成功读取设备装配号: {assembly_no}") + return + elif isinstance(msg, dict) and 'error' in msg: + QMessageBox.warning(self, "查询失败", f"未能获取设备装配号: {msg.get('error')}") + return + + QMessageBox.warning(self, "警告", "未能获取设备装配号") + + except Exception as e: + QMessageBox.critical(self, "错误", f"查询设备装配号失败: {str(e)}") + + def updateAssembly(self): + """ + 修改仪表装配号 - command 19 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + # 获取装配号 + assembly_number = self.assemblyEdit.value() + + # 发送更新命令 + result = self.hartComm.writeFinalAssemblyNumber(assembly_number) + + # 处理响应数据 + if result and len(result) > 0: + QMessageBox.information(self, "成功", "成功更新设备装配号") + else: + QMessageBox.warning(self, "警告", "未能更新设备装配号") + + except Exception as e: + QMessageBox.critical(self, "错误", f"更新设备装配号失败: {str(e)}") + + def updatePollingAddress(self): + """ + 修改设备轮询地址 - command 6 + """ + if not self.hartComm or not self.hartComm.isConnected(): + QMessageBox.warning(self, "警告", "设备未连接,请先连接设备") + return + + try: + new_address = self.pollingAddressSpinBox.value() + current_address_bytes = self.hartComm.getCurrentAddress() + if current_address_bytes and len(current_address_bytes) == 1: + current_address = current_address_bytes[0] & 0x3F + if new_address == current_address: + QMessageBox.information(self, "提示", "新地址与当前地址相同,无需修改。") + return + + reply = QMessageBox.question(self, '确认修改', + f"确定要将设备的轮询地址修改为 {new_address} 吗?\n" + "修改成功后将使用新地址自动重新连接。", + QMessageBox.Yes | QMessageBox.No, QMessageBox.No) + + if reply == QMessageBox.Yes: + result = self.hartComm.write_polling_address(new_address) + if result: + QMessageBox.information(self, "成功", f"成功将轮询地址修改为 {new_address}。\n现在将使用新地址重新连接。") + if self.connectionWidget: + self.connectionWidget.disconnectDevice(silent=True) + self.connectionWidget.pollingAddressSpinBox.setValue(new_address) + self.connectionWidget.connectDevice() + else: + QMessageBox.warning(self, "警告", "未能更新轮询地址。") + + except ValueError as ve: + QMessageBox.critical(self, "地址错误", f"轮询地址设置错误: {str(ve)}") + except Exception as e: + QMessageBox.critical(self, "错误", f"更新轮询地址失败: {str(e)}") \ No newline at end of file diff --git a/UI/HartWidgets/HartDeviceInfoWidget.py b/UI/HartWidgets/HartDeviceInfoWidget.py new file mode 100644 index 0000000..dccbf78 --- /dev/null +++ b/UI/HartWidgets/HartDeviceInfoWidget.py @@ -0,0 +1,439 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART设备信息显示界面模块 +实现设备唯一标识等信息的显示 +""" + +from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QGroupBox, QFormLayout, QTextEdit, + QTableWidget, QTableWidgetItem, QHeaderView, QMessageBox, + QProgressBar, QSplitter) +from PyQt5.QtCore import Qt, QThread, pyqtSignal +from PyQt5.QtGui import QFont, QTextCursor + +class ReadDeviceInfoThread(QThread): + """ + 读取设备信息的线程类 + 避免在UI线程中进行耗时操作 + """ + # 自定义信号 + infoReceived = pyqtSignal(list) # 信息接收信号 + errorOccurred = pyqtSignal(str) # 错误信号 + progressUpdated = pyqtSignal(int, str) # 进度更新信号(百分比, 描述) + + def __init__(self, hartComm): + """ + 初始化线程 + + Args: + hartComm: HART通信对象 + """ + super().__init__() + self.hartComm = hartComm + self.isRunning = True + + def stop(self): + """ + 停止线程执行 + """ + self.isRunning = False + + def run(self): + """ + 线程运行函数 + 读取设备信息和变量数据 + """ + try: + if not self.hartComm: + raise Exception("HART通信对象未初始化") + + deviceInfo = [] + + # 更新进度 + self.progressUpdated.emit(10, "正在读取设备唯一标识...") + if not self.isRunning: return + + # 读取设备唯一标识 + try: + result = self.hartComm.readUniqueId() + if isinstance(result, list): + deviceInfo.extend(result) + elif not isinstance(result, dict) or 'error' not in result: + deviceInfo.append(result) + except Exception as e: + self.errorOccurred.emit(f"读取设备唯一标识失败: {e}") + return + + # 变量信息读取已移至 HartVariableInfoWidget + self.progressUpdated.emit(80, "正在读取设备唯一标识...") + + # 完成读取 + self.progressUpdated.emit(100, "读取完成") + + if self.isRunning: + self.infoReceived.emit(deviceInfo) + except Exception as e: + self.errorOccurred.emit(str(e)) + +class HartDeviceInfoWidget(QWidget): + """ + HART设备信息显示界面模块 + 实现设备唯一标识等信息的显示 + """ + + def __init__(self, parent=None): + """ + 初始化设备信息界面 + """ + super().__init__(parent) + + # 初始化变量 + self.hartComm = None + self.deviceInfo = None + self.readThread = None + + # 初始化界面 + self.initUI() + + def initUI(self): + """ + 初始化用户界面 + """ + # 创建主布局 + mainLayout = QVBoxLayout() + mainLayout.setContentsMargins(20, 20, 20, 20) + mainLayout.setSpacing(20) + + # 创建分割器 + splitter = QSplitter(Qt.Vertical) + + # 创建设备信息组 + deviceInfoGroup = QGroupBox("设备基本信息") + deviceInfoGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold)) + deviceInfoLayout = QFormLayout() + deviceInfoLayout.setLabelAlignment(Qt.AlignRight) + deviceInfoLayout.setFieldGrowthPolicy(QFormLayout.AllNonFixedFieldsGrow) + + # 设备信息字段 + self.manufacturerIdLabel = QLabel("--") + deviceInfoLayout.addRow("制造商ID:", self.manufacturerIdLabel) + + self.deviceTypeLabel = QLabel("--") + deviceInfoLayout.addRow("设备类型:", self.deviceTypeLabel) + + self.deviceIdLabel = QLabel("--") + deviceInfoLayout.addRow("设备ID:", self.deviceIdLabel) + + self.preambleCountLabel = QLabel("--") + deviceInfoLayout.addRow("前导符数量:", self.preambleCountLabel) + + self.universalRevLabel = QLabel("--") + deviceInfoLayout.addRow("通用命令版本:", self.universalRevLabel) + + self.specificRevLabel = QLabel("--") + deviceInfoLayout.addRow("特定命令版本:", self.specificRevLabel) + + self.softwareRevLabel = QLabel("--") + deviceInfoLayout.addRow("软件版本:", self.softwareRevLabel) + + self.hardwareRevLabel = QLabel("--") + deviceInfoLayout.addRow("硬件版本:", self.hardwareRevLabel) + + self.deviceStatusLabel = QLabel("--") + deviceInfoLayout.addRow("设备状态:", self.deviceStatusLabel) + + self.commandNameLabel = QLabel("--") + deviceInfoLayout.addRow("命令名称:", self.commandNameLabel) + + self.responseCodeLabel = QLabel("--") + deviceInfoLayout.addRow("响应代码:", self.responseCodeLabel) + + self.flagsLabel = QLabel("--") + deviceInfoLayout.addRow("标志:", self.flagsLabel) + + # 设置布局 + deviceInfoGroup.setLayout(deviceInfoLayout) + + # 变量信息组已移至 HartVariableInfoWidget + + # 添加到分割器 + splitter.addWidget(deviceInfoGroup) + + + mainLayout.addWidget(splitter) + + # 创建原始数据组 + rawDataGroup = QGroupBox("原始数据") + rawDataGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold)) + rawDataLayout = QVBoxLayout() + + # 原始数据文本框 + self.rawDataText = QTextEdit() + self.rawDataText.setReadOnly(True) + self.rawDataText.setFont(QFont("Consolas", 9)) + self.rawDataText.setLineWrapMode(QTextEdit.NoWrap) + self.rawDataText.setHorizontalScrollBarPolicy(Qt.ScrollBarAsNeeded) + self.rawDataText.setMinimumHeight(120) + rawDataLayout.addWidget(self.rawDataText) + + # 设置布局 + rawDataGroup.setLayout(rawDataLayout) + splitter.addWidget(rawDataGroup) + splitter.setSizes([300, 200]) + splitter.setStretchFactor(0, 1) + splitter.setStretchFactor(1, 1) + + # 创建进度条 + progressLayout = QHBoxLayout() + self.progressBar = QProgressBar() + self.progressBar.setRange(0, 100) + self.progressBar.setValue(0) + self.progressBar.setVisible(False) + self.progressLabel = QLabel("准备就绪") + progressLayout.addWidget(self.progressBar) + progressLayout.addWidget(self.progressLabel) + mainLayout.addLayout(progressLayout) + + # 创建按钮布局 + buttonLayout = QHBoxLayout() + buttonLayout.setSpacing(10) + + # 读取按钮 + self.readButton = QPushButton("读取设备信息") + self.readButton.clicked.connect(self.readDeviceInfo) + self.readButton.setEnabled(False) # 初始禁用 + buttonLayout.addWidget(self.readButton) + + # 取消按钮 + self.cancelButton = QPushButton("取消") + self.cancelButton.setEnabled(False) + self.cancelButton.clicked.connect(self.cancelOperation) + buttonLayout.addWidget(self.cancelButton) + + # 清除按钮 + self.clearButton = QPushButton("清除显示") + self.clearButton.clicked.connect(self.clearDisplay) + buttonLayout.addWidget(self.clearButton) + + mainLayout.addLayout(buttonLayout) + + # 设置主布局 + self.setLayout(mainLayout) + + def setHartComm(self, hartComm): + """ + 设置HART通信对象 + + Args: + hartComm: HART通信对象 + """ + self.hartComm = hartComm + self.readButton.setEnabled(hartComm is not None) + + def readDeviceInfo(self): + """ + 读取设备信息 + """ + if not self.hartComm: + QMessageBox.warning(self, "警告", "未连接到HART设备!") + return + + # 清除显示 + self.clearDisplay() + + # 禁用读取按钮,启用取消按钮 + self.readButton.setEnabled(False) + self.readButton.setText("正在读取...") + self.cancelButton.setEnabled(True) + + # 显示进度条 + self.progressBar.setValue(0) + self.progressBar.setVisible(True) + self.progressLabel.setText("准备读取设备信息...") + + # 创建并启动读取线程 + self.readThread = ReadDeviceInfoThread(self.hartComm) + self.readThread.infoReceived.connect(self.updateDeviceInfo) + self.readThread.errorOccurred.connect(self.handleError) + self.readThread.progressUpdated.connect(self.updateProgress) + self.readThread.finished.connect(self.onReadFinished) + self.readThread.start() + + def updateDeviceInfo(self, deviceInfo): + """ + 更新设备信息显示 + + Args: + deviceInfo: 设备信息列表 + """ + if not deviceInfo: + QMessageBox.warning(self, "警告", "未收到任何设备信息!") + return + + self.rawDataText.clear() + + for item in deviceInfo: + msg = item[0] if isinstance(item, list) and len(item) > 0 else item + + if not isinstance(msg, dict): + self.rawDataText.append(f"接收到非字典格式的消息: {type(msg)}\n") + print(f"接收到非字典格式的消息: {type(msg)}, 内容: {msg}") + continue + + try: + command_name = msg.get('command_name', '未知命令') + + self.setLabelIfValueExists(self.manufacturerIdLabel, msg, 'manufacturer_id') + self.setLabelIfValueExists(self.deviceTypeLabel, msg, 'manufacturer_device_type') + self.setLabelIfValueExists(self.deviceIdLabel, msg, 'device_id') + self.setLabelIfValueExists(self.preambleCountLabel, msg, 'number_response_preamble_characters') + self.setLabelIfValueExists(self.universalRevLabel, msg, 'universal_command_revision_level') + self.setLabelIfValueExists(self.specificRevLabel, msg, 'transmitter_specific_command_revision_level') + self.setLabelIfValueExists(self.softwareRevLabel, msg, 'software_revision_level') + self.setLabelIfValueExists(self.hardwareRevLabel, msg, 'hardware_revision_level') + if command_name != '未知命令': + self.commandNameLabel.setText(command_name) + self.setLabelIfValueExists(self.responseCodeLabel, msg, 'response_code') + + if 'device_status' in msg: + status_text = self.formatDeviceStatus(msg['device_status']) + self.flagsLabel.setText(status_text) + + self.rawDataText.append(f"--- 接收到的报文: {command_name} ---") + + for key, value in msg.items(): + if isinstance(value, bytes): + value_str = ' '.join(f'{b:02x}' for b in value) + self.rawDataText.append(f"{key}: {value_str} (hex)") + else: + self.rawDataText.append(f"{key}: {value}") + + self.rawDataText.append("\n") + self.rawDataText.moveCursor(QTextCursor.End) + except Exception as e: + error_text = f"处理报文数据时出错: {str(e)}\n" + self.rawDataText.append(error_text) + print(error_text) + continue + + def formatDeviceStatus(self, status_byte): + """ + 格式化设备状态字节 + + Args: + status_byte: 状态字节 + + Returns: + str: 格式化的状态描述 + """ + status_text = [] + + # 解析状态位 + if status_byte & 0x80: + status_text.append("设备故障") + if status_byte & 0x40: + status_text.append("配置已更改") + if status_byte & 0x20: + status_text.append("冷启动") + if status_byte & 0x10: + status_text.append("更多状态可用") + if status_byte & 0x08: + status_text.append("回路电流固定") + if status_byte & 0x04: + status_text.append("回路电流饱和") + if status_byte & 0x02: + status_text.append("非主变量超出范围") + if status_byte & 0x01: + status_text.append("主变量超出范围") + + if not status_text: + return "正常" + return ", ".join(status_text) + + + + def setLabelIfValueExists(self, label, data_dict, key, formatter=None): + """ + 仅当 data_dict 中存在键且值有效时才更新对应标签 + """ + value = data_dict.get(key) + if value is not None and value != "": + label.setText(formatter(value) if formatter else str(value)) + + def handleError(self, errorMsg): + """ + 处理错误 + + Args: + errorMsg: 错误信息 + """ + QMessageBox.critical(self, "错误", f"读取设备信息时发生错误:{errorMsg}") + + def updateProgress(self, value, message): + """ + 更新进度条 + + Args: + value: 进度值(0-100) + message: 进度消息 + """ + self.progressBar.setValue(value) + self.progressLabel.setText(message) + + def cancelOperation(self): + """ + 取消当前操作 + """ + if self.readThread and self.readThread.isRunning(): + # 停止线程 + self.readThread.stop() + self.readThread.wait() + + # 更新界面 + self.progressLabel.setText("操作已取消") + QMessageBox.information(self, "操作取消", "读取设备信息操作已取消") + + # 恢复按钮状态 + self.onReadFinished() + + def onReadFinished(self): + """ + 读取完成时的处理 + """ + # 恢复按钮状态 + self.readButton.setEnabled(self.hartComm is not None) + self.readButton.setText("读取设备信息") + self.cancelButton.setEnabled(False) + + # 隐藏进度条 + self.progressBar.setVisible(False) + + def clearDisplay(self): + """ + 清除显示内容 + """ + # 清除所有标签 + self.manufacturerIdLabel.setText("--") + self.deviceTypeLabel.setText("--") + self.deviceIdLabel.setText("--") + self.preambleCountLabel.setText("--") + self.universalRevLabel.setText("--") + self.specificRevLabel.setText("--") + self.softwareRevLabel.setText("--") + self.hardwareRevLabel.setText("--") + self.deviceStatusLabel.setText("--") + self.commandNameLabel.setText("--") + self.responseCodeLabel.setText("--") + self.flagsLabel.setText("--") + + # 变量信息已移至新窗口 + + # 清除原始数据 + self.rawDataText.clear() + + # 重置进度条 + self.progressBar.setValue(0) + self.progressBar.setVisible(False) + self.progressLabel.setText("准备就绪") \ No newline at end of file diff --git a/UI/HartWidgets/HartMainWindow.py b/UI/HartWidgets/HartMainWindow.py new file mode 100644 index 0000000..8f66cd0 --- /dev/null +++ b/UI/HartWidgets/HartMainWindow.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART通信主窗口类 +实现基于PyQt的HART通信界面主窗口,包含多个功能选项卡 +""" + +import sys +from PyQt5.QtWidgets import QApplication, QTabWidget, QVBoxLayout, QWidget +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QFont + +# 导入自定义模块 +from UI.HartWidgets.HartConnectionWidget import HartConnectionWidget +from UI.HartWidgets.HartDeviceInfoWidget import HartDeviceInfoWidget +from UI.HartWidgets.HartDeviceConfigWidget import HartDeviceConfigWidget +from UI.HartWidgets.HartSensorConfigWidget import HartSensorConfigWidget +from UI.HartWidgets.HartCalibrationWidget import HartCalibrationWidget +from UI.HartWidgets.HartVariableInfoWidget import HartVariableInfoWidget + +class HartMainWindow(QWidget): + """ + HART通信主窗口类 + 实现基于PyQt的HART通信界面主窗口,包含多个功能选项卡 + """ + + def __init__(self): + """ + 初始化主窗口 + """ + super().__init__() + + # 创建主布局 + self.mainLayout = QVBoxLayout(self) + self.mainLayout.setContentsMargins(10, 10, 10, 10) + + # 创建选项卡窗口部件 + self.tabWidget = QTabWidget() + self.tabWidget.setObjectName("hartTabWidget") + self.tabWidget.tabBar().setObjectName("hartTabBar") + self.tabWidget.setFont(QFont("Microsoft YaHei", 10)) + self.mainLayout.addWidget(self.tabWidget) + + # 初始化HART通信对象 + self.hartComm = None + + # 初始化界面 + self.initUI() + + def initUI(self): + """ + 初始化用户界面 + """ + # 创建连接设备选项卡 + self.connectionWidget = HartConnectionWidget() + self.tabWidget.addTab(self.connectionWidget, "设备连接") + + # 创建设备信息选项卡 + self.deviceInfoWidget = HartDeviceInfoWidget() + self.tabWidget.addTab(self.deviceInfoWidget, "设备信息") + + # 创建变量信息选项卡 + self.variableInfoWidget = HartVariableInfoWidget() + self.tabWidget.addTab(self.variableInfoWidget, "变量信息") + + # 创建传感器配置选项卡 + self.sensorConfigWidget = HartSensorConfigWidget() + self.tabWidget.addTab(self.sensorConfigWidget, "传感器配置") + + # 创建校准功能选项卡 + self.calibrationWidget = HartCalibrationWidget() + self.tabWidget.addTab(self.calibrationWidget, "校准功能") + + # 创建设备配置选项卡 + self.deviceConfigWidget = HartDeviceConfigWidget() + self.tabWidget.addTab(self.deviceConfigWidget, "设备配置") + + # 将 connectionWidget 实例传递给 deviceConfigWidget + self.deviceConfigWidget.setConnectionWidget(self.connectionWidget) + + # 连接信号和槽 + self.connectionWidget.connectionStatusChanged.connect(self.onConnectionStatusChanged) + self.connectionWidget.hartCommCreated.connect(self.onHartCommCreated) + + def onConnectionStatusChanged(self, connected): + """ + 连接状态改变时的处理函数 + + Args: + connected (bool): 连接状态 + """ + # 根据连接状态启用或禁用其他选项卡 + for i in range(1, self.tabWidget.count()): + self.tabWidget.setTabEnabled(i, connected) + + def onHartCommCreated(self, hartComm): + """ + HART通信对象创建时的处理函数 + + Args: + hartComm (HARTCommunication): HART通信对象 + """ + self.hartComm = hartComm + # 将HART通信对象传递给其他选项卡 + self.deviceInfoWidget.setHartComm(hartComm) + self.variableInfoWidget.setHartComm(hartComm) + self.sensorConfigWidget.setHartComm(hartComm) + self.calibrationWidget.setHartComm(hartComm) + self.deviceConfigWidget.setHartComm(hartComm) \ No newline at end of file diff --git a/UI/HartWidgets/HartSensorConfigWidget.py b/UI/HartWidgets/HartSensorConfigWidget.py new file mode 100644 index 0000000..e66069e --- /dev/null +++ b/UI/HartWidgets/HartSensorConfigWidget.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox, + QLabel, QPushButton, QFormLayout, QMessageBox, + QComboBox, QDoubleSpinBox, QCompleter) +from PyQt5.QtCore import Qt, QThread, pyqtSignal +from protocol.HART import common + +class ReadInfoThread(QThread): + """读取或写入信息的线程类""" + infoReceived = pyqtSignal(dict, str) + errorOccurred = pyqtSignal(str, str) + + def __init__(self, hartComm, command_name, params=None): + super().__init__() + self.hartComm = hartComm + self.command_name = command_name + self.params = params if params is not None else {} + self._is_running = True + + def stop(self): + self._is_running = False + + def run(self): + try: + if not self.hartComm: + raise Exception("HART通信对象未初始化") + + func = getattr(self.hartComm, self.command_name, None) + if not func: + raise Exception(f"未找到名为 {self.command_name} 的通信方法") + + response = func(**self.params) + + if self._is_running: + msg = response[0] if isinstance(response, list) and response else response + if msg and isinstance(msg, dict): + if msg.get("status") != "fail": + self.infoReceived.emit(msg, self.command_name) + else: + error_msg = msg.get("error", "未知错误") + self.errorOccurred.emit(f"命令 {self.command_name} 执行失败: {error_msg}", self.command_name) + else: + self.errorOccurred.emit(f"收到无效响应: {response}", self.command_name) + + except Exception as e: + self.errorOccurred.emit(str(e), self.command_name) + +class HartSensorConfigWidget(QWidget): + def __init__(self, parent=None): + super(HartSensorConfigWidget, self).__init__(parent) + self.hartComm = None + self.readThreads = {} + self.units_map = common.UNITS_CODE + self.reverse_units_map = common.REVERSE_UNITS_CODE + self.transfer_fn_map = common.TRANSFER_FUNCTION_CODE + self.reverse_transfer_fn_map = common.REVERSE_TRANSFER_FUNCTION_CODE + self.initUI() + + def initUI(self): + mainLayout = QVBoxLayout(self) + + displayLayout = QHBoxLayout() + + pvInfoGroup = QGroupBox("传感器信息 (Command 14)") + pvInfoForm = QFormLayout() + self.serialNoLabel = QLabel("--") + self.sensorUpperLimitLabel = QLabel("--") + self.sensorLowerLimitLabel = QLabel("--") + self.sensorUnitLabel = QLabel("--") + self.minSpanLabel = QLabel("--") + pvInfoForm.addRow("传感器序列号:", self.serialNoLabel) + pvInfoForm.addRow("传感器上限:", self.sensorUpperLimitLabel) + pvInfoForm.addRow("传感器下限:", self.sensorLowerLimitLabel) + pvInfoForm.addRow("传感器单位:", self.sensorUnitLabel) + pvInfoForm.addRow("最小量程:", self.minSpanLabel) + self.readPvInfoButton = QPushButton("读取传感器信息") + self.readPvInfoButton.clicked.connect(lambda: self.readInfo('readPrimaryVariableInformation')) + pvInfoForm.addRow(self.readPvInfoButton) + pvInfoGroup.setLayout(pvInfoForm) + + outputInfoGroup = QGroupBox("输出信息 (Command 15)") + outputInfoForm = QFormLayout() + self.alarmCodeLabel = QLabel("--") + self.transferFnLabel = QLabel("--") + self.pvRangeUnitLabel = QLabel("--") + self.pvUpperRangeLabel = QLabel("--") + self.pvLowerRangeLabel = QLabel("--") + self.dampingLabel = QLabel("--") + self.writeProtectLabel = QLabel("--") + self.privateLabelLabel = QLabel("--") + outputInfoForm.addRow("报警动作:", self.alarmCodeLabel) + outputInfoForm.addRow("输出特性:", self.transferFnLabel) + outputInfoForm.addRow("PV量程单位:", self.pvRangeUnitLabel) + outputInfoForm.addRow("PV上限值:", self.pvUpperRangeLabel) + outputInfoForm.addRow("PV下限值:", self.pvLowerRangeLabel) + outputInfoForm.addRow("阻尼值(s):", self.dampingLabel) + outputInfoForm.addRow("写保护:", self.writeProtectLabel) + outputInfoForm.addRow("私有标签:", self.privateLabelLabel) + self.readOutputInfoButton = QPushButton("读取输出信息") + self.readOutputInfoButton.clicked.connect(lambda: self.readInfo('readOutputInformation')) + outputInfoForm.addRow(self.readOutputInfoButton) + outputInfoGroup.setLayout(outputInfoForm) + + displayLayout.addWidget(pvInfoGroup) + displayLayout.addWidget(outputInfoGroup) + mainLayout.addLayout(displayLayout) + + configLayout = QHBoxLayout() + + writeGroup = QGroupBox("写入配置") + writeForm = QFormLayout() + + self.rangeUnitComboBox = QComboBox() + self.rangeUnitComboBox.setEditable(True) + self.rangeUnitComboBox.addItems(sorted(self.units_map.values())) + completer = QCompleter(self.rangeUnitComboBox.model()) + completer.setFilterMode(Qt.MatchContains) + completer.setCaseSensitivity(Qt.CaseInsensitive) + self.rangeUnitComboBox.setCompleter(completer) + + self.upperRangeSpinBox = QDoubleSpinBox() + self.upperRangeSpinBox.setRange(-999999.0, 999999.0) + self.upperRangeSpinBox.setDecimals(4) + + self.lowerRangeSpinBox = QDoubleSpinBox() + self.lowerRangeSpinBox.setRange(-999999.0, 999999.0) + self.lowerRangeSpinBox.setDecimals(4) + + self.dampingSpinBox = QDoubleSpinBox() + self.dampingSpinBox.setRange(0, 100) + self.dampingSpinBox.setDecimals(2) + + self.transferFnComboBox = QComboBox() + self.transferFnComboBox.addItems(self.transfer_fn_map.values()) + + self.setRangeButton = QPushButton("写入量程 (Cmd 35)") + self.setRangeButton.clicked.connect(self.onSetRange) + + self.setDampingButton = QPushButton("写入阻尼 (Cmd 34)") + self.setDampingButton.clicked.connect(self.onSetDamping) + + self.setTransferFnButton = QPushButton("写入输出函数 (Cmd 54)") + self.setTransferFnButton.clicked.connect(self.onSetTransferFunction) + + writeForm.addRow("量程单位:", self.rangeUnitComboBox) + writeForm.addRow("量程上限:", self.upperRangeSpinBox) + writeForm.addRow("量程下限:", self.lowerRangeSpinBox) + writeForm.addRow(self.setRangeButton) + writeForm.addRow("阻尼(s):", self.dampingSpinBox) + writeForm.addRow(self.setDampingButton) + writeForm.addRow("输出函数:", self.transferFnComboBox) + writeForm.addRow(self.setTransferFnButton) + writeGroup.setLayout(writeForm) + + configLayout.addWidget(writeGroup) + mainLayout.addLayout(configLayout) + + self.setHartComm(None) + + def setHartComm(self, hartComm): + self.hartComm = hartComm + is_connected = hartComm is not None + self.readPvInfoButton.setEnabled(is_connected) + self.readOutputInfoButton.setEnabled(is_connected) + self.setRangeButton.setEnabled(is_connected) + self.setDampingButton.setEnabled(is_connected) + self.setTransferFnButton.setEnabled(is_connected) + + def readInfo(self, command_name): + if not self.hartComm: + QMessageBox.warning(self, "警告", "未连接到HART设备!") + return + + if command_name in self.readThreads and self.readThreads[command_name].isRunning(): + return + + thread = ReadInfoThread(self.hartComm, command_name) + thread.infoReceived.connect(self.updateInfo) + thread.errorOccurred.connect(self.handleError) + thread.finished.connect(lambda: self.onReadFinished(command_name)) + + self.readThreads[command_name] = thread + button = self.getButtonForCommand(command_name) + if button: + button.setProperty("original_text", button.text()) + button.setText("读取中...") + button.setEnabled(False) + thread.start() + + def updateInfo(self, msg, command_name): + if command_name == 'readPrimaryVariableInformation': + serial_no = msg.get('serial_no') + self.serialNoLabel.setText(f"{serial_no.hex().upper()}" if serial_no else "--") + self.sensorUpperLimitLabel.setText(f"{msg.get('upper_limit', '--'):.4f}") + self.sensorLowerLimitLabel.setText(f"{msg.get('lower_limit', '--'):.4f}") + self.sensorUnitLabel.setText(common.get_unit_description(msg.get('sensor_limits_code'))) + self.minSpanLabel.setText(f"{msg.get('min_span', '--'):.4f}") + + elif command_name == 'readOutputInformation': + self.alarmCodeLabel.setText(self.getAlarmCodeDescription(msg.get('alarm_code'))) + + transfer_code = msg.get('transfer_fn_code') + transfer_text = self.getTransferFunctionDescription(transfer_code) + self.transferFnLabel.setText(transfer_text) + if transfer_text in self.reverse_transfer_fn_map: + self.transferFnComboBox.setCurrentText(transfer_text) + + unit_code = msg.get('primary_variable_range_code') + unit_text = common.get_unit_description(unit_code) + self.pvRangeUnitLabel.setText(unit_text) + + if unit_text in self.reverse_units_map: + self.rangeUnitComboBox.setCurrentText(unit_text) + + upper_val = msg.get('upper_range_value') + lower_val = msg.get('lower_range_value') + self.pvUpperRangeLabel.setText(f"{upper_val:.4f}" if upper_val is not None else "--") + self.pvLowerRangeLabel.setText(f"{lower_val:.4f}" if lower_val is not None else "--") + + if upper_val is not None: self.upperRangeSpinBox.setValue(upper_val) + if lower_val is not None: self.lowerRangeSpinBox.setValue(lower_val) + + damping_val = msg.get('damping_value') + self.dampingLabel.setText(f"{damping_val:.2f}" if damping_val is not None else "--") + if damping_val is not None: self.dampingSpinBox.setValue(damping_val) + + self.writeProtectLabel.setText(self.getWriteProtectDescription(msg.get('write_protect'))) + private_label = msg.get('private_label') + self.privateLabelLabel.setText(f"{private_label}" if private_label is not None else "--") + + def onSetRange(self): + upper = self.upperRangeSpinBox.value() + lower = self.lowerRangeSpinBox.value() + unit_text = self.rangeUnitComboBox.currentText() + unit_code = self.reverse_units_map.get(unit_text) + + if unit_code is None: + QMessageBox.warning(self, "输入错误", "无效的单位。请从列表中选择或正确输入。") + return + if upper <= lower: + QMessageBox.warning(self, "输入错误", "量程上限必须大于下限。") + return + + self.startWriteThread('writePrimaryVariableRange', {'unitsCode': unit_code, 'upperRange': upper, 'lowerRange': lower}) + + def onSetDamping(self): + damping = self.dampingSpinBox.value() + self.startWriteThread('writePrimaryVariableDamping', {'dampingTime': damping}) + + def onSetTransferFunction(self): + fn_text = self.transferFnComboBox.currentText() + fn_code = self.reverse_transfer_fn_map.get(fn_text) + if fn_code is None: + QMessageBox.warning(self, "输入错误", "无效的输出函数。") + return + self.startWriteThread('writePrimaryVariableOutputFunction', {'transferFunctionCode': fn_code}) + + def startWriteThread(self, command_name, params): + if not self.hartComm: + QMessageBox.warning(self, "警告", "未连接到HART设备!") + return + + if command_name in self.readThreads and self.readThreads[command_name].isRunning(): + return + + thread = ReadInfoThread(self.hartComm, command_name, params) + thread.infoReceived.connect(self.onWriteSuccess) + thread.errorOccurred.connect(self.handleError) + thread.finished.connect(lambda: self.onReadFinished(command_name)) + + self.readThreads[command_name] = thread + button = self.getButtonForCommand(command_name) + if button: + button.setProperty("original_text", button.text()) + button.setText("写入中...") + button.setEnabled(False) + thread.start() + + def onWriteSuccess(self, msg, command_name): + QMessageBox.information(self, "成功", f"命令 {command_name} 执行成功。") + self.readInfo('readOutputInformation') + + def getAlarmCodeDescription(self, code): + return {0: "高报", 1: "低报", 2: "保持最后输出"}.get(code, f"未知代码({code})") + + def getTransferFunctionDescription(self, code): + return self.transfer_fn_map.get(code, f"未知代码({code})") + + def getWriteProtectDescription(self, code): + return {0: "未保护", 1: "已保护"}.get(code, f"未知代码({code})") + + def handleError(self, errorMsg, command_name): + QMessageBox.critical(self, "错误", f"执行 {command_name} 时出错:\\n{errorMsg}") + + def onReadFinished(self, command_name): + button = self.getButtonForCommand(command_name) + if button: + original_text = button.property("original_text") + if original_text: + button.setText(original_text) + button.setEnabled(self.hartComm is not None) + + def getButtonForCommand(self, command_name): + if command_name == 'readPrimaryVariableInformation': + return self.readPvInfoButton + elif command_name == 'readOutputInformation': + return self.readOutputInfoButton + elif command_name == 'writePrimaryVariableRange': + return self.setRangeButton + elif command_name == 'writePrimaryVariableDamping': + return self.setDampingButton + elif command_name == 'writePrimaryVariableOutputFunction': + return self.setTransferFnButton + return None \ No newline at end of file diff --git a/UI/HartWidgets/HartVariableInfoWidget.py b/UI/HartWidgets/HartVariableInfoWidget.py new file mode 100644 index 0000000..5c06926 --- /dev/null +++ b/UI/HartWidgets/HartVariableInfoWidget.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART变量信息显示界面模块 +实现主变量、第二变量、回路电流等动态数据的读取与显示 +""" + +from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QGroupBox, QFormLayout, QTextEdit, + QMessageBox, QProgressBar, QSplitter) +from PyQt5.QtCore import Qt, QThread, pyqtSignal +from PyQt5.QtGui import QFont, QTextCursor +from protocol.HART import common + +class ReadVariableThread(QThread): + """读取单个变量信息的线程类""" + infoReceived = pyqtSignal(dict, str) # 信号发送(响应字典, 命令类型) + errorOccurred = pyqtSignal(str, str) + + def __init__(self, hartComm, command_name): + super().__init__() + self.hartComm = hartComm + self.command_name = command_name + self._is_running = True + + def stop(self): + self._is_running = False + + def run(self): + try: + if not self.hartComm: + raise Exception("HART通信对象未初始化") + + func = getattr(self.hartComm, self.command_name, None) + if not func: + raise Exception(f"未找到名为 {self.command_name} 的通信方法") + + response = func() + + if self._is_running: + msg = response[0] if isinstance(response, list) and response else response + # print(msg) + if msg and isinstance(msg, dict): + if msg.get("status") != "fail": + self.infoReceived.emit(msg, self.command_name) + else: + error_msg = msg.get("error", "未知错误") + self.errorOccurred.emit(f"命令 {self.command_name} 执行失败: {error_msg}", self.command_name) + else: + self.errorOccurred.emit(f"收到无效响应: {response}", self.command_name) + + except Exception as e: + self.errorOccurred.emit(str(e), self.command_name) + + +class HartVariableInfoWidget(QWidget): + """HART变量信息显示界面""" + + def __init__(self, parent=None): + super().__init__(parent) + self.hartComm = None + self.readThreads = {} + self.initUI() + + def initUI(self): + mainLayout = QVBoxLayout(self) + topLayout = QHBoxLayout() + + # --- Command 1 Group --- + cmd1Group = QGroupBox("主变量 (Command 1)") + cmd1Layout = QFormLayout() + self.pvLabel = QLabel("--") + self.pvUnitLabel = QLabel("--") + cmd1Layout.addRow("主变量值:", self.pvLabel) + cmd1Layout.addRow("单位:", self.pvUnitLabel) + self.readCmd1Button = QPushButton("读取") + self.readCmd1Button.clicked.connect(lambda: self.readVariableInfo('readPrimaryVariable')) + cmd1Layout.addRow(self.readCmd1Button) + cmd1Group.setLayout(cmd1Layout) + + # --- Command 2 Group --- + cmd2Group = QGroupBox("回路电流和百分比 (Command 2)") + cmd2Layout = QFormLayout() + self.loopCurrentLabel_cmd2 = QLabel("--") + self.percentRangeLabel = QLabel("--") + cmd2Layout.addRow("回路电流:", self.loopCurrentLabel_cmd2) + cmd2Layout.addRow("量程百分比:", self.percentRangeLabel) + self.readCmd2Button = QPushButton("读取") + self.readCmd2Button.clicked.connect(lambda: self.readVariableInfo('readLoopCurrentAndPercent')) + cmd2Layout.addRow(self.readCmd2Button) + cmd2Group.setLayout(cmd2Layout) + + # --- Command 3 Group --- + cmd3Group = QGroupBox("动态变量和回路电流 (Command 3)") + cmd3Layout = QFormLayout() + self.loopCurrentLabel_cmd3 = QLabel("--") + self.pv_cmd3_Label = QLabel("--") + self.pv_cmd3_UnitLabel = QLabel("--") + self.sv_cmd3_Label = QLabel("--") + self.sv_cmd3_UnitLabel = QLabel("--") + self.tv_cmd3_Label = QLabel("--") + self.tv_cmd3_UnitLabel = QLabel("--") + self.qv_cmd3_Label = QLabel("--") + self.qv_cmd3_UnitLabel = QLabel("--") + cmd3Layout.addRow("回路电流:", self.loopCurrentLabel_cmd3) + cmd3Layout.addRow("主变量:", self.pv_cmd3_Label) + cmd3Layout.addRow("主变量单位:", self.pv_cmd3_UnitLabel) + cmd3Layout.addRow("第二变量:", self.sv_cmd3_Label) + cmd3Layout.addRow("第二变量单位:", self.sv_cmd3_UnitLabel) + cmd3Layout.addRow("第三变量:", self.tv_cmd3_Label) + cmd3Layout.addRow("第三变量单位:", self.tv_cmd3_UnitLabel) + cmd3Layout.addRow("第四变量:", self.qv_cmd3_Label) + cmd3Layout.addRow("第四变量单位:", self.qv_cmd3_UnitLabel) + self.readCmd3Button = QPushButton("读取") + self.readCmd3Button.clicked.connect(lambda: self.readVariableInfo('readDynamicVariablesAndLoopCurrent')) + cmd3Layout.addRow(self.readCmd3Button) + cmd3Group.setLayout(cmd3Layout) + + topLayout.addWidget(cmd1Group) + topLayout.addWidget(cmd2Group) + topLayout.addWidget(cmd3Group) + mainLayout.addLayout(topLayout) + + # --- Raw Data Group --- + rawDataGroup = QGroupBox("原始数据") + rawDataLayout = QVBoxLayout() + self.rawDataText = QTextEdit() + self.rawDataText.setReadOnly(True) + self.rawDataText.setFont(QFont("Consolas", 9)) + self.rawDataText.setLineWrapMode(QTextEdit.NoWrap) + rawDataLayout.addWidget(self.rawDataText) + rawDataGroup.setLayout(rawDataLayout) + + mainLayout.addWidget(rawDataGroup) + + # --- Global Controls --- + controlLayout = QHBoxLayout() + self.clearButton = QPushButton("清除所有显示") + self.clearButton.clicked.connect(self.clearAllDisplays) + controlLayout.addStretch() + controlLayout.addWidget(self.clearButton) + mainLayout.addLayout(controlLayout) + + self.setHartComm(None) # Disable buttons initially + + def setHartComm(self, hartComm): + self.hartComm = hartComm + is_connected = hartComm is not None + self.readCmd1Button.setEnabled(is_connected) + self.readCmd2Button.setEnabled(is_connected) + self.readCmd3Button.setEnabled(is_connected) + + def readVariableInfo(self, command_name): + if not self.hartComm: + QMessageBox.warning(self, "警告", "未连接到HART设备!") + return + + if command_name in self.readThreads and self.readThreads[command_name].isRunning(): + self.readThreads[command_name].stop() + self.readThreads[command_name].wait() + + thread = ReadVariableThread(self.hartComm, command_name) + thread.infoReceived.connect(self.updateVariableInfo) + thread.errorOccurred.connect(self.handleError) + thread.finished.connect(lambda: self.onReadFinished(command_name)) + + self.readThreads[command_name] = thread + button = self.getButtonForCommand(command_name) + if button: + button.setText("读取中...") + button.setEnabled(False) + thread.start() + + def updateVariableInfo(self, msg, command_name): + self.appendRawData(msg) + + if command_name == 'readPrimaryVariable': + pv = msg.get('primary_variable') + unit_code = msg.get('primary_variable_units') + self.pvLabel.setText(f"{pv:.4f}" if pv is not None else "--") + self.pvUnitLabel.setText(common.get_unit_description(unit_code)) + + elif command_name == 'readLoopCurrentAndPercent': + current = msg.get('analog_signal') + percent = msg.get('primary_variable') + self.loopCurrentLabel_cmd2.setText(f"{current:.4f} mA" if current is not None else "--") + self.percentRangeLabel.setText(f"{percent:.2f} %" if percent is not None else "--") + + elif command_name == 'readDynamicVariablesAndLoopCurrent': + current = msg.get('analog_signal') + self.loopCurrentLabel_cmd3.setText(f"{current:.4f} mA" if current is not None else "--") + + pv = msg.get('primary_variable') + pv_unit = msg.get('primary_variable_units') + self.pv_cmd3_Label.setText(f"{pv:.4f}" if pv is not None else "--") + self.pv_cmd3_UnitLabel.setText(common.get_unit_description(pv_unit)) + + sv = msg.get('secondary_variable') + sv_unit = msg.get('secondary_variable_units') + self.sv_cmd3_Label.setText(f"{sv:.4f}" if sv is not None else "--") + self.sv_cmd3_UnitLabel.setText(common.get_unit_description(sv_unit)) + + tv = msg.get('tertiary_variable') + tv_unit = msg.get('tertiary_variable_units') + self.tv_cmd3_Label.setText(f"{tv:.4f}" if tv is not None else "--") + self.tv_cmd3_UnitLabel.setText(common.get_unit_description(tv_unit)) + + qv = msg.get('quaternary_variable') + qv_unit = msg.get('quaternary_variable_units') + self.qv_cmd3_Label.setText(f"{qv:.4f}" if qv is not None else "--") + self.qv_cmd3_UnitLabel.setText(common.get_unit_description(qv_unit)) + + def appendRawData(self, msg): + # self.rawDataText.append(\"--- 接收到的报文: {msg.get('command_name', 'N/A')} ---\") + for key, value in msg.items(): + if isinstance(value, bytes): + value_str = ' '.join(f'{b:02x}' for b in value) + self.rawDataText.append(f"{key}: {value_str} (hex)") + else: + self.rawDataText.append(f"{key}: {value}") + self.rawDataText.append("\\n") + self.rawDataText.moveCursor(QTextCursor.End) + + def handleError(self, errorMsg, command_name): + QMessageBox.critical(self, "错误", f"执行 {command_name} 时出错:\\n{errorMsg}") + + def onReadFinished(self, command_name): + button = self.getButtonForCommand(command_name) + if button: + button.setText("读取") + button.setEnabled(self.hartComm is not None) + + def getButtonForCommand(self, command_name): + if command_name == 'readPrimaryVariable': + return self.readCmd1Button + elif command_name == 'readLoopCurrentAndPercent': + return self.readCmd2Button + elif command_name == 'readDynamicVariablesAndLoopCurrent': + return self.readCmd3Button + return None + + def clearAllDisplays(self): + # Cmd 1 + self.pvLabel.setText("--") + self.pvUnitLabel.setText("--") + # Cmd 2 + self.loopCurrentLabel_cmd2.setText("--") + self.percentRangeLabel.setText("--") + # Cmd 3 + self.loopCurrentLabel_cmd3.setText("--") + self.pv_cmd3_Label.setText("--") + self.pv_cmd3_UnitLabel.setText("--") + self.sv_cmd3_Label.setText("--") + self.sv_cmd3_UnitLabel.setText("--") + self.tv_cmd3_Label.setText("--") + self.tv_cmd3_UnitLabel.setText("--") + self.qv_cmd3_Label.setText("--") + self.qv_cmd3_UnitLabel.setText("--") + + self.rawDataText.clear() \ No newline at end of file diff --git a/UI/HartWidgets/__init__.py b/UI/HartWidgets/__init__.py new file mode 100644 index 0000000..1044bdf --- /dev/null +++ b/UI/HartWidgets/__init__.py @@ -0,0 +1,22 @@ +""" +HART界面组件模块 +提供HART协议相关的PyQt界面组件 +""" + +from .HartMainWindow import HartMainWindow +from .HartConnectionWidget import HartConnectionWidget +from .HartDeviceInfoWidget import HartDeviceInfoWidget +from .HartDeviceConfigWidget import HartDeviceConfigWidget +from .HartSensorConfigWidget import HartSensorConfigWidget +from .HartCalibrationWidget import HartCalibrationWidget +from .HartVariableInfoWidget import HartVariableInfoWidget + +__all__ = [ + 'HartMainWindow', + 'HartConnectionWidget', + 'HartDeviceInfoWidget', + 'HartDeviceConfigWidget', + 'HartSensorConfigWidget', + 'HartCalibrationWidget', + 'HartVariableInfoWidget' +] \ No newline at end of file diff --git a/UI/HartWidgets/main.py b/UI/HartWidgets/main.py new file mode 100644 index 0000000..5d2aca3 --- /dev/null +++ b/UI/HartWidgets/main.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +HART通信工具主程序 +基于PyQt的现代化HART通信界面应用程序 +""" + +import sys +import os +from PyQt5.QtWidgets import QApplication +from PyQt5.QtCore import QFile, QTextStream +from UI.HartWidgets.HartMainWindow import HartMainWindow + +def loadStyleSheet(sheetName): + """ + 加载样式表 + + Args: + sheetName: 样式表文件名 + + Returns: + 样式表内容 + """ + file = QFile(sheetName) + file.open(QFile.ReadOnly | QFile.Text) + stream = QTextStream(file) + return stream.readAll() + +def main(): + """ + 主函数 + """ + # 创建应用程序 + app = QApplication(sys.argv) + + # 设置应用程序样式 + styleSheetPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "style.qss") + if os.path.exists(styleSheetPath): + app.setStyleSheet(loadStyleSheet(styleSheetPath)) + + # 创建主窗口 + mainWindow = HartMainWindow() + mainWindow.setWindowTitle("HART通信工具") + mainWindow.resize(800, 600) + mainWindow.show() + + # 运行应用程序 + sys.exit(app.exec_()) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/UI/Main/Main.py b/UI/Main/Main.py index 6f99d4b..e16aaac 100644 --- a/UI/Main/Main.py +++ b/UI/Main/Main.py @@ -17,7 +17,8 @@ from model.ProjectModel.VarManage import GlobalVarManager from UI.Main.MainLeft import MainLeft from UI.Main.MainTop import MainTop from ..ProjectManages.ProjectWidget import ProjectWidgets -from ..VarManages.VarWidget import VarWidgets, HartWidgets, TcRtdWidgets, AnalogWidgets, HartSimulateWidgets +from ..VarManages.VarWidget import VarWidgets, TcRtdWidgets, AnalogWidgets, HartSimulateWidgets +from UI.HartWidgets.HartMainWindow import HartMainWindow from UI.VarManages.VarTable import RpcVarTableView from ..UserManage.UserWidget import UserWidgets from ..TrendManage.TrendWidget import TrendWidgets @@ -160,7 +161,7 @@ class MainWindow(QMainWindow): self.profibusWidget = ProfibusWidgets() self.trendWidget = TrendWidgets() self.SettingWidget = SettingWidget() - self.hartWidget = HartWidgets() + self.hartWidget = HartMainWindow() self.tcrtdWidget = TcRtdWidgets() self.analogWidget = AnalogWidgets() self.hartsimulateWidget = HartSimulateWidgets() @@ -169,6 +170,7 @@ class MainWindow(QMainWindow): self.userWidget.setObjectName('userWidget') self.projectWidget.setObjectName('projectWidget') self.trendWidget.setObjectName('trendWidget') + self.hartWidget.setObjectName('hartWidget') # self.ModBusWidget.setObjectName('varWidget') self.analogWidget.setObjectName('analogWidget') self.hartsimulateWidget.setObjectName('hartsimulateWidget') @@ -498,7 +500,9 @@ class MainWindow(QMainWindow): if __name__ == '__main__': app = QApplication(sys.argv) app.setStyle(QtWidgets.QStyleFactory.create('Fusion')) - app.setStyleSheet(CommonHelper.readQss('Static/main.qss')) + main_style = CommonHelper.readQss('Static/main.qss') + hart_style = CommonHelper.readQss('Static/Hart.qss') + app.setStyleSheet(main_style + hart_style) # print(QtWidgets.QStyleFactory.keys()) ex = MainWindow() ex.show() diff --git a/UI/ProjectManages/ProjectModel.py b/UI/ProjectManages/ProjectModel.py index f1b5957..9a27264 100644 --- a/UI/ProjectManages/ProjectModel.py +++ b/UI/ProjectManages/ProjectModel.py @@ -265,7 +265,7 @@ class ProjectButtonDelegate(QItemDelegate): Globals.getValue('MainWindows').createWidgets() modelLists = ['ModbusTcpMasterTable', 'ModbusTcpSlaveTable', 'ModbusRtuMasterTable', \ - 'ModbusRtuSlaveTable', 'HartTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable'] + 'ModbusRtuSlaveTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable'] for l in modelLists: # print(l) Globals.getValue(l).model.initTable() diff --git a/UI/VarManages/Thread.py b/UI/VarManages/Thread.py index 2444cee..10445c4 100644 --- a/UI/VarManages/Thread.py +++ b/UI/VarManages/Thread.py @@ -2,7 +2,7 @@ import time from PyQt5.QtCore import QThread, pyqtSignal from protocol.TCP.RTDTC import RTDTCClient from protocol.TCP.Analog import AnalogClient -from protocol.Hart.HartSimulate import HartSimulate +from protocol.HART.HartSimulate import HartSimulate from utils import Globals diff --git a/UI/VarManages/VarWidget.py b/UI/VarManages/VarWidget.py index 481a18c..87829db 100644 --- a/UI/VarManages/VarWidget.py +++ b/UI/VarManages/VarWidget.py @@ -545,45 +545,7 @@ class VarWidgets(QtWidgets.QWidget): return False -class HartWidgets(VarWidgets): - def __init__(self, parent=None): - super(HartWidgets, self).__init__(parent) - - def setupUI(self): - self.setAttribute(Qt.WA_StyledBackground, True) - - self.startProtocolBtn = QPushButton(QIcon('./Static/startProtocol.png'), '开始通讯') - self.startProtocolBtn.setObjectName('startProtocolBtn') - self.startProtocolBtn.setIconSize(QSize(22, 22)) - self.startProtocolBtn.clicked.connect(self.startProtocol) - self.varView = HartTableView() - self.varView.setObjectName('varView') - self.proxy = QtCore.QSortFilterProxyModel(self) - self.proxy.setSourceModel(self.varView.model) - - self.varView.setModel(self.proxy) - self.varView.proxy = self.proxy - - self.timer = QTimer(self) - # 将定时器超时信号与槽函数showTime()连接 - self.timer.timeout.connect(self.proxy.invalidate) - self.timer.start(50) # 启动timer - - Globals.setValue('HartTable', self.varView) - self.gridLayout = QtWidgets.QGridLayout(self) - self.gridLayout.addWidget(self.startProtocolBtn, 0, 0, 1, 1) - self.gridLayout.addWidget(self.varView, 1, 0, 10, 26) - self.gridLayout.setSpacing(20) - self.gridLayout.setContentsMargins(20, 30, 20, 20) - - - # self.comboBox.currentIndexChanged.connect(self.on_comboBox_currentIndexChanged) - - self.horizontalHeader = self.varView.horizontalHeader() - self.horizontalHeader.sectionClicked.connect(self.on_view_horizontalHeader_sectionClicked) - def startProtocol(self): - pass class TcRtdWidgets(VarWidgets): diff --git a/UI/__init__.py b/UI/__init__.py index e69de29..f39462f 100644 --- a/UI/__init__.py +++ b/UI/__init__.py @@ -0,0 +1,3 @@ +""" +UI模块初始化文件 +""" \ No newline at end of file diff --git a/protocol/Hart/HARTCommunication.py b/protocol/Hart/HARTCommunication.py new file mode 100644 index 0000000..546899c --- /dev/null +++ b/protocol/Hart/HARTCommunication.py @@ -0,0 +1,468 @@ +import serial +import time +import threading +from typing import Optional, List, Dict, Any, Union, Tuple + +from protocol.HART import universal, common, tools +from protocol.HART._parsing import parse +from protocol.HART._unpacker import Unpacker + +BurstModeON = 1 # 突发模式开启 +BurstModeOFF = 0 # 突发模式关闭 +PrimaryMasterMode = 1 # 主要模式 +SecondaryMasterMode = 0 # 次要副主站模式 + +class HARTCommunication: + """ + HART协议通信类,使用COM7端口,1200波特率,奇校验 + """ + + def __init__(self, port='COM7', baudrate=1200, parity=serial.PARITY_ODD, timeout=3): + """ + 初始化HART通信类 + + Args: + port (str): 串口端口号,默认为'COM7' + baudrate (int): 波特率,默认为1200 + parity (int): 校验方式,默认为奇校验 + timeout (float): 超时时间,单位秒 + """ + self.port = port + self.baudrate = baudrate + self.parity = parity + self.timeout = timeout + self.serialConnection: Optional[serial.Serial] = None + self.deviceAddress: Optional[bytes] = None + self.comm_lock = threading.RLock() + + self.masterMode = PrimaryMasterMode # 默认为主要主站 + self.burstMode = BurstModeOFF # 默认关闭突发模式 + + def connect(self) -> bool: + """ + 建立串口连接 + + Returns: + bool: 连接是否成功 + """ + try: + self.serialConnection = serial.Serial( + port=self.port, + baudrate=self.baudrate, + parity=self.parity, + stopbits=1, + bytesize=8, + timeout=self.timeout, + write_timeout = 3, + xonxoff = True + ) + return True + except Exception as e: + print(f"连接失败: {e}") + return False + + def disconnect(self): + """ + 断开串口连接 + """ + if self.serialConnection and self.serialConnection.is_open: + self.serialConnection.close() + + def sendCommand(self, command_bytes: bytes) -> Union[List[Dict[str, Any]], Dict[str, str]]: + """ + 发送HART命令 + + Args: + command_bytes (bytes): 要发送的命令字节 + + Returns: + Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据列表或错误字典 + """ + if not self.serialConnection or not self.serialConnection.is_open: + raise ConnectionError("串口未连接") + + with self.comm_lock: + try: + # 发送命令前清空缓冲区,防止旧数据干扰 + self.serialConnection.reset_input_buffer() + self.serialConnection.reset_output_buffer() + self.serialConnection.write(command_bytes) + self.serialConnection.flush() # 等待数据完全发出 + print(f"发送命令: {command_bytes.hex()}") + + # 等待响应 + time.sleep(1) + + # 使用Unpacker解析响应 + unpacker = Unpacker(self.serialConnection) + msgList: List[Dict[str, Any]] = [] + for msg in unpacker: + msgList.append(dict(msg)) + + if not msgList: + return {"error": "设备无响应或响应超时"} + + return msgList + + except Exception as e: + print(f"发送命令失败: {e}") + return {"error": f"发送命令失败: {e}"} + + def sendCommandWithRetry(self, command_bytes: bytes, max_retries=3) -> Union[List[Dict[str, Any]], Dict[str, str]]: + """ + 发送HART命令(带重试和自动重连机制) + + Args: + command_bytes (bytes): 要发送的命令字节 + max_retries (int): 最大重试次数 + + Returns: + Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据或错误字典 + """ + for attempt in range(max_retries): + try: + result = self.sendCommand(command_bytes) + if isinstance(result, list) and result: # 成功并且有数据 + return result + + # 处理失败情况 + error_info = result if isinstance(result, dict) else {"error": "未知错误"} + print(f"第{attempt + 1}次尝试失败,响应: {error_info.get('error', 'N/A')},重试中...") + + # 如果是超时或无响应,则尝试重连 + if error_info.get("error") == "设备无响应或响应超时": + print("检测到设备无响应,正在尝试重新连接串口...") + self.disconnect() + time.sleep(0.1) + if not self.connect(): + print("重新连接失败,放弃重试。") + return {"error": "重新连接串口失败"} + print("串口重新连接成功。") + + time.sleep(0.2) + + except Exception as e: + print(f"第{attempt + 1}次尝试发生异常: {e}") + if attempt == max_retries - 1: + return {"error": f"重试{max_retries}次后仍然失败: {e}"} + + # 发生异常时也尝试重连 + print("发生异常,正在尝试重新连接串口...") + self.disconnect() + time.sleep(0.5) + if not self.connect(): + print("重新连接失败,放弃重试。") + return {"error": "重新连接串口失败"} + print("串口重新连接成功。") + time.sleep(1) + + return {"error": f"重试{max_retries}次后仍然失败"} + + def readUniqueId(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_unique_identifier(self.deviceAddress) + + deviceInfo = self.sendCommandWithRetry(command) + if isinstance(deviceInfo, list): + for msg in deviceInfo: + if isinstance(msg, dict): + manufacturer_id = msg.get('manufacturer_id') + manufacturer_device_type = msg.get('manufacturer_device_type') + device_id = msg.get('device_id') + + if all(v is not None for v in [manufacturer_id, manufacturer_device_type, device_id]): + assert manufacturer_id is not None and manufacturer_device_type is not None and device_id is not None + expandedDeviceType = (manufacturer_id << 8) | manufacturer_device_type + self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=device_id, isLongFrame=True) + if self.deviceAddress: + print(f"Rebuilt long address: {self.deviceAddress.hex()}") + else: + print("Warning: Could not rebuild long address from message, missing required keys.") + else: + print(f"Warning: Expected a dict but got {type(msg)}. Cannot process for long address.") + + return deviceInfo + + def readPrimaryVariable(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_primary_variable(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readLoopCurrentAndPercent(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_loop_current_and_percent(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readDynamicVariablesAndLoopCurrent(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_dynamic_variables_and_loop_current(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readMessage(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_message(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readTagDescriptorDate(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_tag_descriptor_date(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readPrimaryVariableInformation(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_primary_variable_information(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readOutputInformation(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_output_information(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def readFinalAssemblyNumber(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.read_final_assembly_number(self.deviceAddress) + return self.sendCommandWithRetry(command) + + # ========== 校准功能 ==========\\n + def calibrate4mA(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.calibrate_4ma(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def calibrate20mA(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.calibrate_20ma(self.deviceAddress) + return self.sendCommandWithRetry(command) + + def calibrateZeroPoint(self): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.calibrate_zero_point(self.deviceAddress) + return self.sendCommandWithRetry(command) + + # ========== 量程和输出设置 ==========\\n + def writePrimaryVariableDamping(self, dampingTime: float): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_primary_variable_damping(self.deviceAddress, dampingTime) + return self.sendCommandWithRetry(command) + + def writePrimaryVariableRange(self, unitsCode: int, upperRange: float, lowerRange: float): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_primary_variable_range(self.deviceAddress, unitsCode, upperRange, lowerRange) + return self.sendCommandWithRetry(command) + + def writePrimaryVariableUnits(self, unitsCode: int): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_primary_variable_units(self.deviceAddress, unitsCode) + return self.sendCommandWithRetry(command) + + def writePrimaryVariableOutputFunction(self, transferFunctionCode: int): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_primary_variable_output_function(self.deviceAddress, transferFunctionCode) + return self.sendCommandWithRetry(command) + + # ========== 电流微调功能 ==========\\n + def trimLoopCurrent4mA(self, measuredCurrent: float): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.trim_loop_current_4ma(self.deviceAddress, measuredCurrent) + return self.sendCommandWithRetry(command) + + def trimLoopCurrent20mA(self, measuredCurrent: float): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.trim_loop_current_20ma(self.deviceAddress, measuredCurrent) + return self.sendCommandWithRetry(command) + + def setFixedCurrentOutput(self, enable: bool, currentValue: float = 0.0): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.set_fixed_current_output(self.deviceAddress, enable, currentValue) + return self.sendCommandWithRetry(command) + + # ========== 突发模式和主站模式设置 ==========\\n + def setBurstMode(self, enable: bool, burstMessageCommand: int = 1): + self.burstMode = BurstModeON if enable else BurstModeOFF + if self.deviceAddress: + self._rebuildCurrentAddress() + return {"burst_mode": self.burstMode, "burst_message_command": burstMessageCommand} + + def setMasterMode(self, isPrimaryMaster: bool): + self.masterMode = PrimaryMasterMode if isPrimaryMaster else SecondaryMasterMode + if self.deviceAddress: + self._rebuildCurrentAddress() + return {"master_mode": self.masterMode} + + def _rebuildCurrentAddress(self): + if self.deviceAddress is None: + return + if len(self.deviceAddress) == 5: # 长帧地址 + byte0 = self.deviceAddress[0] + expandedDeviceType = ((byte0 & 0x3F) << 8) | self.deviceAddress[1] + deviceId = (self.deviceAddress[2] << 16) | (self.deviceAddress[3] << 8) | self.deviceAddress[4] + self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=deviceId, isLongFrame=True) + else: # 短帧地址 + pollingAddress = self.deviceAddress[0] & 0x3F + self.buildHartAddress(pollingAddress=pollingAddress, isLongFrame=False) + + # ========== 实用工具方法 ==========\\n + def getDeviceStatus(self): + if not self.deviceAddress: + return {"error": "设备地址未设置"} + return self.readUniqueId() + + def isConnected(self) -> bool: + return self.serialConnection is not None and self.serialConnection.is_open + + def getCurrentAddress(self) -> Optional[bytes]: + return self.deviceAddress + + def setDeviceAddress(self, address: bytes): + self.deviceAddress = address + + def writeMessage(self, message: str): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_message(self.deviceAddress, message) + return self.sendCommandWithRetry(command, max_retries=1) + + def writeTagDescriptorDate(self, tag: str, descriptor: str, date: Tuple[int, int, int]): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_tag_descriptor_date(self.deviceAddress, tag, descriptor, date) + return self.sendCommandWithRetry(command, max_retries=1) + + def writeFinalAssemblyNumber(self, number: int): + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + command = universal.write_final_assembly_number(self.deviceAddress, number) + return self.sendCommandWithRetry(command, max_retries=1) + + def write_polling_address(self, new_polling_address: int): + """ + 修改设备的轮询地址 (Command 6) + + Args: + new_polling_address (int): 新的轮询地址 (0-63) + """ + if self.deviceAddress is None: + raise ValueError("设备地址未设置") + if not (0 <= new_polling_address <= 63): + raise ValueError("轮询地址必须在0-63范围内") + + command = universal.write_polling_address(self.deviceAddress, new_polling_address) + result = self.sendCommandWithRetry(command) + + # The response should contain the new polling address + if isinstance(result, list) and result and result[0].get('polling_address') == new_polling_address: + print(f"轮询地址成功修改为: {new_polling_address}") + # Update the internal address to the new one + self.buildHartAddress(pollingAddress=new_polling_address, isLongFrame=False) + return result + else: + error_msg = f"修改轮询地址失败. 响应: {result}" + print(error_msg) + raise Exception(error_msg) + + def scanForDevice(self) -> Optional[Dict[str, Any]]: + if not self.serialConnection: + raise ConnectionError("串口未连接") + print("开始扫描设备...") + original_timeout = self.serialConnection.timeout + try: + self.serialConnection.timeout = 0.5 + for addr in range(64): + print(f"正在尝试轮询地址: {addr}") + self.buildHartAddress(pollingAddress=addr, isLongFrame=False) + + try: + if self.deviceAddress is None: continue + command = universal.read_unique_identifier(self.deviceAddress) + device_info_list = self.sendCommandWithRetry(command, max_retries=1) + + if isinstance(device_info_list, list) and device_info_list: + print(f"在地址 {addr} 找到设备: {device_info_list[0]}") + return {"address": addr, "device_info": device_info_list[0]} + except Exception as e: + print(f"地址 {addr} 扫描异常: {e}") + continue + finally: + if self.serialConnection: + self.serialConnection.timeout = original_timeout + + print("扫描完成,未找到设备。") + return None + + def buildHartAddress(self, expandedDeviceType: Optional[int] = None, deviceId: Optional[int] = None, pollingAddress: int = 0, isLongFrame: bool = False): + master = self.masterMode + burstMode = self.burstMode + if isLongFrame: + if expandedDeviceType is None or deviceId is None: + raise ValueError("长帧格式需要提供expandedDeviceType和deviceId参数") + if not (0 <= deviceId <= 0xFFFFFF): + raise ValueError("设备ID必须在0-16777215范围内") + byte0 = (master << 7) | (burstMode << 6) | ((expandedDeviceType >> 8) & 0x3F) + byte1 = expandedDeviceType & 0xFF + byte2 = (deviceId >> 16) & 0xFF + byte3 = (deviceId >> 8) & 0xFF + byte4 = deviceId & 0xFF + self.deviceAddress = bytes([byte0, byte1, byte2, byte3, byte4]) + else: + if not (0 <= pollingAddress <= 63): + raise ValueError("轮询地址必须在0-63范围内") + self.deviceAddress = bytes([(master << 7) | (burstMode << 6) | (pollingAddress & 0x3F)]) + return self.deviceAddress + +if __name__ == '__main__': + hart = HARTCommunication() + if hart.connect(): + print("=== HART通信类功能演示 ===") + address = hart.buildHartAddress(pollingAddress=0, isLongFrame=False) + print(f"设备地址: {address.hex()}") + + print("\n--- 读取设备信息 ---") + device_info = hart.readUniqueId() + if isinstance(device_info, list): + for msg in device_info: + print(f"设备信息: {msg}") + + # print("\n--- 读取主变量 ---") + # primary_var = hart.readPrimaryVariable() + # if isinstance(primary_var, list): + # for msg in primary_var: + # print(f"主变量: {msg}") + + testMEs: List[Dict[str, Any]] | Dict[str, str] = hart.writePrimaryVariableDamping(1) + if isinstance(testMEs, list): + for msg in testMEs: + print(f"信息: {msg}") + + testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation() + if isinstance(testMEs1, list): + for msg in testMEs1: + print(f"信息: {msg}") + + testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation() + if isinstance(testMEs1, list): + for msg in testMEs1: + print(f"信息: {msg}") + + hart.disconnect() + print("\n连接已断开") + else: + print("连接失败") \ No newline at end of file diff --git a/protocol/Hart/HartProtocol.py b/protocol/Hart/HartProtocol.py index f2457c2..5cd80ed 100644 --- a/protocol/Hart/HartProtocol.py +++ b/protocol/Hart/HartProtocol.py @@ -1,8 +1,4 @@ -import sys -sys.path.append('../') -sys.path.append('../../') -sys.path.append('../../../') -from protocol.ModBus.rtumaster_example import RTUMaster +from ..ModBus.rtumaster_example import RTUMaster class HartProtocol(object): def __init__(self): diff --git a/protocol/Hart/VERSION.py b/protocol/Hart/VERSION.py new file mode 100644 index 0000000..f0b17d2 --- /dev/null +++ b/protocol/Hart/VERSION.py @@ -0,0 +1 @@ +__version__ = "2023.6.0" diff --git a/protocol/Hart/__init__.py b/protocol/Hart/__init__.py index e69de29..3c5cfea 100644 --- a/protocol/Hart/__init__.py +++ b/protocol/Hart/__init__.py @@ -0,0 +1,28 @@ +"""A sans-io python implementation of the Highway Addressable Remote Transducer Protocol.""" + +from .__version__ import * + +# 延迟导入以避免循环依赖 +def _get_hart_communication(): + from .HARTCommunication import HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode + return HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode + +# 导出常用模块 +from . import common +from . import universal +from . import tools +from ._unpacker import * + +# 导出主要类和常量 +HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode = _get_hart_communication() + +__all__ = [ + 'HARTCommunication', + 'BurstModeON', + 'BurstModeOFF', + 'PrimaryMasterMode', + 'SecondaryMasterMode', + 'common', + 'universal', + 'tools' +] diff --git a/protocol/Hart/__version__.py b/protocol/Hart/__version__.py new file mode 100644 index 0000000..0fe0286 --- /dev/null +++ b/protocol/Hart/__version__.py @@ -0,0 +1,23 @@ +"""Define version.""" + +import pathlib +import subprocess +from .VERSION import __version__ + + +here = pathlib.Path(__file__).resolve().parent + + +__all__ = ["__version__", "__branch__"] + +try: + __branch__ = ( + subprocess.run(["git", "branch", "--show-current"], capture_output=True, cwd=here) + .stdout.strip() + .decode() + ) +except: + __branch__ = "" + +if __branch__: + __version__ += "+" + __branch__ diff --git a/protocol/Hart/_parsing.py b/protocol/Hart/_parsing.py new file mode 100644 index 0000000..6b17be1 --- /dev/null +++ b/protocol/Hart/_parsing.py @@ -0,0 +1,261 @@ +import struct +from typing import MutableMapping, Union +from . import tools + + +def parse(response: bytes) -> MutableMapping[str, Union[int, bytes, str, float]]: + # print(response, 1111111111111) + out: MutableMapping[str, Union[int, bytes, str, float]] = dict() + out["full_response"] = response + if response[0] & 0x80: # long address + out["address"] = int.from_bytes(response[1:6], "big") + response = response[6:] + else: # short address + out["address"] = response[1] + response = response[2:] + command, bytecount, response_code, device_status = struct.unpack_from(">BBBB", response) + out["device_status"] = device_status + out["response_code"] = response_code + + if response_code == 0: + out["status"] = "success" + else: + out["status"] = "error" + out["error"] = f"Device responded with error code: {response_code}" + + data = response[4 : 4 + bytecount] + out["command"] = command + out["command_name"] = f"hart_command_{command}" + out["bytecount"] = bytecount + out["data"] = data + + # handle error return + if bytecount == 2: + return out + + # universal commands + if command in [0, 11]: + out["command_name"] = "read_unique_identifier" + out["manufacturer_id"] = data[1] + out["manufacturer_device_type"] = data[2] + out["number_response_preamble_characters"] = data[3] + out["universal_command_revision_level"] = data[4] + out["transmitter_specific_command_revision_level"] = data[5] + out["software_revision_level"] = data[6] + out["hardware_revision_level"] = data[7] + out["device_id"] = int.from_bytes(data[9:12], "big") + elif command in [1]: + out["command_name"] = "read_primary_variable" + units, variable = struct.unpack_from(">Bf", data) + out["primary_variable_units"] = units + out["primary_variable"] = variable + elif command in [2]: + out["command_name"] = "read_loop_current_and_percent" + analog_signal, primary_variable = struct.unpack_from(">ff", data) + out["analog_signal"] = analog_signal + out["primary_variable"] = primary_variable + elif command in [3]: + out["command_name"] = "read_dynamic_variables_and_loop_current" + if len(data) >= 4: + analog_signal = struct.unpack_from(">f", data, 0)[0] + out["analog_signal"] = analog_signal + + # 计算剩余数据长度并确定支持的变量数量 + remaining_length = len(data) - 4 + variable_count = remaining_length // 5 # 每个变量占5字节 (1字节单位 + 4字节值) + + # 动态解析变量数据 + offset = 4 # 从第4字节开始是变量数据 + for i in range(variable_count): + if offset + 5 > len(data): # 确保有足够数据 + break + + # 解析单位枚举值 (1字节) + unit_enum = struct.unpack_from(">B", data, offset)[0] + offset += 1 + + # 解析变量值 (4字节) + variable_value = struct.unpack_from(">f", data, offset)[0] + offset += 4 + + # 根据变量索引设置字段名 + if i == 0: + out["primary_variable_units"] = unit_enum + out["primary_variable"] = variable_value + elif i == 1: + out["secondary_variable_units"] = unit_enum + out["secondary_variable"] = variable_value + elif i == 2: + out["tertiary_variable_units"] = unit_enum + out["tertiary_variable"] = variable_value + elif i == 3: + out["quaternary_variable_units"] = unit_enum + out["quaternary_variable"] = variable_value + elif command in [6]: + out["command_name"] = "write_polling_address" + polling_address = struct.unpack_from(">B", data)[0] + out["polling_address"] = polling_address + elif command in [12]: + # print(data) + out["command_name"] = "read_message" + out["message"] = data[0:24] + # print(out, 111111111) + elif command in [13]: + out["command_name"] = "read_tag_descriptor_date" + # Tag is 8 chars packed into 6 bytes + out["device_tag_name"] = tools.unpack_packed_ascii(data[0:6], 8) + # Descriptor is 16 chars packed into 12 bytes + out["device_descriptor"] = tools.unpack_packed_ascii(data[6:18], 16) + # Date is 3 bytes: day, month, year (year is offset from 1900) + day, month, year_offset = struct.unpack_from(">BBB", data, 18) + out["date"] = {"day": day, "month": month, "year": year_offset + 1900} + elif command in [14]: + out["command_name"] = "read_primary_variable_information" + out["serial_no"] = data[0:3] + sensor_limits_code, upper_limit, lower_limit, min_span = struct.unpack_from( + ">xxxBfff", data + ) + out["sensor_limits_code"] = sensor_limits_code + out["upper_limit"] = upper_limit + out["lower_limit"] = lower_limit + out["min_span"] = min_span + elif command in [15]: + out["command_name"] = "read_output_information" + ( + alarm_code, + transfer_fn_code, + primary_variable_range_code, + upper_range_value, + lower_range_value, + damping_value, + write_protect, + private_label, + ) = struct.unpack_from(">BBBfffBB", data) + out["alarm_code"] = alarm_code + out["transfer_fn_code"] = transfer_fn_code + out["primary_variable_range_code"] = primary_variable_range_code + out["upper_range_value"] = upper_range_value + out["lower_range_value"] = lower_range_value + out["damping_value"] = damping_value + out["write_protect"] = write_protect + out["private_label"] = private_label + elif command in [16]: + out["command_name"] = "read_final_assembly_number" + # print(data) + out["final_assembly_no"] = int.from_bytes(data[0:3], "big") + elif command in [17]: + out["command_name"] = "write_message" + out["message"] = data[0:24] + elif command in [18]: + out["command_name"] = "write_tag_descriptor_date" + out["device_tag_name"] = data[0:6] + out["device_descriptor"] = data[6:18] + out["date"] = data[18:21] + elif command in [19]: + out["command_name"] = "write_final_assembly_number" + out["final_assembly_no"] = int.from_bytes(data[0:2], "big") + elif command in [34]: + out["command_name"] = "write_primary_variable_damping" + out["damping_time"] = struct.unpack_from(">f", data)[0] + elif command in [35]: + out["command_name"] = "write_primary_variable_range" + out["upper_range"] = struct.unpack_from(">f", data)[0] + out["lower_range"] = struct.unpack_from(">f", data[4:])[0] + elif command in [36]: + out["command_name"] = "calibrate_20ma" + # 20mA校准命令无返回数据 + elif command in [37]: + out["command_name"] = "calibrate_4ma" + # 4mA校准命令无返回数据 + elif command in [40]: + out["command_name"] = "set_fixed_current_output" + if len(data) > 0: + out["fixed_output_enabled"] = data[0] == 1 + if len(data) > 1: + out["fixed_current_value"] = struct.unpack_from(">f", data[1:])[0] + elif command in [43]: + out["command_name"] = "calibrate_zero_point" + # 零点校准命令无返回数据 + elif command in [44]: + out["command_name"] = "write_primary_variable_units" + out["units_code"] = data[0] + elif command in [45]: + out["command_name"] = "trim_loop_current_4ma" + out["measured_current"] = struct.unpack_from(">f", data)[0] + elif command in [46]: + out["command_name"] = "trim_loop_current_20ma" + out["measured_current"] = struct.unpack_from(">f", data)[0] + elif command in [47]: + out["command_name"] = "write_primary_variable_output_function" + out["transfer_function_code"] = data[0] + + + # COMMON COMMANDS + + # elif command in [37]: + # out["command_name"] = "set_primary_variable_lower_range_value" + # out[""] = + # request data bytes = NONE, response data bytes = NONE + + # elif command in [38]: + # out["command_name"] = "reset_configuration_changed_flag" + # out[""] = + # request data bytes = NONE, response data bytes = NONE + + # elif command in [42]: + # out["command_name"] = "perform_master_reset" + # out[""] = + # request data bytes = NONE, response data bytes = NONE + + # elif command in [48]: + # out["command_name"] = "read_additional_transmitter_status" + # out[""] = + # request data bytes = NONE, response data bytes = NONE + + elif command in [50]: + out["command_name"] = "read_dynamic_variable_assignments" + ( + primary_transmitter_variable, + secondary_transmitter_variable, + tertiary_transmitter_variable, + quaternary_transmitter_variable, + ) = struct.unpack_from(">BBBB", data) + out["primary_transmitter_variable"] = primary_transmitter_variable + out["secondary_transmitter_variable"] = secondary_transmitter_variable + out["tertiary_transmitter_variable"] = tertiary_transmitter_variable # NOT USED + out["quaternary_transmitter_variable"] = quaternary_transmitter_variable # NOT USED + elif command in [59]: + out["command_name"] = "write_number_of_response_preambles" + n_response_preambles = struct.unpack_from(">B", data)[0] + out["n_response_preambles"] = n_response_preambles + elif command in [66]: + out["command_name"] = "toggle_analog_output_mode" + ( + analog_output_selection, + analog_output_units_code, + fixed_analog_output, + ) = struct.unpack_from(">BBf", data) + out["analog_output_selection"] = analog_output_selection + out["analog_output_units_code"] = analog_output_units_code + out["fixed_analog_output"] = fixed_analog_output + elif command in [67]: + out["command_name"] = "trim_analog_output_zero" + analog_output_code, analog_output_units_code, measured_analog_output = struct.unpack_from( + ">BBf", data + ) + out["analog_output_code"] = analog_output_code + out["analog_output_units_code"] = analog_output_units_code + out["measured_analog_output"] = measured_analog_output + elif command in [68]: + out["command_name"] = "trim_analog_output_span" + analog_output_code, analog_output_units_code, measured_analog_output = struct.unpack_from( + ">BBf", data + ) + out["analog_output_code"] = analog_output_code + out["analog_output_units_code"] = analog_output_units_code + out["measured_analog_output"] = measured_analog_output + elif command in [123]: + out["command_name"] = "select_baud_rate" + out["baud_rate"] = int.from_bytes(data, "big") + + return out diff --git a/protocol/Hart/_unpacker.py b/protocol/Hart/_unpacker.py new file mode 100644 index 0000000..545ef23 --- /dev/null +++ b/protocol/Hart/_unpacker.py @@ -0,0 +1,140 @@ +__all__ = ["Unpacker"] + +import asyncio +from collections import namedtuple +import io +import struct +from tabnanny import check +import warnings + +from ._parsing import parse +from . import tools + + +class Unpacker: + """ + Create an Unpacker to decode a byte stream into HART protocol messages. + + The ``file_like`` parameter should be an object which data can be sourced from. + It should support the ``read()`` method. + + The ``on_error`` parameter selects the action to take if invalid data is detected. + If set to ``"continue"`` (the default), bytes will be discarded if the byte sequence + does not appear to be a valid message. + If set to ``"warn"``, the behaviour is identical, but a warning message will be emitted. + To instead immediately abort the stream decoding and raise a ``RuntimeError``, set to + ``"raise"``. + + :param file_like: A file-like object which data can be `read()` from. + :param on_error: Action to take if invalid data is detected. + """ + + def __init__(self, file_like=None, on_error="continue"): + if file_like is None: + self._file = io.BytesIO() + else: + self._file = file_like + self.buf = b"" + self.on_error = on_error + # print(self._file) + + def __iter__(self): + return self + + def _decoding_error(self, message="Error decoding message from buffer."): + """ + Take appropriate action if parsing of data stream fails. + + :param message: Warning or error message string. + """ + if self.on_error == "raise": + raise RuntimeError(message) + if self.on_error == "warn": + warnings.warn(message) + + def _read_one_byte_if_possible(self): + if self._file.in_waiting > 0: + # print(1) + return self._file.read(1) + else: + raise StopIteration + + def __next__(self): + # must work with at least two bytes to start with + while len(self.buf) < 3: + self.buf += self._read_one_byte_if_possible() + # keep reading until we find a minimum preamble + # print(self.buf, 11) + while self.buf[:3] not in [b"\xFF\xFF\x06", b"\xFF\xFF\x86"]: + self.buf += self._read_one_byte_if_possible() + # print(self.buf) + self.buf = self.buf[1:] + self._decoding_error("Head of buffer not recognized as valid preamble") + # now the head of our buffer is the start charachter plus two preamble + # we will read all the way through status + if self.buf[2] & 0x80: + l = 12 + else: + l = 8 + while len(self.buf) < l: + self.buf += self._read_one_byte_if_possible() + # now we can use the bytecount to read through the data and checksum + # + # print(self.buf) + # print(type(self.buf[l - 4]), 222) + if self.buf[l - 4] == 15: # 对command15进行特殊操作 修复报文错误 + # bytecount = 19 + self.buf = self.buf[:l - 3] + b'\x13\x00\x00\x00' + bytecount = self.buf[l - 3] + response_length = l + bytecount - 1 + # print(self.buf, bytecount) + + while len(self.buf) < response_length: + self.buf += self._read_one_byte_if_possible() + # checksum + # print(self.buf) + checksum = int.from_bytes( + tools.calculate_checksum(self.buf[2 : response_length - 1]), "big" + ) + # print(self.buf) + if checksum != self.buf[response_length - 1]: + # print(66666666) + self._decoding_error("Invalid checksum.") + raise StopIteration + # print(self.buf) + # parse + response = self.buf[2:response_length] + # print(response, 'test') + # print(response) + dict_ = parse(response) + # clear buffer + if len(self.buf) == response_length: + self.buf = b"" + else: + self.buf = self.buf[response_length + 3 :] + # return + return dict_ + + def __aiter__(self): + return self + + async def __anext__(self): + while True: + try: + return next(self) + except StopIteration: + await asyncio.sleep(0.001) + + def feed(self, data: bytes): + """ + Add byte data to the input stream. + + The input stream must support random access, if it does not, must be fed externally + (e.g. serial port data). + + :param data: Byte array containing data to add. + """ + pos = self._file.tell() + self._file.seek(0, 2) + self._file.write(data) + self._file.seek(pos) diff --git a/protocol/Hart/common.py b/protocol/Hart/common.py new file mode 100644 index 0000000..0d0e8f2 --- /dev/null +++ b/protocol/Hart/common.py @@ -0,0 +1,267 @@ +from . import tools + +TRANSFER_FUNCTION_CODE = { + 0: "线性 (Linear)", + 1: "平方根 (Square Root)", + 2: "平方根三次方 (Square Root 3rd Power)", + 3: "平方根五次方 (Square Root 5th Power)", + 4: "特殊曲线 (Special Curve) - 不推荐使用", + 5: "平方 (Square)", + 6: "带截止的平方根 (Square root with cut-off)", + 10: "等百分比 1:25 (Equal Percentage 1:25)", + 11: "等百分比 1:33 (Equal Percentage 1:33)", + 12: "等百分比 1:50 (Equal Percentage 1:50)", + 15: "快开 1:25 (Quick Open 1:25)", + 16: "快开 1:33 (Quick Open 1:33)", + 17: "快开 1:50 (Quick Open 1:50)", + 30: "双曲线 (Hyperbolic) - Shape Factor 0.10", + 31: "双曲线 (Hyperbolic) - Shape Factor 0.20", + 32: "双曲线 (Hyperbolic) - Shape Factor 0.30", + 34: "双曲线 (Hyperbolic) - Shape Factor 0.50", + 37: "双曲线 (Hyperbolic) - Shape Factor 0.70", + 40: "双曲线 (Hyperbolic) - Shape Factor 1.00", + 41: "双曲线 (Hyperbolic) - Shape Factor 1.50", + 42: "双曲线 (Hyperbolic) - Shape Factor 2.00", + 43: "双曲线 (Hyperbolic) - Shape Factor 3.00", + 44: "双曲线 (Hyperbolic) - Shape Factor 4.00", + 45: "双曲线 (Hyperbolic) - Shape Factor 5.00", + 100: "平底罐 (Flat bottom tank)", + 101: "锥形或金字塔形底罐 (Conical or pyramidal bottom tank)", + 102: "抛物线形底罐 (Parabolic bottom tank)", + 103: "球形底罐 (Spherical bottom tank)", + 104: "斜底罐 (Angled bottom tank)", + 105: "平端圆柱罐 (Flat end cylinder tank)", + 106: "抛物线端圆柱罐 (Parabolic end cylinder tank)", + 107: "球形罐 (Spherical tank)", + 230: "离散/开关 (Discrete/Switch)", + 250: "未使用 (Not Used)", + 251: "无 (None)", + 252: "未知 (Unknown)", + 253: "特殊 (Special)" +} +REVERSE_TRANSFER_FUNCTION_CODE = {v: k for k, v in TRANSFER_FUNCTION_CODE.items()} + +UNITS_CODE = { + # Pressure + 1: "inH2O @ 68 F", + 2: "inHg @ 0 C", + 3: "ftH2O @ 68 F", + 4: "mmH2O @ 68 F", + 5: "mmHg @ 0 C", + 6: "psi", + 7: "bar", + 8: "mbar", + 9: "g/cm2", + 10: "kg/cm2", + 11: "Pa", + 12: "kPa", + 13: "torr", + 14: "atm", + 145: "inH2O @ 60F", + 237: "MPa", + 238: "inH2O @ 4C", + 239: "mmH2O @ 4C", + # Temperature + 32: "deg C", + 33: "deg F", + 34: "deg R", + 35: "K", + # Volumetric Flow + 15: "cu ft/min", + 16: "gal/min", + 17: "liter/min", + 18: "imp gal/min", + 19: "cu m/hr", + 22: "gal/sec", + 23: "Mgal/day", + 24: "liter/sec", + 25: "Ml/day", + 26: "cu ft/sec", + 27: "cu ft/day", + 28: "cu m/sec", + 29: "cu m/day", + 30: "imp gal/hr", + 31: "imp gal/day", + 121: "std cu m/hr", + 122: "std liter/hr", + 123: "std cu ft/min", + 130: "cu ft/hr", + 131: "cu m/min", + 132: "bbl/sec", + 133: "bbl/min", + 134: "bbl/hr", + 135: "bbl/day", + 136: "gal/hr", + 137: "imp gal/sec", + 235: "gal/day", + # Velocity + 20: "ft/sec", + 21: "m/sec", + 114: "in/sec", + 115: "in/min", + 116: "ft/min", + 120: "m/hr", + # Volume + 40: "gal", + 41: "liter", + 42: "imp gal", + 43: "cu m", + 46: "bbl", + 110: "bushel", + 111: "cu yd", + 112: "cu ft", + 113: "cu in", + 134: "bbl liq", + 166: "std cu m", + 167: "std l", + 168: "std cu ft", + 236: "hectoliter", + # Mass + 60: "g", + 61: "kg", + 62: "metric ton", + 63: "lb", + 64: "short ton", + 65: "long ton", + 125: "oz", + # Mass Flow + 70: "g/sec", + 71: "g/min", + 72: "g/hr", + 73: "kg/sec", + 74: "kg/min", + 75: "kg/hr", + 76: "kg/day", + 77: "metric ton/min", + 78: "metric ton/hr", + 79: "metric ton/day", + 80: "lb/sec", + 81: "lb/min", + 82: "lb/hr", + 83: "lb/day", + 84: "short ton/min", + 85: "short ton/hr", + 86: "short ton/day", + 87: "long ton/hr", + 88: "long ton/day", + # Mass per Volume + 90: "SGU", + 91: "g/cu cm", + 92: "kg/cu m", + 93: "lb/gal", + 94: "lb/cu ft", + 95: "g/ml", + 96: "kg/liter", + 97: "g/liter", + 98: "lb/cu in", + 99: "short ton/cu yd", + 100: "deg Twaddell", + 102: "deg Baume", + 103: "deg API", + 104: "deg API", + 146: "ug/liter", + 147: "ug/cu m", + # Viscosity + 54: "cSt", + 55: "cP", + # Electric Potential + 36: "mV", + 58: "V", + # Electric Current + 39: "mA", + # Electric Resistance + 37: "ohm", + 163: "kohm", + # Energy (includes Work) + 69: "J", + 89: "dtherm", + 126: "kWh", + 128: "MWh", + 162: "kcal", + 164: "MJ", + 165: "Btu", + # Power + 127: "kW", + 129: "hp", + 140: "Mcal/hr", + 141: "MJ/hr", + 142: "Btu/hr", + # Radial Velocity + 117: "deg/sec", + 118: "rev/sec", + 119: "rpm", + # Miscellaneous + 38: "Hz", + 57: "percent", + 59: "pH", + 101: "deg Balling", + 105: "percent solids/wt", + 106: "percent solids/vol", + 107: "deg Plato", + 108: "proof/vol", + 109: "proof/mass", + 139: "ppm", + 148: "percent consistency", + 149: "vol percent", + 150: "percent steam qual", + 152: "cu ft/lb", + 153: "pF", + 154: "ml/liter", + 155: "ul/liter", + 156: "dB", + 160: "deg Brix", + 161: "percent LEL", + 169: "ppb", + # Generic & Reserved + 240: "Manufacturer specific", + 241: "Manufacturer specific", + 242: "Manufacturer specific", + 243: "Manufacturer specific", + 244: "Manufacturer specific", + 245: "Manufacturer specific", + 246: "Manufacturer specific", + 247: "Manufacturer specific", + 248: "Manufacturer specific", + 249: "Manufacturer specific", + 250: "Not Used", + 251: "Unknown", + 252: "Unknown", + 253: "Special" +} +REVERSE_UNITS_CODE = {v: k for k, v in UNITS_CODE.items()} + +def get_unit_description(code): + """根据代码获取单位描述""" + return UNITS_CODE.get(code, f"未知代码({code})") + +def set_primary_variable_lower_range_value(address: bytes, value) -> bytes: + return tools.pack_command(address, command_id=37) + +def reset_configuration_changed_flag(address: bytes) -> bytes: + return tools.pack_command(address, command_id=38) + +def perform_master_reset(address: bytes) -> bytes: + return tools.pack_command(address, command_id=42) + +def read_additional_transmitter_status(address: bytes) -> bytes: + return tools.pack_command(address, command_id=48) + +def read_dynamic_variable_assignments(address: bytes) -> bytes: + return tools.pack_command(address, command_id=50) + +def write_number_of_response_preambles(address: bytes, number: int) -> bytes: + data = number.to_bytes(1, "big") + return tools.pack_command(address, command_id=59, data=data) + +def toggle_analog_output_mode(address: bytes) -> bytes: + return tools.pack_command(address, command_id=66) + +def trim_analog_output_zero(address: bytes) -> bytes: + return tools.pack_command(address, command_id=67) + +def trim_analog_output_span(address: bytes) -> bytes: + return tools.pack_command(address, command_id=68) + +def select_baud_rate(address: bytes, rate: int) -> bytes: + data = rate.to_bytes(1, "big") + return tools.pack_command(address, command_id=123, data=data) \ No newline at end of file diff --git a/protocol/Hart/py.typed b/protocol/Hart/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/protocol/Hart/tools.py b/protocol/Hart/tools.py new file mode 100644 index 0000000..3ad5a05 --- /dev/null +++ b/protocol/Hart/tools.py @@ -0,0 +1,147 @@ +import math +from typing import Union + + +def calculate_checksum(command: Union[int, bytes]) -> bytes: + # print(command) + if type(command) == int: + command = command.to_bytes(64, "big") # type: ignore + lrc = 0 + for byte in command: # type: ignore + lrc ^= byte + out = lrc.to_bytes(1, "big") + # print(out) + return out + + +def calculate_long_address(manufacturer_id: int, manufacturer_device_type: int, device_id: bytes): + out = int.from_bytes(device_id, "big") + out |= manufacturer_device_type << 24 + out |= manufacturer_id << 32 + return out.to_bytes(5, "big") + + +def pack_command(address, command_id, data=None): + # if type(address) == bytes: + # address = int.from_bytes(address, "big") + if type(command_id) == int: + command_id = command_id.to_bytes(1, "big") + command = b"\xFF\xFF\xFF\xFF\xFF\xFF" # preamble + command += b"\x82" if len(address) >= 5 else b"\x02" + command += address + command += command_id + if data is None: + command += b"\x00" # byte count + else: + # print(len(data), 22222222222) + command += len(data).to_bytes(1, "big") # byte count + command += data # data + # print(command[6:]) + command += calculate_checksum(command[6:]) + # print(command) + return command + + +PACKED_ASCII_MAP = { + 0x00: '@', 0x01: 'A', 0x02: 'B', 0x03: 'C', 0x04: 'D', 0x05: 'E', 0x06: 'F', 0x07: 'G', + 0x08: 'H', 0x09: 'I', 0x0A: 'J', 0x0B: 'K', 0x0C: 'L', 0x0D: 'M', 0x0E: 'N', 0x0F: 'O', + 0x10: 'P', 0x11: 'Q', 0x12: 'R', 0x13: 'S', 0x14: 'T', 0x15: 'U', 0x16: 'V', 0x17: 'W', + 0x18: 'X', 0x19: 'Y', 0x1A: 'Z', 0x1B: '[', 0x1C: '\\', 0x1D: ']', 0x1E: '^', 0x1F: '_', # 这里应该是 '_' 而不是 '-' + 0x20: ' ', 0x21: '!', 0x22: '"', 0x23: '=', 0x24: '$', 0x25: '%', 0x26: '&', 0x27: '`', # 注意这里是反引号 '`' + 0x28: '(', 0x29: ')', 0x2A: '*', 0x2B: '+', 0x2C: ',', 0x2D: '-', 0x2E: '.', 0x2F: '/', + 0x30: '0', 0x31: '1', 0x32: '2', 0x33: '3', 0x34: '4', 0x35: '5', 0x36: '6', 0x37: '7', + 0x38: '8', 0x39: '9', 0x3A: ':', 0x3B: ';', 0x3C: '<', 0x3D: '=', 0x3E: '>', 0x3F: '?' +} + + + +def unpack_packed_ascii(data: bytes, expected_len: int) -> str: + """Unpacks a byte array into a string using the HART Packed ASCII 6-bit encoding by processing in chunks.""" + chars = [] + # Process data in 3-byte chunks, as 4 characters (24 bits) fit into 3 bytes (24 bits) + for i in range(0, len(data), 3): + chunk = data[i:i+3] + + # Pad chunk to 3 bytes if it's a partial chunk at the end + if len(chunk) < 3: + chunk += b'\x00' * (3 - len(chunk)) + + byte1, byte2, byte3 = chunk + + # Unpack 4 6-bit characters from the 3 8-bit bytes + c1 = byte1 >> 2 + c2 = ((byte1 & 0x03) << 4) | (byte2 >> 4) + c3 = ((byte2 & 0x0F) << 2) | (byte3 >> 6) + c4 = byte3 & 0x3F + + codes = [c1, c2, c3, c4] + for code in codes: + chars.append(PACKED_ASCII_MAP.get(code, '?')) + + # Join and then trim to the exact expected length. Do not rstrip() as spaces can be valid. + return "".join(chars[:expected_len]) + + + + +# Build a reverse map for packing, handling duplicates deterministically +REVERSE_PACKED_ASCII_MAP = {} +# Iterate through the original map, sorted by code value (key) +for code, char in sorted(PACKED_ASCII_MAP.items(), key=lambda item: item[0]): + # This ensures that for characters with multiple codes, the one with the highest code value is chosen. + REVERSE_PACKED_ASCII_MAP[char] = code + + +def pack_packed_ascii(text: Union[str, bytes], expected_len: int) -> bytes: + """ + Packs a string or bytes into a HART Packed ASCII byte array. + This function is the symmetrical inverse of the unpack_packed_ascii function. + """ + # Defensively decode if bytes are passed in + if isinstance(text, bytes): + try: + # Try decoding with ascii, fall back to latin-1 which never fails + text = text.decode('ascii') + except UnicodeDecodeError: + text = text.decode('latin-1') + + # 1. Pad/truncate the input string to the exact expected length + padded_text = text.ljust(expected_len, ' ') + padded_text = padded_text[:expected_len] + + # 2. Convert all characters to their corresponding 6-bit codes + codes = [REVERSE_PACKED_ASCII_MAP.get(c, 0x3F) for c in padded_text] # Default to '?' (0x3F) + + packed_bytes = bytearray() + + # 3. Process the codes in chunks of 4 + for i in range(0, len(codes), 4): + # Get a chunk of up to 4 codes. + chunk = codes[i:i+4] + + # If it's a partial chunk at the end, pad with space codes to make a full chunk of 4. + while len(chunk) < 4: + chunk.append(REVERSE_PACKED_ASCII_MAP[' ']) + + c1, c2, c3, c4 = chunk + + # 4. Pack the 4 6-bit codes into 3 8-bit bytes, mirroring the unpacking logic + byte1 = (c1 << 2) | (c2 >> 4) + byte2 = ((c2 & 0x0F) << 4) | (c3 >> 2) + byte3 = ((c3 & 0x03) << 6) | c4 + + packed_bytes.extend([byte1, byte2, byte3]) + + # 5. Calculate the exact number of bytes required for the original expected length + num_bytes = math.ceil(expected_len * 6 / 8) + + # 6. Get the core byte array, trimmed to the precise required size + result = bytes(packed_bytes[:num_bytes]) + + # For Command 17 (Write Message), the spec requires a fixed 24-byte data field. + # This corresponds to an expected character length of 32. + if expected_len == 32: + # Pad the result to exactly 24 bytes. + return result.ljust(24, b'\x00') + + return result diff --git a/protocol/Hart/universal.py b/protocol/Hart/universal.py new file mode 100644 index 0000000..0ed1c07 --- /dev/null +++ b/protocol/Hart/universal.py @@ -0,0 +1,136 @@ +import struct +from typing import Tuple +from . import tools + + +def read_unique_identifier(address: bytes) -> bytes: + return tools.pack_command(address, command_id=0) + + +def read_primary_variable(address: bytes) -> bytes: + return tools.pack_command(address, command_id=1) + + +def read_loop_current_and_percent(address: bytes) -> bytes: + return tools.pack_command(address, command_id=2) + + +def read_dynamic_variables_and_loop_current(address: bytes) -> bytes: + return tools.pack_command(address, command_id=3) + + +def write_polling_address(address: bytes, new_polling_address: int) -> bytes: + assert 0 <= new_polling_address <= 63 + return tools.pack_command(address, command_id=6, data=new_polling_address.to_bytes(1, "big")) + + +def read_unique_identifier_associated_with_tag(tag: bytes, *, address: int = 0) -> bytes: + return tools.pack_command(address, command_id=11, data=tag) + + +def read_message(address: bytes) -> bytes: + return tools.pack_command(address, command_id=12) + + +def read_tag_descriptor_date(address: bytes) -> bytes: + return tools.pack_command(address, command_id=13) + + +def read_primary_variable_information(address: bytes) -> bytes: + return tools.pack_command(address, command_id=14) + + +def read_output_information(address: bytes) -> bytes: + return tools.pack_command(address, command_id=15) + + +def read_final_assembly_number(address: bytes) -> bytes: + return tools.pack_command(address, command_id=16) + + +def write_message(address: bytes, message: str) -> bytes: + """Writes a 24-char message to the device (Command 17).""" + # Per spec, message is 24 characters, packed into 18 bytes. + packed_message = tools.pack_packed_ascii(message, expected_len=32) + return tools.pack_command(address, command_id=17, data=packed_message) + + +def write_tag_descriptor_date(address: bytes, tag: str, descriptor: str, date: Tuple[int, int, int]): + """Writes tag, descriptor, and date to the device (Command 18).""" + # Per spec, tag is 6 chars, descriptor is 12 chars. + packed_tag = tools.pack_packed_ascii(tag, 8) + packed_descriptor = tools.pack_packed_ascii(descriptor, 16) + data = packed_tag + packed_descriptor + day, month, year = date + data += day.to_bytes(1, "big") + data += month.to_bytes(1, "big") + data += year.to_bytes(1, "big") + # print(len(data)) + return tools.pack_command(address, command_id=18, data=data) + + +def write_final_assembly_number(address: bytes, number: int): + data = number.to_bytes(3, "big") + return tools.pack_command(address, command_id=19, data=data) + + +def write_primary_variable_damping(address: bytes, damping_time: float) -> bytes: + """修改主变量阻尼时间(单位s) command 34""" + data = struct.pack(">f", damping_time) + return tools.pack_command(address, command_id=34, data=data) + + +def write_primary_variable_range(address: bytes, units_code: int, upper_range: float, lower_range: float) -> bytes: + """修改主变量量程范围 command 35""" + data = units_code.to_bytes(1, "big") + data += struct.pack(">ff", upper_range, lower_range) + return tools.pack_command(address, command_id=35, data=data) + + +def calibrate_20ma(address: bytes) -> bytes: + """20mA校准 command 36""" + return tools.pack_command(address, command_id=36) + + +def calibrate_4ma(address: bytes) -> bytes: + """4mA校准 command 37""" + return tools.pack_command(address, command_id=37) + + +def set_fixed_current_output(address: bytes, enable: bool, current_value: float = 0.0) -> bytes: + """设置固定电流输出/退出固定模式 command 40""" + if enable: + data = struct.pack(">f", current_value) + else: + data = struct.pack(">f", 0) + return tools.pack_command(address, command_id=40, data=data) + + +def calibrate_zero_point(address: bytes) -> bytes: + """零点校准 command 43""" + return tools.pack_command(address, command_id=43) + + +def write_primary_variable_units(address: bytes, units_code: int) -> bytes: + """修改主变量单位 command 44""" + data = units_code.to_bytes(1, "big") + return tools.pack_command(address, command_id=44, data=data) + + +def trim_loop_current_4ma(address: bytes, measured_current: float) -> bytes: + """微调环路电流4mA command 45""" + data = struct.pack(">f", measured_current) + return tools.pack_command(address, command_id=45, data=data) + + +def trim_loop_current_20ma(address: bytes, measured_current: float) -> bytes: + """微调环路电流20mA command 46""" + data = struct.pack(">f", measured_current) + return tools.pack_command(address, command_id=46, data=data) + + +def write_primary_variable_output_function(address: bytes, transfer_function_code: int) -> bytes: + """修改主变量输出函数 command 47""" + data = transfer_function_code.to_bytes(1, "big") + return tools.pack_command(address, command_id=47, data=data) +