You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
8.9 KiB
Python

4 months ago
import logging
import time
import threading
4 months ago
from protocol.TCP.IOTCPClinet import TCPCommunicator
4 months ago
FPGATrigger = 1
TCTrigger = 2
RTDTrigger = 3
4 months ago
localModel = 1
NetModel = 2
SimModel = 3
4 months ago
class TCPVarManager:
def __init__(self, host, port):
"""
初始化TCP变量管理器
:param host: 服务器主机地址
:param port: 服务器端口
"""
self.communicator = TCPCommunicator(host, port)
logging.info(f"TCPVarManager initialized for {host}:{port}")
4 months ago
self.AODATA = [0.004] * 16
4 months ago
self.DODATA = [0] * 16
4 months ago
self.TCDATA = [0.0] * 8 # mv
self.RTDDATA = [0.0] * 8 # Ω
4 months ago
self.AIDATA = [0.0] * 8
self.DIDATA = [0] * 16
self.DELTATDATA = [0] * 16
4 months ago
self.simAOData = [0.004] * 16
self.simDOData = [0] * 16
self.simTCData = [0.0] * 8 # mv
self.simRTDData = [0.0] * 8 # Ω
self.simAIata = [0.0] * 8
self.simDIata = [0] * 16
4 months ago
self.startPeriodicRead()
def startTimeTest(self, triggerType, time = 2000):
data = [
# 0-15: 16通道AO电流数据A
*self.AODATA,
# 16: 输出模式0无输出/1正常输出/2触发输出
2, # 触发输出
# 17: 触发类型0:无触发 1fpga触发/2TC触发/3RTD触发
4 months ago
triggerType, # 触发类型
4 months ago
time, # 实验时间ms
# 18: 实验时间ms和 DO状态U16转双精度
4 months ago
0, # do补位
4 months ago
# 19: DO状态已包含在上面的do_value中
# 20-27: 8通道TC数据mV
*self.TCDATA,
# 28-35: 8通道RTD数据Ω
*self.RTDDATA
]
4 months ago
self.writeData(data)
4 months ago
# print(1)
def readAiDi(self):
"""
读取AI和DI数据
:return: 包含(AI值, DI状态)的元组或发生错误时返回None
"""
try:
result = self.communicator.readAIDI()
# print(result)
if result is None:
logging.warning("Failed to read AI/DI data")
return result
except Exception as e:
logging.error(f"Error reading AI/DI: {e}")
return None
# 新增周期性读取方法
def startPeriodicRead(self, interval=0.5, callback=None):
"""
启动周期性读取AI/DI数据
:param interval: 读取间隔()默认500ms
:param callback: 数据回调函数
"""
self.stop_event = threading.Event()
self.readThread = threading.Thread(
target=self._read_worker,
args=(interval, callback),
daemon=True
)
self.readThread.start()
logging.info(f"Started periodic read every {interval*1000}ms")
def _read_worker(self, interval, callback):
"""工作线程实现周期性读取"""
while not self.stop_event.is_set():
try:
result = self.readAiDi()
self.AIDATA = result[0]
self.DIDATA = result[1]
# print(self.AIDATA, self.DIDATA)
if callback and result:
callback(result[0], result[1])
time.sleep(interval)
except Exception as e:
logging.error(f"Periodic read error: {e}")
time.sleep(1)
def stopPeriodicRead(self):
"""停止周期性读取"""
if hasattr(self, 'stop_event'):
self.stop_event.set()
logging.info("Stopped periodic read")
4 months ago
def writeData(self, data):
4 months ago
"""
写入AO和DO数据
4 months ago
:param data: 36个AO值的列表
:param self.DODATA: 16个DO状态的列表 (0或1)
4 months ago
:return: 写入成功返回True否则返回False
"""
4 months ago
if len(data) != 36:
4 months ago
logging.error("AO data must contain exactly 36 values")
return False
4 months ago
if len(self.DODATA) != 16:
4 months ago
logging.error("DO states must contain exactly 16 values")
return False
try:
# 将DO状态转换为单个浮点数协议要求
4 months ago
doValue = 0
4 months ago
for i, state in enumerate(self.DODATA):
4 months ago
doValue |= (state << i)
4 months ago
4 months ago
# 替换data中第20个元素为DO状态值
4 months ago
data[19] = float(doValue)
4 months ago
4 months ago
success = self.communicator.writeAo(data)
4 months ago
if not success:
logging.warning("AO/DO write operation failed")
4 months ago
# print(success)
4 months ago
return success
except Exception as e:
logging.error(f"Error writing AO/DO: {e}")
return False
def readDeltaT(self):
"""
读取DeltaT数据
:return: 16个DeltaT值的元组或发生错误时返回None
"""
try:
result = self.communicator.readDeltaT()
if result is None:
logging.warning("Failed to read DeltaT data")
return result
except Exception as e:
logging.error(f"Error reading DeltaT: {e}")
return None
4 months ago
4 months ago
def writeValue(self, variableType, channel, value, trigger=None, model = localModel):
4 months ago
if variableType == "AO":
4 months ago
if model == SimModel:
self.simAOData[channel] = float(value)
return
else:
self.AODATA[channel] = float(value)
4 months ago
elif variableType == "DO":
4 months ago
if model == SimModel:
self.simDOData[channel] = int(value)
return
else:
self.DODATA[channel] = int(value)
4 months ago
elif variableType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']:
4 months ago
if model == SimModel:
self.simTCData[channel] = float(value)
return
else:
self.TCDATA[channel] = float(value)
4 months ago
elif variableType == "PT100":
4 months ago
if model == SimModel:
self.simRTDData[channel] = float(value)
return
else:
self.RTDDATA[channel] = float(value)
elif variableType == "AI":
if model == SimModel:
self.simAIata[channel] = float(value)
return
elif variableType == "DI":
if model == SimModel:
self.simDIata[channel] = int(value)
return
4 months ago
if not trigger:
data = [
# 0-15: 16通道AO电流数据A
*self.AODATA,
# 16: 输出模式0无输出/1正常输出/2触发输出
4 months ago
1, # 正常输出
4 months ago
# 17: 触发类型0:无触发 1fpga触发/2TC触发/3RTD触发
0, # 正常写入
0, # 实验时间ms
# 18: 实验时间ms和 DO状态U16转双精度
0, # do补位
# 19: DO状态已包含在上面的do_value中
# 20-27: 8通道TC数据mV
*self.TCDATA,
# 28-35: 8通道RTD数据Ω
*self.RTDDATA
]
# print(data)
self.writeData(data)
4 months ago
def readValue(self, variableType, channel, model = localModel):
4 months ago
# channel = channel
if variableType == "AI":
# print(self.AIDATA)
4 months ago
if model == SimModel:
return self.simAIata[channel]
else:
return self.AIDATA[channel]
4 months ago
elif variableType == "DI":
4 months ago
if model == SimModel:
return self.simDIata[channel]
else:
return self.DIDATA[channel]
4 months ago
elif variableType == "AO":
4 months ago
if model == SimModel:
return self.simAOData[channel]
else:
return self.AODATA[channel]
4 months ago
elif variableType == "DO":
4 months ago
if model == SimModel:
return self.simDOData[channel]
else:
return self.DODATA[channel]
4 months ago
4 months ago
def shutdown(self):
"""关闭连接并清理资源"""
try:
self.stopPeriodicRead()
self.communicator.shutdown()
logging.info("TCPVarManager shutdown complete")
except Exception as e:
logging.error(f"Error during shutdown: {e}")
if __name__ == "__main__":
TCPVarManager = TCPVarManager("127.0.0.1", 8000)
TCPVarManager.startTimeTest(TCTrigger)
try:
# 运行模拟器直到手动停止
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping simulator...")
TCPVarManager.shutdown()
# TCPVarManager.shutdown()