import socket import struct import threading import time import logging from collections import deque logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class Box1Communicator: START_FLAG = b'\xF3\x12' END_FLAG = b'\xF3\x14' HEADER_SIZE = 10 # 命令类型 CMD_READ_AI = 1 CMD_WRITE_AO = 2 CMD_READ_DELTAT = 3 RESP_READ_AI = 11 RESP_WRITE_AO = 12 RESP_READ_DELTAT = 13 def __init__(self, host, port, reconnectInterval=3): self.host = host self.port = port self.reconnectInterval = reconnectInterval self.sock = None self.connected = False self.packetCounter = 0 self.lock = threading.Lock() self.stopEvent = threading.Event() # 回调函数 self.aiCallback = None self.deltatCallback = None self.aoCallback = None # 响应等待机制 self.pending_responses = {} self.response_lock = threading.Lock() self.connect() # 启动连接和接收线程 threading.Thread(target=self.connectThread, daemon=True).start() threading.Thread(target=self.receiveThread, daemon=True).start() def registerCallbacks(self, aiCallback=None, deltatCallback=None, aoCallback=None): self.aiCallback = aiCallback self.deltatCallback = deltatCallback self.aoCallback = aoCallback def connectThread(self): while not self.stopEvent.is_set(): if not self.connected: self.connect() time.sleep(1) def connect(self): try: if self.sock: self.sock.close() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.bind((self.host, 6323)) self.sock.connect((self.host, self.port)) self.connected = True self.packetCounter = 0 logging.info(f"Connected to {self.host}:{self.port}") return True except Exception as e: logging.error(f"Connection failed: {e}") self.connected = False time.sleep(self.reconnectInterval) return False def sendCommand(self, cmdType, data=None): if not self.connected: if not self.connect(): return None # 连接失败 try: with self.lock: body = b'' if cmdType == self.CMD_WRITE_AO and data: if len(data) != 36: raise ValueError("AO data requires 36 elements") body = struct.pack('>' + 'd'*36, *data) elif cmdType == self.CMD_READ_AI or cmdType == self.CMD_READ_DELTAT: body = struct.pack('>d', 0) # 占位数据 dataSize = len(body) current_counter = self.packetCounter header = struct.pack('>H I I', cmdType, dataSize, current_counter) packet = self.START_FLAG + header + body + self.END_FLAG # 创建等待事件 response_event = threading.Event() with self.response_lock: self.pending_responses[current_counter] = (cmdType, response_event, deque()) self.sock.sendall(packet) self.packetCounter += 1 return current_counter except Exception as e: logging.error(f"Send error: {e}") self.connected = False return None def waitForResponse(self, packetNum, expectedType, timeout=5): """等待指定packetNum的响应""" try: with self.response_lock: if packetNum not in self.pending_responses: logging.error(f"Invalid packetNum: {packetNum}") return None _, event, result_queue = self.pending_responses[packetNum] # 等待响应或超时 if not event.wait(timeout): logging.warning(f"Response timeout for packet {packetNum}") return None with self.response_lock: # 检查结果是否有效 if not result_queue: return None result = result_queue.popleft() # 验证响应类型 if result[0] != expectedType + 10: # 响应类型 = 命令类型 + 10 logging.error(f"Response type mismatch. Expected:{expectedType+10}, Got:{result[0]}") return None return result[1] finally: with self.response_lock: if packetNum in self.pending_responses: del self.pending_responses[packetNum] def receiveThread(self): buffer = b'' while not self.stopEvent.is_set(): try: if not self.connected: time.sleep(1) continue data = self.sock.recv(4096) if not data: self.connected = False continue buffer += data while True: startPos = buffer.find(self.START_FLAG) if startPos == -1: break # 移除起始标志前的无效数据 if startPos > 0: buffer = buffer[startPos:] # 检查是否收到完整头部 if len(buffer) < len(self.START_FLAG) + self.HEADER_SIZE: break # 解析头部 header = buffer[len(self.START_FLAG):len(self.START_FLAG) + self.HEADER_SIZE] dataType, dataSize, packetNum = struct.unpack('>H I I', header) # 检查是否收到完整数据包 frameLen = len(self.START_FLAG) + self.HEADER_SIZE + dataSize + len(self.END_FLAG) if len(buffer) < frameLen: break # 提取完整数据帧 fullFrame = buffer[:frameLen] buffer = buffer[frameLen:] # 验证结束标志 if fullFrame[-len(self.END_FLAG):] != self.END_FLAG: logging.warning("Invalid end flag. Discarding packet.") continue # 处理数据包 body = fullFrame[len(self.START_FLAG) + self.HEADER_SIZE:-len(self.END_FLAG)] self.processData(dataType, body, packetNum) except Exception as e: logging.error(f"Receive error: {e}") self.connected = False def processData(self, dataType, body, packetNum): try: result = None # AI响应处理 if dataType == self.RESP_READ_AI: if len(body) != 72: logging.error(f"Invalid AI response length: {len(body)}") return # print(1) aiValues = struct.unpack('>8d', body[:64]) diBools = [(struct.unpack('>Q', body[64:72])[0] >> i) & 1 for i in range(16)] result = (aiValues, diBools) # print(result) if self.aiCallback: self.aiCallback(aiValues, diBools, packetNum) # DeltaT响应处理 elif dataType == self.RESP_READ_DELTAT: if len(body) != 128: logging.error(f"Invalid DeltaT response length: {len(body)}") return deltatValues = struct.unpack('>16d', body) result = deltatValues if self.deltatCallback: self.deltatCallback(deltatValues, packetNum) # AO响应处理 elif dataType == self.RESP_WRITE_AO: success = struct.unpack('>d', body)[0] == 0.0 if len(body)==8 else False result = success if self.aoCallback: self.aoCallback(success, packetNum) else: logging.warning(f"Unknown response type: {dataType}") return # 唤醒等待线程 with self.response_lock: if packetNum in self.pending_responses: _, event, result_queue = self.pending_responses[packetNum] result_queue.append((dataType, result)) event.set() except Exception as e: logging.error(f"Data processing error: {e}") def readAIDI(self): """同步读取AI数据""" packetNum = self.sendCommand(self.CMD_READ_AI) if packetNum is None: return None return self.waitForResponse(packetNum, self.CMD_READ_AI) # 返回16通道AI和8通道DI def readDeltaT(self): """同步读取DeltaT数据""" packetNum = self.sendCommand(self.CMD_READ_DELTAT) if packetNum is None: return None return self.waitForResponse(packetNum, self.CMD_READ_DELTAT) def writeAo(self, aoData): """同步写入AO数据并返回结果""" packetNum = self.sendCommand(self.CMD_WRITE_AO, aoData) if packetNum is None: return False return self.waitForResponse(packetNum, self.CMD_WRITE_AO) or False def shutdown(self): self.stopEvent.set() if self.sock: self.sock.close() # 唤醒所有等待线程 with self.response_lock: for packetNum, (_, event, _) in list(self.pending_responses.items()): event.set() del self.pending_responses[packetNum] logging.info("Communicator stopped") import time import logging import threading import random # from procli import Box1Communicator # 导入前面修改的客户端类 # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("box1_client_test.log"), logging.StreamHandler() ] ) # 全局状态 class TestState: running = True read_count = 0 read_count_1 = 0 write_count = 0 last_ai = None last_deltat = None # 回调函数定义 def ai_callback(ai_values, di_states, packet_num): TestState.last_ai = (ai_values, di_states) TestState.read_count += 1 # if TestState.read_count % 10 == 0: # logging.info(f"AI Callback - Packet {packet_num}: Received {TestState.read_count}th AI update") logging.info(f" 读取到的AI: {ai_values}...") logging.info(f" 读取到的DI: {di_states}...") def deltat_callback(deltat_values, packet_num): TestState.last_deltat = deltat_values TestState.read_count_1 += 1 # if TestState.read_count_1 % 10 == 0: # logging.info(f"DeltaT Callback - Packet {packet_num}: Received {TestState.read_count}th DeltaT update") logging.info(f"读取到的delat: {deltat_values}...") def ao_callback(success, packet_num): TestState.write_count += 1 status = "SUCCESS" if success else "FAILED" # if TestState.write_count % 10 == 0: logging.info(f"写入AO成功 {packet_num}: Write {status} ({TestState.write_count}th write)") def read_worker(communicator, interval): """循环读取数据的线程""" while TestState.running: try: # 读取AI数据 communicator.readAIDI() # 读取DeltaT数据 communicator.readDeltaT() # 等待指定间隔 time.sleep(interval) except Exception as e: logging.error(f"Read worker error: {e}") time.sleep(1) def write_worker(communicator, interval): """循环写入数据的线程""" ao_data = [0.0] * 36 # 初始AO数据 while TestState.running: try: # 随机修改一些AO值 for i in range(5): idx = random.randint(0, 35) ao_data[idx] = random.uniform(0.0, 5.0) # 写入AO数据 # print(ao_data) communicator.writeAo(ao_data) # 等待指定间隔 time.sleep(interval) except Exception as e: logging.error(f"Write worker error: {e}") time.sleep(1) def main(): # 创建通信器实例 communicator = Box1Communicator("localhost", 8000) # 注册回调函数 communicator.registerCallbacks( aiCallback=ai_callback, deltatCallback=deltat_callback, aoCallback=ao_callback ) communicator.readAIDI() communicator.readDeltaT() communicator.writeAo([1.0] * 36) # 单步调试 # logging.info("Starting Box1 client test with continuous read/write...") # try: # # 等待初始连接 # time.sleep(1) # # 启动读取线程 (每200ms读取一次) # read_thread = threading.Thread(target=read_worker, args=(communicator, 0.2), daemon=True) # read_thread.start() # # 启动写入线程 (每500ms写入一次) # write_thread = threading.Thread(target=write_worker, args=(communicator, 0.2), daemon=True) # write_thread.start() # # # 启动状态监控线程 # # monitor_thread = threading.Thread(target=status_monitor, daemon=True) # # monitor_thread.start() # # 主线程等待用户输入退出 # logging.info("Continuous read/write test running. Press Enter to stop...") # input() # except KeyboardInterrupt: # logging.info("Test interrupted by user") # finally: # # 设置停止标志 # TestState.running = False # # 等待工作线程结束 # time.sleep(1) # # 清理资源 communicator.shutdown() # logging.info("Test completed. Final stats:") # logging.info(f"Total reads: {TestState.read_count}") # logging.info(f"Total writes: {TestState.write_count}") if __name__ == "__main__": main()