1104更新

main
DESKTOP-3D7M4SA\Hicent 3 months ago
parent 57b755f1e6
commit 9fd733fa8a

Binary file not shown.

@ -0,0 +1,23 @@
/* ==================== HART Tab标签页样式 ==================== */
QTabBar#hartTabBar::tab {
font-family: "PingFangSC-Medium", "Microsoft YaHei", sans-serif;
font-size: 16px;
width: 160px;
height: 40px;
padding: 8px 16px;
margin: 2px;
border-radius: 6px;
background-color: #F3F4F6;
color: #6B7280;
}
QTabBar#hartTabBar::tab:hover {
background-color: #E3F2FD;
color: #2277EF;
}
QTabBar#hartTabBar::tab:selected {
background-color: #2277EF;
color: #FFFFFF;
font-weight: 600;
}

@ -0,0 +1,327 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox,
QLabel, QLineEdit, QPushButton, QComboBox,
QTabWidget, QMessageBox, QGridLayout, QDoubleSpinBox,
QSpinBox, QFormLayout, QCheckBox)
from PyQt5.QtCore import Qt
class HartCalibrationWidget(QWidget):
def __init__(self, parent=None):
super(HartCalibrationWidget, self).__init__(parent)
self.hartComm = None
self.initUI()
def initUI(self):
# 创建主布局
mainLayout = QVBoxLayout()
# --- 1. 标准校准区域 ---
calibGroup = QGroupBox("标准校准操作")
calibLayout = QVBoxLayout(calibGroup)
infoLabel1 = QLabel("标准校准操作将校准设备的测量值。请确保在执行校准前已正确连接设备。")
infoLabel1.setWordWrap(True)
calibLayout.addWidget(infoLabel1)
buttonLayout = QGridLayout()
self.calibrate4mAButton = QPushButton("4mA校准 (Command 37)")
self.calibrate20mAButton = QPushButton("20mA校准 (Command 36)")
self.calibrateZeroPointButton = QPushButton("零点校准 (Command 43)")
buttonLayout.addWidget(self.calibrate4mAButton, 0, 0)
buttonLayout.addWidget(self.calibrate20mAButton, 0, 1)
buttonLayout.addWidget(self.calibrateZeroPointButton, 1, 0, 1, 2)
calibLayout.addLayout(buttonLayout)
warningLabel1 = QLabel("警告:校准操作将修改设备的校准参数,请确保您知道您在做什么!")
warningLabel1.setStyleSheet("color: red;")
warningLabel1.setWordWrap(True)
calibLayout.addWidget(warningLabel1)
mainLayout.addWidget(calibGroup)
# --- 2. 环路电流微调区域 ---
trimGroup = QGroupBox("环路电流微调")
trimLayout = QVBoxLayout(trimGroup)
infoLabel2 = QLabel("环路电流微调用于精确调整4mA和20mA输出。请先进入固定电流模式使设备输出固定电流再使用高精度电流表测量实际输出电流然后输入测量值进行微调。")
infoLabel2.setWordWrap(True)
trimLayout.addWidget(infoLabel2)
trim4mAGroup = QGroupBox("4mA微调 (Command 45)")
trim4mALayout = QFormLayout(trim4mAGroup)
self.trim4mASpinBox = QDoubleSpinBox()
self.trim4mASpinBox.setRange(3.0, 5.0)
self.trim4mASpinBox.setSingleStep(0.001)
self.trim4mASpinBox.setDecimals(3)
self.trim4mASpinBox.setValue(4.0)
self.trim4mAButton = QPushButton("执行4mA微调")
trim4mALayout.addRow(QLabel("测量的电流值(mA):"), self.trim4mASpinBox)
trim4mALayout.addRow("", self.trim4mAButton)
trim20mAGroup = QGroupBox("20mA微调 (Command 46)")
trim20mALayout = QFormLayout(trim20mAGroup)
self.trim20mASpinBox = QDoubleSpinBox()
self.trim20mASpinBox.setRange(19.0, 21.0)
self.trim20mASpinBox.setSingleStep(0.001)
self.trim20mASpinBox.setDecimals(3)
self.trim20mASpinBox.setValue(20.0)
self.trim20mAButton = QPushButton("执行20mA微调")
trim20mALayout.addRow(QLabel("测量的电流值(mA):"), self.trim20mASpinBox)
trim20mALayout.addRow("", self.trim20mAButton)
trimLayout.addWidget(trim4mAGroup)
trimLayout.addWidget(trim20mAGroup)
warningLabel2 = QLabel("警告:微调操作将修改设备的校准参数,请确保使用高精度电流表测量!")
warningLabel2.setStyleSheet("color: red;")
warningLabel2.setWordWrap(True)
trimLayout.addWidget(warningLabel2)
mainLayout.addWidget(trimGroup)
# --- 3. 固定电流输出区域 ---
fixedCurrentGroup = QGroupBox("固定电流输出 (Command 40)")
fixedCurrentLayout = QVBoxLayout(fixedCurrentGroup)
infoLabel3 = QLabel("固定电流输出模式用于测试和校准目的。在此模式下,设备将输出固定电流值,不受测量值变化的影响。")
infoLabel3.setWordWrap(True)
fixedCurrentLayout.addWidget(infoLabel3)
self.enableFixedCurrentCheckBox = QCheckBox("启用固定电流输出")
fixedCurrentLayout.addWidget(self.enableFixedCurrentCheckBox)
fixedCurrentFormLayout = QFormLayout()
self.fixedCurrentSpinBox = QDoubleSpinBox()
self.fixedCurrentSpinBox.setRange(3.8, 21.0)
self.fixedCurrentSpinBox.setSingleStep(0.1)
self.fixedCurrentSpinBox.setDecimals(2)
self.fixedCurrentSpinBox.setValue(4.0)
self.fixedCurrentSpinBox.setEnabled(False)
fixedCurrentFormLayout.addRow(QLabel("固定电流值(mA):"), self.fixedCurrentSpinBox)
fixedCurrentLayout.addLayout(fixedCurrentFormLayout)
self.applyFixedCurrentButton = QPushButton("应用设置")
fixedCurrentLayout.addWidget(self.applyFixedCurrentButton)
warningLabel3 = QLabel("警告:启用固定电流输出模式将使设备不再响应测量值变化!请在完成测试后务必退出固定电流模式。")
warningLabel3.setStyleSheet("color: red;")
warningLabel3.setWordWrap(True)
fixedCurrentLayout.addWidget(warningLabel3)
mainLayout.addWidget(fixedCurrentGroup)
mainLayout.addStretch(1)
self.setLayout(mainLayout)
# --- 连接所有信号和槽 ---
self.calibrate4mAButton.clicked.connect(self.onCalibrate4mA)
self.calibrate20mAButton.clicked.connect(self.onCalibrate20mA)
self.calibrateZeroPointButton.clicked.connect(self.onCalibrateZeroPoint)
self.trim4mAButton.clicked.connect(self.onTrimLoopCurrent4mA)
self.trim20mAButton.clicked.connect(self.onTrimLoopCurrent20mA)
self.enableFixedCurrentCheckBox.toggled.connect(self.fixedCurrentSpinBox.setEnabled)
self.applyFixedCurrentButton.clicked.connect(self.onSetFixedCurrentOutput)
def setHartComm(self, hartComm):
"""设置HART通信对象"""
self.hartComm = hartComm
def onCalibrate4mA(self):
"""4mA校准 command 37"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
reply = QMessageBox.question(self, "确认", "确定要执行4mA校准吗此操作将修改设备校准参数。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.calibrate4mA()
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
QMessageBox.information(self, "成功", f"4mA校准成功!\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"4mA校准失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "4mA校准失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"4mA校准时发生异常: {str(e)}")
def onCalibrate20mA(self):
"""20mA校准 command 36"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
reply = QMessageBox.question(self, "确认", "确定要执行20mA校准吗此操作将修改设备校准参数。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.calibrate20mA()
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
QMessageBox.information(self, "成功", f"20mA校准成功!\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"20mA校准失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "20mA校准失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"20mA校准时发生异常: {str(e)}")
def onCalibrateZeroPoint(self):
"""零点校准 command 43"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
reply = QMessageBox.question(self, "确认", "确定要执行零点校准吗?此操作将修改设备校准参数。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.calibrateZeroPoint()
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
QMessageBox.information(self, "成功", f"零点校准成功!\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"零点校准失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "零点校准失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"零点校准时发生异常: {str(e)}")
def onTrimLoopCurrent4mA(self):
"""微调环路电流4mA command 45"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
measured_current = self.trim4mASpinBox.value()
reply = QMessageBox.question(self, "确认", f"确定要使用测量值 {measured_current} mA 执行4mA微调吗此操作将修改设备校准参数。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.trimLoopCurrent4mA(measured_current)
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
QMessageBox.information(self, "成功", f"4mA环路电流微调成功!\n测量值: {measured_current} mA\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"4mA环路电流微调失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "4mA环路电流微调失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"4mA环路电流微调时发生异常: {str(e)}")
def onTrimLoopCurrent20mA(self):
"""微调环路电流20mA command 46"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
measured_current = self.trim20mASpinBox.value()
reply = QMessageBox.question(self, "确认", f"确定要使用测量值 {measured_current} mA 执行20mA微调吗此操作将修改设备校准参数。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.trimLoopCurrent20mA(measured_current)
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
QMessageBox.information(self, "成功", f"20mA环路电流微调成功!\n测量值: {measured_current} mA\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"20mA环路电流微调失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "20mA环路电流微调失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"20mA环路电流微调时发生异常: {str(e)}")
def onSetFixedCurrentOutput(self):
"""设置固定电流输出/退出固定模式 command 40"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
enable = self.enableFixedCurrentCheckBox.isChecked()
current_value = self.fixedCurrentSpinBox.value() if enable else 0.0
if enable:
message = f"确定要启用固定电流输出模式,输出电流值为 {current_value} mA 吗?\n\n警告:启用此模式后,设备将不再响应测量值变化!"
else:
message = "确定要退出固定电流输出模式吗?"
reply = QMessageBox.question(self, "确认", message, QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply != QMessageBox.Yes:
return
try:
raw_response = self.hartComm.setFixedCurrentOutput(enable, current_value)
response_dict = raw_response[0] if isinstance(raw_response, list) and raw_response else raw_response
if isinstance(response_dict, dict) and response_dict.get("status") == "success":
details = self.formatResponseDetails(response_dict)
if enable:
QMessageBox.information(self, "成功", f"已启用固定电流输出模式!\n电流值: {current_value} mA\n\n{details}")
else:
QMessageBox.information(self, "成功", f"已退出固定电流输出模式!\n\n{details}")
elif isinstance(response_dict, dict):
error_msg = response_dict.get("error", "未知错误")
QMessageBox.critical(self, "错误", f"设置固定电流输出失败: {error_msg}")
else:
QMessageBox.critical(self, "错误", "设置固定电流输出失败: 无效的响应格式")
except Exception as e:
QMessageBox.critical(self, "异常", f"设置固定电流输出时发生异常: {str(e)}")
def formatResponseDetails(self, response):
"""格式化响应详细信息"""
details = ""
# 添加消息对象的详细信息
msg = response.get("msg", {})
if msg:
details += "消息详情:\n"
for key, value in msg.items():
# 处理二进制数据,转换为可读形式
if isinstance(value, bytes):
try:
# 尝试解码为ASCII字符串移除不可打印字符
printable_chars = "".join(chr(c) for c in value if 32 <= c <= 126)
if printable_chars:
value_str = f"{printable_chars} (HEX: {value.hex()})"
else:
value_str = f"HEX: {value.hex()}"
except:
value_str = f"HEX: {value.hex()}"
else:
value_str = str(value)
details += f" {key}: {value_str}\n"
# 添加原始数据的详细信息
raw_data = response.get("raw_data")
if raw_data:
details += "\n原始数据:\n"
details += f" HEX: {raw_data.hex()}"
return details

@ -0,0 +1,355 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART设备连接界面模块
实现设备连接断开等功能
"""
import serial
import serial.tools.list_ports
from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QComboBox, QGroupBox, QFormLayout,
QMessageBox, QSpinBox, QRadioButton)
from PyQt5.QtCore import Qt, pyqtSignal, QThread, QSize
from PyQt5.QtGui import QFont
import qtawesome
from protocol.HART import HARTCommunication, PrimaryMasterMode, SecondaryMasterMode, BurstModeON, BurstModeOFF
class ScanDeviceThread(QThread):
"""在后台扫描设备的线程"""
scanCompleted = pyqtSignal(object) # 扫描完成信号参数为找到的设备信息或None
def __init__(self, hartComm):
super().__init__()
self.hartComm = hartComm
def run(self):
result = self.hartComm.scanForDevice()
self.scanCompleted.emit(result)
class HartConnectionWidget(QWidget):
"""
HART设备连接界面模块
实现设备连接断开等功能
"""
# 自定义信号
connectionStatusChanged = pyqtSignal(bool) # 连接状态改变信号
hartCommCreated = pyqtSignal(object) # HART通信对象创建信号
def __init__(self):
"""
初始化连接界面
"""
super().__init__()
# 初始化变量
self.hartComm = None
self.isConnected = False
# 初始化界面
self.initUI()
# 检查COM7是否可用
self.refreshPortList()
def initUI(self):
"""
初始化用户界面
"""
# 创建主布局
mainLayout = QVBoxLayout()
mainLayout.setContentsMargins(20, 20, 20, 20)
mainLayout.setSpacing(20)
# 创建连接设置组
connectionGroup = QGroupBox("串口设置")
connectionGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
connectionLayout = QFormLayout()
try:
connectionLayout.setLabelAlignment(Qt.AlignRight)
except AttributeError:
pass # Linter may fail on this
self.portLabel = QLabel("COM7")
connectionLayout.addRow("串口:", self.portLabel)
self.baudrateLabel = QLabel("1200")
connectionLayout.addRow("波特率:", self.baudrateLabel)
self.parityLabel = QLabel("奇校验")
connectionLayout.addRow("校验方式:", self.parityLabel)
self.timeoutLabel = QLabel("3 秒")
connectionLayout.addRow("超时时间:", self.timeoutLabel)
connectionGroup.setLayout(connectionLayout)
mainLayout.addWidget(connectionGroup)
# 创建地址与模式设置组
configGroup = QGroupBox("寻址与模式")
configGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
configLayout = QFormLayout()
try:
configLayout.setLabelAlignment(Qt.AlignRight)
except AttributeError:
pass # Linter may fail on this
# 寻址方式
addressingLayout = QHBoxLayout()
self.rbSpecificAddress = QRadioButton("指定短地址")
self.rbSpecificAddress.setChecked(True)
self.rbScanAddress = QRadioButton("扫描设备(轮询)")
addressingLayout.addWidget(self.rbSpecificAddress)
addressingLayout.addWidget(self.rbScanAddress)
configLayout.addRow("寻址方式:", addressingLayout)
# 短地址输入
self.pollingAddressSpinBox = QSpinBox()
self.pollingAddressSpinBox.setRange(0, 63)
configLayout.addRow("轮询地址:", self.pollingAddressSpinBox)
# 主站模式
self.masterModeCombo = QComboBox()
self.masterModeCombo.addItems(["主要主站", "次要主站"])
configLayout.addRow("主站模式:", self.masterModeCombo)
# 突发模式
self.burstModeCombo = QComboBox()
self.burstModeCombo.addItems(["关闭", "开启"])
configLayout.addRow("突发模式:", self.burstModeCombo)
configGroup.setLayout(configLayout)
mainLayout.addWidget(configGroup)
# 创建按钮布局
buttonLayout = QHBoxLayout()
buttonLayout.setSpacing(10)
# 连接/扫描按钮
self.connectButton = QPushButton("连接设备")
self.connectButton.setObjectName("startProtocolBtn")
self.connectButton.setIcon(qtawesome.icon('fa5s.play-circle', color='#047857'))
self.connectButton.setIconSize(QSize(22, 22))
self.connectButton.clicked.connect(self.connectDevice)
self.scanButton = QPushButton("扫描设备")
self.scanButton.setObjectName("forceBtn") # Yellow style for scanning
self.scanButton.setIcon(qtawesome.icon('fa5s.search', color='#D97706'))
self.scanButton.setIconSize(QSize(22, 22))
self.scanButton.clicked.connect(self.scanDevice)
self.scanButton.setVisible(False) # 默认隐藏
buttonLayout.addWidget(self.connectButton)
buttonLayout.addWidget(self.scanButton)
# 信号连接
self.rbSpecificAddress.toggled.connect(self.updateAddressingMode)
# 断开按钮
self.disconnectButton = QPushButton("断开连接")
self.disconnectButton.setObjectName("delBtn") # Red style
self.disconnectButton.setIcon(qtawesome.icon('fa5s.stop-circle', color='#B91C1C'))
self.disconnectButton.setIconSize(QSize(22, 22))
self.disconnectButton.clicked.connect(self.disconnectDevice)
self.disconnectButton.setEnabled(False)
buttonLayout.addWidget(self.disconnectButton)
mainLayout.addLayout(buttonLayout)
# 添加状态标签
self.statusLabel = QLabel("未连接")
try:
self.statusLabel.setAlignment(Qt.AlignCenter)
except AttributeError:
pass # Linter may fail on this
self.statusLabel.setFont(QFont("Microsoft YaHei", 10))
self.statusLabel.setStyleSheet("color: red;")
mainLayout.addWidget(self.statusLabel)
# 添加弹性空间
mainLayout.addStretch(1)
# 设置主布局
self.setLayout(mainLayout)
def refreshPortList(self):
"""
刷新串口列表仅检查COM7是否可用
"""
ports = [port.device for port in serial.tools.list_ports.comports()]
# 检查COM7是否可用
if "COM7" in ports:
self.connectButton.setEnabled(True)
self.statusLabel.setText("未连接")
else:
self.connectButton.setEnabled(False)
self.statusLabel.setText("未检测到COM7串口设备")
def updateAddressingMode(self, checked):
is_specific = self.rbSpecificAddress.isChecked()
self.pollingAddressSpinBox.setEnabled(is_specific)
self.connectButton.setVisible(is_specific)
self.scanButton.setVisible(not is_specific)
def scanDevice(self):
"""开始扫描设备"""
if self.isConnected:
QMessageBox.information(self, "提示", "设备已连接。如需重新扫描,请先断开连接。")
return
if not self._ensure_serial_connection():
return
self.scanButton.setEnabled(False)
self.scanButton.setText("扫描中...")
self.statusLabel.setText("正在扫描设备 (0-63)...")
self.statusLabel.setStyleSheet("color: orange;")
self.scanThread = ScanDeviceThread(self.hartComm)
self.scanThread.scanCompleted.connect(self.onScanCompleted)
self.scanThread.start()
def onScanCompleted(self, result):
"""扫描完成后的处理"""
self.scanButton.setEnabled(True)
self.scanButton.setText("扫描设备")
if result and 'address' in result:
found_addr = result['address']
QMessageBox.information(self, "扫描成功", f"在轮询地址 {found_addr} 找到设备!\n已自动配置为该地址。")
self.pollingAddressSpinBox.setValue(found_addr)
self.rbSpecificAddress.setChecked(True) # 切换回指定地址模式
# 自动连接
self.connectDevice()
else:
QMessageBox.warning(self, "扫描失败", "未在任何轮询地址 (0-63) 找到设备。")
self.statusLabel.setText("扫描完成,未找到设备")
self.statusLabel.setStyleSheet("color: orange;")
def _ensure_serial_connection(self):
"""确保串口已连接并创建HART对象"""
if self.hartComm and self.hartComm.isConnected():
return True
if self.hartComm:
self.hartComm.disconnect()
port = "COM7"
baudrate = 1200
timeout = 3
parity = serial.PARITY_ODD
try:
self.hartComm = HARTCommunication(port=port, baudrate=baudrate, parity=parity, timeout=timeout)
if self.hartComm.connect():
self.hartCommCreated.emit(self.hartComm)
return True
else:
self.hartComm = None
QMessageBox.critical(self, "连接失败", "无法打开COM7串口请检查设备连接。")
return False
except Exception as e:
self.hartComm = None
QMessageBox.critical(self, "连接错误", f"连接过程中发生错误:{str(e)}")
return False
def connectDevice(self):
"""使用指定配置连接设备并验证"""
if self.isConnected:
QMessageBox.information(self, "提示", "设备已连接。如需更换地址或模式,请先断开连接。")
return
if not self._ensure_serial_connection() or not self.hartComm:
return
try:
# 应用界面上的配置
is_primary = self.masterModeCombo.currentIndex() == 0
self.hartComm.setMasterMode(isPrimaryMaster=is_primary)
is_burst_on = self.burstModeCombo.currentIndex() == 1
self.hartComm.setBurstMode(enable=is_burst_on)
polling_address = self.pollingAddressSpinBox.value()
self.hartComm.buildHartAddress(pollingAddress=polling_address, isLongFrame=False)
self.statusLabel.setText(f"正在验证地址 {polling_address}...")
self.statusLabel.setStyleSheet("color: orange;")
app_instance = QApplication.instance()
if app_instance:
app_instance.processEvents()
# 验证设备通信
device_info = self.hartComm.readUniqueId()
if isinstance(device_info, list) and device_info:
self.isConnected = True
self.statusLabel.setText(f"已连接 (地址: {polling_address})")
self.statusLabel.setStyleSheet("color: green;")
self.disconnectButton.setEnabled(True)
self.connectButton.setEnabled(False)
self.scanButton.setVisible(False)
self._set_config_controls_enabled(False)
self.connectionStatusChanged.emit(True)
QMessageBox.information(self, "连接成功", f"成功连接到地址为 {polling_address} 的设备。")
else:
# self.isConnected remains False
self.statusLabel.setText("连接失败")
self.statusLabel.setStyleSheet("color: red;")
self.connectionStatusChanged.emit(False)
QMessageBox.critical(self, "连接失败", f"在地址 {polling_address} 未找到设备或设备无响应。")
# 保持串口打开,允许用户尝试其他地址, 但需要一个断开按钮
self.disconnectButton.setEnabled(True)
except Exception as e:
self.isConnected = False
self.statusLabel.setText("连接错误")
self.statusLabel.setStyleSheet("color: red;")
self.connectionStatusChanged.emit(False)
QMessageBox.critical(self, "配置错误", f"应用配置时发生错误:{str(e)}")
def disconnectDevice(self, silent=False):
"""
断开设备连接
"""
try:
if self.hartComm:
self.hartComm.disconnect()
self.isConnected = False
self.statusLabel.setText("未连接")
self.statusLabel.setStyleSheet("color: red;")
self.disconnectButton.setEnabled(False)
self.connectButton.setEnabled(True)
self._set_config_controls_enabled(True)
self.updateAddressingMode(self.rbSpecificAddress.isChecked())
self.connectionStatusChanged.emit(False)
if not silent:
QMessageBox.information(self, "断开连接", "已成功断开设备连接!")
except Exception as e:
if not silent:
QMessageBox.warning(self, "断开连接错误", f"断开连接过程中发生错误:{str(e)}")
# 恢复UI到断开状态
self.isConnected = False
self.statusLabel.setText("未连接")
self.statusLabel.setStyleSheet("color: red;")
self.disconnectButton.setEnabled(False)
self.connectButton.setEnabled(True)
self._set_config_controls_enabled(True)
self.connectionStatusChanged.emit(False)
def _set_config_controls_enabled(self, enabled):
"""启用或禁用配置相关的控件"""
self.pollingAddressSpinBox.setEnabled(enabled)
self.masterModeCombo.setEnabled(enabled)
self.burstModeCombo.setEnabled(enabled)
self.rbSpecificAddress.setEnabled(enabled)
self.rbScanAddress.setEnabled(enabled)
def updatePollingAddress(self, new_address):
"""
从外部更新轮询地址的UI显示
"""
self.pollingAddressSpinBox.setValue(new_address)
if self.isConnected:
self.statusLabel.setText(f"已连接 (地址: {new_address})")

@ -0,0 +1,385 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART设备配置界面模块
实现HART设备参数配置功能包括查询和修改仪表基本信息标签描述及出厂日期装配号等
"""
import sys
import time
from datetime import datetime
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox,
QLabel, QLineEdit, QPushButton, QGridLayout,
QTabWidget, QMessageBox, QDateEdit, QSpinBox)
from PyQt5.QtCore import Qt, pyqtSignal, QDate
from PyQt5.QtGui import QFont
from protocol.HART import tools
class HartDeviceConfigWidget(QWidget):
"""
HART设备配置界面类
实现HART设备参数配置功能包括查询和修改仪表基本信息标签描述及出厂日期装配号等
"""
def __init__(self):
"""
初始化设备配置界面
"""
super().__init__()
# 初始化HART通信对象
self.hartComm = None
self.connectionWidget = None
# 初始化界面
self.initUI()
def initUI(self):
"""
初始化用户界面
"""
# 创建主布局
self.mainLayout = QVBoxLayout(self)
self.mainLayout.setContentsMargins(10, 10, 10, 10)
# --- 1. 仪表基本信息区域 ---
infoGroup = QGroupBox("仪表基本信息")
infoLayout = QGridLayout(infoGroup)
self.messageLabel = QLabel("设备消息:")
self.messageEdit = QLineEdit()
self.messageEdit.setMaxLength(32)
infoLayout.addWidget(self.messageLabel, 0, 0)
infoLayout.addWidget(self.messageEdit, 0, 1)
self.queryBasicInfoBtn = QPushButton("查询信息")
self.updateBasicInfoBtn = QPushButton("更新信息")
infoButtonLayout = QHBoxLayout()
infoButtonLayout.addWidget(self.queryBasicInfoBtn)
infoButtonLayout.addWidget(self.updateBasicInfoBtn)
self.mainLayout.addWidget(infoGroup)
self.mainLayout.addLayout(infoButtonLayout)
# --- 2. 仪表标签与描述区域 ---
tagGroup = QGroupBox("仪表标签与描述")
tagLayout = QGridLayout(tagGroup)
self.tagLabel = QLabel("设备标签:")
self.tagEdit = QLineEdit()
self.tagEdit.setMaxLength(8)
tagLayout.addWidget(self.tagLabel, 0, 0)
tagLayout.addWidget(self.tagEdit, 0, 1)
self.descLabel = QLabel("设备描述:")
self.descEdit = QLineEdit()
self.descEdit.setMaxLength(16)
tagLayout.addWidget(self.descLabel, 1, 0)
tagLayout.addWidget(self.descEdit, 1, 1)
self.dateLabel = QLabel("出厂日期:")
self.dateEdit = QDateEdit()
self.dateEdit.setDisplayFormat("dd/MM/yyyy")
self.dateEdit.setCalendarPopup(True)
self.dateEdit.setDate(QDate.currentDate())
tagLayout.addWidget(self.dateLabel, 2, 0)
tagLayout.addWidget(self.dateEdit, 2, 1)
self.queryTagDescBtn = QPushButton("查询信息")
self.updateTagDescBtn = QPushButton("更新信息")
tagButtonLayout = QHBoxLayout()
tagButtonLayout.addWidget(self.queryTagDescBtn)
tagButtonLayout.addWidget(self.updateTagDescBtn)
self.mainLayout.addWidget(tagGroup)
self.mainLayout.addLayout(tagButtonLayout)
# --- 3. 仪表装配号区域 ---
assemblyGroup = QGroupBox("仪表装配号")
assemblyLayout = QGridLayout(assemblyGroup)
self.assemblyLabel = QLabel("装配号:")
self.assemblyEdit = QSpinBox()
self.assemblyEdit.setRange(0, 16777215)
self.assemblyEdit.setButtonSymbols(QSpinBox.NoButtons)
assemblyLayout.addWidget(self.assemblyLabel, 0, 0)
assemblyLayout.addWidget(self.assemblyEdit, 0, 1)
self.queryAssemblyBtn = QPushButton("查询装配号")
self.updateAssemblyBtn = QPushButton("更新装配号")
assemblyButtonLayout = QHBoxLayout()
assemblyButtonLayout.addWidget(self.queryAssemblyBtn)
assemblyButtonLayout.addWidget(self.updateAssemblyBtn)
self.mainLayout.addWidget(assemblyGroup)
self.mainLayout.addLayout(assemblyButtonLayout)
# --- 4. 轮询地址修改区域 ---
pollingGroup = QGroupBox("轮询地址修改 (仅短帧)")
pollingLayout = QGridLayout(pollingGroup)
self.pollingAddressLabel = QLabel("新轮询地址:")
self.pollingAddressSpinBox = QSpinBox()
self.pollingAddressSpinBox.setRange(0, 63)
pollingLayout.addWidget(self.pollingAddressLabel, 0, 0)
pollingLayout.addWidget(self.pollingAddressSpinBox, 0, 1)
self.updatePollingAddressBtn = QPushButton("更新轮询地址")
pollingButtonLayout = QHBoxLayout()
pollingButtonLayout.addWidget(self.updatePollingAddressBtn)
self.mainLayout.addWidget(pollingGroup)
self.mainLayout.addLayout(pollingButtonLayout)
self.mainLayout.addStretch(1)
# --- 连接所有信号和槽 ---
self.queryBasicInfoBtn.clicked.connect(self.queryBasicInfo)
self.updateBasicInfoBtn.clicked.connect(self.updateBasicInfo)
self.queryTagDescBtn.clicked.connect(self.queryTagDesc)
self.updateTagDescBtn.clicked.connect(self.updateTagDesc)
self.queryAssemblyBtn.clicked.connect(self.queryAssembly)
self.updateAssemblyBtn.clicked.connect(self.updateAssembly)
self.updatePollingAddressBtn.clicked.connect(self.updatePollingAddress)
def setHartComm(self, hartComm):
"""
设置HART通信对象
Args:
hartComm: HART通信对象
"""
self.hartComm = hartComm
def setConnectionWidget(self, widget):
"""设置连接窗口的引用"""
self.connectionWidget = widget
def queryBasicInfo(self):
"""
查询仪表基本信息 - command 12
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 发送查询命令
result = self.hartComm.readMessage()
# print(result)
# 处理响应数据
if result:
# 兼容处理如果result是列表则取第一个元素
msg = result[0] if isinstance(result, list) and len(result) > 0 else result
if isinstance(msg, dict):
message_data = msg.get('message')
if isinstance(message_data, bytes):
try:
# 使用新的 unpack_packed_ascii 函数
message_str = tools.unpack_packed_ascii(message_data, 32)
self.messageEdit.setText(message_str.strip())
except Exception as e:
# 如果解码失败,显示原始数据的十六进制表示和错误
message_hex = message_data.hex()
self.messageEdit.setText(f"HEX: {message_hex}")
QMessageBox.critical(self, "解码错误", f"解码消息失败: {e}")
return
QMessageBox.warning(self, "警告", "未能获取设备基本信息")
except Exception as e:
QMessageBox.critical(self, "错误", f"查询设备基本信息失败: {str(e)}")
def updateBasicInfo(self):
"""
修改仪表基本信息 - command 17
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 获取消息内容
message_text = self.messageEdit.text()
# 直接将字符串传递给通信函数,由底层负责打包
result = self.hartComm.writeMessage(message_text)
# 处理响应数据
if result and len(result) > 0:
QMessageBox.information(self, "成功", "成功更新设备基本信息")
else:
QMessageBox.warning(self, "警告", "未能更新设备基本信息")
except Exception as e:
QMessageBox.critical(self, "错误", f"更新设备基本信息失败: {str(e)}")
def queryTagDesc(self):
"""
查询仪表标签描述及出厂日期 - command 13
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 发送查询命令
result = self.hartComm.readTagDescriptorDate()
# 处理响应数据
if result:
# 兼容处理如果result是列表则取第一个元素
msg = result[0] if isinstance(result, list) and len(result) > 0 else result
if isinstance(msg, dict) and msg.get("status") == "success":
tag_str = msg.get("device_tag_name", "")
desc_str = msg.get("device_descriptor", "")
date_dict = msg.get("date", {})
self.tagEdit.setText(tag_str.strip())
self.descEdit.setText(desc_str.strip())
if isinstance(date_dict, dict) and all(k in date_dict for k in ['day', 'month', 'year']):
day = date_dict.get('day', 1)
month = date_dict.get('month', 1)
year = date_dict.get('year', 1900)
self.dateEdit.setDate(QDate(year, month, day))
QMessageBox.information(self, "成功", "成功读取设备标签、描述及日期信息")
return
elif isinstance(msg, dict) and 'error' in msg:
QMessageBox.warning(self, "查询失败", f"未能获取设备标签、描述及日期信息: {msg.get('error')}")
return
QMessageBox.warning(self, "警告", "未能获取设备标签、描述及日期信息")
except Exception as e:
QMessageBox.critical(self, "错误", f"查询设备标签、描述及日期信息失败: {str(e)}")
def updateTagDesc(self):
"""
修改仪表标签描述及出厂日期 - command 18
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 获取标签和描述
tag_text = self.tagEdit.text()
descriptor_text = self.descEdit.text()
date = self.dateEdit.date()
# According to the spec, year should be (actual_year - 1900)
year_for_protocol = date.year() - 1900
if not (0 <= year_for_protocol <= 255):
QMessageBox.warning(self, "年份错误", "年份必须在1900年至2155年之间")
return
date_tuple = (date.day(), date.month(), year_for_protocol)
# 直接将字符串传递给通信函数,由底层负责打包
result = self.hartComm.writeTagDescriptorDate(tag_text, descriptor_text, date_tuple)
# 处理响应数据
if result and len(result) > 0:
QMessageBox.information(self, "成功", "成功更新设备标签、描述及日期信息")
else:
QMessageBox.warning(self, "警告", "未能更新设备标签、描述及日期信息")
except Exception as e:
QMessageBox.critical(self, "错误", f"更新设备标签、描述及日期信息失败: {str(e)}")
def queryAssembly(self):
"""
查询仪表装配号 - command 16
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 发送查询命令
result = self.hartComm.readFinalAssemblyNumber()
# 处理响应数据
if result:
# 兼容处理如果result是列表则取第一个元素
msg = result[0] if isinstance(result, list) and len(result) > 0 else result
if isinstance(msg, dict) and msg.get("status") == "success":
assembly_no = msg.get("final_assembly_no", 0)
self.assemblyEdit.setValue(assembly_no)
QMessageBox.information(self, "成功", f"成功读取设备装配号: {assembly_no}")
return
elif isinstance(msg, dict) and 'error' in msg:
QMessageBox.warning(self, "查询失败", f"未能获取设备装配号: {msg.get('error')}")
return
QMessageBox.warning(self, "警告", "未能获取设备装配号")
except Exception as e:
QMessageBox.critical(self, "错误", f"查询设备装配号失败: {str(e)}")
def updateAssembly(self):
"""
修改仪表装配号 - command 19
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
# 获取装配号
assembly_number = self.assemblyEdit.value()
# 发送更新命令
result = self.hartComm.writeFinalAssemblyNumber(assembly_number)
# 处理响应数据
if result and len(result) > 0:
QMessageBox.information(self, "成功", "成功更新设备装配号")
else:
QMessageBox.warning(self, "警告", "未能更新设备装配号")
except Exception as e:
QMessageBox.critical(self, "错误", f"更新设备装配号失败: {str(e)}")
def updatePollingAddress(self):
"""
修改设备轮询地址 - command 6
"""
if not self.hartComm or not self.hartComm.isConnected():
QMessageBox.warning(self, "警告", "设备未连接,请先连接设备")
return
try:
new_address = self.pollingAddressSpinBox.value()
current_address_bytes = self.hartComm.getCurrentAddress()
if current_address_bytes and len(current_address_bytes) == 1:
current_address = current_address_bytes[0] & 0x3F
if new_address == current_address:
QMessageBox.information(self, "提示", "新地址与当前地址相同,无需修改。")
return
reply = QMessageBox.question(self, '确认修改',
f"确定要将设备的轮询地址修改为 {new_address} 吗?\n"
"修改成功后将使用新地址自动重新连接。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
result = self.hartComm.write_polling_address(new_address)
if result:
QMessageBox.information(self, "成功", f"成功将轮询地址修改为 {new_address}\n现在将使用新地址重新连接。")
if self.connectionWidget:
self.connectionWidget.disconnectDevice(silent=True)
self.connectionWidget.pollingAddressSpinBox.setValue(new_address)
self.connectionWidget.connectDevice()
else:
QMessageBox.warning(self, "警告", "未能更新轮询地址。")
except ValueError as ve:
QMessageBox.critical(self, "地址错误", f"轮询地址设置错误: {str(ve)}")
except Exception as e:
QMessageBox.critical(self, "错误", f"更新轮询地址失败: {str(e)}")

@ -0,0 +1,439 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART设备信息显示界面模块
实现设备唯一标识等信息的显示
"""
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout, QTextEdit,
QTableWidget, QTableWidgetItem, QHeaderView, QMessageBox,
QProgressBar, QSplitter)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QTextCursor
class ReadDeviceInfoThread(QThread):
"""
读取设备信息的线程类
避免在UI线程中进行耗时操作
"""
# 自定义信号
infoReceived = pyqtSignal(list) # 信息接收信号
errorOccurred = pyqtSignal(str) # 错误信号
progressUpdated = pyqtSignal(int, str) # 进度更新信号(百分比, 描述)
def __init__(self, hartComm):
"""
初始化线程
Args:
hartComm: HART通信对象
"""
super().__init__()
self.hartComm = hartComm
self.isRunning = True
def stop(self):
"""
停止线程执行
"""
self.isRunning = False
def run(self):
"""
线程运行函数
读取设备信息和变量数据
"""
try:
if not self.hartComm:
raise Exception("HART通信对象未初始化")
deviceInfo = []
# 更新进度
self.progressUpdated.emit(10, "正在读取设备唯一标识...")
if not self.isRunning: return
# 读取设备唯一标识
try:
result = self.hartComm.readUniqueId()
if isinstance(result, list):
deviceInfo.extend(result)
elif not isinstance(result, dict) or 'error' not in result:
deviceInfo.append(result)
except Exception as e:
self.errorOccurred.emit(f"读取设备唯一标识失败: {e}")
return
# 变量信息读取已移至 HartVariableInfoWidget
self.progressUpdated.emit(80, "正在读取设备唯一标识...")
# 完成读取
self.progressUpdated.emit(100, "读取完成")
if self.isRunning:
self.infoReceived.emit(deviceInfo)
except Exception as e:
self.errorOccurred.emit(str(e))
class HartDeviceInfoWidget(QWidget):
"""
HART设备信息显示界面模块
实现设备唯一标识等信息的显示
"""
def __init__(self, parent=None):
"""
初始化设备信息界面
"""
super().__init__(parent)
# 初始化变量
self.hartComm = None
self.deviceInfo = None
self.readThread = None
# 初始化界面
self.initUI()
def initUI(self):
"""
初始化用户界面
"""
# 创建主布局
mainLayout = QVBoxLayout()
mainLayout.setContentsMargins(20, 20, 20, 20)
mainLayout.setSpacing(20)
# 创建分割器
splitter = QSplitter(Qt.Vertical)
# 创建设备信息组
deviceInfoGroup = QGroupBox("设备基本信息")
deviceInfoGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
deviceInfoLayout = QFormLayout()
deviceInfoLayout.setLabelAlignment(Qt.AlignRight)
deviceInfoLayout.setFieldGrowthPolicy(QFormLayout.AllNonFixedFieldsGrow)
# 设备信息字段
self.manufacturerIdLabel = QLabel("--")
deviceInfoLayout.addRow("制造商ID:", self.manufacturerIdLabel)
self.deviceTypeLabel = QLabel("--")
deviceInfoLayout.addRow("设备类型:", self.deviceTypeLabel)
self.deviceIdLabel = QLabel("--")
deviceInfoLayout.addRow("设备ID:", self.deviceIdLabel)
self.preambleCountLabel = QLabel("--")
deviceInfoLayout.addRow("前导符数量:", self.preambleCountLabel)
self.universalRevLabel = QLabel("--")
deviceInfoLayout.addRow("通用命令版本:", self.universalRevLabel)
self.specificRevLabel = QLabel("--")
deviceInfoLayout.addRow("特定命令版本:", self.specificRevLabel)
self.softwareRevLabel = QLabel("--")
deviceInfoLayout.addRow("软件版本:", self.softwareRevLabel)
self.hardwareRevLabel = QLabel("--")
deviceInfoLayout.addRow("硬件版本:", self.hardwareRevLabel)
self.deviceStatusLabel = QLabel("--")
deviceInfoLayout.addRow("设备状态:", self.deviceStatusLabel)
self.commandNameLabel = QLabel("--")
deviceInfoLayout.addRow("命令名称:", self.commandNameLabel)
self.responseCodeLabel = QLabel("--")
deviceInfoLayout.addRow("响应代码:", self.responseCodeLabel)
self.flagsLabel = QLabel("--")
deviceInfoLayout.addRow("标志:", self.flagsLabel)
# 设置布局
deviceInfoGroup.setLayout(deviceInfoLayout)
# 变量信息组已移至 HartVariableInfoWidget
# 添加到分割器
splitter.addWidget(deviceInfoGroup)
mainLayout.addWidget(splitter)
# 创建原始数据组
rawDataGroup = QGroupBox("原始数据")
rawDataGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
rawDataLayout = QVBoxLayout()
# 原始数据文本框
self.rawDataText = QTextEdit()
self.rawDataText.setReadOnly(True)
self.rawDataText.setFont(QFont("Consolas", 9))
self.rawDataText.setLineWrapMode(QTextEdit.NoWrap)
self.rawDataText.setHorizontalScrollBarPolicy(Qt.ScrollBarAsNeeded)
self.rawDataText.setMinimumHeight(120)
rawDataLayout.addWidget(self.rawDataText)
# 设置布局
rawDataGroup.setLayout(rawDataLayout)
splitter.addWidget(rawDataGroup)
splitter.setSizes([300, 200])
splitter.setStretchFactor(0, 1)
splitter.setStretchFactor(1, 1)
# 创建进度条
progressLayout = QHBoxLayout()
self.progressBar = QProgressBar()
self.progressBar.setRange(0, 100)
self.progressBar.setValue(0)
self.progressBar.setVisible(False)
self.progressLabel = QLabel("准备就绪")
progressLayout.addWidget(self.progressBar)
progressLayout.addWidget(self.progressLabel)
mainLayout.addLayout(progressLayout)
# 创建按钮布局
buttonLayout = QHBoxLayout()
buttonLayout.setSpacing(10)
# 读取按钮
self.readButton = QPushButton("读取设备信息")
self.readButton.clicked.connect(self.readDeviceInfo)
self.readButton.setEnabled(False) # 初始禁用
buttonLayout.addWidget(self.readButton)
# 取消按钮
self.cancelButton = QPushButton("取消")
self.cancelButton.setEnabled(False)
self.cancelButton.clicked.connect(self.cancelOperation)
buttonLayout.addWidget(self.cancelButton)
# 清除按钮
self.clearButton = QPushButton("清除显示")
self.clearButton.clicked.connect(self.clearDisplay)
buttonLayout.addWidget(self.clearButton)
mainLayout.addLayout(buttonLayout)
# 设置主布局
self.setLayout(mainLayout)
def setHartComm(self, hartComm):
"""
设置HART通信对象
Args:
hartComm: HART通信对象
"""
self.hartComm = hartComm
self.readButton.setEnabled(hartComm is not None)
def readDeviceInfo(self):
"""
读取设备信息
"""
if not self.hartComm:
QMessageBox.warning(self, "警告", "未连接到HART设备")
return
# 清除显示
self.clearDisplay()
# 禁用读取按钮,启用取消按钮
self.readButton.setEnabled(False)
self.readButton.setText("正在读取...")
self.cancelButton.setEnabled(True)
# 显示进度条
self.progressBar.setValue(0)
self.progressBar.setVisible(True)
self.progressLabel.setText("准备读取设备信息...")
# 创建并启动读取线程
self.readThread = ReadDeviceInfoThread(self.hartComm)
self.readThread.infoReceived.connect(self.updateDeviceInfo)
self.readThread.errorOccurred.connect(self.handleError)
self.readThread.progressUpdated.connect(self.updateProgress)
self.readThread.finished.connect(self.onReadFinished)
self.readThread.start()
def updateDeviceInfo(self, deviceInfo):
"""
更新设备信息显示
Args:
deviceInfo: 设备信息列表
"""
if not deviceInfo:
QMessageBox.warning(self, "警告", "未收到任何设备信息!")
return
self.rawDataText.clear()
for item in deviceInfo:
msg = item[0] if isinstance(item, list) and len(item) > 0 else item
if not isinstance(msg, dict):
self.rawDataText.append(f"接收到非字典格式的消息: {type(msg)}\n")
print(f"接收到非字典格式的消息: {type(msg)}, 内容: {msg}")
continue
try:
command_name = msg.get('command_name', '未知命令')
self.setLabelIfValueExists(self.manufacturerIdLabel, msg, 'manufacturer_id')
self.setLabelIfValueExists(self.deviceTypeLabel, msg, 'manufacturer_device_type')
self.setLabelIfValueExists(self.deviceIdLabel, msg, 'device_id')
self.setLabelIfValueExists(self.preambleCountLabel, msg, 'number_response_preamble_characters')
self.setLabelIfValueExists(self.universalRevLabel, msg, 'universal_command_revision_level')
self.setLabelIfValueExists(self.specificRevLabel, msg, 'transmitter_specific_command_revision_level')
self.setLabelIfValueExists(self.softwareRevLabel, msg, 'software_revision_level')
self.setLabelIfValueExists(self.hardwareRevLabel, msg, 'hardware_revision_level')
if command_name != '未知命令':
self.commandNameLabel.setText(command_name)
self.setLabelIfValueExists(self.responseCodeLabel, msg, 'response_code')
if 'device_status' in msg:
status_text = self.formatDeviceStatus(msg['device_status'])
self.flagsLabel.setText(status_text)
self.rawDataText.append(f"--- 接收到的报文: {command_name} ---")
for key, value in msg.items():
if isinstance(value, bytes):
value_str = ' '.join(f'{b:02x}' for b in value)
self.rawDataText.append(f"{key}: {value_str} (hex)")
else:
self.rawDataText.append(f"{key}: {value}")
self.rawDataText.append("\n")
self.rawDataText.moveCursor(QTextCursor.End)
except Exception as e:
error_text = f"处理报文数据时出错: {str(e)}\n"
self.rawDataText.append(error_text)
print(error_text)
continue
def formatDeviceStatus(self, status_byte):
"""
格式化设备状态字节
Args:
status_byte: 状态字节
Returns:
str: 格式化的状态描述
"""
status_text = []
# 解析状态位
if status_byte & 0x80:
status_text.append("设备故障")
if status_byte & 0x40:
status_text.append("配置已更改")
if status_byte & 0x20:
status_text.append("冷启动")
if status_byte & 0x10:
status_text.append("更多状态可用")
if status_byte & 0x08:
status_text.append("回路电流固定")
if status_byte & 0x04:
status_text.append("回路电流饱和")
if status_byte & 0x02:
status_text.append("非主变量超出范围")
if status_byte & 0x01:
status_text.append("主变量超出范围")
if not status_text:
return "正常"
return ", ".join(status_text)
def setLabelIfValueExists(self, label, data_dict, key, formatter=None):
"""
仅当 data_dict 中存在键且值有效时才更新对应标签
"""
value = data_dict.get(key)
if value is not None and value != "":
label.setText(formatter(value) if formatter else str(value))
def handleError(self, errorMsg):
"""
处理错误
Args:
errorMsg: 错误信息
"""
QMessageBox.critical(self, "错误", f"读取设备信息时发生错误:{errorMsg}")
def updateProgress(self, value, message):
"""
更新进度条
Args:
value: 进度值(0-100)
message: 进度消息
"""
self.progressBar.setValue(value)
self.progressLabel.setText(message)
def cancelOperation(self):
"""
取消当前操作
"""
if self.readThread and self.readThread.isRunning():
# 停止线程
self.readThread.stop()
self.readThread.wait()
# 更新界面
self.progressLabel.setText("操作已取消")
QMessageBox.information(self, "操作取消", "读取设备信息操作已取消")
# 恢复按钮状态
self.onReadFinished()
def onReadFinished(self):
"""
读取完成时的处理
"""
# 恢复按钮状态
self.readButton.setEnabled(self.hartComm is not None)
self.readButton.setText("读取设备信息")
self.cancelButton.setEnabled(False)
# 隐藏进度条
self.progressBar.setVisible(False)
def clearDisplay(self):
"""
清除显示内容
"""
# 清除所有标签
self.manufacturerIdLabel.setText("--")
self.deviceTypeLabel.setText("--")
self.deviceIdLabel.setText("--")
self.preambleCountLabel.setText("--")
self.universalRevLabel.setText("--")
self.specificRevLabel.setText("--")
self.softwareRevLabel.setText("--")
self.hardwareRevLabel.setText("--")
self.deviceStatusLabel.setText("--")
self.commandNameLabel.setText("--")
self.responseCodeLabel.setText("--")
self.flagsLabel.setText("--")
# 变量信息已移至新窗口
# 清除原始数据
self.rawDataText.clear()
# 重置进度条
self.progressBar.setValue(0)
self.progressBar.setVisible(False)
self.progressLabel.setText("准备就绪")

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART通信主窗口类
实现基于PyQt的HART通信界面主窗口包含多个功能选项卡
"""
import sys
from PyQt5.QtWidgets import QApplication, QTabWidget, QVBoxLayout, QWidget
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QFont
# 导入自定义模块
from UI.HartWidgets.HartConnectionWidget import HartConnectionWidget
from UI.HartWidgets.HartDeviceInfoWidget import HartDeviceInfoWidget
from UI.HartWidgets.HartDeviceConfigWidget import HartDeviceConfigWidget
from UI.HartWidgets.HartSensorConfigWidget import HartSensorConfigWidget
from UI.HartWidgets.HartCalibrationWidget import HartCalibrationWidget
from UI.HartWidgets.HartVariableInfoWidget import HartVariableInfoWidget
class HartMainWindow(QWidget):
"""
HART通信主窗口类
实现基于PyQt的HART通信界面主窗口包含多个功能选项卡
"""
def __init__(self):
"""
初始化主窗口
"""
super().__init__()
# 创建主布局
self.mainLayout = QVBoxLayout(self)
self.mainLayout.setContentsMargins(10, 10, 10, 10)
# 创建选项卡窗口部件
self.tabWidget = QTabWidget()
self.tabWidget.setObjectName("hartTabWidget")
self.tabWidget.tabBar().setObjectName("hartTabBar")
self.tabWidget.setFont(QFont("Microsoft YaHei", 10))
self.mainLayout.addWidget(self.tabWidget)
# 初始化HART通信对象
self.hartComm = None
# 初始化界面
self.initUI()
def initUI(self):
"""
初始化用户界面
"""
# 创建连接设备选项卡
self.connectionWidget = HartConnectionWidget()
self.tabWidget.addTab(self.connectionWidget, "设备连接")
# 创建设备信息选项卡
self.deviceInfoWidget = HartDeviceInfoWidget()
self.tabWidget.addTab(self.deviceInfoWidget, "设备信息")
# 创建变量信息选项卡
self.variableInfoWidget = HartVariableInfoWidget()
self.tabWidget.addTab(self.variableInfoWidget, "变量信息")
# 创建传感器配置选项卡
self.sensorConfigWidget = HartSensorConfigWidget()
self.tabWidget.addTab(self.sensorConfigWidget, "传感器配置")
# 创建校准功能选项卡
self.calibrationWidget = HartCalibrationWidget()
self.tabWidget.addTab(self.calibrationWidget, "校准功能")
# 创建设备配置选项卡
self.deviceConfigWidget = HartDeviceConfigWidget()
self.tabWidget.addTab(self.deviceConfigWidget, "设备配置")
# 将 connectionWidget 实例传递给 deviceConfigWidget
self.deviceConfigWidget.setConnectionWidget(self.connectionWidget)
# 连接信号和槽
self.connectionWidget.connectionStatusChanged.connect(self.onConnectionStatusChanged)
self.connectionWidget.hartCommCreated.connect(self.onHartCommCreated)
def onConnectionStatusChanged(self, connected):
"""
连接状态改变时的处理函数
Args:
connected (bool): 连接状态
"""
# 根据连接状态启用或禁用其他选项卡
for i in range(1, self.tabWidget.count()):
self.tabWidget.setTabEnabled(i, connected)
def onHartCommCreated(self, hartComm):
"""
HART通信对象创建时的处理函数
Args:
hartComm (HARTCommunication): HART通信对象
"""
self.hartComm = hartComm
# 将HART通信对象传递给其他选项卡
self.deviceInfoWidget.setHartComm(hartComm)
self.variableInfoWidget.setHartComm(hartComm)
self.sensorConfigWidget.setHartComm(hartComm)
self.calibrationWidget.setHartComm(hartComm)
self.deviceConfigWidget.setHartComm(hartComm)

@ -0,0 +1,316 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGroupBox,
QLabel, QPushButton, QFormLayout, QMessageBox,
QComboBox, QDoubleSpinBox, QCompleter)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from protocol.HART import common
class ReadInfoThread(QThread):
"""读取或写入信息的线程类"""
infoReceived = pyqtSignal(dict, str)
errorOccurred = pyqtSignal(str, str)
def __init__(self, hartComm, command_name, params=None):
super().__init__()
self.hartComm = hartComm
self.command_name = command_name
self.params = params if params is not None else {}
self._is_running = True
def stop(self):
self._is_running = False
def run(self):
try:
if not self.hartComm:
raise Exception("HART通信对象未初始化")
func = getattr(self.hartComm, self.command_name, None)
if not func:
raise Exception(f"未找到名为 {self.command_name} 的通信方法")
response = func(**self.params)
if self._is_running:
msg = response[0] if isinstance(response, list) and response else response
if msg and isinstance(msg, dict):
if msg.get("status") != "fail":
self.infoReceived.emit(msg, self.command_name)
else:
error_msg = msg.get("error", "未知错误")
self.errorOccurred.emit(f"命令 {self.command_name} 执行失败: {error_msg}", self.command_name)
else:
self.errorOccurred.emit(f"收到无效响应: {response}", self.command_name)
except Exception as e:
self.errorOccurred.emit(str(e), self.command_name)
class HartSensorConfigWidget(QWidget):
def __init__(self, parent=None):
super(HartSensorConfigWidget, self).__init__(parent)
self.hartComm = None
self.readThreads = {}
self.units_map = common.UNITS_CODE
self.reverse_units_map = common.REVERSE_UNITS_CODE
self.transfer_fn_map = common.TRANSFER_FUNCTION_CODE
self.reverse_transfer_fn_map = common.REVERSE_TRANSFER_FUNCTION_CODE
self.initUI()
def initUI(self):
mainLayout = QVBoxLayout(self)
displayLayout = QHBoxLayout()
pvInfoGroup = QGroupBox("传感器信息 (Command 14)")
pvInfoForm = QFormLayout()
self.serialNoLabel = QLabel("--")
self.sensorUpperLimitLabel = QLabel("--")
self.sensorLowerLimitLabel = QLabel("--")
self.sensorUnitLabel = QLabel("--")
self.minSpanLabel = QLabel("--")
pvInfoForm.addRow("传感器序列号:", self.serialNoLabel)
pvInfoForm.addRow("传感器上限:", self.sensorUpperLimitLabel)
pvInfoForm.addRow("传感器下限:", self.sensorLowerLimitLabel)
pvInfoForm.addRow("传感器单位:", self.sensorUnitLabel)
pvInfoForm.addRow("最小量程:", self.minSpanLabel)
self.readPvInfoButton = QPushButton("读取传感器信息")
self.readPvInfoButton.clicked.connect(lambda: self.readInfo('readPrimaryVariableInformation'))
pvInfoForm.addRow(self.readPvInfoButton)
pvInfoGroup.setLayout(pvInfoForm)
outputInfoGroup = QGroupBox("输出信息 (Command 15)")
outputInfoForm = QFormLayout()
self.alarmCodeLabel = QLabel("--")
self.transferFnLabel = QLabel("--")
self.pvRangeUnitLabel = QLabel("--")
self.pvUpperRangeLabel = QLabel("--")
self.pvLowerRangeLabel = QLabel("--")
self.dampingLabel = QLabel("--")
self.writeProtectLabel = QLabel("--")
self.privateLabelLabel = QLabel("--")
outputInfoForm.addRow("报警动作:", self.alarmCodeLabel)
outputInfoForm.addRow("输出特性:", self.transferFnLabel)
outputInfoForm.addRow("PV量程单位:", self.pvRangeUnitLabel)
outputInfoForm.addRow("PV上限值:", self.pvUpperRangeLabel)
outputInfoForm.addRow("PV下限值:", self.pvLowerRangeLabel)
outputInfoForm.addRow("阻尼值(s):", self.dampingLabel)
outputInfoForm.addRow("写保护:", self.writeProtectLabel)
outputInfoForm.addRow("私有标签:", self.privateLabelLabel)
self.readOutputInfoButton = QPushButton("读取输出信息")
self.readOutputInfoButton.clicked.connect(lambda: self.readInfo('readOutputInformation'))
outputInfoForm.addRow(self.readOutputInfoButton)
outputInfoGroup.setLayout(outputInfoForm)
displayLayout.addWidget(pvInfoGroup)
displayLayout.addWidget(outputInfoGroup)
mainLayout.addLayout(displayLayout)
configLayout = QHBoxLayout()
writeGroup = QGroupBox("写入配置")
writeForm = QFormLayout()
self.rangeUnitComboBox = QComboBox()
self.rangeUnitComboBox.setEditable(True)
self.rangeUnitComboBox.addItems(sorted(self.units_map.values()))
completer = QCompleter(self.rangeUnitComboBox.model())
completer.setFilterMode(Qt.MatchContains)
completer.setCaseSensitivity(Qt.CaseInsensitive)
self.rangeUnitComboBox.setCompleter(completer)
self.upperRangeSpinBox = QDoubleSpinBox()
self.upperRangeSpinBox.setRange(-999999.0, 999999.0)
self.upperRangeSpinBox.setDecimals(4)
self.lowerRangeSpinBox = QDoubleSpinBox()
self.lowerRangeSpinBox.setRange(-999999.0, 999999.0)
self.lowerRangeSpinBox.setDecimals(4)
self.dampingSpinBox = QDoubleSpinBox()
self.dampingSpinBox.setRange(0, 100)
self.dampingSpinBox.setDecimals(2)
self.transferFnComboBox = QComboBox()
self.transferFnComboBox.addItems(self.transfer_fn_map.values())
self.setRangeButton = QPushButton("写入量程 (Cmd 35)")
self.setRangeButton.clicked.connect(self.onSetRange)
self.setDampingButton = QPushButton("写入阻尼 (Cmd 34)")
self.setDampingButton.clicked.connect(self.onSetDamping)
self.setTransferFnButton = QPushButton("写入输出函数 (Cmd 54)")
self.setTransferFnButton.clicked.connect(self.onSetTransferFunction)
writeForm.addRow("量程单位:", self.rangeUnitComboBox)
writeForm.addRow("量程上限:", self.upperRangeSpinBox)
writeForm.addRow("量程下限:", self.lowerRangeSpinBox)
writeForm.addRow(self.setRangeButton)
writeForm.addRow("阻尼(s):", self.dampingSpinBox)
writeForm.addRow(self.setDampingButton)
writeForm.addRow("输出函数:", self.transferFnComboBox)
writeForm.addRow(self.setTransferFnButton)
writeGroup.setLayout(writeForm)
configLayout.addWidget(writeGroup)
mainLayout.addLayout(configLayout)
self.setHartComm(None)
def setHartComm(self, hartComm):
self.hartComm = hartComm
is_connected = hartComm is not None
self.readPvInfoButton.setEnabled(is_connected)
self.readOutputInfoButton.setEnabled(is_connected)
self.setRangeButton.setEnabled(is_connected)
self.setDampingButton.setEnabled(is_connected)
self.setTransferFnButton.setEnabled(is_connected)
def readInfo(self, command_name):
if not self.hartComm:
QMessageBox.warning(self, "警告", "未连接到HART设备")
return
if command_name in self.readThreads and self.readThreads[command_name].isRunning():
return
thread = ReadInfoThread(self.hartComm, command_name)
thread.infoReceived.connect(self.updateInfo)
thread.errorOccurred.connect(self.handleError)
thread.finished.connect(lambda: self.onReadFinished(command_name))
self.readThreads[command_name] = thread
button = self.getButtonForCommand(command_name)
if button:
button.setProperty("original_text", button.text())
button.setText("读取中...")
button.setEnabled(False)
thread.start()
def updateInfo(self, msg, command_name):
if command_name == 'readPrimaryVariableInformation':
serial_no = msg.get('serial_no')
self.serialNoLabel.setText(f"{serial_no.hex().upper()}" if serial_no else "--")
self.sensorUpperLimitLabel.setText(f"{msg.get('upper_limit', '--'):.4f}")
self.sensorLowerLimitLabel.setText(f"{msg.get('lower_limit', '--'):.4f}")
self.sensorUnitLabel.setText(common.get_unit_description(msg.get('sensor_limits_code')))
self.minSpanLabel.setText(f"{msg.get('min_span', '--'):.4f}")
elif command_name == 'readOutputInformation':
self.alarmCodeLabel.setText(self.getAlarmCodeDescription(msg.get('alarm_code')))
transfer_code = msg.get('transfer_fn_code')
transfer_text = self.getTransferFunctionDescription(transfer_code)
self.transferFnLabel.setText(transfer_text)
if transfer_text in self.reverse_transfer_fn_map:
self.transferFnComboBox.setCurrentText(transfer_text)
unit_code = msg.get('primary_variable_range_code')
unit_text = common.get_unit_description(unit_code)
self.pvRangeUnitLabel.setText(unit_text)
if unit_text in self.reverse_units_map:
self.rangeUnitComboBox.setCurrentText(unit_text)
upper_val = msg.get('upper_range_value')
lower_val = msg.get('lower_range_value')
self.pvUpperRangeLabel.setText(f"{upper_val:.4f}" if upper_val is not None else "--")
self.pvLowerRangeLabel.setText(f"{lower_val:.4f}" if lower_val is not None else "--")
if upper_val is not None: self.upperRangeSpinBox.setValue(upper_val)
if lower_val is not None: self.lowerRangeSpinBox.setValue(lower_val)
damping_val = msg.get('damping_value')
self.dampingLabel.setText(f"{damping_val:.2f}" if damping_val is not None else "--")
if damping_val is not None: self.dampingSpinBox.setValue(damping_val)
self.writeProtectLabel.setText(self.getWriteProtectDescription(msg.get('write_protect')))
private_label = msg.get('private_label')
self.privateLabelLabel.setText(f"{private_label}" if private_label is not None else "--")
def onSetRange(self):
upper = self.upperRangeSpinBox.value()
lower = self.lowerRangeSpinBox.value()
unit_text = self.rangeUnitComboBox.currentText()
unit_code = self.reverse_units_map.get(unit_text)
if unit_code is None:
QMessageBox.warning(self, "输入错误", "无效的单位。请从列表中选择或正确输入。")
return
if upper <= lower:
QMessageBox.warning(self, "输入错误", "量程上限必须大于下限。")
return
self.startWriteThread('writePrimaryVariableRange', {'unitsCode': unit_code, 'upperRange': upper, 'lowerRange': lower})
def onSetDamping(self):
damping = self.dampingSpinBox.value()
self.startWriteThread('writePrimaryVariableDamping', {'dampingTime': damping})
def onSetTransferFunction(self):
fn_text = self.transferFnComboBox.currentText()
fn_code = self.reverse_transfer_fn_map.get(fn_text)
if fn_code is None:
QMessageBox.warning(self, "输入错误", "无效的输出函数。")
return
self.startWriteThread('writePrimaryVariableOutputFunction', {'transferFunctionCode': fn_code})
def startWriteThread(self, command_name, params):
if not self.hartComm:
QMessageBox.warning(self, "警告", "未连接到HART设备")
return
if command_name in self.readThreads and self.readThreads[command_name].isRunning():
return
thread = ReadInfoThread(self.hartComm, command_name, params)
thread.infoReceived.connect(self.onWriteSuccess)
thread.errorOccurred.connect(self.handleError)
thread.finished.connect(lambda: self.onReadFinished(command_name))
self.readThreads[command_name] = thread
button = self.getButtonForCommand(command_name)
if button:
button.setProperty("original_text", button.text())
button.setText("写入中...")
button.setEnabled(False)
thread.start()
def onWriteSuccess(self, msg, command_name):
QMessageBox.information(self, "成功", f"命令 {command_name} 执行成功。")
self.readInfo('readOutputInformation')
def getAlarmCodeDescription(self, code):
return {0: "高报", 1: "低报", 2: "保持最后输出"}.get(code, f"未知代码({code})")
def getTransferFunctionDescription(self, code):
return self.transfer_fn_map.get(code, f"未知代码({code})")
def getWriteProtectDescription(self, code):
return {0: "未保护", 1: "已保护"}.get(code, f"未知代码({code})")
def handleError(self, errorMsg, command_name):
QMessageBox.critical(self, "错误", f"执行 {command_name} 时出错:\\n{errorMsg}")
def onReadFinished(self, command_name):
button = self.getButtonForCommand(command_name)
if button:
original_text = button.property("original_text")
if original_text:
button.setText(original_text)
button.setEnabled(self.hartComm is not None)
def getButtonForCommand(self, command_name):
if command_name == 'readPrimaryVariableInformation':
return self.readPvInfoButton
elif command_name == 'readOutputInformation':
return self.readOutputInfoButton
elif command_name == 'writePrimaryVariableRange':
return self.setRangeButton
elif command_name == 'writePrimaryVariableDamping':
return self.setDampingButton
elif command_name == 'writePrimaryVariableOutputFunction':
return self.setTransferFnButton
return None

@ -0,0 +1,261 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART变量信息显示界面模块
实现主变量第二变量回路电流等动态数据的读取与显示
"""
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout, QTextEdit,
QMessageBox, QProgressBar, QSplitter)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QFont, QTextCursor
from protocol.HART import common
class ReadVariableThread(QThread):
"""读取单个变量信息的线程类"""
infoReceived = pyqtSignal(dict, str) # 信号发送(响应字典, 命令类型)
errorOccurred = pyqtSignal(str, str)
def __init__(self, hartComm, command_name):
super().__init__()
self.hartComm = hartComm
self.command_name = command_name
self._is_running = True
def stop(self):
self._is_running = False
def run(self):
try:
if not self.hartComm:
raise Exception("HART通信对象未初始化")
func = getattr(self.hartComm, self.command_name, None)
if not func:
raise Exception(f"未找到名为 {self.command_name} 的通信方法")
response = func()
if self._is_running:
msg = response[0] if isinstance(response, list) and response else response
# print(msg)
if msg and isinstance(msg, dict):
if msg.get("status") != "fail":
self.infoReceived.emit(msg, self.command_name)
else:
error_msg = msg.get("error", "未知错误")
self.errorOccurred.emit(f"命令 {self.command_name} 执行失败: {error_msg}", self.command_name)
else:
self.errorOccurred.emit(f"收到无效响应: {response}", self.command_name)
except Exception as e:
self.errorOccurred.emit(str(e), self.command_name)
class HartVariableInfoWidget(QWidget):
"""HART变量信息显示界面"""
def __init__(self, parent=None):
super().__init__(parent)
self.hartComm = None
self.readThreads = {}
self.initUI()
def initUI(self):
mainLayout = QVBoxLayout(self)
topLayout = QHBoxLayout()
# --- Command 1 Group ---
cmd1Group = QGroupBox("主变量 (Command 1)")
cmd1Layout = QFormLayout()
self.pvLabel = QLabel("--")
self.pvUnitLabel = QLabel("--")
cmd1Layout.addRow("主变量值:", self.pvLabel)
cmd1Layout.addRow("单位:", self.pvUnitLabel)
self.readCmd1Button = QPushButton("读取")
self.readCmd1Button.clicked.connect(lambda: self.readVariableInfo('readPrimaryVariable'))
cmd1Layout.addRow(self.readCmd1Button)
cmd1Group.setLayout(cmd1Layout)
# --- Command 2 Group ---
cmd2Group = QGroupBox("回路电流和百分比 (Command 2)")
cmd2Layout = QFormLayout()
self.loopCurrentLabel_cmd2 = QLabel("--")
self.percentRangeLabel = QLabel("--")
cmd2Layout.addRow("回路电流:", self.loopCurrentLabel_cmd2)
cmd2Layout.addRow("量程百分比:", self.percentRangeLabel)
self.readCmd2Button = QPushButton("读取")
self.readCmd2Button.clicked.connect(lambda: self.readVariableInfo('readLoopCurrentAndPercent'))
cmd2Layout.addRow(self.readCmd2Button)
cmd2Group.setLayout(cmd2Layout)
# --- Command 3 Group ---
cmd3Group = QGroupBox("动态变量和回路电流 (Command 3)")
cmd3Layout = QFormLayout()
self.loopCurrentLabel_cmd3 = QLabel("--")
self.pv_cmd3_Label = QLabel("--")
self.pv_cmd3_UnitLabel = QLabel("--")
self.sv_cmd3_Label = QLabel("--")
self.sv_cmd3_UnitLabel = QLabel("--")
self.tv_cmd3_Label = QLabel("--")
self.tv_cmd3_UnitLabel = QLabel("--")
self.qv_cmd3_Label = QLabel("--")
self.qv_cmd3_UnitLabel = QLabel("--")
cmd3Layout.addRow("回路电流:", self.loopCurrentLabel_cmd3)
cmd3Layout.addRow("主变量:", self.pv_cmd3_Label)
cmd3Layout.addRow("主变量单位:", self.pv_cmd3_UnitLabel)
cmd3Layout.addRow("第二变量:", self.sv_cmd3_Label)
cmd3Layout.addRow("第二变量单位:", self.sv_cmd3_UnitLabel)
cmd3Layout.addRow("第三变量:", self.tv_cmd3_Label)
cmd3Layout.addRow("第三变量单位:", self.tv_cmd3_UnitLabel)
cmd3Layout.addRow("第四变量:", self.qv_cmd3_Label)
cmd3Layout.addRow("第四变量单位:", self.qv_cmd3_UnitLabel)
self.readCmd3Button = QPushButton("读取")
self.readCmd3Button.clicked.connect(lambda: self.readVariableInfo('readDynamicVariablesAndLoopCurrent'))
cmd3Layout.addRow(self.readCmd3Button)
cmd3Group.setLayout(cmd3Layout)
topLayout.addWidget(cmd1Group)
topLayout.addWidget(cmd2Group)
topLayout.addWidget(cmd3Group)
mainLayout.addLayout(topLayout)
# --- Raw Data Group ---
rawDataGroup = QGroupBox("原始数据")
rawDataLayout = QVBoxLayout()
self.rawDataText = QTextEdit()
self.rawDataText.setReadOnly(True)
self.rawDataText.setFont(QFont("Consolas", 9))
self.rawDataText.setLineWrapMode(QTextEdit.NoWrap)
rawDataLayout.addWidget(self.rawDataText)
rawDataGroup.setLayout(rawDataLayout)
mainLayout.addWidget(rawDataGroup)
# --- Global Controls ---
controlLayout = QHBoxLayout()
self.clearButton = QPushButton("清除所有显示")
self.clearButton.clicked.connect(self.clearAllDisplays)
controlLayout.addStretch()
controlLayout.addWidget(self.clearButton)
mainLayout.addLayout(controlLayout)
self.setHartComm(None) # Disable buttons initially
def setHartComm(self, hartComm):
self.hartComm = hartComm
is_connected = hartComm is not None
self.readCmd1Button.setEnabled(is_connected)
self.readCmd2Button.setEnabled(is_connected)
self.readCmd3Button.setEnabled(is_connected)
def readVariableInfo(self, command_name):
if not self.hartComm:
QMessageBox.warning(self, "警告", "未连接到HART设备")
return
if command_name in self.readThreads and self.readThreads[command_name].isRunning():
self.readThreads[command_name].stop()
self.readThreads[command_name].wait()
thread = ReadVariableThread(self.hartComm, command_name)
thread.infoReceived.connect(self.updateVariableInfo)
thread.errorOccurred.connect(self.handleError)
thread.finished.connect(lambda: self.onReadFinished(command_name))
self.readThreads[command_name] = thread
button = self.getButtonForCommand(command_name)
if button:
button.setText("读取中...")
button.setEnabled(False)
thread.start()
def updateVariableInfo(self, msg, command_name):
self.appendRawData(msg)
if command_name == 'readPrimaryVariable':
pv = msg.get('primary_variable')
unit_code = msg.get('primary_variable_units')
self.pvLabel.setText(f"{pv:.4f}" if pv is not None else "--")
self.pvUnitLabel.setText(common.get_unit_description(unit_code))
elif command_name == 'readLoopCurrentAndPercent':
current = msg.get('analog_signal')
percent = msg.get('primary_variable')
self.loopCurrentLabel_cmd2.setText(f"{current:.4f} mA" if current is not None else "--")
self.percentRangeLabel.setText(f"{percent:.2f} %" if percent is not None else "--")
elif command_name == 'readDynamicVariablesAndLoopCurrent':
current = msg.get('analog_signal')
self.loopCurrentLabel_cmd3.setText(f"{current:.4f} mA" if current is not None else "--")
pv = msg.get('primary_variable')
pv_unit = msg.get('primary_variable_units')
self.pv_cmd3_Label.setText(f"{pv:.4f}" if pv is not None else "--")
self.pv_cmd3_UnitLabel.setText(common.get_unit_description(pv_unit))
sv = msg.get('secondary_variable')
sv_unit = msg.get('secondary_variable_units')
self.sv_cmd3_Label.setText(f"{sv:.4f}" if sv is not None else "--")
self.sv_cmd3_UnitLabel.setText(common.get_unit_description(sv_unit))
tv = msg.get('tertiary_variable')
tv_unit = msg.get('tertiary_variable_units')
self.tv_cmd3_Label.setText(f"{tv:.4f}" if tv is not None else "--")
self.tv_cmd3_UnitLabel.setText(common.get_unit_description(tv_unit))
qv = msg.get('quaternary_variable')
qv_unit = msg.get('quaternary_variable_units')
self.qv_cmd3_Label.setText(f"{qv:.4f}" if qv is not None else "--")
self.qv_cmd3_UnitLabel.setText(common.get_unit_description(qv_unit))
def appendRawData(self, msg):
# self.rawDataText.append(\"--- 接收到的报文: {msg.get('command_name', 'N/A')} ---\")
for key, value in msg.items():
if isinstance(value, bytes):
value_str = ' '.join(f'{b:02x}' for b in value)
self.rawDataText.append(f"{key}: {value_str} (hex)")
else:
self.rawDataText.append(f"{key}: {value}")
self.rawDataText.append("\\n")
self.rawDataText.moveCursor(QTextCursor.End)
def handleError(self, errorMsg, command_name):
QMessageBox.critical(self, "错误", f"执行 {command_name} 时出错:\\n{errorMsg}")
def onReadFinished(self, command_name):
button = self.getButtonForCommand(command_name)
if button:
button.setText("读取")
button.setEnabled(self.hartComm is not None)
def getButtonForCommand(self, command_name):
if command_name == 'readPrimaryVariable':
return self.readCmd1Button
elif command_name == 'readLoopCurrentAndPercent':
return self.readCmd2Button
elif command_name == 'readDynamicVariablesAndLoopCurrent':
return self.readCmd3Button
return None
def clearAllDisplays(self):
# Cmd 1
self.pvLabel.setText("--")
self.pvUnitLabel.setText("--")
# Cmd 2
self.loopCurrentLabel_cmd2.setText("--")
self.percentRangeLabel.setText("--")
# Cmd 3
self.loopCurrentLabel_cmd3.setText("--")
self.pv_cmd3_Label.setText("--")
self.pv_cmd3_UnitLabel.setText("--")
self.sv_cmd3_Label.setText("--")
self.sv_cmd3_UnitLabel.setText("--")
self.tv_cmd3_Label.setText("--")
self.tv_cmd3_UnitLabel.setText("--")
self.qv_cmd3_Label.setText("--")
self.qv_cmd3_UnitLabel.setText("--")
self.rawDataText.clear()

@ -0,0 +1,22 @@
"""
HART界面组件模块
提供HART协议相关的PyQt界面组件
"""
from .HartMainWindow import HartMainWindow
from .HartConnectionWidget import HartConnectionWidget
from .HartDeviceInfoWidget import HartDeviceInfoWidget
from .HartDeviceConfigWidget import HartDeviceConfigWidget
from .HartSensorConfigWidget import HartSensorConfigWidget
from .HartCalibrationWidget import HartCalibrationWidget
from .HartVariableInfoWidget import HartVariableInfoWidget
__all__ = [
'HartMainWindow',
'HartConnectionWidget',
'HartDeviceInfoWidget',
'HartDeviceConfigWidget',
'HartSensorConfigWidget',
'HartCalibrationWidget',
'HartVariableInfoWidget'
]

@ -0,0 +1,52 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART通信工具主程序
基于PyQt的现代化HART通信界面应用程序
"""
import sys
import os
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QFile, QTextStream
from UI.HartWidgets.HartMainWindow import HartMainWindow
def loadStyleSheet(sheetName):
"""
加载样式表
Args:
sheetName: 样式表文件名
Returns:
样式表内容
"""
file = QFile(sheetName)
file.open(QFile.ReadOnly | QFile.Text)
stream = QTextStream(file)
return stream.readAll()
def main():
"""
主函数
"""
# 创建应用程序
app = QApplication(sys.argv)
# 设置应用程序样式
styleSheetPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "style.qss")
if os.path.exists(styleSheetPath):
app.setStyleSheet(loadStyleSheet(styleSheetPath))
# 创建主窗口
mainWindow = HartMainWindow()
mainWindow.setWindowTitle("HART通信工具")
mainWindow.resize(800, 600)
mainWindow.show()
# 运行应用程序
sys.exit(app.exec_())
if __name__ == "__main__":
main()

@ -17,7 +17,8 @@ from model.ProjectModel.VarManage import GlobalVarManager
from UI.Main.MainLeft import MainLeft
from UI.Main.MainTop import MainTop
from ..ProjectManages.ProjectWidget import ProjectWidgets
from ..VarManages.VarWidget import VarWidgets, HartWidgets, TcRtdWidgets, AnalogWidgets, HartSimulateWidgets
from ..VarManages.VarWidget import VarWidgets, TcRtdWidgets, AnalogWidgets, HartSimulateWidgets
from UI.HartWidgets.HartMainWindow import HartMainWindow
from UI.VarManages.VarTable import RpcVarTableView
from ..UserManage.UserWidget import UserWidgets
from ..TrendManage.TrendWidget import TrendWidgets
@ -160,7 +161,7 @@ class MainWindow(QMainWindow):
self.profibusWidget = ProfibusWidgets()
self.trendWidget = TrendWidgets()
self.SettingWidget = SettingWidget()
self.hartWidget = HartWidgets()
self.hartWidget = HartMainWindow()
self.tcrtdWidget = TcRtdWidgets()
self.analogWidget = AnalogWidgets()
self.hartsimulateWidget = HartSimulateWidgets()
@ -169,6 +170,7 @@ class MainWindow(QMainWindow):
self.userWidget.setObjectName('userWidget')
self.projectWidget.setObjectName('projectWidget')
self.trendWidget.setObjectName('trendWidget')
self.hartWidget.setObjectName('hartWidget')
# self.ModBusWidget.setObjectName('varWidget')
self.analogWidget.setObjectName('analogWidget')
self.hartsimulateWidget.setObjectName('hartsimulateWidget')
@ -498,7 +500,9 @@ class MainWindow(QMainWindow):
if __name__ == '__main__':
app = QApplication(sys.argv)
app.setStyle(QtWidgets.QStyleFactory.create('Fusion'))
app.setStyleSheet(CommonHelper.readQss('Static/main.qss'))
main_style = CommonHelper.readQss('Static/main.qss')
hart_style = CommonHelper.readQss('Static/Hart.qss')
app.setStyleSheet(main_style + hart_style)
# print(QtWidgets.QStyleFactory.keys())
ex = MainWindow()
ex.show()

@ -265,7 +265,7 @@ class ProjectButtonDelegate(QItemDelegate):
Globals.getValue('MainWindows').createWidgets()
modelLists = ['ModbusTcpMasterTable', 'ModbusTcpSlaveTable', 'ModbusRtuMasterTable', \
'ModbusRtuSlaveTable', 'HartTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable']
'ModbusRtuSlaveTable', 'TcRtdTable', 'AnalogTable', 'HartSimulateTable', 'userTable']
for l in modelLists:
# print(l)
Globals.getValue(l).model.initTable()

@ -2,7 +2,7 @@ import time
from PyQt5.QtCore import QThread, pyqtSignal
from protocol.TCP.RTDTC import RTDTCClient
from protocol.TCP.Analog import AnalogClient
from protocol.Hart.HartSimulate import HartSimulate
from protocol.HART.HartSimulate import HartSimulate
from utils import Globals

@ -545,45 +545,7 @@ class VarWidgets(QtWidgets.QWidget):
return False
class HartWidgets(VarWidgets):
def __init__(self, parent=None):
super(HartWidgets, self).__init__(parent)
def setupUI(self):
self.setAttribute(Qt.WA_StyledBackground, True)
self.startProtocolBtn = QPushButton(QIcon('./Static/startProtocol.png'), '开始通讯')
self.startProtocolBtn.setObjectName('startProtocolBtn')
self.startProtocolBtn.setIconSize(QSize(22, 22))
self.startProtocolBtn.clicked.connect(self.startProtocol)
self.varView = HartTableView()
self.varView.setObjectName('varView')
self.proxy = QtCore.QSortFilterProxyModel(self)
self.proxy.setSourceModel(self.varView.model)
self.varView.setModel(self.proxy)
self.varView.proxy = self.proxy
self.timer = QTimer(self)
# 将定时器超时信号与槽函数showTime()连接
self.timer.timeout.connect(self.proxy.invalidate)
self.timer.start(50) # 启动timer
Globals.setValue('HartTable', self.varView)
self.gridLayout = QtWidgets.QGridLayout(self)
self.gridLayout.addWidget(self.startProtocolBtn, 0, 0, 1, 1)
self.gridLayout.addWidget(self.varView, 1, 0, 10, 26)
self.gridLayout.setSpacing(20)
self.gridLayout.setContentsMargins(20, 30, 20, 20)
# self.comboBox.currentIndexChanged.connect(self.on_comboBox_currentIndexChanged)
self.horizontalHeader = self.varView.horizontalHeader()
self.horizontalHeader.sectionClicked.connect(self.on_view_horizontalHeader_sectionClicked)
def startProtocol(self):
pass
class TcRtdWidgets(VarWidgets):

@ -0,0 +1,3 @@
"""
UI模块初始化文件
"""

@ -0,0 +1,468 @@
import serial
import time
import threading
from typing import Optional, List, Dict, Any, Union, Tuple
from protocol.HART import universal, common, tools
from protocol.HART._parsing import parse
from protocol.HART._unpacker import Unpacker
BurstModeON = 1 # 突发模式开启
BurstModeOFF = 0 # 突发模式关闭
PrimaryMasterMode = 1 # 主要模式
SecondaryMasterMode = 0 # 次要副主站模式
class HARTCommunication:
"""
HART协议通信类使用COM7端口1200波特率奇校验
"""
def __init__(self, port='COM7', baudrate=1200, parity=serial.PARITY_ODD, timeout=3):
"""
初始化HART通信类
Args:
port (str): 串口端口号默认为'COM7'
baudrate (int): 波特率默认为1200
parity (int): 校验方式默认为奇校验
timeout (float): 超时时间单位秒
"""
self.port = port
self.baudrate = baudrate
self.parity = parity
self.timeout = timeout
self.serialConnection: Optional[serial.Serial] = None
self.deviceAddress: Optional[bytes] = None
self.comm_lock = threading.RLock()
self.masterMode = PrimaryMasterMode # 默认为主要主站
self.burstMode = BurstModeOFF # 默认关闭突发模式
def connect(self) -> bool:
"""
建立串口连接
Returns:
bool: 连接是否成功
"""
try:
self.serialConnection = serial.Serial(
port=self.port,
baudrate=self.baudrate,
parity=self.parity,
stopbits=1,
bytesize=8,
timeout=self.timeout,
write_timeout = 3,
xonxoff = True
)
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""
断开串口连接
"""
if self.serialConnection and self.serialConnection.is_open:
self.serialConnection.close()
def sendCommand(self, command_bytes: bytes) -> Union[List[Dict[str, Any]], Dict[str, str]]:
"""
发送HART命令
Args:
command_bytes (bytes): 要发送的命令字节
Returns:
Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据列表或错误字典
"""
if not self.serialConnection or not self.serialConnection.is_open:
raise ConnectionError("串口未连接")
with self.comm_lock:
try:
# 发送命令前清空缓冲区,防止旧数据干扰
self.serialConnection.reset_input_buffer()
self.serialConnection.reset_output_buffer()
self.serialConnection.write(command_bytes)
self.serialConnection.flush() # 等待数据完全发出
print(f"发送命令: {command_bytes.hex()}")
# 等待响应
time.sleep(1)
# 使用Unpacker解析响应
unpacker = Unpacker(self.serialConnection)
msgList: List[Dict[str, Any]] = []
for msg in unpacker:
msgList.append(dict(msg))
if not msgList:
return {"error": "设备无响应或响应超时"}
return msgList
except Exception as e:
print(f"发送命令失败: {e}")
return {"error": f"发送命令失败: {e}"}
def sendCommandWithRetry(self, command_bytes: bytes, max_retries=3) -> Union[List[Dict[str, Any]], Dict[str, str]]:
"""
发送HART命令带重试和自动重连机制
Args:
command_bytes (bytes): 要发送的命令字节
max_retries (int): 最大重试次数
Returns:
Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据或错误字典
"""
for attempt in range(max_retries):
try:
result = self.sendCommand(command_bytes)
if isinstance(result, list) and result: # 成功并且有数据
return result
# 处理失败情况
error_info = result if isinstance(result, dict) else {"error": "未知错误"}
print(f"{attempt + 1}次尝试失败,响应: {error_info.get('error', 'N/A')},重试中...")
# 如果是超时或无响应,则尝试重连
if error_info.get("error") == "设备无响应或响应超时":
print("检测到设备无响应,正在尝试重新连接串口...")
self.disconnect()
time.sleep(0.1)
if not self.connect():
print("重新连接失败,放弃重试。")
return {"error": "重新连接串口失败"}
print("串口重新连接成功。")
time.sleep(0.2)
except Exception as e:
print(f"{attempt + 1}次尝试发生异常: {e}")
if attempt == max_retries - 1:
return {"error": f"重试{max_retries}次后仍然失败: {e}"}
# 发生异常时也尝试重连
print("发生异常,正在尝试重新连接串口...")
self.disconnect()
time.sleep(0.5)
if not self.connect():
print("重新连接失败,放弃重试。")
return {"error": "重新连接串口失败"}
print("串口重新连接成功。")
time.sleep(1)
return {"error": f"重试{max_retries}次后仍然失败"}
def readUniqueId(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_unique_identifier(self.deviceAddress)
deviceInfo = self.sendCommandWithRetry(command)
if isinstance(deviceInfo, list):
for msg in deviceInfo:
if isinstance(msg, dict):
manufacturer_id = msg.get('manufacturer_id')
manufacturer_device_type = msg.get('manufacturer_device_type')
device_id = msg.get('device_id')
if all(v is not None for v in [manufacturer_id, manufacturer_device_type, device_id]):
assert manufacturer_id is not None and manufacturer_device_type is not None and device_id is not None
expandedDeviceType = (manufacturer_id << 8) | manufacturer_device_type
self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=device_id, isLongFrame=True)
if self.deviceAddress:
print(f"Rebuilt long address: {self.deviceAddress.hex()}")
else:
print("Warning: Could not rebuild long address from message, missing required keys.")
else:
print(f"Warning: Expected a dict but got {type(msg)}. Cannot process for long address.")
return deviceInfo
def readPrimaryVariable(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_primary_variable(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readLoopCurrentAndPercent(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_loop_current_and_percent(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readDynamicVariablesAndLoopCurrent(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_dynamic_variables_and_loop_current(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readMessage(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_message(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readTagDescriptorDate(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_tag_descriptor_date(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readPrimaryVariableInformation(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_primary_variable_information(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readOutputInformation(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_output_information(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readFinalAssemblyNumber(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_final_assembly_number(self.deviceAddress)
return self.sendCommandWithRetry(command)
# ========== 校准功能 ==========\\n
def calibrate4mA(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_4ma(self.deviceAddress)
return self.sendCommandWithRetry(command)
def calibrate20mA(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_20ma(self.deviceAddress)
return self.sendCommandWithRetry(command)
def calibrateZeroPoint(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_zero_point(self.deviceAddress)
return self.sendCommandWithRetry(command)
# ========== 量程和输出设置 ==========\\n
def writePrimaryVariableDamping(self, dampingTime: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_damping(self.deviceAddress, dampingTime)
return self.sendCommandWithRetry(command)
def writePrimaryVariableRange(self, unitsCode: int, upperRange: float, lowerRange: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_range(self.deviceAddress, unitsCode, upperRange, lowerRange)
return self.sendCommandWithRetry(command)
def writePrimaryVariableUnits(self, unitsCode: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_units(self.deviceAddress, unitsCode)
return self.sendCommandWithRetry(command)
def writePrimaryVariableOutputFunction(self, transferFunctionCode: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_output_function(self.deviceAddress, transferFunctionCode)
return self.sendCommandWithRetry(command)
# ========== 电流微调功能 ==========\\n
def trimLoopCurrent4mA(self, measuredCurrent: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.trim_loop_current_4ma(self.deviceAddress, measuredCurrent)
return self.sendCommandWithRetry(command)
def trimLoopCurrent20mA(self, measuredCurrent: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.trim_loop_current_20ma(self.deviceAddress, measuredCurrent)
return self.sendCommandWithRetry(command)
def setFixedCurrentOutput(self, enable: bool, currentValue: float = 0.0):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.set_fixed_current_output(self.deviceAddress, enable, currentValue)
return self.sendCommandWithRetry(command)
# ========== 突发模式和主站模式设置 ==========\\n
def setBurstMode(self, enable: bool, burstMessageCommand: int = 1):
self.burstMode = BurstModeON if enable else BurstModeOFF
if self.deviceAddress:
self._rebuildCurrentAddress()
return {"burst_mode": self.burstMode, "burst_message_command": burstMessageCommand}
def setMasterMode(self, isPrimaryMaster: bool):
self.masterMode = PrimaryMasterMode if isPrimaryMaster else SecondaryMasterMode
if self.deviceAddress:
self._rebuildCurrentAddress()
return {"master_mode": self.masterMode}
def _rebuildCurrentAddress(self):
if self.deviceAddress is None:
return
if len(self.deviceAddress) == 5: # 长帧地址
byte0 = self.deviceAddress[0]
expandedDeviceType = ((byte0 & 0x3F) << 8) | self.deviceAddress[1]
deviceId = (self.deviceAddress[2] << 16) | (self.deviceAddress[3] << 8) | self.deviceAddress[4]
self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=deviceId, isLongFrame=True)
else: # 短帧地址
pollingAddress = self.deviceAddress[0] & 0x3F
self.buildHartAddress(pollingAddress=pollingAddress, isLongFrame=False)
# ========== 实用工具方法 ==========\\n
def getDeviceStatus(self):
if not self.deviceAddress:
return {"error": "设备地址未设置"}
return self.readUniqueId()
def isConnected(self) -> bool:
return self.serialConnection is not None and self.serialConnection.is_open
def getCurrentAddress(self) -> Optional[bytes]:
return self.deviceAddress
def setDeviceAddress(self, address: bytes):
self.deviceAddress = address
def writeMessage(self, message: str):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_message(self.deviceAddress, message)
return self.sendCommandWithRetry(command, max_retries=1)
def writeTagDescriptorDate(self, tag: str, descriptor: str, date: Tuple[int, int, int]):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_tag_descriptor_date(self.deviceAddress, tag, descriptor, date)
return self.sendCommandWithRetry(command, max_retries=1)
def writeFinalAssemblyNumber(self, number: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_final_assembly_number(self.deviceAddress, number)
return self.sendCommandWithRetry(command, max_retries=1)
def write_polling_address(self, new_polling_address: int):
"""
修改设备的轮询地址 (Command 6)
Args:
new_polling_address (int): 新的轮询地址 (0-63)
"""
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
if not (0 <= new_polling_address <= 63):
raise ValueError("轮询地址必须在0-63范围内")
command = universal.write_polling_address(self.deviceAddress, new_polling_address)
result = self.sendCommandWithRetry(command)
# The response should contain the new polling address
if isinstance(result, list) and result and result[0].get('polling_address') == new_polling_address:
print(f"轮询地址成功修改为: {new_polling_address}")
# Update the internal address to the new one
self.buildHartAddress(pollingAddress=new_polling_address, isLongFrame=False)
return result
else:
error_msg = f"修改轮询地址失败. 响应: {result}"
print(error_msg)
raise Exception(error_msg)
def scanForDevice(self) -> Optional[Dict[str, Any]]:
if not self.serialConnection:
raise ConnectionError("串口未连接")
print("开始扫描设备...")
original_timeout = self.serialConnection.timeout
try:
self.serialConnection.timeout = 0.5
for addr in range(64):
print(f"正在尝试轮询地址: {addr}")
self.buildHartAddress(pollingAddress=addr, isLongFrame=False)
try:
if self.deviceAddress is None: continue
command = universal.read_unique_identifier(self.deviceAddress)
device_info_list = self.sendCommandWithRetry(command, max_retries=1)
if isinstance(device_info_list, list) and device_info_list:
print(f"在地址 {addr} 找到设备: {device_info_list[0]}")
return {"address": addr, "device_info": device_info_list[0]}
except Exception as e:
print(f"地址 {addr} 扫描异常: {e}")
continue
finally:
if self.serialConnection:
self.serialConnection.timeout = original_timeout
print("扫描完成,未找到设备。")
return None
def buildHartAddress(self, expandedDeviceType: Optional[int] = None, deviceId: Optional[int] = None, pollingAddress: int = 0, isLongFrame: bool = False):
master = self.masterMode
burstMode = self.burstMode
if isLongFrame:
if expandedDeviceType is None or deviceId is None:
raise ValueError("长帧格式需要提供expandedDeviceType和deviceId参数")
if not (0 <= deviceId <= 0xFFFFFF):
raise ValueError("设备ID必须在0-16777215范围内")
byte0 = (master << 7) | (burstMode << 6) | ((expandedDeviceType >> 8) & 0x3F)
byte1 = expandedDeviceType & 0xFF
byte2 = (deviceId >> 16) & 0xFF
byte3 = (deviceId >> 8) & 0xFF
byte4 = deviceId & 0xFF
self.deviceAddress = bytes([byte0, byte1, byte2, byte3, byte4])
else:
if not (0 <= pollingAddress <= 63):
raise ValueError("轮询地址必须在0-63范围内")
self.deviceAddress = bytes([(master << 7) | (burstMode << 6) | (pollingAddress & 0x3F)])
return self.deviceAddress
if __name__ == '__main__':
hart = HARTCommunication()
if hart.connect():
print("=== HART通信类功能演示 ===")
address = hart.buildHartAddress(pollingAddress=0, isLongFrame=False)
print(f"设备地址: {address.hex()}")
print("\n--- 读取设备信息 ---")
device_info = hart.readUniqueId()
if isinstance(device_info, list):
for msg in device_info:
print(f"设备信息: {msg}")
# print("\n--- 读取主变量 ---")
# primary_var = hart.readPrimaryVariable()
# if isinstance(primary_var, list):
# for msg in primary_var:
# print(f"主变量: {msg}")
testMEs: List[Dict[str, Any]] | Dict[str, str] = hart.writePrimaryVariableDamping(1)
if isinstance(testMEs, list):
for msg in testMEs:
print(f"信息: {msg}")
testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation()
if isinstance(testMEs1, list):
for msg in testMEs1:
print(f"信息: {msg}")
testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation()
if isinstance(testMEs1, list):
for msg in testMEs1:
print(f"信息: {msg}")
hart.disconnect()
print("\n连接已断开")
else:
print("连接失败")

@ -1,8 +1,4 @@
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
from protocol.ModBus.rtumaster_example import RTUMaster
from ..ModBus.rtumaster_example import RTUMaster
class HartProtocol(object):
def __init__(self):

@ -0,0 +1 @@
__version__ = "2023.6.0"

@ -0,0 +1,28 @@
"""A sans-io python implementation of the Highway Addressable Remote Transducer Protocol."""
from .__version__ import *
# 延迟导入以避免循环依赖
def _get_hart_communication():
from .HARTCommunication import HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode
return HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode
# 导出常用模块
from . import common
from . import universal
from . import tools
from ._unpacker import *
# 导出主要类和常量
HARTCommunication, BurstModeON, BurstModeOFF, PrimaryMasterMode, SecondaryMasterMode = _get_hart_communication()
__all__ = [
'HARTCommunication',
'BurstModeON',
'BurstModeOFF',
'PrimaryMasterMode',
'SecondaryMasterMode',
'common',
'universal',
'tools'
]

@ -0,0 +1,23 @@
"""Define version."""
import pathlib
import subprocess
from .VERSION import __version__
here = pathlib.Path(__file__).resolve().parent
__all__ = ["__version__", "__branch__"]
try:
__branch__ = (
subprocess.run(["git", "branch", "--show-current"], capture_output=True, cwd=here)
.stdout.strip()
.decode()
)
except:
__branch__ = ""
if __branch__:
__version__ += "+" + __branch__

@ -0,0 +1,261 @@
import struct
from typing import MutableMapping, Union
from . import tools
def parse(response: bytes) -> MutableMapping[str, Union[int, bytes, str, float]]:
# print(response, 1111111111111)
out: MutableMapping[str, Union[int, bytes, str, float]] = dict()
out["full_response"] = response
if response[0] & 0x80: # long address
out["address"] = int.from_bytes(response[1:6], "big")
response = response[6:]
else: # short address
out["address"] = response[1]
response = response[2:]
command, bytecount, response_code, device_status = struct.unpack_from(">BBBB", response)
out["device_status"] = device_status
out["response_code"] = response_code
if response_code == 0:
out["status"] = "success"
else:
out["status"] = "error"
out["error"] = f"Device responded with error code: {response_code}"
data = response[4 : 4 + bytecount]
out["command"] = command
out["command_name"] = f"hart_command_{command}"
out["bytecount"] = bytecount
out["data"] = data
# handle error return
if bytecount == 2:
return out
# universal commands
if command in [0, 11]:
out["command_name"] = "read_unique_identifier"
out["manufacturer_id"] = data[1]
out["manufacturer_device_type"] = data[2]
out["number_response_preamble_characters"] = data[3]
out["universal_command_revision_level"] = data[4]
out["transmitter_specific_command_revision_level"] = data[5]
out["software_revision_level"] = data[6]
out["hardware_revision_level"] = data[7]
out["device_id"] = int.from_bytes(data[9:12], "big")
elif command in [1]:
out["command_name"] = "read_primary_variable"
units, variable = struct.unpack_from(">Bf", data)
out["primary_variable_units"] = units
out["primary_variable"] = variable
elif command in [2]:
out["command_name"] = "read_loop_current_and_percent"
analog_signal, primary_variable = struct.unpack_from(">ff", data)
out["analog_signal"] = analog_signal
out["primary_variable"] = primary_variable
elif command in [3]:
out["command_name"] = "read_dynamic_variables_and_loop_current"
if len(data) >= 4:
analog_signal = struct.unpack_from(">f", data, 0)[0]
out["analog_signal"] = analog_signal
# 计算剩余数据长度并确定支持的变量数量
remaining_length = len(data) - 4
variable_count = remaining_length // 5 # 每个变量占5字节 (1字节单位 + 4字节值)
# 动态解析变量数据
offset = 4 # 从第4字节开始是变量数据
for i in range(variable_count):
if offset + 5 > len(data): # 确保有足够数据
break
# 解析单位枚举值 (1字节)
unit_enum = struct.unpack_from(">B", data, offset)[0]
offset += 1
# 解析变量值 (4字节)
variable_value = struct.unpack_from(">f", data, offset)[0]
offset += 4
# 根据变量索引设置字段名
if i == 0:
out["primary_variable_units"] = unit_enum
out["primary_variable"] = variable_value
elif i == 1:
out["secondary_variable_units"] = unit_enum
out["secondary_variable"] = variable_value
elif i == 2:
out["tertiary_variable_units"] = unit_enum
out["tertiary_variable"] = variable_value
elif i == 3:
out["quaternary_variable_units"] = unit_enum
out["quaternary_variable"] = variable_value
elif command in [6]:
out["command_name"] = "write_polling_address"
polling_address = struct.unpack_from(">B", data)[0]
out["polling_address"] = polling_address
elif command in [12]:
# print(data)
out["command_name"] = "read_message"
out["message"] = data[0:24]
# print(out, 111111111)
elif command in [13]:
out["command_name"] = "read_tag_descriptor_date"
# Tag is 8 chars packed into 6 bytes
out["device_tag_name"] = tools.unpack_packed_ascii(data[0:6], 8)
# Descriptor is 16 chars packed into 12 bytes
out["device_descriptor"] = tools.unpack_packed_ascii(data[6:18], 16)
# Date is 3 bytes: day, month, year (year is offset from 1900)
day, month, year_offset = struct.unpack_from(">BBB", data, 18)
out["date"] = {"day": day, "month": month, "year": year_offset + 1900}
elif command in [14]:
out["command_name"] = "read_primary_variable_information"
out["serial_no"] = data[0:3]
sensor_limits_code, upper_limit, lower_limit, min_span = struct.unpack_from(
">xxxBfff", data
)
out["sensor_limits_code"] = sensor_limits_code
out["upper_limit"] = upper_limit
out["lower_limit"] = lower_limit
out["min_span"] = min_span
elif command in [15]:
out["command_name"] = "read_output_information"
(
alarm_code,
transfer_fn_code,
primary_variable_range_code,
upper_range_value,
lower_range_value,
damping_value,
write_protect,
private_label,
) = struct.unpack_from(">BBBfffBB", data)
out["alarm_code"] = alarm_code
out["transfer_fn_code"] = transfer_fn_code
out["primary_variable_range_code"] = primary_variable_range_code
out["upper_range_value"] = upper_range_value
out["lower_range_value"] = lower_range_value
out["damping_value"] = damping_value
out["write_protect"] = write_protect
out["private_label"] = private_label
elif command in [16]:
out["command_name"] = "read_final_assembly_number"
# print(data)
out["final_assembly_no"] = int.from_bytes(data[0:3], "big")
elif command in [17]:
out["command_name"] = "write_message"
out["message"] = data[0:24]
elif command in [18]:
out["command_name"] = "write_tag_descriptor_date"
out["device_tag_name"] = data[0:6]
out["device_descriptor"] = data[6:18]
out["date"] = data[18:21]
elif command in [19]:
out["command_name"] = "write_final_assembly_number"
out["final_assembly_no"] = int.from_bytes(data[0:2], "big")
elif command in [34]:
out["command_name"] = "write_primary_variable_damping"
out["damping_time"] = struct.unpack_from(">f", data)[0]
elif command in [35]:
out["command_name"] = "write_primary_variable_range"
out["upper_range"] = struct.unpack_from(">f", data)[0]
out["lower_range"] = struct.unpack_from(">f", data[4:])[0]
elif command in [36]:
out["command_name"] = "calibrate_20ma"
# 20mA校准命令无返回数据
elif command in [37]:
out["command_name"] = "calibrate_4ma"
# 4mA校准命令无返回数据
elif command in [40]:
out["command_name"] = "set_fixed_current_output"
if len(data) > 0:
out["fixed_output_enabled"] = data[0] == 1
if len(data) > 1:
out["fixed_current_value"] = struct.unpack_from(">f", data[1:])[0]
elif command in [43]:
out["command_name"] = "calibrate_zero_point"
# 零点校准命令无返回数据
elif command in [44]:
out["command_name"] = "write_primary_variable_units"
out["units_code"] = data[0]
elif command in [45]:
out["command_name"] = "trim_loop_current_4ma"
out["measured_current"] = struct.unpack_from(">f", data)[0]
elif command in [46]:
out["command_name"] = "trim_loop_current_20ma"
out["measured_current"] = struct.unpack_from(">f", data)[0]
elif command in [47]:
out["command_name"] = "write_primary_variable_output_function"
out["transfer_function_code"] = data[0]
# COMMON COMMANDS
# elif command in [37]:
# out["command_name"] = "set_primary_variable_lower_range_value"
# out[""] =
# request data bytes = NONE, response data bytes = NONE
# elif command in [38]:
# out["command_name"] = "reset_configuration_changed_flag"
# out[""] =
# request data bytes = NONE, response data bytes = NONE
# elif command in [42]:
# out["command_name"] = "perform_master_reset"
# out[""] =
# request data bytes = NONE, response data bytes = NONE
# elif command in [48]:
# out["command_name"] = "read_additional_transmitter_status"
# out[""] =
# request data bytes = NONE, response data bytes = NONE
elif command in [50]:
out["command_name"] = "read_dynamic_variable_assignments"
(
primary_transmitter_variable,
secondary_transmitter_variable,
tertiary_transmitter_variable,
quaternary_transmitter_variable,
) = struct.unpack_from(">BBBB", data)
out["primary_transmitter_variable"] = primary_transmitter_variable
out["secondary_transmitter_variable"] = secondary_transmitter_variable
out["tertiary_transmitter_variable"] = tertiary_transmitter_variable # NOT USED
out["quaternary_transmitter_variable"] = quaternary_transmitter_variable # NOT USED
elif command in [59]:
out["command_name"] = "write_number_of_response_preambles"
n_response_preambles = struct.unpack_from(">B", data)[0]
out["n_response_preambles"] = n_response_preambles
elif command in [66]:
out["command_name"] = "toggle_analog_output_mode"
(
analog_output_selection,
analog_output_units_code,
fixed_analog_output,
) = struct.unpack_from(">BBf", data)
out["analog_output_selection"] = analog_output_selection
out["analog_output_units_code"] = analog_output_units_code
out["fixed_analog_output"] = fixed_analog_output
elif command in [67]:
out["command_name"] = "trim_analog_output_zero"
analog_output_code, analog_output_units_code, measured_analog_output = struct.unpack_from(
">BBf", data
)
out["analog_output_code"] = analog_output_code
out["analog_output_units_code"] = analog_output_units_code
out["measured_analog_output"] = measured_analog_output
elif command in [68]:
out["command_name"] = "trim_analog_output_span"
analog_output_code, analog_output_units_code, measured_analog_output = struct.unpack_from(
">BBf", data
)
out["analog_output_code"] = analog_output_code
out["analog_output_units_code"] = analog_output_units_code
out["measured_analog_output"] = measured_analog_output
elif command in [123]:
out["command_name"] = "select_baud_rate"
out["baud_rate"] = int.from_bytes(data, "big")
return out

@ -0,0 +1,140 @@
__all__ = ["Unpacker"]
import asyncio
from collections import namedtuple
import io
import struct
from tabnanny import check
import warnings
from ._parsing import parse
from . import tools
class Unpacker:
"""
Create an Unpacker to decode a byte stream into HART protocol messages.
The ``file_like`` parameter should be an object which data can be sourced from.
It should support the ``read()`` method.
The ``on_error`` parameter selects the action to take if invalid data is detected.
If set to ``"continue"`` (the default), bytes will be discarded if the byte sequence
does not appear to be a valid message.
If set to ``"warn"``, the behaviour is identical, but a warning message will be emitted.
To instead immediately abort the stream decoding and raise a ``RuntimeError``, set to
``"raise"``.
:param file_like: A file-like object which data can be `read()` from.
:param on_error: Action to take if invalid data is detected.
"""
def __init__(self, file_like=None, on_error="continue"):
if file_like is None:
self._file = io.BytesIO()
else:
self._file = file_like
self.buf = b""
self.on_error = on_error
# print(self._file)
def __iter__(self):
return self
def _decoding_error(self, message="Error decoding message from buffer."):
"""
Take appropriate action if parsing of data stream fails.
:param message: Warning or error message string.
"""
if self.on_error == "raise":
raise RuntimeError(message)
if self.on_error == "warn":
warnings.warn(message)
def _read_one_byte_if_possible(self):
if self._file.in_waiting > 0:
# print(1)
return self._file.read(1)
else:
raise StopIteration
def __next__(self):
# must work with at least two bytes to start with
while len(self.buf) < 3:
self.buf += self._read_one_byte_if_possible()
# keep reading until we find a minimum preamble
# print(self.buf, 11)
while self.buf[:3] not in [b"\xFF\xFF\x06", b"\xFF\xFF\x86"]:
self.buf += self._read_one_byte_if_possible()
# print(self.buf)
self.buf = self.buf[1:]
self._decoding_error("Head of buffer not recognized as valid preamble")
# now the head of our buffer is the start charachter plus two preamble
# we will read all the way through status
if self.buf[2] & 0x80:
l = 12
else:
l = 8
while len(self.buf) < l:
self.buf += self._read_one_byte_if_possible()
# now we can use the bytecount to read through the data and checksum
#
# print(self.buf)
# print(type(self.buf[l - 4]), 222)
if self.buf[l - 4] == 15: # 对command15进行特殊操作 修复报文错误
# bytecount = 19
self.buf = self.buf[:l - 3] + b'\x13\x00\x00\x00'
bytecount = self.buf[l - 3]
response_length = l + bytecount - 1
# print(self.buf, bytecount)
while len(self.buf) < response_length:
self.buf += self._read_one_byte_if_possible()
# checksum
# print(self.buf)
checksum = int.from_bytes(
tools.calculate_checksum(self.buf[2 : response_length - 1]), "big"
)
# print(self.buf)
if checksum != self.buf[response_length - 1]:
# print(66666666)
self._decoding_error("Invalid checksum.")
raise StopIteration
# print(self.buf)
# parse
response = self.buf[2:response_length]
# print(response, 'test')
# print(response)
dict_ = parse(response)
# clear buffer
if len(self.buf) == response_length:
self.buf = b""
else:
self.buf = self.buf[response_length + 3 :]
# return
return dict_
def __aiter__(self):
return self
async def __anext__(self):
while True:
try:
return next(self)
except StopIteration:
await asyncio.sleep(0.001)
def feed(self, data: bytes):
"""
Add byte data to the input stream.
The input stream must support random access, if it does not, must be fed externally
(e.g. serial port data).
:param data: Byte array containing data to add.
"""
pos = self._file.tell()
self._file.seek(0, 2)
self._file.write(data)
self._file.seek(pos)

@ -0,0 +1,267 @@
from . import tools
TRANSFER_FUNCTION_CODE = {
0: "线性 (Linear)",
1: "平方根 (Square Root)",
2: "平方根三次方 (Square Root 3rd Power)",
3: "平方根五次方 (Square Root 5th Power)",
4: "特殊曲线 (Special Curve) - 不推荐使用",
5: "平方 (Square)",
6: "带截止的平方根 (Square root with cut-off)",
10: "等百分比 1:25 (Equal Percentage 1:25)",
11: "等百分比 1:33 (Equal Percentage 1:33)",
12: "等百分比 1:50 (Equal Percentage 1:50)",
15: "快开 1:25 (Quick Open 1:25)",
16: "快开 1:33 (Quick Open 1:33)",
17: "快开 1:50 (Quick Open 1:50)",
30: "双曲线 (Hyperbolic) - Shape Factor 0.10",
31: "双曲线 (Hyperbolic) - Shape Factor 0.20",
32: "双曲线 (Hyperbolic) - Shape Factor 0.30",
34: "双曲线 (Hyperbolic) - Shape Factor 0.50",
37: "双曲线 (Hyperbolic) - Shape Factor 0.70",
40: "双曲线 (Hyperbolic) - Shape Factor 1.00",
41: "双曲线 (Hyperbolic) - Shape Factor 1.50",
42: "双曲线 (Hyperbolic) - Shape Factor 2.00",
43: "双曲线 (Hyperbolic) - Shape Factor 3.00",
44: "双曲线 (Hyperbolic) - Shape Factor 4.00",
45: "双曲线 (Hyperbolic) - Shape Factor 5.00",
100: "平底罐 (Flat bottom tank)",
101: "锥形或金字塔形底罐 (Conical or pyramidal bottom tank)",
102: "抛物线形底罐 (Parabolic bottom tank)",
103: "球形底罐 (Spherical bottom tank)",
104: "斜底罐 (Angled bottom tank)",
105: "平端圆柱罐 (Flat end cylinder tank)",
106: "抛物线端圆柱罐 (Parabolic end cylinder tank)",
107: "球形罐 (Spherical tank)",
230: "离散/开关 (Discrete/Switch)",
250: "未使用 (Not Used)",
251: "无 (None)",
252: "未知 (Unknown)",
253: "特殊 (Special)"
}
REVERSE_TRANSFER_FUNCTION_CODE = {v: k for k, v in TRANSFER_FUNCTION_CODE.items()}
UNITS_CODE = {
# Pressure
1: "inH2O @ 68 F",
2: "inHg @ 0 C",
3: "ftH2O @ 68 F",
4: "mmH2O @ 68 F",
5: "mmHg @ 0 C",
6: "psi",
7: "bar",
8: "mbar",
9: "g/cm2",
10: "kg/cm2",
11: "Pa",
12: "kPa",
13: "torr",
14: "atm",
145: "inH2O @ 60F",
237: "MPa",
238: "inH2O @ 4C",
239: "mmH2O @ 4C",
# Temperature
32: "deg C",
33: "deg F",
34: "deg R",
35: "K",
# Volumetric Flow
15: "cu ft/min",
16: "gal/min",
17: "liter/min",
18: "imp gal/min",
19: "cu m/hr",
22: "gal/sec",
23: "Mgal/day",
24: "liter/sec",
25: "Ml/day",
26: "cu ft/sec",
27: "cu ft/day",
28: "cu m/sec",
29: "cu m/day",
30: "imp gal/hr",
31: "imp gal/day",
121: "std cu m/hr",
122: "std liter/hr",
123: "std cu ft/min",
130: "cu ft/hr",
131: "cu m/min",
132: "bbl/sec",
133: "bbl/min",
134: "bbl/hr",
135: "bbl/day",
136: "gal/hr",
137: "imp gal/sec",
235: "gal/day",
# Velocity
20: "ft/sec",
21: "m/sec",
114: "in/sec",
115: "in/min",
116: "ft/min",
120: "m/hr",
# Volume
40: "gal",
41: "liter",
42: "imp gal",
43: "cu m",
46: "bbl",
110: "bushel",
111: "cu yd",
112: "cu ft",
113: "cu in",
134: "bbl liq",
166: "std cu m",
167: "std l",
168: "std cu ft",
236: "hectoliter",
# Mass
60: "g",
61: "kg",
62: "metric ton",
63: "lb",
64: "short ton",
65: "long ton",
125: "oz",
# Mass Flow
70: "g/sec",
71: "g/min",
72: "g/hr",
73: "kg/sec",
74: "kg/min",
75: "kg/hr",
76: "kg/day",
77: "metric ton/min",
78: "metric ton/hr",
79: "metric ton/day",
80: "lb/sec",
81: "lb/min",
82: "lb/hr",
83: "lb/day",
84: "short ton/min",
85: "short ton/hr",
86: "short ton/day",
87: "long ton/hr",
88: "long ton/day",
# Mass per Volume
90: "SGU",
91: "g/cu cm",
92: "kg/cu m",
93: "lb/gal",
94: "lb/cu ft",
95: "g/ml",
96: "kg/liter",
97: "g/liter",
98: "lb/cu in",
99: "short ton/cu yd",
100: "deg Twaddell",
102: "deg Baume",
103: "deg API",
104: "deg API",
146: "ug/liter",
147: "ug/cu m",
# Viscosity
54: "cSt",
55: "cP",
# Electric Potential
36: "mV",
58: "V",
# Electric Current
39: "mA",
# Electric Resistance
37: "ohm",
163: "kohm",
# Energy (includes Work)
69: "J",
89: "dtherm",
126: "kWh",
128: "MWh",
162: "kcal",
164: "MJ",
165: "Btu",
# Power
127: "kW",
129: "hp",
140: "Mcal/hr",
141: "MJ/hr",
142: "Btu/hr",
# Radial Velocity
117: "deg/sec",
118: "rev/sec",
119: "rpm",
# Miscellaneous
38: "Hz",
57: "percent",
59: "pH",
101: "deg Balling",
105: "percent solids/wt",
106: "percent solids/vol",
107: "deg Plato",
108: "proof/vol",
109: "proof/mass",
139: "ppm",
148: "percent consistency",
149: "vol percent",
150: "percent steam qual",
152: "cu ft/lb",
153: "pF",
154: "ml/liter",
155: "ul/liter",
156: "dB",
160: "deg Brix",
161: "percent LEL",
169: "ppb",
# Generic & Reserved
240: "Manufacturer specific",
241: "Manufacturer specific",
242: "Manufacturer specific",
243: "Manufacturer specific",
244: "Manufacturer specific",
245: "Manufacturer specific",
246: "Manufacturer specific",
247: "Manufacturer specific",
248: "Manufacturer specific",
249: "Manufacturer specific",
250: "Not Used",
251: "Unknown",
252: "Unknown",
253: "Special"
}
REVERSE_UNITS_CODE = {v: k for k, v in UNITS_CODE.items()}
def get_unit_description(code):
"""根据代码获取单位描述"""
return UNITS_CODE.get(code, f"未知代码({code})")
def set_primary_variable_lower_range_value(address: bytes, value) -> bytes:
return tools.pack_command(address, command_id=37)
def reset_configuration_changed_flag(address: bytes) -> bytes:
return tools.pack_command(address, command_id=38)
def perform_master_reset(address: bytes) -> bytes:
return tools.pack_command(address, command_id=42)
def read_additional_transmitter_status(address: bytes) -> bytes:
return tools.pack_command(address, command_id=48)
def read_dynamic_variable_assignments(address: bytes) -> bytes:
return tools.pack_command(address, command_id=50)
def write_number_of_response_preambles(address: bytes, number: int) -> bytes:
data = number.to_bytes(1, "big")
return tools.pack_command(address, command_id=59, data=data)
def toggle_analog_output_mode(address: bytes) -> bytes:
return tools.pack_command(address, command_id=66)
def trim_analog_output_zero(address: bytes) -> bytes:
return tools.pack_command(address, command_id=67)
def trim_analog_output_span(address: bytes) -> bytes:
return tools.pack_command(address, command_id=68)
def select_baud_rate(address: bytes, rate: int) -> bytes:
data = rate.to_bytes(1, "big")
return tools.pack_command(address, command_id=123, data=data)

@ -0,0 +1,147 @@
import math
from typing import Union
def calculate_checksum(command: Union[int, bytes]) -> bytes:
# print(command)
if type(command) == int:
command = command.to_bytes(64, "big") # type: ignore
lrc = 0
for byte in command: # type: ignore
lrc ^= byte
out = lrc.to_bytes(1, "big")
# print(out)
return out
def calculate_long_address(manufacturer_id: int, manufacturer_device_type: int, device_id: bytes):
out = int.from_bytes(device_id, "big")
out |= manufacturer_device_type << 24
out |= manufacturer_id << 32
return out.to_bytes(5, "big")
def pack_command(address, command_id, data=None):
# if type(address) == bytes:
# address = int.from_bytes(address, "big")
if type(command_id) == int:
command_id = command_id.to_bytes(1, "big")
command = b"\xFF\xFF\xFF\xFF\xFF\xFF" # preamble
command += b"\x82" if len(address) >= 5 else b"\x02"
command += address
command += command_id
if data is None:
command += b"\x00" # byte count
else:
# print(len(data), 22222222222)
command += len(data).to_bytes(1, "big") # byte count
command += data # data
# print(command[6:])
command += calculate_checksum(command[6:])
# print(command)
return command
PACKED_ASCII_MAP = {
0x00: '@', 0x01: 'A', 0x02: 'B', 0x03: 'C', 0x04: 'D', 0x05: 'E', 0x06: 'F', 0x07: 'G',
0x08: 'H', 0x09: 'I', 0x0A: 'J', 0x0B: 'K', 0x0C: 'L', 0x0D: 'M', 0x0E: 'N', 0x0F: 'O',
0x10: 'P', 0x11: 'Q', 0x12: 'R', 0x13: 'S', 0x14: 'T', 0x15: 'U', 0x16: 'V', 0x17: 'W',
0x18: 'X', 0x19: 'Y', 0x1A: 'Z', 0x1B: '[', 0x1C: '\\', 0x1D: ']', 0x1E: '^', 0x1F: '_', # 这里应该是 '_' 而不是 '-'
0x20: ' ', 0x21: '!', 0x22: '"', 0x23: '=', 0x24: '$', 0x25: '%', 0x26: '&', 0x27: '`', # 注意这里是反引号 '`'
0x28: '(', 0x29: ')', 0x2A: '*', 0x2B: '+', 0x2C: ',', 0x2D: '-', 0x2E: '.', 0x2F: '/',
0x30: '0', 0x31: '1', 0x32: '2', 0x33: '3', 0x34: '4', 0x35: '5', 0x36: '6', 0x37: '7',
0x38: '8', 0x39: '9', 0x3A: ':', 0x3B: ';', 0x3C: '<', 0x3D: '=', 0x3E: '>', 0x3F: '?'
}
def unpack_packed_ascii(data: bytes, expected_len: int) -> str:
"""Unpacks a byte array into a string using the HART Packed ASCII 6-bit encoding by processing in chunks."""
chars = []
# Process data in 3-byte chunks, as 4 characters (24 bits) fit into 3 bytes (24 bits)
for i in range(0, len(data), 3):
chunk = data[i:i+3]
# Pad chunk to 3 bytes if it's a partial chunk at the end
if len(chunk) < 3:
chunk += b'\x00' * (3 - len(chunk))
byte1, byte2, byte3 = chunk
# Unpack 4 6-bit characters from the 3 8-bit bytes
c1 = byte1 >> 2
c2 = ((byte1 & 0x03) << 4) | (byte2 >> 4)
c3 = ((byte2 & 0x0F) << 2) | (byte3 >> 6)
c4 = byte3 & 0x3F
codes = [c1, c2, c3, c4]
for code in codes:
chars.append(PACKED_ASCII_MAP.get(code, '?'))
# Join and then trim to the exact expected length. Do not rstrip() as spaces can be valid.
return "".join(chars[:expected_len])
# Build a reverse map for packing, handling duplicates deterministically
REVERSE_PACKED_ASCII_MAP = {}
# Iterate through the original map, sorted by code value (key)
for code, char in sorted(PACKED_ASCII_MAP.items(), key=lambda item: item[0]):
# This ensures that for characters with multiple codes, the one with the highest code value is chosen.
REVERSE_PACKED_ASCII_MAP[char] = code
def pack_packed_ascii(text: Union[str, bytes], expected_len: int) -> bytes:
"""
Packs a string or bytes into a HART Packed ASCII byte array.
This function is the symmetrical inverse of the unpack_packed_ascii function.
"""
# Defensively decode if bytes are passed in
if isinstance(text, bytes):
try:
# Try decoding with ascii, fall back to latin-1 which never fails
text = text.decode('ascii')
except UnicodeDecodeError:
text = text.decode('latin-1')
# 1. Pad/truncate the input string to the exact expected length
padded_text = text.ljust(expected_len, ' ')
padded_text = padded_text[:expected_len]
# 2. Convert all characters to their corresponding 6-bit codes
codes = [REVERSE_PACKED_ASCII_MAP.get(c, 0x3F) for c in padded_text] # Default to '?' (0x3F)
packed_bytes = bytearray()
# 3. Process the codes in chunks of 4
for i in range(0, len(codes), 4):
# Get a chunk of up to 4 codes.
chunk = codes[i:i+4]
# If it's a partial chunk at the end, pad with space codes to make a full chunk of 4.
while len(chunk) < 4:
chunk.append(REVERSE_PACKED_ASCII_MAP[' '])
c1, c2, c3, c4 = chunk
# 4. Pack the 4 6-bit codes into 3 8-bit bytes, mirroring the unpacking logic
byte1 = (c1 << 2) | (c2 >> 4)
byte2 = ((c2 & 0x0F) << 4) | (c3 >> 2)
byte3 = ((c3 & 0x03) << 6) | c4
packed_bytes.extend([byte1, byte2, byte3])
# 5. Calculate the exact number of bytes required for the original expected length
num_bytes = math.ceil(expected_len * 6 / 8)
# 6. Get the core byte array, trimmed to the precise required size
result = bytes(packed_bytes[:num_bytes])
# For Command 17 (Write Message), the spec requires a fixed 24-byte data field.
# This corresponds to an expected character length of 32.
if expected_len == 32:
# Pad the result to exactly 24 bytes.
return result.ljust(24, b'\x00')
return result

@ -0,0 +1,136 @@
import struct
from typing import Tuple
from . import tools
def read_unique_identifier(address: bytes) -> bytes:
return tools.pack_command(address, command_id=0)
def read_primary_variable(address: bytes) -> bytes:
return tools.pack_command(address, command_id=1)
def read_loop_current_and_percent(address: bytes) -> bytes:
return tools.pack_command(address, command_id=2)
def read_dynamic_variables_and_loop_current(address: bytes) -> bytes:
return tools.pack_command(address, command_id=3)
def write_polling_address(address: bytes, new_polling_address: int) -> bytes:
assert 0 <= new_polling_address <= 63
return tools.pack_command(address, command_id=6, data=new_polling_address.to_bytes(1, "big"))
def read_unique_identifier_associated_with_tag(tag: bytes, *, address: int = 0) -> bytes:
return tools.pack_command(address, command_id=11, data=tag)
def read_message(address: bytes) -> bytes:
return tools.pack_command(address, command_id=12)
def read_tag_descriptor_date(address: bytes) -> bytes:
return tools.pack_command(address, command_id=13)
def read_primary_variable_information(address: bytes) -> bytes:
return tools.pack_command(address, command_id=14)
def read_output_information(address: bytes) -> bytes:
return tools.pack_command(address, command_id=15)
def read_final_assembly_number(address: bytes) -> bytes:
return tools.pack_command(address, command_id=16)
def write_message(address: bytes, message: str) -> bytes:
"""Writes a 24-char message to the device (Command 17)."""
# Per spec, message is 24 characters, packed into 18 bytes.
packed_message = tools.pack_packed_ascii(message, expected_len=32)
return tools.pack_command(address, command_id=17, data=packed_message)
def write_tag_descriptor_date(address: bytes, tag: str, descriptor: str, date: Tuple[int, int, int]):
"""Writes tag, descriptor, and date to the device (Command 18)."""
# Per spec, tag is 6 chars, descriptor is 12 chars.
packed_tag = tools.pack_packed_ascii(tag, 8)
packed_descriptor = tools.pack_packed_ascii(descriptor, 16)
data = packed_tag + packed_descriptor
day, month, year = date
data += day.to_bytes(1, "big")
data += month.to_bytes(1, "big")
data += year.to_bytes(1, "big")
# print(len(data))
return tools.pack_command(address, command_id=18, data=data)
def write_final_assembly_number(address: bytes, number: int):
data = number.to_bytes(3, "big")
return tools.pack_command(address, command_id=19, data=data)
def write_primary_variable_damping(address: bytes, damping_time: float) -> bytes:
"""修改主变量阻尼时间单位s command 34"""
data = struct.pack(">f", damping_time)
return tools.pack_command(address, command_id=34, data=data)
def write_primary_variable_range(address: bytes, units_code: int, upper_range: float, lower_range: float) -> bytes:
"""修改主变量量程范围 command 35"""
data = units_code.to_bytes(1, "big")
data += struct.pack(">ff", upper_range, lower_range)
return tools.pack_command(address, command_id=35, data=data)
def calibrate_20ma(address: bytes) -> bytes:
"""20mA校准 command 36"""
return tools.pack_command(address, command_id=36)
def calibrate_4ma(address: bytes) -> bytes:
"""4mA校准 command 37"""
return tools.pack_command(address, command_id=37)
def set_fixed_current_output(address: bytes, enable: bool, current_value: float = 0.0) -> bytes:
"""设置固定电流输出/退出固定模式 command 40"""
if enable:
data = struct.pack(">f", current_value)
else:
data = struct.pack(">f", 0)
return tools.pack_command(address, command_id=40, data=data)
def calibrate_zero_point(address: bytes) -> bytes:
"""零点校准 command 43"""
return tools.pack_command(address, command_id=43)
def write_primary_variable_units(address: bytes, units_code: int) -> bytes:
"""修改主变量单位 command 44"""
data = units_code.to_bytes(1, "big")
return tools.pack_command(address, command_id=44, data=data)
def trim_loop_current_4ma(address: bytes, measured_current: float) -> bytes:
"""微调环路电流4mA command 45"""
data = struct.pack(">f", measured_current)
return tools.pack_command(address, command_id=45, data=data)
def trim_loop_current_20ma(address: bytes, measured_current: float) -> bytes:
"""微调环路电流20mA command 46"""
data = struct.pack(">f", measured_current)
return tools.pack_command(address, command_id=46, data=data)
def write_primary_variable_output_function(address: bytes, transfer_function_code: int) -> bytes:
"""修改主变量输出函数 command 47"""
data = transfer_function_code.to_bytes(1, "big")
return tools.pack_command(address, command_id=47, data=data)
Loading…
Cancel
Save