You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
482 lines
16 KiB
Python
482 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
控制系统管理器
|
|
整合事件系统和规则引擎,提供统一的控制接口
|
|
"""
|
|
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
|
|
from .EventSystem import getEventBus, getVariableMonitor, VariableEvent, EventType
|
|
from .RuleEngine import getRuleEngine, Rule, Condition, Action, OperatorType, ActionType
|
|
from utils import Globals
|
|
|
|
|
|
class ControlManager:
|
|
"""控制系统管理器"""
|
|
|
|
def __init__(self):
|
|
self.eventBus = getEventBus()
|
|
self.variableMonitor = getVariableMonitor()
|
|
self.ruleEngine = getRuleEngine()
|
|
self.isInitialized = False
|
|
self._lock = threading.RLock()
|
|
|
|
# 动态获取配置路径(基于当前工程)
|
|
self.configPath = self._getConfigPath()
|
|
|
|
# 确保配置目录存在
|
|
os.makedirs(self.configPath, exist_ok=True)
|
|
|
|
def _getConfigPath(self):
|
|
"""获取当前工程的配置路径"""
|
|
currentProject = Globals.getValue('currentPro')
|
|
if currentProject:
|
|
# 确保工程名是字符串
|
|
projectName = str(currentProject)
|
|
# 基于工程的配置路径
|
|
projectPath = os.path.join('project', projectName, 'control')
|
|
return projectPath
|
|
else:
|
|
# 默认配置路径(兼容性)
|
|
return "control/config"
|
|
|
|
def initialize(self):
|
|
"""初始化控制系统"""
|
|
if self.isInitialized:
|
|
return
|
|
|
|
try:
|
|
# 集成现有的协议管理器
|
|
self._integrateWithProtocolManager()
|
|
|
|
# 加载规则配置
|
|
self._loadConfiguration()
|
|
|
|
# 启动变量监控
|
|
self.variableMonitor.startMonitoring()
|
|
|
|
self.isInitialized = True
|
|
print("控制系统初始化成功")
|
|
except Exception as e:
|
|
print(f"控制系统初始化失败: {e}")
|
|
|
|
def _integrateWithProtocolManager(self):
|
|
"""与现有协议管理器集成"""
|
|
protocolManager = Globals.getValue("protocolManage")
|
|
if protocolManager:
|
|
# 扩展协议管理器,添加变量变化通知
|
|
originalWriteMethod = protocolManager.writeVariableValue
|
|
originalReadMethod = getattr(protocolManager, 'readVariableValue', None)
|
|
|
|
def enhancedWrite(varName, value, **kwargs):
|
|
result = originalWriteMethod(varName, value, **kwargs)
|
|
if result:
|
|
# 通知变量监控器
|
|
self.variableMonitor.updateVariable(varName, value)
|
|
return result
|
|
|
|
# 替换写入方法
|
|
protocolManager.writeVariableValue = enhancedWrite
|
|
|
|
# 同步现有的变量值到变量监控器
|
|
self._syncExistingVariables(protocolManager)
|
|
|
|
print("已集成协议管理器")
|
|
|
|
def _syncExistingVariables(self, protocolManager):
|
|
"""同步现有变量值到变量监控器"""
|
|
try:
|
|
# 获取所有变量名
|
|
allVarNames = list(protocolManager.getAllVariableNames())
|
|
print(f"发现 {len(allVarNames)} 个变量,开始同步...")
|
|
|
|
# 同步变量值
|
|
syncCount = 0
|
|
for varName in allVarNames:
|
|
try:
|
|
value = protocolManager.readVariableValue(varName)
|
|
if value is not None:
|
|
# 添加到监控器(不触发事件)
|
|
with self.variableMonitor._lock:
|
|
self.variableMonitor.variableCache[varName] = value
|
|
syncCount += 1
|
|
except Exception as e:
|
|
print(f"同步变量 {varName} 失败: {e}")
|
|
|
|
print(f"成功同步 {syncCount} 个变量值")
|
|
|
|
except Exception as e:
|
|
print(f"同步变量失败: {e}")
|
|
|
|
def _loadConfiguration(self):
|
|
"""加载配置"""
|
|
# 动态更新配置路径
|
|
self.configPath = self._getConfigPath()
|
|
os.makedirs(self.configPath, exist_ok=True)
|
|
|
|
rulesFile = os.path.join(self.configPath, "rules.json")
|
|
if os.path.exists(rulesFile):
|
|
self.ruleEngine.loadRulesFromFile(rulesFile)
|
|
print(f"从工程配置加载规则: {rulesFile}")
|
|
else:
|
|
print(f"工程规则文件不存在,将创建新文件: {rulesFile}")
|
|
|
|
def saveConfiguration(self):
|
|
"""保存配置"""
|
|
# 动态更新配置路径
|
|
self.configPath = self._getConfigPath()
|
|
os.makedirs(self.configPath, exist_ok=True)
|
|
|
|
rulesFile = os.path.join(self.configPath, "rules.json")
|
|
self.ruleEngine.saveRulesToFile(rulesFile)
|
|
print(f"规则已保存到工程配置: {rulesFile}")
|
|
|
|
def switchProject(self, projectName: str = None):
|
|
"""切换工程时重新加载规则配置"""
|
|
try:
|
|
# 保存当前工程的规则(如果有的话)
|
|
if self.isInitialized and hasattr(self, 'configPath'):
|
|
oldConfigPath = self.configPath
|
|
# 只有当有规则时才保存到旧工程路径
|
|
if len(self.ruleEngine.rules) > 0:
|
|
# 确保旧工程目录存在
|
|
os.makedirs(oldConfigPath, exist_ok=True)
|
|
oldRulesFile = os.path.join(oldConfigPath, "rules.json")
|
|
self.ruleEngine.saveRulesToFile(oldRulesFile)
|
|
print(f"已保存旧工程规则: {oldConfigPath}")
|
|
|
|
# 清空当前规则和变量监控
|
|
with self._lock:
|
|
self.ruleEngine.rules.clear()
|
|
self.variableMonitor.monitorConfig.clear()
|
|
self.variableMonitor.variableCache.clear()
|
|
|
|
# 更新配置路径(基于新的当前工程)
|
|
self.configPath = self._getConfigPath()
|
|
|
|
# 加载新工程的规则
|
|
self._loadConfiguration()
|
|
|
|
print(f"已切换到工程规则配置: {self.configPath}")
|
|
|
|
except Exception as e:
|
|
print(f"切换工程规则配置失败: {e}")
|
|
|
|
def addMonitorVariable(self, varName: str, threshold: float = 0.1):
|
|
"""添加监控变量"""
|
|
self.variableMonitor.addVariable(varName, threshold)
|
|
print(f"已添加监控变量: {varName}")
|
|
|
|
def removeMonitorVariable(self, varName: str):
|
|
"""移除监控变量"""
|
|
self.variableMonitor.removeVariable(varName)
|
|
print(f"已移除监控变量: {varName}")
|
|
|
|
def createSimpleRule(self, name: str, varName: str, operator: str,
|
|
threshold: Any, targetVar: str, targetValue: Any) -> str:
|
|
"""创建简单规则"""
|
|
ruleId = f"rule_{int(time.time())}"
|
|
|
|
# 创建条件
|
|
condition = Condition(
|
|
variable=varName,
|
|
operator=OperatorType(operator),
|
|
value=threshold
|
|
)
|
|
|
|
# 创建动作
|
|
action = Action(
|
|
actionType=ActionType.SET_VARIABLE,
|
|
target=targetVar,
|
|
value=targetValue
|
|
)
|
|
|
|
# 创建规则
|
|
rule = Rule(
|
|
ruleId=ruleId,
|
|
name=name,
|
|
description=f"当 {varName} {operator} {threshold} 时,设置 {targetVar} = {targetValue}",
|
|
enabled=True,
|
|
conditions=[condition],
|
|
actions=[action]
|
|
)
|
|
|
|
self.ruleEngine.addRule(rule)
|
|
|
|
# 自动添加变量到监控器
|
|
self.addMonitorVariable(varName)
|
|
|
|
return ruleId
|
|
|
|
def createAlarmRule(self, name: str, varName: str, operator: str,
|
|
threshold: Any, alarmMessage: str) -> str:
|
|
"""创建报警规则"""
|
|
ruleId = f"alarm_{int(time.time())}"
|
|
|
|
condition = Condition(
|
|
variable=varName,
|
|
operator=OperatorType(operator),
|
|
value=threshold
|
|
)
|
|
|
|
action = Action(
|
|
actionType=ActionType.SEND_ALARM,
|
|
target=varName,
|
|
value=alarmMessage
|
|
)
|
|
|
|
rule = Rule(
|
|
ruleId=ruleId,
|
|
name=name,
|
|
description=f"当 {varName} {operator} {threshold} 时,发送报警",
|
|
enabled=True,
|
|
conditions=[condition],
|
|
actions=[action],
|
|
cooldownPeriod=5000 # 5秒冷却期
|
|
)
|
|
|
|
self.ruleEngine.addRule(rule)
|
|
|
|
# 自动添加变量到监控器
|
|
self.addMonitorVariable(varName)
|
|
|
|
return ruleId
|
|
|
|
def createMultiConditionRule(self, name: str, description: str,
|
|
conditions: List[Dict], actions: List[Dict],
|
|
logicOperator: str = "and", cooldownPeriod: int = 0) -> str:
|
|
"""创建多条件规则
|
|
|
|
Args:
|
|
name: 规则名称
|
|
description: 规则描述
|
|
conditions: 条件列表,格式: [{"variable": "var1", "operator": ">", "value": 10}, ...]
|
|
actions: 动作列表,格式: [{"actionType": "set_variable", "target": "var2", "value": 1}, ...]
|
|
logicOperator: 逻辑操作符 ("and" 或 "or")
|
|
cooldownPeriod: 冷却期(毫秒)
|
|
|
|
Returns:
|
|
规则ID
|
|
"""
|
|
ruleId = f"multi_{int(time.time())}"
|
|
|
|
# 创建条件对象列表
|
|
conditionObjs = []
|
|
for condData in conditions:
|
|
condition = Condition(
|
|
variable=condData['variable'],
|
|
operator=OperatorType(condData['operator']),
|
|
value=condData['value']
|
|
)
|
|
conditionObjs.append(condition)
|
|
# 自动添加变量到监控器
|
|
self.addMonitorVariable(condData['variable'])
|
|
|
|
# 创建动作对象列表
|
|
actionObjs = []
|
|
for actionData in actions:
|
|
action = Action(
|
|
actionType=ActionType(actionData['actionType']),
|
|
target=actionData['target'],
|
|
value=actionData['value'],
|
|
delay=actionData.get('delay', 0)
|
|
)
|
|
actionObjs.append(action)
|
|
|
|
# 创建规则
|
|
rule = Rule(
|
|
ruleId=ruleId,
|
|
name=name,
|
|
description=description,
|
|
enabled=True,
|
|
conditions=conditionObjs,
|
|
actions=actionObjs,
|
|
logicOperator=logicOperator,
|
|
cooldownPeriod=cooldownPeriod
|
|
)
|
|
|
|
self.ruleEngine.addRule(rule)
|
|
|
|
return ruleId
|
|
|
|
def createComplexRule(self, ruleData: Dict) -> str:
|
|
"""创建复杂规则(通用方法)
|
|
|
|
Args:
|
|
ruleData: 完整的规则数据字典
|
|
|
|
Returns:
|
|
规则ID
|
|
"""
|
|
ruleId = ruleData.get('ruleId', f"complex_{int(time.time())}")
|
|
|
|
# 解析条件
|
|
conditions = []
|
|
for condData in ruleData.get('conditions', []):
|
|
condition = Condition(
|
|
variable=condData['variable'],
|
|
operator=OperatorType(condData['operator']),
|
|
value=condData['value']
|
|
)
|
|
conditions.append(condition)
|
|
# 自动添加变量到监控器
|
|
self.addMonitorVariable(condData['variable'])
|
|
|
|
# 解析动作
|
|
actions = []
|
|
for actionData in ruleData.get('actions', []):
|
|
action = Action(
|
|
actionType=ActionType(actionData['actionType']),
|
|
target=actionData['target'],
|
|
value=actionData['value'],
|
|
delay=actionData.get('delay', 0)
|
|
)
|
|
actions.append(action)
|
|
|
|
# 创建规则
|
|
rule = Rule(
|
|
ruleId=ruleId,
|
|
name=ruleData['name'],
|
|
description=ruleData.get('description', ''),
|
|
enabled=ruleData.get('enabled', True),
|
|
conditions=conditions,
|
|
actions=actions,
|
|
logicOperator=ruleData.get('logicOperator', 'and'),
|
|
cooldownPeriod=ruleData.get('cooldownPeriod', 0)
|
|
)
|
|
|
|
self.ruleEngine.addRule(rule)
|
|
|
|
return ruleId
|
|
|
|
def enableRule(self, ruleId: str):
|
|
"""启用规则"""
|
|
self.ruleEngine.enableRule(ruleId)
|
|
|
|
def disableRule(self, ruleId: str):
|
|
"""禁用规则"""
|
|
self.ruleEngine.disableRule(ruleId)
|
|
|
|
def deleteRule(self, ruleId: str):
|
|
"""删除规则"""
|
|
self.ruleEngine.removeRule(ruleId)
|
|
|
|
def updateRule(self, ruleId: str, ruleData: Dict) -> bool:
|
|
"""更新规则"""
|
|
try:
|
|
# 获取现有规则
|
|
existingRule = self.ruleEngine.getRule(ruleId)
|
|
if not existingRule:
|
|
return False
|
|
|
|
# 创建新的条件
|
|
condition = Condition(
|
|
variable=ruleData['variable'],
|
|
operator=OperatorType(ruleData['operator']),
|
|
value=ruleData['threshold']
|
|
)
|
|
|
|
# 创建新的动作
|
|
action = Action(
|
|
actionType=ActionType(ruleData['actionType']),
|
|
target=ruleData['target'],
|
|
value=ruleData['value'],
|
|
delay=ruleData.get('delay', 0)
|
|
)
|
|
|
|
# 更新规则属性
|
|
existingRule.name = ruleData['name']
|
|
existingRule.description = ruleData['description']
|
|
existingRule.enabled = ruleData.get('enabled', True)
|
|
existingRule.conditions = [condition]
|
|
existingRule.actions = [action]
|
|
existingRule.cooldownPeriod = ruleData.get('cooldownPeriod', 0)
|
|
existingRule.logicOperator = ruleData.get('logicOperator', 'and')
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"更新规则失败: {e}")
|
|
return False
|
|
|
|
def getAllRules(self) -> List[Dict]:
|
|
"""获取所有规则"""
|
|
return [rule.toDict() for rule in self.ruleEngine.getAllRules()]
|
|
|
|
def getSystemStatus(self) -> Dict:
|
|
"""获取系统状态"""
|
|
rules = self.ruleEngine.getAllRules()
|
|
variables = self.variableMonitor.getAllVariables()
|
|
currentProject = Globals.getValue('currentPro')
|
|
|
|
return {
|
|
'initialized': self.isInitialized,
|
|
'currentProject': currentProject or '未选择工程',
|
|
'configPath': self.configPath,
|
|
'totalRules': len(rules),
|
|
'enabledRules': len([r for r in rules if r.enabled]),
|
|
'monitoredVariables': len(variables),
|
|
'lastUpdate': datetime.now().isoformat()
|
|
}
|
|
|
|
def testRule(self, ruleId: str) -> bool:
|
|
"""测试规则"""
|
|
rule = self.ruleEngine.getRule(ruleId)
|
|
if rule:
|
|
return rule.evaluate(self.variableMonitor)
|
|
return False
|
|
|
|
def shutdown(self):
|
|
"""关闭控制系统"""
|
|
try:
|
|
# 保存当前工程的规则配置
|
|
self.saveConfiguration()
|
|
|
|
# 停止变量监控
|
|
self.variableMonitor.stopMonitoring()
|
|
|
|
print("控制系统已关闭,规则配置已保存")
|
|
except Exception as e:
|
|
print(f"关闭控制系统时出错: {e}")
|
|
|
|
|
|
# 全局控制管理器实例
|
|
_globalControlManager = None
|
|
|
|
|
|
def getControlManager() -> ControlManager:
|
|
"""获取全局控制管理器"""
|
|
global _globalControlManager
|
|
if _globalControlManager is None:
|
|
_globalControlManager = ControlManager()
|
|
return _globalControlManager
|
|
|
|
|
|
def resetControlManager():
|
|
"""重置全局控制管理器(用于测试)"""
|
|
global _globalControlManager
|
|
if _globalControlManager:
|
|
try:
|
|
_globalControlManager.shutdown()
|
|
except:
|
|
pass
|
|
_globalControlManager = None
|
|
|
|
# 同时重置事件系统和规则引擎
|
|
from .EventSystem import resetEventSystem
|
|
from .RuleEngine import resetRuleEngine
|
|
resetEventSystem()
|
|
resetRuleEngine()
|
|
|
|
|
|
def initializeControlSystem():
|
|
"""初始化控制系统"""
|
|
controlManager = getControlManager()
|
|
controlManager.initialize()
|
|
return controlManager |