You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
9.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
事件驱动系统核心模块
提供变量监控事件分发和规则执行功能
"""
import time
import threading
import json
from datetime import datetime
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import uuid
class EventType(Enum):
"""事件类型枚举"""
VARIABLE_CHANGE = "variable_change"
DEVICE_STATUS = "device_status"
ALARM = "alarm"
TIMER = "timer"
class Priority(Enum):
"""优先级枚举"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class VariableEvent:
"""变量事件数据结构"""
eventId: str
timestamp: datetime
eventType: EventType
variableName: str
oldValue: Any
newValue: Any
quality: str = "good"
priority: Priority = Priority.NORMAL
tags: List[str] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
def toDict(self) -> Dict:
"""转换为字典格式"""
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
data['eventType'] = self.eventType.value
data['priority'] = self.priority.value
return data
class EventBus:
"""事件总线 - 负责事件的发布和订阅"""
def __init__(self):
self._subscribers: Dict[EventType, List[Callable]] = {}
self._lock = threading.RLock()
def subscribe(self, eventType: EventType, callback: Callable):
"""订阅事件"""
with self._lock:
if eventType not in self._subscribers:
self._subscribers[eventType] = []
self._subscribers[eventType].append(callback)
def unsubscribe(self, eventType: EventType, callback: Callable):
"""取消订阅"""
with self._lock:
if eventType in self._subscribers:
try:
self._subscribers[eventType].remove(callback)
except ValueError:
pass
def publish(self, event: VariableEvent):
"""发布事件"""
with self._lock:
subscribers = self._subscribers.get(event.eventType, [])
for callback in subscribers:
try:
callback(event)
except Exception as e:
print(f"事件处理错误: {e}")
class VariableMonitor:
"""变量监控器 - 监控变量变化并生成事件"""
def __init__(self, eventBus: EventBus):
self.eventBus = eventBus
self.variableCache: Dict[str, Any] = {}
self.monitorConfig: Dict[str, Dict] = {}
self.isRunning = False
self.monitorThread: Optional[threading.Thread] = None
self._lock = threading.RLock()
self.scanInterval = 1.0 # 扫描间隔(秒)
def addVariable(self, varName: str, threshold: float = 0.1, scanInterval: int = 1000):
"""添加监控变量"""
with self._lock:
self.monitorConfig[varName] = {
'threshold': threshold,
'scanInterval': scanInterval,
'lastScanTime': 0,
'enabled': True
}
def removeVariable(self, varName: str):
"""移除监控变量"""
with self._lock:
self.monitorConfig.pop(varName, None)
self.variableCache.pop(varName, None)
def updateVariable(self, varName: str, newValue: Any, quality: str = "good"):
"""更新变量值(由外部协议管理器调用)"""
with self._lock:
if varName not in self.monitorConfig:
return
oldValue = self.variableCache.get(varName)
# 检查是否需要触发事件
if self._shouldTriggerEvent(varName, oldValue, newValue):
event = VariableEvent(
eventId=str(uuid.uuid4()),
timestamp=datetime.now(),
eventType=EventType.VARIABLE_CHANGE,
variableName=varName,
oldValue=oldValue,
newValue=newValue,
quality=quality
)
self.variableCache[varName] = newValue
self.eventBus.publish(event)
def _shouldTriggerEvent(self, varName: str, oldValue: Any, newValue: Any) -> bool:
"""判断是否应该触发事件"""
if oldValue is None:
return True
config = self.monitorConfig.get(varName, {})
threshold = config.get('threshold', 0.1)
try:
# 数值类型的变化检测
if isinstance(oldValue, (int, float)) and isinstance(newValue, (int, float)):
return abs(newValue - oldValue) >= threshold
# 其他类型直接比较
else:
return oldValue != newValue
except:
return oldValue != newValue
def getVariableValue(self, varName: str) -> Any:
"""获取变量当前值"""
# 首先尝试从协议管理器获取实时值
from utils import Globals
protocolManager = Globals.getValue("protocolManage")
if protocolManager:
try:
realValue = protocolManager.readVariableValue(varName)
if realValue is not None:
# 更新缓存
with self._lock:
self.variableCache[varName] = realValue
return realValue
except Exception as e:
print(f"从协议管理器读取变量 {varName} 失败: {e}")
# 如果协议管理器不可用,返回缓存值
return self.variableCache.get(varName)
def getAllVariables(self) -> Dict[str, Any]:
"""获取所有变量值"""
return self.variableCache.copy()
def startMonitoring(self):
"""启动变量监控"""
if self.isRunning:
return
self.isRunning = True
self.monitorThread = threading.Thread(target=self._monitorLoop, daemon=True)
self.monitorThread.start()
print("变量监控器已启动")
def stopMonitoring(self):
"""停止变量监控"""
self.isRunning = False
if self.monitorThread:
self.monitorThread.join(timeout=2)
print("变量监控器已停止")
def _monitorLoop(self):
"""监控循环"""
from utils import Globals
while self.isRunning:
try:
protocolManager = Globals.getValue("protocolManage")
if protocolManager:
# 检查所有监控的变量
with self._lock:
for varName in list(self.monitorConfig.keys()):
if not self.monitorConfig[varName].get('enabled', True):
continue
try:
# 从协议管理器读取最新值
newValue = protocolManager.readVariableValue(varName)
if newValue is not None:
oldValue = self.variableCache.get(varName)
# 检查是否需要触发事件
if self._shouldTriggerEvent(varName, oldValue, newValue):
# 更新缓存
self.variableCache[varName] = newValue
# 发布变量变化事件
event = VariableEvent(
eventId=str(uuid.uuid4()),
timestamp=datetime.now(),
eventType=EventType.VARIABLE_CHANGE,
variableName=varName,
oldValue=oldValue,
newValue=newValue,
quality="good"
)
self.eventBus.publish(event)
print(f"变量变化事件: {varName} {oldValue} -> {newValue}")
except Exception as e:
print(f"监控变量 {varName} 时出错: {e}")
time.sleep(self.scanInterval)
except Exception as e:
print(f"变量监控循环出错: {e}")
time.sleep(self.scanInterval)
# 全局事件系统实例
_globalEventBus = None
_globalVariableMonitor = None
def getEventBus() -> EventBus:
"""获取全局事件总线"""
global _globalEventBus
if _globalEventBus is None:
_globalEventBus = EventBus()
return _globalEventBus
def getVariableMonitor() -> VariableMonitor:
"""获取全局变量监控器"""
global _globalVariableMonitor, _globalEventBus
if _globalVariableMonitor is None:
if _globalEventBus is None:
_globalEventBus = EventBus()
_globalVariableMonitor = VariableMonitor(_globalEventBus)
return _globalVariableMonitor
def resetEventSystem():
"""重置全局事件系统(用于测试和工程切换)"""
global _globalEventBus, _globalVariableMonitor
# 停止变量监控器
if _globalVariableMonitor:
try:
_globalVariableMonitor.stopMonitoring()
except:
pass
# 重置实例
_globalEventBus = None
_globalVariableMonitor = None