#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 规则引擎模块 提供规则定义、条件评估和动作执行功能 """ import json import threading import time from datetime import datetime from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, asdict from enum import Enum import operator import uuid from .EventSystem import VariableEvent, EventType, getEventBus, getVariableMonitor class OperatorType(Enum): """操作符类型""" EQUAL = "==" NOT_EQUAL = "!=" GREATER = ">" LESS = "<" GREATER_EQUAL = ">=" LESS_EQUAL = "<=" class ActionType(Enum): """动作类型""" SET_VARIABLE = "set_variable" LOG_MESSAGE = "log_message" SEND_ALARM = "send_alarm" EXECUTE_SCRIPT = "execute_script" @dataclass class Condition: """条件定义""" variable: str operator: OperatorType value: Any def evaluate(self, variableMonitor) -> bool: """评估条件""" currentValue = variableMonitor.getVariableValue(self.variable) if currentValue is None: return False try: opMap = { OperatorType.EQUAL: operator.eq, OperatorType.NOT_EQUAL: operator.ne, OperatorType.GREATER: operator.gt, OperatorType.LESS: operator.lt, OperatorType.GREATER_EQUAL: operator.ge, OperatorType.LESS_EQUAL: operator.le, } opFunc = opMap.get(self.operator) if opFunc: return opFunc(currentValue, self.value) return False except Exception as e: print(f"条件评估错误: {e}") return False @dataclass class Action: """动作定义""" actionType: ActionType target: str value: Any delay: int = 0 # 延迟执行时间(毫秒) def execute(self, context: Dict = None): """执行动作""" if self.delay > 0: time.sleep(self.delay / 1000.0) try: if self.actionType == ActionType.SET_VARIABLE: self._setVariable() elif self.actionType == ActionType.LOG_MESSAGE: self._logMessage() elif self.actionType == ActionType.SEND_ALARM: self._sendAlarm() elif self.actionType == ActionType.EXECUTE_SCRIPT: self._executeScript() except Exception as e: print(f"动作执行错误: {e}") def _setVariable(self): """设置变量值""" # 这里需要与协议管理器集成 from utils import Globals protocolManager = Globals.getValue("protocolManage") if protocolManager: # 首先检查目标变量是否存在,如果不存在则直接在缓存中创建 with protocolManager.cacheLock: if self.target not in protocolManager.variableValueCache: # 如果变量不存在,直接在缓存中创建 protocolManager.variableValueCache[self.target] = self.value print(f"创建并设置变量 {self.target} = {self.value}") success = True else: # 变量存在,尝试通过协议管理器写入 success = protocolManager.writeVariableValue(self.target, self.value) if not success: # 如果写入失败,直接更新缓存 protocolManager.variableValueCache[self.target] = self.value print(f"直接更新变量缓存 {self.target} = {self.value}") success = True else: print(f"设置变量 {self.target} = {self.value}, 结果: {success}") return success return False def _logMessage(self): """记录日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] 规则日志: {self.value}") def _sendAlarm(self): """发送报警""" print(f"报警: {self.target} - {self.value}") def _executeScript(self): """执行脚本""" print(f"执行脚本: {self.target}") @dataclass class Rule: """规则定义""" ruleId: str name: str description: str enabled: bool conditions: List[Condition] actions: List[Action] logicOperator: str = "and" # and/or cooldownPeriod: int = 0 # 冷却期(毫秒) lastExecutionTime: Optional[datetime] = None def evaluate(self, variableMonitor) -> bool: """评估规则条件""" if not self.enabled or not self.conditions: return False # 检查冷却期 if self.lastExecutionTime and self.cooldownPeriod > 0: elapsed = (datetime.now() - self.lastExecutionTime).total_seconds() * 1000 if elapsed < self.cooldownPeriod: return False # 评估所有条件 results = [condition.evaluate(variableMonitor) for condition in self.conditions] if self.logicOperator.lower() == "and": return all(results) elif self.logicOperator.lower() == "or": return any(results) else: return all(results) # 默认使用 and def execute(self): """执行规则动作""" self.lastExecutionTime = datetime.now() for action in self.actions: try: action.execute() except Exception as e: print(f"规则 {self.name} 动作执行失败: {e}") def toDict(self) -> Dict: """转换为字典格式""" data = { 'ruleId': self.ruleId, 'name': self.name, 'description': self.description, 'enabled': self.enabled, 'logicOperator': self.logicOperator, 'cooldownPeriod': self.cooldownPeriod, 'conditions': [ { 'variable': c.variable, 'operator': c.operator.value, 'value': c.value } for c in self.conditions ], 'actions': [ { 'actionType': a.actionType.value, 'target': a.target, 'value': a.value, 'delay': a.delay } for a in self.actions ] } if self.lastExecutionTime: data['lastExecutionTime'] = self.lastExecutionTime.isoformat() return data @classmethod def fromDict(cls, data: Dict) -> 'Rule': """从字典创建规则""" # 转换条件 conditions = [] for condData in data.get('conditions', []): condition = Condition( variable=condData['variable'], operator=OperatorType(condData['operator']), value=condData['value'] ) conditions.append(condition) # 转换动作 actions = [] for actionData in data.get('actions', []): action = Action( actionType=ActionType(actionData['actionType']), target=actionData['target'], value=actionData['value'], delay=actionData.get('delay', 0) ) actions.append(action) # 转换最后执行时间 lastExecutionTime = None if data.get('lastExecutionTime'): lastExecutionTime = datetime.fromisoformat(data['lastExecutionTime']) return cls( ruleId=data['ruleId'], name=data['name'], description=data['description'], enabled=data['enabled'], conditions=conditions, actions=actions, logicOperator=data.get('logicOperator', 'and'), cooldownPeriod=data.get('cooldownPeriod', 0), lastExecutionTime=lastExecutionTime ) class RuleEngine: """规则引擎""" def __init__(self): self.rules: Dict[str, Rule] = {} self.eventBus = getEventBus() self.variableMonitor = getVariableMonitor() self.isRunning = False self._lock = threading.RLock() # 订阅变量变化事件 self.eventBus.subscribe(EventType.VARIABLE_CHANGE, self._onVariableChange) def addRule(self, rule: Rule): """添加规则""" with self._lock: self.rules[rule.ruleId] = rule def removeRule(self, ruleId: str): """移除规则""" with self._lock: self.rules.pop(ruleId, None) def getRule(self, ruleId: str) -> Optional[Rule]: """获取规则""" return self.rules.get(ruleId) def getAllRules(self) -> List[Rule]: """获取所有规则""" return list(self.rules.values()) def enableRule(self, ruleId: str): """启用规则""" rule = self.rules.get(ruleId) if rule: rule.enabled = True def disableRule(self, ruleId: str): """禁用规则""" rule = self.rules.get(ruleId) if rule: rule.enabled = False def _onVariableChange(self, event: VariableEvent): """处理变量变化事件""" with self._lock: for rule in self.rules.values(): if rule.evaluate(self.variableMonitor): try: rule.execute() print(f"规则 '{rule.name}' 已执行") except Exception as e: print(f"规则 '{rule.name}' 执行失败: {e}") def saveRulesToFile(self, filePath: str): """保存规则到文件""" try: rulesData = [rule.toDict() for rule in self.rules.values()] with open(filePath, 'w', encoding='utf-8') as f: json.dump(rulesData, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存规则失败: {e}") def loadRulesFromFile(self, filePath: str): """从文件加载规则""" try: with open(filePath, 'r', encoding='utf-8') as f: rulesData = json.load(f) with self._lock: self.rules.clear() for ruleData in rulesData: rule = Rule.fromDict(ruleData) self.rules[rule.ruleId] = rule print(f"成功加载 {len(self.rules)} 个规则") except Exception as e: print(f"加载规则失败: {e}") # 全局规则引擎实例 _globalRuleEngine = None def getRuleEngine() -> RuleEngine: """获取全局规则引擎""" global _globalRuleEngine if _globalRuleEngine is None: _globalRuleEngine = RuleEngine() return _globalRuleEngine def resetRuleEngine(): """重置全局规则引擎(用于测试和工程切换)""" global _globalRuleEngine _globalRuleEngine = None