You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

468 lines
20 KiB
Python

1 month ago
import serial
import time
import threading
from typing import Optional, List, Dict, Any, Union, Tuple
from protocol.HART import universal, common, tools
from protocol.HART._parsing import parse
from protocol.HART._unpacker import Unpacker
BurstModeON = 1 # 突发模式开启
BurstModeOFF = 0 # 突发模式关闭
PrimaryMasterMode = 1 # 主要模式
SecondaryMasterMode = 0 # 次要副主站模式
class HARTCommunication:
"""
HART协议通信类使用COM7端口1200波特率奇校验
"""
def __init__(self, port='COM7', baudrate=1200, parity=serial.PARITY_ODD, timeout=3):
"""
初始化HART通信类
Args:
port (str): 串口端口号默认为'COM7'
baudrate (int): 波特率默认为1200
parity (int): 校验方式默认为奇校验
timeout (float): 超时时间单位秒
"""
self.port = port
self.baudrate = baudrate
self.parity = parity
self.timeout = timeout
self.serialConnection: Optional[serial.Serial] = None
self.deviceAddress: Optional[bytes] = None
self.comm_lock = threading.RLock()
self.masterMode = PrimaryMasterMode # 默认为主要主站
self.burstMode = BurstModeOFF # 默认关闭突发模式
def connect(self) -> bool:
"""
建立串口连接
Returns:
bool: 连接是否成功
"""
try:
self.serialConnection = serial.Serial(
port=self.port,
baudrate=self.baudrate,
parity=self.parity,
stopbits=1,
bytesize=8,
timeout=self.timeout,
write_timeout = 3,
xonxoff = True
)
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def disconnect(self):
"""
断开串口连接
"""
if self.serialConnection and self.serialConnection.is_open:
self.serialConnection.close()
def sendCommand(self, command_bytes: bytes) -> Union[List[Dict[str, Any]], Dict[str, str]]:
"""
发送HART命令
Args:
command_bytes (bytes): 要发送的命令字节
Returns:
Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据列表或错误字典
"""
if not self.serialConnection or not self.serialConnection.is_open:
raise ConnectionError("串口未连接")
with self.comm_lock:
try:
# 发送命令前清空缓冲区,防止旧数据干扰
self.serialConnection.reset_input_buffer()
self.serialConnection.reset_output_buffer()
self.serialConnection.write(command_bytes)
self.serialConnection.flush() # 等待数据完全发出
print(f"发送命令: {command_bytes.hex()}")
# 等待响应
time.sleep(1)
# 使用Unpacker解析响应
unpacker = Unpacker(self.serialConnection)
msgList: List[Dict[str, Any]] = []
for msg in unpacker:
msgList.append(dict(msg))
if not msgList:
return {"error": "设备无响应或响应超时"}
return msgList
except Exception as e:
print(f"发送命令失败: {e}")
return {"error": f"发送命令失败: {e}"}
def sendCommandWithRetry(self, command_bytes: bytes, max_retries=3) -> Union[List[Dict[str, Any]], Dict[str, str]]:
"""
发送HART命令带重试和自动重连机制
Args:
command_bytes (bytes): 要发送的命令字节
max_retries (int): 最大重试次数
Returns:
Union[List[Dict[str, Any]], Dict[str, str]]: 解析后的响应数据或错误字典
"""
for attempt in range(max_retries):
try:
result = self.sendCommand(command_bytes)
if isinstance(result, list) and result: # 成功并且有数据
return result
# 处理失败情况
error_info = result if isinstance(result, dict) else {"error": "未知错误"}
print(f"{attempt + 1}次尝试失败,响应: {error_info.get('error', 'N/A')},重试中...")
# 如果是超时或无响应,则尝试重连
if error_info.get("error") == "设备无响应或响应超时":
print("检测到设备无响应,正在尝试重新连接串口...")
self.disconnect()
time.sleep(0.1)
if not self.connect():
print("重新连接失败,放弃重试。")
return {"error": "重新连接串口失败"}
print("串口重新连接成功。")
time.sleep(0.2)
except Exception as e:
print(f"{attempt + 1}次尝试发生异常: {e}")
if attempt == max_retries - 1:
return {"error": f"重试{max_retries}次后仍然失败: {e}"}
# 发生异常时也尝试重连
print("发生异常,正在尝试重新连接串口...")
self.disconnect()
time.sleep(0.5)
if not self.connect():
print("重新连接失败,放弃重试。")
return {"error": "重新连接串口失败"}
print("串口重新连接成功。")
time.sleep(1)
return {"error": f"重试{max_retries}次后仍然失败"}
def readUniqueId(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_unique_identifier(self.deviceAddress)
deviceInfo = self.sendCommandWithRetry(command)
if isinstance(deviceInfo, list):
for msg in deviceInfo:
if isinstance(msg, dict):
manufacturer_id = msg.get('manufacturer_id')
manufacturer_device_type = msg.get('manufacturer_device_type')
device_id = msg.get('device_id')
if all(v is not None for v in [manufacturer_id, manufacturer_device_type, device_id]):
assert manufacturer_id is not None and manufacturer_device_type is not None and device_id is not None
expandedDeviceType = (manufacturer_id << 8) | manufacturer_device_type
self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=device_id, isLongFrame=True)
if self.deviceAddress:
print(f"Rebuilt long address: {self.deviceAddress.hex()}")
else:
print("Warning: Could not rebuild long address from message, missing required keys.")
else:
print(f"Warning: Expected a dict but got {type(msg)}. Cannot process for long address.")
return deviceInfo
def readPrimaryVariable(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_primary_variable(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readLoopCurrentAndPercent(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_loop_current_and_percent(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readDynamicVariablesAndLoopCurrent(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_dynamic_variables_and_loop_current(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readMessage(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_message(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readTagDescriptorDate(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_tag_descriptor_date(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readPrimaryVariableInformation(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_primary_variable_information(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readOutputInformation(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_output_information(self.deviceAddress)
return self.sendCommandWithRetry(command)
def readFinalAssemblyNumber(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.read_final_assembly_number(self.deviceAddress)
return self.sendCommandWithRetry(command)
# ========== 校准功能 ==========\\n
def calibrate4mA(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_4ma(self.deviceAddress)
return self.sendCommandWithRetry(command)
def calibrate20mA(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_20ma(self.deviceAddress)
return self.sendCommandWithRetry(command)
def calibrateZeroPoint(self):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.calibrate_zero_point(self.deviceAddress)
return self.sendCommandWithRetry(command)
# ========== 量程和输出设置 ==========\\n
def writePrimaryVariableDamping(self, dampingTime: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_damping(self.deviceAddress, dampingTime)
return self.sendCommandWithRetry(command)
def writePrimaryVariableRange(self, unitsCode: int, upperRange: float, lowerRange: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_range(self.deviceAddress, unitsCode, upperRange, lowerRange)
return self.sendCommandWithRetry(command)
def writePrimaryVariableUnits(self, unitsCode: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_units(self.deviceAddress, unitsCode)
return self.sendCommandWithRetry(command)
def writePrimaryVariableOutputFunction(self, transferFunctionCode: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_primary_variable_output_function(self.deviceAddress, transferFunctionCode)
return self.sendCommandWithRetry(command)
# ========== 电流微调功能 ==========\\n
def trimLoopCurrent4mA(self, measuredCurrent: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.trim_loop_current_4ma(self.deviceAddress, measuredCurrent)
return self.sendCommandWithRetry(command)
def trimLoopCurrent20mA(self, measuredCurrent: float):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.trim_loop_current_20ma(self.deviceAddress, measuredCurrent)
return self.sendCommandWithRetry(command)
def setFixedCurrentOutput(self, enable: bool, currentValue: float = 0.0):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.set_fixed_current_output(self.deviceAddress, enable, currentValue)
return self.sendCommandWithRetry(command)
# ========== 突发模式和主站模式设置 ==========\\n
def setBurstMode(self, enable: bool, burstMessageCommand: int = 1):
self.burstMode = BurstModeON if enable else BurstModeOFF
if self.deviceAddress:
self._rebuildCurrentAddress()
return {"burst_mode": self.burstMode, "burst_message_command": burstMessageCommand}
def setMasterMode(self, isPrimaryMaster: bool):
self.masterMode = PrimaryMasterMode if isPrimaryMaster else SecondaryMasterMode
if self.deviceAddress:
self._rebuildCurrentAddress()
return {"master_mode": self.masterMode}
def _rebuildCurrentAddress(self):
if self.deviceAddress is None:
return
if len(self.deviceAddress) == 5: # 长帧地址
byte0 = self.deviceAddress[0]
expandedDeviceType = ((byte0 & 0x3F) << 8) | self.deviceAddress[1]
deviceId = (self.deviceAddress[2] << 16) | (self.deviceAddress[3] << 8) | self.deviceAddress[4]
self.buildHartAddress(expandedDeviceType=expandedDeviceType, deviceId=deviceId, isLongFrame=True)
else: # 短帧地址
pollingAddress = self.deviceAddress[0] & 0x3F
self.buildHartAddress(pollingAddress=pollingAddress, isLongFrame=False)
# ========== 实用工具方法 ==========\\n
def getDeviceStatus(self):
if not self.deviceAddress:
return {"error": "设备地址未设置"}
return self.readUniqueId()
def isConnected(self) -> bool:
return self.serialConnection is not None and self.serialConnection.is_open
def getCurrentAddress(self) -> Optional[bytes]:
return self.deviceAddress
def setDeviceAddress(self, address: bytes):
self.deviceAddress = address
def writeMessage(self, message: str):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_message(self.deviceAddress, message)
return self.sendCommandWithRetry(command, max_retries=1)
def writeTagDescriptorDate(self, tag: str, descriptor: str, date: Tuple[int, int, int]):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_tag_descriptor_date(self.deviceAddress, tag, descriptor, date)
return self.sendCommandWithRetry(command, max_retries=1)
def writeFinalAssemblyNumber(self, number: int):
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
command = universal.write_final_assembly_number(self.deviceAddress, number)
return self.sendCommandWithRetry(command, max_retries=1)
def write_polling_address(self, new_polling_address: int):
"""
修改设备的轮询地址 (Command 6)
Args:
new_polling_address (int): 新的轮询地址 (0-63)
"""
if self.deviceAddress is None:
raise ValueError("设备地址未设置")
if not (0 <= new_polling_address <= 63):
raise ValueError("轮询地址必须在0-63范围内")
command = universal.write_polling_address(self.deviceAddress, new_polling_address)
result = self.sendCommandWithRetry(command)
# The response should contain the new polling address
if isinstance(result, list) and result and result[0].get('polling_address') == new_polling_address:
print(f"轮询地址成功修改为: {new_polling_address}")
# Update the internal address to the new one
self.buildHartAddress(pollingAddress=new_polling_address, isLongFrame=False)
return result
else:
error_msg = f"修改轮询地址失败. 响应: {result}"
print(error_msg)
raise Exception(error_msg)
def scanForDevice(self) -> Optional[Dict[str, Any]]:
if not self.serialConnection:
raise ConnectionError("串口未连接")
print("开始扫描设备...")
original_timeout = self.serialConnection.timeout
try:
self.serialConnection.timeout = 0.5
for addr in range(64):
print(f"正在尝试轮询地址: {addr}")
self.buildHartAddress(pollingAddress=addr, isLongFrame=False)
try:
if self.deviceAddress is None: continue
command = universal.read_unique_identifier(self.deviceAddress)
device_info_list = self.sendCommandWithRetry(command, max_retries=1)
if isinstance(device_info_list, list) and device_info_list:
print(f"在地址 {addr} 找到设备: {device_info_list[0]}")
return {"address": addr, "device_info": device_info_list[0]}
except Exception as e:
print(f"地址 {addr} 扫描异常: {e}")
continue
finally:
if self.serialConnection:
self.serialConnection.timeout = original_timeout
print("扫描完成,未找到设备。")
return None
def buildHartAddress(self, expandedDeviceType: Optional[int] = None, deviceId: Optional[int] = None, pollingAddress: int = 0, isLongFrame: bool = False):
master = self.masterMode
burstMode = self.burstMode
if isLongFrame:
if expandedDeviceType is None or deviceId is None:
raise ValueError("长帧格式需要提供expandedDeviceType和deviceId参数")
if not (0 <= deviceId <= 0xFFFFFF):
raise ValueError("设备ID必须在0-16777215范围内")
byte0 = (master << 7) | (burstMode << 6) | ((expandedDeviceType >> 8) & 0x3F)
byte1 = expandedDeviceType & 0xFF
byte2 = (deviceId >> 16) & 0xFF
byte3 = (deviceId >> 8) & 0xFF
byte4 = deviceId & 0xFF
self.deviceAddress = bytes([byte0, byte1, byte2, byte3, byte4])
else:
if not (0 <= pollingAddress <= 63):
raise ValueError("轮询地址必须在0-63范围内")
self.deviceAddress = bytes([(master << 7) | (burstMode << 6) | (pollingAddress & 0x3F)])
return self.deviceAddress
if __name__ == '__main__':
hart = HARTCommunication()
if hart.connect():
print("=== HART通信类功能演示 ===")
address = hart.buildHartAddress(pollingAddress=0, isLongFrame=False)
print(f"设备地址: {address.hex()}")
print("\n--- 读取设备信息 ---")
device_info = hart.readUniqueId()
if isinstance(device_info, list):
for msg in device_info:
print(f"设备信息: {msg}")
# print("\n--- 读取主变量 ---")
# primary_var = hart.readPrimaryVariable()
# if isinstance(primary_var, list):
# for msg in primary_var:
# print(f"主变量: {msg}")
testMEs: List[Dict[str, Any]] | Dict[str, str] = hart.writePrimaryVariableDamping(1)
if isinstance(testMEs, list):
for msg in testMEs:
print(f"信息: {msg}")
testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation()
if isinstance(testMEs1, list):
for msg in testMEs1:
print(f"信息: {msg}")
testMEs1: List[Dict[str, Any]] | Dict[str, str] = hart.readOutputInformation()
if isinstance(testMEs1, list):
for msg in testMEs1:
print(f"信息: {msg}")
hart.disconnect()
print("\n连接已断开")
else:
print("连接失败")