You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

536 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
控制系统管理器
整合事件系统和规则引擎提供统一的控制接口
"""
import os
import threading
import time
from typing import Dict, List, Any, Optional
from datetime import datetime
from .EventSystem import getEventBus, getVariableMonitor, VariableEvent, EventType
from .RuleEngine import getRuleEngine, Rule, Condition, Action, OperatorType, ActionType
from utils import Globals
class ControlManager:
"""控制系统管理器"""
def __init__(self):
self.eventBus = getEventBus()
self.variableMonitor = getVariableMonitor()
self.ruleEngine = getRuleEngine()
self.isInitialized = False
self._lock = threading.RLock()
# 动态获取配置路径(基于当前工程)
self.configPath = self._getConfigPath()
# 确保配置目录存在
os.makedirs(self.configPath, exist_ok=True)
def _getConfigPath(self):
"""获取当前工程的配置路径"""
currentProject = Globals.getValue('currentPro')
if currentProject:
# 确保工程名是字符串
projectName = str(currentProject)
# 基于工程的配置路径
projectPath = os.path.join('project', projectName, 'control')
return projectPath
else:
# 默认配置路径(兼容性)
return "control/config"
def initialize(self):
"""初始化控制系统"""
if self.isInitialized:
return
try:
# 集成现有的协议管理器
self._integrateWithProtocolManager()
# 加载规则配置
self._loadConfiguration()
# 启动变量监控
self.variableMonitor.startMonitoring()
self.isInitialized = True
print("控制系统初始化成功")
except Exception as e:
print(f"控制系统初始化失败: {e}")
def _integrateWithProtocolManager(self):
"""与现有协议管理器集成"""
protocolManager = Globals.getValue("protocolManage")
if protocolManager:
# 扩展协议管理器,添加变量变化通知
originalWriteMethod = protocolManager.writeVariableValue
originalReadMethod = getattr(protocolManager, 'readVariableValue', None)
def enhancedWrite(varName, value, **kwargs):
result = originalWriteMethod(varName, value, **kwargs)
if result:
# 通知变量监控器
self.variableMonitor.updateVariable(varName, value)
return result
# 替换写入方法
protocolManager.writeVariableValue = enhancedWrite
# 同步现有的变量值到变量监控器
self._syncExistingVariables(protocolManager)
print("已集成协议管理器")
def _syncExistingVariables(self, protocolManager):
"""同步现有变量值到变量监控器"""
try:
# 获取所有变量名
allVarNames = list(protocolManager.getAllVariableNames())
print(f"发现 {len(allVarNames)} 个变量,开始同步...")
# 同步变量值
syncCount = 0
for varName in allVarNames:
try:
value = protocolManager.readVariableValue(varName)
if value is not None:
# 添加到监控器(不触发事件)
with self.variableMonitor._lock:
self.variableMonitor.variableCache[varName] = value
syncCount += 1
except Exception as e:
print(f"同步变量 {varName} 失败: {e}")
print(f"成功同步 {syncCount} 个变量值")
except Exception as e:
print(f"同步变量失败: {e}")
def _loadConfiguration(self):
"""加载配置"""
# 动态更新配置路径
self.configPath = self._getConfigPath()
os.makedirs(self.configPath, exist_ok=True)
rulesFile = os.path.join(self.configPath, "rules.json")
if os.path.exists(rulesFile):
self.ruleEngine.loadRulesFromFile(rulesFile)
print(f"从工程配置加载规则: {rulesFile}")
# 自动启动规则
self._autoStartRules()
else:
print(f"工程规则文件不存在,将创建新文件: {rulesFile}")
def _autoStartRules(self):
"""自动启动所有启用的规则"""
try:
rules = self.ruleEngine.getAllRules()
if not rules:
print("没有找到规则,跳过自动启动")
return
# 自动添加所有规则中涉及的变量到监控器
monitoredVars = set()
enabledRulesCount = 0
for rule in rules:
if rule.enabled:
enabledRulesCount += 1
# 添加规则条件中的变量到监控
for condition in rule.conditions:
if condition.variable not in monitoredVars:
self.addMonitorVariable(condition.variable)
monitoredVars.add(condition.variable)
print(f"自动启动完成: {enabledRulesCount} 个启用规则, {len(monitoredVars)} 个监控变量")
# 确保变量监控器正在运行
if not self.variableMonitor.isRunning:
self.variableMonitor.startMonitoring()
print("已启动变量监控器")
except Exception as e:
print(f"自动启动规则失败: {e}")
def saveConfiguration(self):
"""保存配置"""
# 动态更新配置路径
self.configPath = self._getConfigPath()
os.makedirs(self.configPath, exist_ok=True)
rulesFile = os.path.join(self.configPath, "rules.json")
self.ruleEngine.saveRulesToFile(rulesFile)
print(f"规则已保存到工程配置: {rulesFile}")
def switchProject(self, projectName: str = None):
"""切换工程时重新加载规则配置"""
try:
# 保存当前工程的规则(如果有的话)
if self.isInitialized and hasattr(self, 'configPath'):
oldConfigPath = self.configPath
# 只有当有规则时才保存到旧工程路径
if len(self.ruleEngine.rules) > 0:
# 确保旧工程目录存在
os.makedirs(oldConfigPath, exist_ok=True)
oldRulesFile = os.path.join(oldConfigPath, "rules.json")
self.ruleEngine.saveRulesToFile(oldRulesFile)
print(f"已保存旧工程规则: {oldConfigPath}")
# 停止当前的变量监控
if self.variableMonitor.isRunning:
self.variableMonitor.stopMonitoring()
# 清空当前规则和变量监控
with self._lock:
self.ruleEngine.rules.clear()
self.variableMonitor.monitorConfig.clear()
self.variableMonitor.variableCache.clear()
# 更新配置路径(基于新的当前工程)
self.configPath = self._getConfigPath()
# 加载新工程的规则(包含自动启动)
self._loadConfiguration()
print(f"已切换到工程规则配置: {self.configPath}")
except Exception as e:
print(f"切换工程规则配置失败: {e}")
def delayedAutoStart(self, delay: int = 2000):
"""延迟自动启动规则(给协议管理器时间初始化)"""
def autoStartAfterDelay():
time.sleep(delay / 1000.0) # 转换为秒
try:
# 重新集成协议管理器
self._integrateWithProtocolManager()
# 自动启动规则
self._autoStartRules()
print("延迟自动启动规则完成")
except Exception as e:
print(f"延迟自动启动规则失败: {e}")
# 在后台线程中执行延迟启动
import threading
threading.Thread(target=autoStartAfterDelay, daemon=True).start()
def addMonitorVariable(self, varName: str, threshold: float = 0.1):
"""添加监控变量"""
self.variableMonitor.addVariable(varName, threshold)
print(f"已添加监控变量: {varName}")
def removeMonitorVariable(self, varName: str):
"""移除监控变量"""
self.variableMonitor.removeVariable(varName)
print(f"已移除监控变量: {varName}")
def createSimpleRule(self, name: str, varName: str, operator: str,
threshold: Any, targetVar: str, targetValue: Any) -> str:
"""创建简单规则"""
ruleId = f"rule_{int(time.time())}"
# 创建条件
condition = Condition(
variable=varName,
operator=OperatorType(operator),
value=threshold
)
# 创建动作
action = Action(
actionType=ActionType.SET_VARIABLE,
target=targetVar,
value=targetValue
)
# 创建规则
rule = Rule(
ruleId=ruleId,
name=name,
description=f"{varName} {operator} {threshold} 时,设置 {targetVar} = {targetValue}",
enabled=True,
conditions=[condition],
actions=[action]
)
self.ruleEngine.addRule(rule)
# 自动添加变量到监控器
self.addMonitorVariable(varName)
return ruleId
def createAlarmRule(self, name: str, varName: str, operator: str,
threshold: Any, alarmMessage: str) -> str:
"""创建报警规则"""
ruleId = f"alarm_{int(time.time())}"
condition = Condition(
variable=varName,
operator=OperatorType(operator),
value=threshold
)
action = Action(
actionType=ActionType.SEND_ALARM,
target=varName,
value=alarmMessage
)
rule = Rule(
ruleId=ruleId,
name=name,
description=f"{varName} {operator} {threshold} 时,发送报警",
enabled=True,
conditions=[condition],
actions=[action],
cooldownPeriod=5000 # 5秒冷却期
)
self.ruleEngine.addRule(rule)
# 自动添加变量到监控器
self.addMonitorVariable(varName)
return ruleId
def createMultiConditionRule(self, name: str, description: str,
conditions: List[Dict], actions: List[Dict],
logicOperator: str = "and", cooldownPeriod: int = 0) -> str:
"""创建多条件规则
Args:
name: 规则名称
description: 规则描述
conditions: 条件列表格式: [{"variable": "var1", "operator": ">", "value": 10}, ...]
actions: 动作列表格式: [{"actionType": "set_variable", "target": "var2", "value": 1}, ...]
logicOperator: 逻辑操作符 ("and" "or")
cooldownPeriod: 冷却期毫秒
Returns:
规则ID
"""
ruleId = f"multi_{int(time.time())}"
# 创建条件对象列表
conditionObjs = []
for condData in conditions:
condition = Condition(
variable=condData['variable'],
operator=OperatorType(condData['operator']),
value=condData['value']
)
conditionObjs.append(condition)
# 自动添加变量到监控器
self.addMonitorVariable(condData['variable'])
# 创建动作对象列表
actionObjs = []
for actionData in actions:
action = Action(
actionType=ActionType(actionData['actionType']),
target=actionData['target'],
value=actionData['value'],
delay=actionData.get('delay', 0)
)
actionObjs.append(action)
# 创建规则
rule = Rule(
ruleId=ruleId,
name=name,
description=description,
enabled=True,
conditions=conditionObjs,
actions=actionObjs,
logicOperator=logicOperator,
cooldownPeriod=cooldownPeriod
)
self.ruleEngine.addRule(rule)
return ruleId
def createComplexRule(self, ruleData: Dict) -> str:
"""创建复杂规则(通用方法)
Args:
ruleData: 完整的规则数据字典
Returns:
规则ID
"""
ruleId = ruleData.get('ruleId', f"complex_{int(time.time())}")
# 解析条件
conditions = []
for condData in ruleData.get('conditions', []):
condition = Condition(
variable=condData['variable'],
operator=OperatorType(condData['operator']),
value=condData['value']
)
conditions.append(condition)
# 自动添加变量到监控器
self.addMonitorVariable(condData['variable'])
# 解析动作
actions = []
for actionData in ruleData.get('actions', []):
action = Action(
actionType=ActionType(actionData['actionType']),
target=actionData['target'],
value=actionData['value'],
delay=actionData.get('delay', 0)
)
actions.append(action)
# 创建规则
rule = Rule(
ruleId=ruleId,
name=ruleData['name'],
description=ruleData.get('description', ''),
enabled=ruleData.get('enabled', True),
conditions=conditions,
actions=actions,
logicOperator=ruleData.get('logicOperator', 'and'),
cooldownPeriod=ruleData.get('cooldownPeriod', 0)
)
self.ruleEngine.addRule(rule)
return ruleId
def enableRule(self, ruleId: str):
"""启用规则"""
self.ruleEngine.enableRule(ruleId)
def disableRule(self, ruleId: str):
"""禁用规则"""
self.ruleEngine.disableRule(ruleId)
def deleteRule(self, ruleId: str):
"""删除规则"""
self.ruleEngine.removeRule(ruleId)
def updateRule(self, ruleId: str, ruleData: Dict) -> bool:
"""更新规则"""
try:
# 获取现有规则
existingRule = self.ruleEngine.getRule(ruleId)
if not existingRule:
return False
# 创建新的条件
condition = Condition(
variable=ruleData['variable'],
operator=OperatorType(ruleData['operator']),
value=ruleData['threshold']
)
# 创建新的动作
action = Action(
actionType=ActionType(ruleData['actionType']),
target=ruleData['target'],
value=ruleData['value'],
delay=ruleData.get('delay', 0)
)
# 更新规则属性
existingRule.name = ruleData['name']
existingRule.description = ruleData['description']
existingRule.enabled = ruleData.get('enabled', True)
existingRule.conditions = [condition]
existingRule.actions = [action]
existingRule.cooldownPeriod = ruleData.get('cooldownPeriod', 0)
existingRule.logicOperator = ruleData.get('logicOperator', 'and')
return True
except Exception as e:
print(f"更新规则失败: {e}")
return False
def getAllRules(self) -> List[Dict]:
"""获取所有规则"""
return [rule.toDict() for rule in self.ruleEngine.getAllRules()]
def getSystemStatus(self) -> Dict:
"""获取系统状态"""
rules = self.ruleEngine.getAllRules()
variables = self.variableMonitor.getAllVariables()
currentProject = Globals.getValue('currentPro')
return {
'initialized': self.isInitialized,
'currentProject': currentProject or '未选择工程',
'configPath': self.configPath,
'totalRules': len(rules),
'enabledRules': len([r for r in rules if r.enabled]),
'monitoredVariables': len(variables),
'lastUpdate': datetime.now().isoformat()
}
def testRule(self, ruleId: str) -> bool:
"""测试规则"""
rule = self.ruleEngine.getRule(ruleId)
if rule:
return rule.evaluate(self.variableMonitor)
return False
def shutdown(self):
"""关闭控制系统"""
try:
# 保存当前工程的规则配置
self.saveConfiguration()
# 停止变量监控
self.variableMonitor.stopMonitoring()
print("控制系统已关闭,规则配置已保存")
except Exception as e:
print(f"关闭控制系统时出错: {e}")
# 全局控制管理器实例
_globalControlManager = None
def getControlManager() -> ControlManager:
"""获取全局控制管理器"""
global _globalControlManager
if _globalControlManager is None:
_globalControlManager = ControlManager()
return _globalControlManager
def resetControlManager():
"""重置全局控制管理器(用于测试)"""
global _globalControlManager
if _globalControlManager:
try:
_globalControlManager.shutdown()
except:
pass
_globalControlManager = None
# 同时重置事件系统和规则引擎
from .EventSystem import resetEventSystem
from .RuleEngine import resetRuleEngine
resetEventSystem()
resetRuleEngine()
def initializeControlSystem():
"""初始化控制系统"""
controlManager = getControlManager()
controlManager.initialize()
return controlManager