You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
14 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HART设备连接界面模块
实现设备连接、断开等功能
"""
import serial
import serial.tools.list_ports
from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QComboBox, QGroupBox, QFormLayout,
QMessageBox, QSpinBox, QRadioButton)
from PyQt5.QtCore import Qt, pyqtSignal, QThread, QSize
from PyQt5.QtGui import QFont
import qtawesome
from protocol.HART import HARTCommunication, PrimaryMasterMode, SecondaryMasterMode, BurstModeON, BurstModeOFF
class ScanDeviceThread(QThread):
"""在后台扫描设备的线程"""
scanCompleted = pyqtSignal(object) # 扫描完成信号参数为找到的设备信息或None
def __init__(self, hartComm):
super().__init__()
self.hartComm = hartComm
def run(self):
result = self.hartComm.scanForDevice()
self.scanCompleted.emit(result)
class HartConnectionWidget(QWidget):
"""
HART设备连接界面模块
实现设备连接、断开等功能
"""
# 自定义信号
connectionStatusChanged = pyqtSignal(bool) # 连接状态改变信号
hartCommCreated = pyqtSignal(object) # HART通信对象创建信号
def __init__(self):
"""
初始化连接界面
"""
super().__init__()
# 初始化变量
self.hartComm = None
self.isConnected = False
# 初始化界面
self.initUI()
# 检查COM7是否可用
self.refreshPortList()
def initUI(self):
"""
初始化用户界面
"""
# 创建主布局
mainLayout = QVBoxLayout()
mainLayout.setContentsMargins(20, 20, 20, 20)
mainLayout.setSpacing(20)
# 创建连接设置组
connectionGroup = QGroupBox("串口设置")
connectionGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
connectionLayout = QFormLayout()
try:
connectionLayout.setLabelAlignment(Qt.AlignRight)
except AttributeError:
pass # Linter may fail on this
self.portLabel = QLabel("COM7")
connectionLayout.addRow("串口:", self.portLabel)
self.baudrateLabel = QLabel("1200")
connectionLayout.addRow("波特率:", self.baudrateLabel)
self.parityLabel = QLabel("奇校验")
connectionLayout.addRow("校验方式:", self.parityLabel)
self.timeoutLabel = QLabel("3 秒")
connectionLayout.addRow("超时时间:", self.timeoutLabel)
connectionGroup.setLayout(connectionLayout)
mainLayout.addWidget(connectionGroup)
# 创建地址与模式设置组
configGroup = QGroupBox("寻址与模式")
configGroup.setFont(QFont("Microsoft YaHei", 10, QFont.Bold))
configLayout = QFormLayout()
try:
configLayout.setLabelAlignment(Qt.AlignRight)
except AttributeError:
pass # Linter may fail on this
# 寻址方式
addressingLayout = QHBoxLayout()
self.rbSpecificAddress = QRadioButton("指定短地址")
self.rbSpecificAddress.setChecked(True)
self.rbScanAddress = QRadioButton("扫描设备(轮询)")
addressingLayout.addWidget(self.rbSpecificAddress)
addressingLayout.addWidget(self.rbScanAddress)
configLayout.addRow("寻址方式:", addressingLayout)
# 短地址输入
self.pollingAddressSpinBox = QSpinBox()
self.pollingAddressSpinBox.setRange(0, 63)
configLayout.addRow("轮询地址:", self.pollingAddressSpinBox)
# 主站模式
self.masterModeCombo = QComboBox()
self.masterModeCombo.addItems(["主要主站", "次要主站"])
configLayout.addRow("主站模式:", self.masterModeCombo)
# 突发模式
self.burstModeCombo = QComboBox()
self.burstModeCombo.addItems(["关闭", "开启"])
configLayout.addRow("突发模式:", self.burstModeCombo)
configGroup.setLayout(configLayout)
mainLayout.addWidget(configGroup)
# 创建按钮布局
buttonLayout = QHBoxLayout()
buttonLayout.setSpacing(10)
# 连接/扫描按钮
self.connectButton = QPushButton("连接设备")
self.connectButton.setObjectName("startProtocolBtn")
self.connectButton.setIcon(qtawesome.icon('fa5s.play-circle', color='#047857'))
self.connectButton.setIconSize(QSize(22, 22))
self.connectButton.clicked.connect(self.connectDevice)
self.scanButton = QPushButton("扫描设备")
self.scanButton.setObjectName("forceBtn") # Yellow style for scanning
self.scanButton.setIcon(qtawesome.icon('fa5s.search', color='#D97706'))
self.scanButton.setIconSize(QSize(22, 22))
self.scanButton.clicked.connect(self.scanDevice)
self.scanButton.setVisible(False) # 默认隐藏
buttonLayout.addWidget(self.connectButton)
buttonLayout.addWidget(self.scanButton)
# 信号连接
self.rbSpecificAddress.toggled.connect(self.updateAddressingMode)
# 断开按钮
self.disconnectButton = QPushButton("断开连接")
self.disconnectButton.setObjectName("delBtn") # Red style
self.disconnectButton.setIcon(qtawesome.icon('fa5s.stop-circle', color='#B91C1C'))
self.disconnectButton.setIconSize(QSize(22, 22))
self.disconnectButton.clicked.connect(self.disconnectDevice)
self.disconnectButton.setEnabled(False)
buttonLayout.addWidget(self.disconnectButton)
mainLayout.addLayout(buttonLayout)
# 添加状态标签
self.statusLabel = QLabel("未连接")
try:
self.statusLabel.setAlignment(Qt.AlignCenter)
except AttributeError:
pass # Linter may fail on this
self.statusLabel.setFont(QFont("Microsoft YaHei", 10))
self.statusLabel.setStyleSheet("color: red;")
mainLayout.addWidget(self.statusLabel)
# 添加弹性空间
mainLayout.addStretch(1)
# 设置主布局
self.setLayout(mainLayout)
def refreshPortList(self):
"""
刷新串口列表仅检查COM7是否可用
"""
ports = [port.device for port in serial.tools.list_ports.comports()]
# 检查COM7是否可用
if "COM7" in ports:
self.connectButton.setEnabled(True)
self.statusLabel.setText("未连接")
else:
self.connectButton.setEnabled(False)
self.statusLabel.setText("未检测到COM7串口设备")
def updateAddressingMode(self, checked):
is_specific = self.rbSpecificAddress.isChecked()
self.pollingAddressSpinBox.setEnabled(is_specific)
self.connectButton.setVisible(is_specific)
self.scanButton.setVisible(not is_specific)
def scanDevice(self):
"""开始扫描设备"""
if self.isConnected:
QMessageBox.information(self, "提示", "设备已连接。如需重新扫描,请先断开连接。")
return
if not self._ensure_serial_connection():
return
self.scanButton.setEnabled(False)
self.scanButton.setText("扫描中...")
self.statusLabel.setText("正在扫描设备 (0-63)...")
self.statusLabel.setStyleSheet("color: orange;")
self.scanThread = ScanDeviceThread(self.hartComm)
self.scanThread.scanCompleted.connect(self.onScanCompleted)
self.scanThread.start()
def onScanCompleted(self, result):
"""扫描完成后的处理"""
self.scanButton.setEnabled(True)
self.scanButton.setText("扫描设备")
if result and 'address' in result:
found_addr = result['address']
QMessageBox.information(self, "扫描成功", f"在轮询地址 {found_addr} 找到设备!\n已自动配置为该地址。")
self.pollingAddressSpinBox.setValue(found_addr)
self.rbSpecificAddress.setChecked(True) # 切换回指定地址模式
# 自动连接
self.connectDevice()
else:
QMessageBox.warning(self, "扫描失败", "未在任何轮询地址 (0-63) 找到设备。")
self.statusLabel.setText("扫描完成,未找到设备")
self.statusLabel.setStyleSheet("color: orange;")
def _ensure_serial_connection(self):
"""确保串口已连接并创建HART对象"""
if self.hartComm and self.hartComm.isConnected():
return True
if self.hartComm:
self.hartComm.disconnect()
port = "COM7"
baudrate = 1200
timeout = 3
parity = serial.PARITY_ODD
try:
self.hartComm = HARTCommunication(port=port, baudrate=baudrate, parity=parity, timeout=timeout)
if self.hartComm.connect():
self.hartCommCreated.emit(self.hartComm)
return True
else:
self.hartComm = None
QMessageBox.critical(self, "连接失败", "无法打开COM7串口请检查设备连接。")
return False
except Exception as e:
self.hartComm = None
QMessageBox.critical(self, "连接错误", f"连接过程中发生错误:{str(e)}")
return False
def connectDevice(self):
"""使用指定配置连接设备并验证"""
if self.isConnected:
QMessageBox.information(self, "提示", "设备已连接。如需更换地址或模式,请先断开连接。")
return
if not self._ensure_serial_connection() or not self.hartComm:
return
try:
# 应用界面上的配置
is_primary = self.masterModeCombo.currentIndex() == 0
self.hartComm.setMasterMode(isPrimaryMaster=is_primary)
is_burst_on = self.burstModeCombo.currentIndex() == 1
self.hartComm.setBurstMode(enable=is_burst_on)
polling_address = self.pollingAddressSpinBox.value()
self.hartComm.buildHartAddress(pollingAddress=polling_address, isLongFrame=False)
self.statusLabel.setText(f"正在验证地址 {polling_address}...")
self.statusLabel.setStyleSheet("color: orange;")
app_instance = QApplication.instance()
if app_instance:
app_instance.processEvents()
# 验证设备通信
device_info = self.hartComm.readUniqueId()
if isinstance(device_info, list) and device_info:
self.isConnected = True
self.statusLabel.setText(f"已连接 (地址: {polling_address})")
self.statusLabel.setStyleSheet("color: green;")
self.disconnectButton.setEnabled(True)
self.connectButton.setEnabled(False)
self.scanButton.setVisible(False)
self._set_config_controls_enabled(False)
self.connectionStatusChanged.emit(True)
QMessageBox.information(self, "连接成功", f"成功连接到地址为 {polling_address} 的设备。")
else:
# self.isConnected remains False
self.statusLabel.setText("连接失败")
self.statusLabel.setStyleSheet("color: red;")
self.connectionStatusChanged.emit(False)
QMessageBox.critical(self, "连接失败", f"在地址 {polling_address} 未找到设备或设备无响应。")
# 保持串口打开,允许用户尝试其他地址, 但需要一个断开按钮
self.disconnectButton.setEnabled(True)
except Exception as e:
self.isConnected = False
self.statusLabel.setText("连接错误")
self.statusLabel.setStyleSheet("color: red;")
self.connectionStatusChanged.emit(False)
QMessageBox.critical(self, "配置错误", f"应用配置时发生错误:{str(e)}")
def disconnectDevice(self, silent=False):
"""
断开设备连接
"""
try:
if self.hartComm:
self.hartComm.disconnect()
self.isConnected = False
self.statusLabel.setText("未连接")
self.statusLabel.setStyleSheet("color: red;")
self.disconnectButton.setEnabled(False)
self.connectButton.setEnabled(True)
self._set_config_controls_enabled(True)
self.updateAddressingMode(self.rbSpecificAddress.isChecked())
self.connectionStatusChanged.emit(False)
if not silent:
QMessageBox.information(self, "断开连接", "已成功断开设备连接!")
except Exception as e:
if not silent:
QMessageBox.warning(self, "断开连接错误", f"断开连接过程中发生错误:{str(e)}")
# 恢复UI到断开状态
self.isConnected = False
self.statusLabel.setText("未连接")
self.statusLabel.setStyleSheet("color: red;")
self.disconnectButton.setEnabled(False)
self.connectButton.setEnabled(True)
self._set_config_controls_enabled(True)
self.connectionStatusChanged.emit(False)
def _set_config_controls_enabled(self, enabled):
"""启用或禁用配置相关的控件"""
self.pollingAddressSpinBox.setEnabled(enabled)
self.masterModeCombo.setEnabled(enabled)
self.burstModeCombo.setEnabled(enabled)
self.rbSpecificAddress.setEnabled(enabled)
self.rbScanAddress.setEnabled(enabled)
def updatePollingAddress(self, new_address):
"""
从外部更新轮询地址的UI显示
"""
self.pollingAddressSpinBox.setValue(new_address)
if self.isConnected:
self.statusLabel.setText(f"已连接 (地址: {new_address})")