import logging import time import threading from protocol.TCP.IOTCPClinet import TCPCommunicator FPGATrigger = 1 TCTrigger = 2 RTDTrigger = 3 localModel = 1 NetModel = 2 SimModel = 3 class TCPVarManager: def __init__(self, host, port): """ 初始化TCP变量管理器 :param host: 服务器主机地址 :param port: 服务器端口 """ self.communicator = TCPCommunicator(host, port) logging.info(f"TCPVarManager initialized for {host}:{port}") self.AODATA = [0.004] * 16 self.DODATA = [0] * 16 self.TCDATA = [0.0] * 8 # mv self.RTDDATA = [0.0] * 8 # Ω self.AIDATA = [0.0] * 8 self.DIDATA = [0] * 16 self.DELTATDATA = [0] * 16 self.simAOData = [0.004] * 16 self.simDOData = [0] * 16 self.simTCData = [0.0] * 8 # mv self.simRTDData = [0.0] * 8 # Ω self.simAIata = [0.0] * 8 self.simDIata = [0] * 16 self.startPeriodicRead() def startTimeTest(self, triggerType, time = 2000): data = [ # 0-15: 16通道AO电流数据(A) *self.AODATA, # 16: 输出模式(0:无输出/1:正常输出/2:触发输出) 2, # 触发输出 # 17: 触发类型(0:无触发 1:fpga触发/2:TC触发/3:RTD触发) triggerType, # 触发类型 time, # 实验时间(ms) # 18: 实验时间(ms)和 DO状态(U16转双精度) 0, # do补位 # 19: DO状态已包含在上面的do_value中 # 20-27: 8通道TC数据(mV) *self.TCDATA, # 28-35: 8通道RTD数据(Ω) *self.RTDDATA ] self.writeData(data) # print(1) def readAiDi(self): """ 读取AI和DI数据 :return: 包含(AI值, DI状态)的元组,或发生错误时返回None """ try: result = self.communicator.readAIDI() # print(result) if result is None: logging.warning("Failed to read AI/DI data") return result except Exception as e: logging.error(f"Error reading AI/DI: {e}") return None # 新增周期性读取方法 def startPeriodicRead(self, interval=0.5, callback=None): """ 启动周期性读取AI/DI数据 :param interval: 读取间隔(秒),默认500ms :param callback: 数据回调函数 """ self.stop_event = threading.Event() self.readThread = threading.Thread( target=self._read_worker, args=(interval, callback), daemon=True ) self.readThread.start() logging.info(f"Started periodic read every {interval*1000}ms") def _read_worker(self, interval, callback): """工作线程实现周期性读取""" while not self.stop_event.is_set(): try: result = self.readAiDi() self.AIDATA = result[0] self.DIDATA = result[1] # print(self.AIDATA, self.DIDATA) if callback and result: callback(result[0], result[1]) time.sleep(interval) except Exception as e: logging.error(f"Periodic read error: {e}") time.sleep(1) def stopPeriodicRead(self): """停止周期性读取""" if hasattr(self, 'stop_event'): self.stop_event.set() logging.info("Stopped periodic read") def writeData(self, data): """ 写入AO和DO数据 :param data: 36个AO值的列表 :param self.DODATA: 16个DO状态的列表 (0或1) :return: 写入成功返回True,否则返回False """ if len(data) != 36: logging.error("AO data must contain exactly 36 values") return False if len(self.DODATA) != 16: logging.error("DO states must contain exactly 16 values") return False try: # 将DO状态转换为单个浮点数(协议要求) doValue = 0 for i, state in enumerate(self.DODATA): doValue |= (state << i) # 替换data中第20个元素为DO状态值 data[19] = float(doValue) success = self.communicator.writeAo(data) if not success: logging.warning("AO/DO write operation failed") return success except Exception as e: logging.error(f"Error writing AO/DO: {e}") return False def readDeltaT(self): """ 读取DeltaT数据 :return: 16个DeltaT值的元组,或发生错误时返回None """ try: result = self.communicator.readDeltaT() if result is None: logging.warning("Failed to read DeltaT data") return result except Exception as e: logging.error(f"Error reading DeltaT: {e}") return None def writeValue(self, variableType, channel, value, trigger=None, model = localModel): if variableType == "AO": if model == SimModel: self.simAOData[channel] = float(value) return else: self.AODATA[channel] = float(value) elif variableType == "DO": if model == SimModel: self.simDOData[channel] = int(value) return else: self.DODATA[channel] = int(value) elif variableType in ['R', 'S', 'B', 'J', 'T', 'E', 'K', 'N', 'C', 'A']: if model == SimModel: self.simTCData[channel] = float(value) return else: self.TCDATA[channel] = float(value) elif variableType == "PT100": if model == SimModel: self.simRTDData[channel] = float(value) return else: self.RTDDATA[channel] = float(value) elif variableType == "AI": if model == SimModel: self.simAIata[channel] = float(value) return elif variableType == "DI": if model == SimModel: self.simDIata[channel] = int(value) return if not trigger: data = [ # 0-15: 16通道AO电流数据(A) *self.AODATA, # 16: 输出模式(0:无输出/1:正常输出/2:触发输出) 2, # 正常输出 # 17: 触发类型(0:无触发 1:fpga触发/2:TC触发/3:RTD触发) 0, # 正常写入 0, # 实验时间(ms) # 18: 实验时间(ms)和 DO状态(U16转双精度) 0, # do补位 # 19: DO状态已包含在上面的do_value中 # 20-27: 8通道TC数据(mV) *self.TCDATA, # 28-35: 8通道RTD数据(Ω) *self.RTDDATA ] # print(data) self.writeData(data) def readValue(self, variableType, channel, model = localModel): # channel = channel if variableType == "AI": # print(self.AIDATA) if model == SimModel: return self.simAIata[channel] else: return self.AIDATA[channel] elif variableType == "DI": if model == SimModel: return self.simDIata[channel] else: return self.DIDATA[channel] elif variableType == "AO": if model == SimModel: return self.simAOData[channel] else: return self.AODATA[channel] elif variableType == "DO": if model == SimModel: return self.simDOData[channel] else: return self.DODATA[channel] def shutdown(self): """关闭连接并清理资源""" try: self.stopPeriodicRead() self.communicator.shutdown() logging.info("TCPVarManager shutdown complete") except Exception as e: logging.error(f"Error during shutdown: {e}") if __name__ == "__main__": TCPVarManager = TCPVarManager("127.0.0.1", 8000) TCPVarManager.startTimeTest(TCTrigger) try: # 运行模拟器直到手动停止 while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping simulator...") TCPVarManager.shutdown() # TCPVarManager.shutdown()