#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 事件驱动系统核心模块 提供变量监控、事件分发和规则执行功能 """ import time import threading import json from datetime import datetime from typing import Dict, List, Callable, Any, Optional from dataclasses import dataclass, asdict from enum import Enum import uuid class EventType(Enum): """事件类型枚举""" VARIABLE_CHANGE = "variable_change" DEVICE_STATUS = "device_status" ALARM = "alarm" TIMER = "timer" class Priority(Enum): """优先级枚举""" LOW = 1 NORMAL = 2 HIGH = 3 CRITICAL = 4 @dataclass class VariableEvent: """变量事件数据结构""" eventId: str timestamp: datetime eventType: EventType variableName: str oldValue: Any newValue: Any quality: str = "good" priority: Priority = Priority.NORMAL tags: List[str] = None def __post_init__(self): if self.tags is None: self.tags = [] def toDict(self) -> Dict: """转换为字典格式""" data = asdict(self) data['timestamp'] = self.timestamp.isoformat() data['eventType'] = self.eventType.value data['priority'] = self.priority.value return data class EventBus: """事件总线 - 负责事件的发布和订阅""" def __init__(self): self._subscribers: Dict[EventType, List[Callable]] = {} self._lock = threading.RLock() def subscribe(self, eventType: EventType, callback: Callable): """订阅事件""" with self._lock: if eventType not in self._subscribers: self._subscribers[eventType] = [] self._subscribers[eventType].append(callback) def unsubscribe(self, eventType: EventType, callback: Callable): """取消订阅""" with self._lock: if eventType in self._subscribers: try: self._subscribers[eventType].remove(callback) except ValueError: pass def publish(self, event: VariableEvent): """发布事件""" with self._lock: subscribers = self._subscribers.get(event.eventType, []) for callback in subscribers: try: callback(event) except Exception as e: print(f"事件处理错误: {e}") class VariableMonitor: """变量监控器 - 监控变量变化并生成事件""" def __init__(self, eventBus: EventBus): self.eventBus = eventBus self.variableCache: Dict[str, Any] = {} self.monitorConfig: Dict[str, Dict] = {} self.isRunning = False self.monitorThread: Optional[threading.Thread] = None self._lock = threading.RLock() self.scanInterval = 1.0 # 扫描间隔(秒) def addVariable(self, varName: str, threshold: float = 0.1, scanInterval: int = 1000): """添加监控变量""" with self._lock: self.monitorConfig[varName] = { 'threshold': threshold, 'scanInterval': scanInterval, 'lastScanTime': 0, 'enabled': True } def removeVariable(self, varName: str): """移除监控变量""" with self._lock: self.monitorConfig.pop(varName, None) self.variableCache.pop(varName, None) def updateVariable(self, varName: str, newValue: Any, quality: str = "good"): """更新变量值(由外部协议管理器调用)""" with self._lock: if varName not in self.monitorConfig: return oldValue = self.variableCache.get(varName) # 检查是否需要触发事件 if self._shouldTriggerEvent(varName, oldValue, newValue): event = VariableEvent( eventId=str(uuid.uuid4()), timestamp=datetime.now(), eventType=EventType.VARIABLE_CHANGE, variableName=varName, oldValue=oldValue, newValue=newValue, quality=quality ) self.variableCache[varName] = newValue self.eventBus.publish(event) def _shouldTriggerEvent(self, varName: str, oldValue: Any, newValue: Any) -> bool: """判断是否应该触发事件""" if oldValue is None: return True config = self.monitorConfig.get(varName, {}) threshold = config.get('threshold', 0.1) try: # 数值类型的变化检测 if isinstance(oldValue, (int, float)) and isinstance(newValue, (int, float)): return abs(newValue - oldValue) >= threshold # 其他类型直接比较 else: return oldValue != newValue except: return oldValue != newValue def getVariableValue(self, varName: str) -> Any: """获取变量当前值""" # 首先尝试从协议管理器获取实时值 from utils import Globals protocolManager = Globals.getValue("protocolManage") if protocolManager: try: realValue = protocolManager.readVariableValue(varName) if realValue is not None: # 更新缓存 with self._lock: self.variableCache[varName] = realValue return realValue except Exception as e: print(f"从协议管理器读取变量 {varName} 失败: {e}") # 如果协议管理器不可用,返回缓存值 return self.variableCache.get(varName) def getAllVariables(self) -> Dict[str, Any]: """获取所有变量值""" return self.variableCache.copy() def startMonitoring(self): """启动变量监控""" if self.isRunning: return self.isRunning = True self.monitorThread = threading.Thread(target=self._monitorLoop, daemon=True) self.monitorThread.start() print("变量监控器已启动") def stopMonitoring(self): """停止变量监控""" self.isRunning = False if self.monitorThread: self.monitorThread.join(timeout=2) print("变量监控器已停止") def _monitorLoop(self): """监控循环""" from utils import Globals while self.isRunning: try: protocolManager = Globals.getValue("protocolManage") if protocolManager: # 检查所有监控的变量 with self._lock: for varName in list(self.monitorConfig.keys()): if not self.monitorConfig[varName].get('enabled', True): continue try: # 从协议管理器读取最新值 newValue = protocolManager.readVariableValue(varName) if newValue is not None: oldValue = self.variableCache.get(varName) # 检查是否需要触发事件 if self._shouldTriggerEvent(varName, oldValue, newValue): # 更新缓存 self.variableCache[varName] = newValue # 发布变量变化事件 event = VariableEvent( eventId=str(uuid.uuid4()), timestamp=datetime.now(), eventType=EventType.VARIABLE_CHANGE, variableName=varName, oldValue=oldValue, newValue=newValue, quality="good" ) self.eventBus.publish(event) print(f"变量变化事件: {varName} {oldValue} -> {newValue}") except Exception as e: print(f"监控变量 {varName} 时出错: {e}") time.sleep(self.scanInterval) except Exception as e: print(f"变量监控循环出错: {e}") time.sleep(self.scanInterval) # 全局事件系统实例 _globalEventBus = None _globalVariableMonitor = None def getEventBus() -> EventBus: """获取全局事件总线""" global _globalEventBus if _globalEventBus is None: _globalEventBus = EventBus() return _globalEventBus def getVariableMonitor() -> VariableMonitor: """获取全局变量监控器""" global _globalVariableMonitor, _globalEventBus if _globalVariableMonitor is None: if _globalEventBus is None: _globalEventBus = EventBus() _globalVariableMonitor = VariableMonitor(_globalEventBus) return _globalVariableMonitor def resetEventSystem(): """重置全局事件系统(用于测试和工程切换)""" global _globalEventBus, _globalVariableMonitor # 停止变量监控器 if _globalVariableMonitor: try: _globalVariableMonitor.stopMonitoring() except: pass # 重置实例 _globalEventBus = None _globalVariableMonitor = None