You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

712 lines
26 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from utils.DBModels.ProtocolModel import TCPSetting, RTUSetting
from .tcpmaster_example import TcpMaster
from .rtumaster_example import RTUMaster
from .tcpslave_example import TCPSlave
from .rtuslave_example import RTUSlave
from modbus_tk import hooks
import threading
import time
class ModbusManager:
"""Modbus 通讯管理器,负责管理所有 Modbus TCP/RTU 主站和从站"""
def __init__(self):
# Modbus 通讯实例
self.modbusTcpMaster = None
self.modbusRtuMaster = None
self.modbusTcpSlave = None
self.modbusRtuSlave = None
# 线程管理
self.modbusReadThreads = {} # 存储各个Modbus读取线程
self.modbusStopEvents = {} # 存储各个Modbus停止事件
self.modbusLocks = {} # 存储各个Modbus的锁
# 变量缓存回调
self.variableValueCache = None
self.cacheLock = None
self.varInfoCache = None
# 报文记录
self.messageHistory = {
'send': [], # 发送的报文
'receive': [] # 接收的报文
}
self.maxMessageCount = 100 # 最大保存报文数量
self.messageLock = threading.Lock()
# 设置报文捕获 hooks
self._setupHooks()
def setVariableCache(self, variableValueCache, cacheLock, varInfoCache):
"""设置变量缓存引用"""
self.variableValueCache = variableValueCache
self.cacheLock = cacheLock
self.varInfoCache = varInfoCache
# ==================== 启动/停止方法 ====================
def startModbusTcpMaster(self):
"""启动 Modbus TCP 主站"""
try:
if self.modbusTcpMaster is not None:
self.modbusTcpMaster.master.close()
self.modbusTcpMaster = None
# 从数据库获取TCP设置
tcpSettings = self._getTcpSettings('master')
if not tcpSettings:
return False
# 创建TCP主站实例
# print(tcpSettings['host'], tcpSettings['port'])
self.modbusTcpMaster = TcpMaster(
host=tcpSettings['host'],
port=tcpSettings['port']
)
# 启动读取线程
self._startModbusReadThread('TCP_MASTER', tcpSettings['frequency'])
return True
except Exception as e:
print(f"启动 Modbus TCP 主站失败: {str(e)}")
return False
def stopModbusTcpMaster(self):
"""停止 Modbus TCP 主站"""
try:
if self.modbusTcpMaster is None:
return True
# 停止读取线程
self._stopModbusReadThread('TCP_MASTER')
# 清理主站实例
self.modbusTcpMaster = None
return True
except Exception as e:
return False
def startModbusRtuMaster(self):
"""启动 Modbus RTU 主站"""
try:
if self.modbusRtuMaster is not None:
self.modbusRtuMaster.master.close()
self.modbusRtuMaster = None
# 从数据库获取RTU设置
rtuSettings = self._getRtuSettings('master')
if not rtuSettings:
return False
# 创建RTU主站实例
self.modbusRtuMaster = RTUMaster(
port=rtuSettings['port'],
baudrate=rtuSettings['baudrate'],
bytesize=rtuSettings['byteSize'],
parity=rtuSettings['parity'][0], # 取第一个字符
stopbits=rtuSettings['stopbits']
)
# 启动读取线程
self._startModbusReadThread('RTU_MASTER', rtuSettings['frequency'])
return True
except Exception as e:
return False
def stopModbusRtuMaster(self):
"""停止 Modbus RTU 主站"""
try:
if self.modbusRtuMaster is None:
return True
# 停止读取线程
self._stopModbusReadThread('RTU_MASTER')
# 清理主站实例
self.modbusRtuMaster = None
return True
except Exception as e:
return False
def startModbusTcpSlave(self):
"""启动 Modbus TCP 从站"""
try:
if self.modbusTcpSlave is not None:
self.modbusTcpSlave.server.close()
self.modbusTcpSlave = None
# 从数据库获取TCP设置
tcpSettings = self._getTcpSettings('slave')
if not tcpSettings:
return False
# 创建TCP从站实例
self.modbusTcpSlave = TCPSlave(
address=tcpSettings['host'],
port=tcpSettings['port']
)
# 添加默认从站ID
# self.modbusTcpSlave.addSlave(1)
return True
except Exception as e:
return False
def stopModbusTcpSlave(self):
"""停止 Modbus TCP 从站"""
try:
if self.modbusTcpSlave is None:
return True
# 停止从站服务器
if hasattr(self.modbusTcpSlave, 'server'):
self.modbusTcpSlave.server.stop()
self.modbusTcpSlave = None
return True
except Exception as e:
return False
def startModbusRtuSlave(self):
"""启动 Modbus RTU 从站"""
try:
if self.modbusRtuSlave is not None:
self.modbusRtuSlave.server.close()
self.modbusRtuSlave = None
# 从数据库获取RTU设置
rtuSettings = self._getRtuSettings('slave')
if not rtuSettings:
return False
# 创建RTU从站实例
self.modbusRtuSlave = RTUSlave(
port=rtuSettings['port'],
baudrate=rtuSettings['baudrate'],
bytesize=rtuSettings['byteSize'],
parity=rtuSettings['parity'][0], # 取第一个字符
stopbits=rtuSettings['stopbits']
)
# 启动从站服务器
self.modbusRtuSlave.start()
# 添加默认从站ID
# self.modbusRtuSlave.addSlave(1)
return True
except Exception as e:
return False
def stopModbusRtuSlave(self):
"""停止 Modbus RTU 从站"""
try:
if self.modbusRtuSlave is None:
return True
# 停止从站服务器
if hasattr(self.modbusRtuSlave, 'server'):
self.modbusRtuSlave.server.stop()
self.modbusRtuSlave = None
return True
except Exception as e:
return False
def stopAllModbus(self):
"""停止所有 Modbus 通讯"""
results = []
results.append(self.stopModbusTcpMaster())
results.append(self.stopModbusRtuMaster())
results.append(self.stopModbusTcpSlave())
results.append(self.stopModbusRtuSlave())
return all(results)
def getModbusStatus(self):
"""获取所有 Modbus 通讯状态"""
return {
'tcpMaster': self.modbusTcpMaster is not None,
'rtuMaster': self.modbusRtuMaster is not None,
'tcpSlave': self.modbusTcpSlave is not None,
'rtuSlave': self.modbusRtuSlave is not None
}
# ==================== 读写方法 ====================
def writeModbusTcpMasterValue(self, info, value):
"""写入TCP主站变量值"""
try:
if self.modbusTcpMaster is None:
return False
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._writeModbusValue(self.modbusTcpMaster, slaveId, address, varType, value, order)
except Exception as e:
print(f"写入TCP主站变量失败: {str(e)}")
return False
def writeModbusRtuMasterValue(self, info, value):
"""写入RTU主站变量值"""
try:
if self.modbusRtuMaster is None:
return False
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._writeModbusValue(self.modbusRtuMaster, slaveId, address, varType, value, order)
except Exception as e:
print(f"写入RTU主站变量失败: {str(e)}")
return False
def writeModbusTcpSlaveValue(self, info, value):
"""写入TCP从站变量值"""
try:
if self.modbusTcpSlave is None:
print("Modbus TCP 从站未启动")
return False
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._writeModbusSlaveValue(self.modbusTcpSlave, slaveId, address, varType, value, order)
except Exception as e:
print(f"写入TCP从站变量失败: {str(e)}")
return False
def writeModbusRtuSlaveValue(self, info, value):
"""写入RTU从站变量值"""
try:
if self.modbusRtuSlave is None:
print("Modbus RTU 从站未启动")
return False
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._writeModbusSlaveValue(self.modbusRtuSlave, slaveId, address, varType, value, order)
except Exception as e:
print(f"写入RTU从站变量失败: {str(e)}")
return False
def readModbusTcpSlaveValue(self, info):
"""读取TCP从站变量值"""
try:
if self.modbusTcpSlave is None:
return None
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._readModbusSlaveValue(self.modbusTcpSlave, slaveId, address, varType, order)
except Exception as e:
print(f"读取TCP从站变量失败: {str(e)}")
return None
def readModbusRtuSlaveValue(self, info):
"""读取RTU从站变量值"""
try:
if self.modbusRtuSlave is None:
return None
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._readModbusSlaveValue(self.modbusRtuSlave, slaveId, address, varType, order)
except Exception as e:
print(f"读取RTU从站变量失败: {str(e)}")
return None
def readModbusTcpMasterValue(self, info):
"""读取TCP主站变量值"""
try:
if self.modbusTcpMaster is None:
return None
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._readModbusValue(self.modbusTcpMaster, slaveId, address, varType, order)
except Exception as e:
print(f"读取TCP主站变量失败: {str(e)}")
return None
def readModbusRtuMasterValue(self, info):
"""读取RTU主站变量值"""
try:
if self.modbusRtuMaster is None:
return None
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
return self._readModbusValue(self.modbusRtuMaster, slaveId, address, varType, order)
except Exception as e:
print(f"读取RTU主站变量失败: {str(e)}")
return None
# ==================== 私有方法 ====================
def _getTcpSettings(self, tcpType):
"""从数据库获取TCP设置"""
try:
setting = TCPSetting.select().where(TCPSetting.tcpType == tcpType).first()
if setting:
return {
'host': setting.host,
'port': setting.port,
'frequency': float(setting.frequency),
'offset': setting.offset
}
else:
# 如果没有设置,创建默认设置
newSetting = TCPSetting()
newSetting.initSetting(tcpType)
return {
'host': newSetting.host,
'port': newSetting.port,
'frequency': float(newSetting.frequency),
'offset': newSetting.offset
}
except Exception as e:
print(f"获取TCP设置失败: {str(e)}")
return None
def _getRtuSettings(self, tcpType):
"""从数据库获取RTU设置"""
try:
setting = RTUSetting.select().where(RTUSetting.tcpType == tcpType).first()
if setting:
return {
'port': setting.port,
'byteSize': setting.byteSize,
'baudrate': setting.baudrate,
'stopbits': setting.stopbits,
'parity': setting.parity,
'frequency': float(setting.frequency),
'offset': setting.offset
}
else:
# 如果没有设置,创建默认设置
newSetting = RTUSetting()
newSetting.initSetting(tcpType)
return {
'port': newSetting.port,
'byteSize': newSetting.byteSize,
'baudrate': newSetting.baudrate,
'stopbits': newSetting.stopbits,
'parity': newSetting.parity,
'frequency': float(newSetting.frequency),
'offset': newSetting.offset
}
except Exception as e:
print(f"获取RTU设置失败: {str(e)}")
return None
def _startModbusReadThread(self, protocolType, frequency):
"""启动Modbus读取线程"""
if protocolType in self.modbusReadThreads:
return
stopEvent = threading.Event()
lock = threading.Lock()
self.modbusStopEvents[protocolType] = stopEvent
self.modbusLocks[protocolType] = lock
readThread = threading.Thread(
target=self._modbusReadWorker,
args=(protocolType, frequency, stopEvent, lock),
daemon=True
)
self.modbusReadThreads[protocolType] = readThread
readThread.start()
def _stopModbusReadThread(self, protocolType):
"""停止Modbus读取线程"""
if protocolType in self.modbusStopEvents:
self.modbusStopEvents[protocolType].set()
if protocolType in self.modbusReadThreads:
self.modbusReadThreads[protocolType].join(timeout=2)
del self.modbusReadThreads[protocolType]
if protocolType in self.modbusStopEvents:
del self.modbusStopEvents[protocolType]
if protocolType in self.modbusLocks:
del self.modbusLocks[protocolType]
def _modbusReadWorker(self, protocolType, frequency, stopEvent, lock):
"""Modbus读取工作线程"""
interval = 1.0 / frequency if frequency > 0 else 1.0
while not stopEvent.is_set():
try:
with lock:
self._readModbusVariables(protocolType)
except Exception as e:
print(f"Modbus读取线程错误 ({protocolType}): {str(e)}")
time.sleep(interval)
def _readModbusVariables(self, protocolType):
"""读取指定协议类型的所有Modbus变量"""
if protocolType == 'TCP_MASTER':
self._readTcpMasterVariables()
elif protocolType == 'RTU_MASTER':
self._readRtuMasterVariables()
def _readTcpMasterVariables(self):
"""读取TCP主站变量"""
if self.modbusTcpMaster is None or self.varInfoCache is None:
return
for varName, varInfo in self.varInfoCache.items():
if varInfo['modelType'] == 'ModbusTcpMasterVar':
try:
info = varInfo['variableData']
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
value = self._readModbusValue(self.modbusTcpMaster, slaveId, address, varType, order)
if value != 'error' and self.variableValueCache is not None and self.cacheLock is not None:
with self.cacheLock:
self.variableValueCache[varName] = value
# print(varName, value)
except Exception as e:
print(f"读取TCP主站变量失败 {varName}: {str(e)}")
def _readRtuMasterVariables(self):
"""读取RTU主站变量"""
if self.modbusRtuMaster is None or self.varInfoCache is None:
return
for varName, varInfo in self.varInfoCache.items():
if varInfo['modelType'] == 'ModbusRtuMasterVar':
try:
info = varInfo['variableData']
slaveId = int(info['slaveID'])
address = int(info['address'])
varType = int(info['varType'])
order = info['order']
value = self._readModbusValue(self.modbusRtuMaster, slaveId, address, varType, order)
if value != 'error' and self.variableValueCache is not None and self.cacheLock is not None:
with self.cacheLock:
self.variableValueCache[varName] = value
except Exception as e:
print(f"读取RTU主站变量失败 {varName}: {str(e)}")
def _readModbusValue(self, master, slaveId, address, varType, order):
"""通用Modbus值读取方法"""
try:
# varType: 0=线圈, 1=离散输入, 3=输入寄存器, 4=保持寄存器
# 对于寄存器类型需要读取2个寄存器来获取完整的浮点数
registerCount = 2 if varType in [3, 4] and order != 'int' else 1
if varType == 0: # 线圈
return master.readCoils(slaveId, address, 1)
elif varType == 1: # 离散输入
return master.readInputCoils(slaveId, address, 1)
elif varType == 3: # 输入寄存器
return master.readInputRegisters(slaveId, address, registerCount, order)
elif varType == 4: # 保持寄存器
return master.readHoldingRegisters(slaveId, address, registerCount, order)
else:
return 'error'
except Exception as e:
print(f"读取Modbus值失败: {str(e)}")
return 'error'
def _writeModbusValue(self, master, slaveId, address, varType, value, order):
"""通用Modbus主站值写入方法"""
try:
# varType: 0=线圈, 1=离散输入, 3=输入寄存器, 4=保持寄存器
if varType == 0: # 线圈
result = master.writeSingleCoil(slaveId, address, value)
return result != 'error'
elif varType == 4: # 保持寄存器
result = master.writeSingleRegister(slaveId, address, value, order)
return result != 'error'
else:
print(f"不支持写入的变量类型: {varType}")
return False
except Exception as e:
print(f"写入Modbus主站值失败: {str(e)}")
return False
def _writeModbusSlaveValue(self, slave, slaveId, address, varType, value, order):
"""通用Modbus从站值写入方法"""
try:
# 根据变量类型确定存储区名称
blockName = self._getModbusBlockName(varType, address)
if not blockName:
return False
# 转换地址为存储区内部地
slave.setValue(slaveId, blockName, address, value, order)
return True
except Exception as e:
print(f"写入Modbus从站值失败: {str(e)}")
return False
def _readModbusSlaveValue(self, slave, slaveId, address, varType, order):
"""通用Modbus从站值读取方法"""
try:
# 根据变量类型确定存储区名称
blockName = self._getModbusBlockName(varType, address)
if not blockName:
return None
return slave.readValue(slaveId, blockName, address, order)
except Exception as e:
print(f"读取Modbus从站值失败: {str(e)}")
return None
def _getModbusBlockName(self, varType, address):
"""根据变量类型和地址获取Modbus存储区名称"""
# varType: 0=线圈, 1=离散输入, 3=输入寄存器, 4=保持寄存器
if varType == 0: # 线圈 (0-9999)
return '0'
elif varType == 1: # 离散输入 (10000-19999)
return '1'
elif varType == 3: # 输入寄存器 (30000-39999)
return '3'
elif varType == 4: # 保持寄存器 (40000-49999)
return '4'
else:
print(f"未知的变量类型: {varType}")
return None
# ==================== 报文记录方法 ====================
def _setupHooks(self):
"""设置 modbus_tk hooks 来捕获报文"""
try:
def captureRequest(args):
"""捕获请求报文数据"""
try:
if args and len(args) > 0:
# args[-1] 通常包含报文数据
data = args[-1]
if isinstance(data, (list, tuple)):
hexData = ' '.join([f'{b:02X}' for b in data])
elif isinstance(data, bytes):
hexData = ' '.join([f'{b:02X}' for b in data])
else:
hexData = str(data)
self._addMessage('receive', f"接收请求: {hexData}")
except Exception as e:
pass
def captureResponse(args):
"""捕获响应报文数据"""
try:
if args and len(args) > 0:
# args[-1] 通常包含报文数据
data = args[-1]
if isinstance(data, (list, tuple)):
hexData = ' '.join([f'{b:02X}' for b in data])
elif isinstance(data, bytes):
hexData = ' '.join([f'{b:02X}' for b in data])
else:
hexData = str(data)
self._addMessage('send', f"发送响应: {hexData}")
except Exception as e:
pass
# 使用正确的 hook 名称
hooks.install_hook('modbus.Server.before_handle_request', captureRequest)
hooks.install_hook('modbus.Server.after_handle_request', captureResponse)
except Exception as e:
print(f"设置 hooks 失败: {e}")
# 如果 hooks 设置失败,不影响正常通讯功能
pass
def _addMessage(self, messageType, content):
"""添加报文记录"""
try:
with self.messageLock:
timestamp = time.strftime('%H:%M:%S')
message = f"[{timestamp}] {content}"
if messageType == 'send':
self.messageHistory['send'].append(message)
if len(self.messageHistory['send']) > self.maxMessageCount:
self.messageHistory['send'].pop(0)
elif messageType == 'receive':
self.messageHistory['receive'].append(message)
if len(self.messageHistory['receive']) > self.maxMessageCount:
self.messageHistory['receive'].pop(0)
except Exception as e:
pass
def getMessages(self):
"""获取报文信息"""
try:
with self.messageLock:
return {
'send': self.messageHistory['send'].copy(),
'receive': self.messageHistory['receive'].copy()
}
except Exception as e:
return {'send': [], 'receive': []}
def clearMessages(self):
"""清空报文记录"""
try:
with self.messageLock:
self.messageHistory['send'].clear()
self.messageHistory['receive'].clear()
except Exception as e:
pass