import socket import struct import threading import time import logging import random logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class Box1Simulator: START_FLAG = b'\xF3\x12' END_FLAG = b'\xF3\x14' HEADER_SIZE = 10 # 命令类型 CMD_READ_AI = 1 CMD_WRITE_AO = 2 CMD_READ_DELTAT = 3 RESP_READ_AI = 11 RESP_WRITE_AO = 12 RESP_READ_DELTAT = 13 def __init__(self, host, port): self.host = host self.port = port self.server_socket = None self.stop_event = threading.Event() # 模拟设备状态 self.ai_values = [0.0] * 8 self.di_states = [0] * 16 self.ao_values = [0.0] * 36 self.deltat_values = [0] * 16 # 初始化模拟值 self.reset_simulated_values() # 启动服务 threading.Thread(target=self.run_server, daemon=True).start() logging.info(f"Box1 simulator started on {host}:{port}") def reset_simulated_values(self): """重置模拟值到初始状态""" # 模拟AI值 (0-10V范围) self.ai_values = [random.uniform(0, 10) for _ in range(8)] # 模拟DI状态 (随机0或1) self.di_states = [random.randint(0, 1) for _ in range(16)] # 模拟AO值 (初始全0) self.ao_values = [0.0] * 36 # 模拟DeltaT值 (时间戳差) self.deltat_values = [random.randint(1000, 5000) for _ in range(16)] logging.info("Simulated values reset") def run_server(self): """运行TCP服务器""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) while not self.stop_event.is_set(): try: client_sock, addr = self.server_socket.accept() logging.info(f"New connection from {addr}") threading.Thread(target=self.handle_client, args=(client_sock,), daemon=True).start() except socket.timeout: pass except Exception as e: logging.error(f"Server error: {e}") break def handle_client(self, client_sock): """处理客户端连接""" buffer = b'' try: while not self.stop_event.is_set(): data = client_sock.recv(4096) if not data: break buffer += data while True: start_pos = buffer.find(self.START_FLAG) if start_pos == -1: break if start_pos > 0: buffer = buffer[start_pos:] if len(buffer) < len(self.START_FLAG) + self.HEADER_SIZE: break header = buffer[len(self.START_FLAG):len(self.START_FLAG) + self.HEADER_SIZE] try: cmd_type, data_size, packet_num = struct.unpack('>H I I', header) except struct.error: logging.warning("Invalid header format") buffer = buffer[len(self.START_FLAG):] continue frame_len = len(self.START_FLAG) + self.HEADER_SIZE + data_size + len(self.END_FLAG) if len(buffer) < frame_len: break full_frame = buffer[:frame_len] buffer = buffer[frame_len:] if full_frame[-len(self.END_FLAG):] != self.END_FLAG: logging.warning("Invalid end flag") continue body = full_frame[len(self.START_FLAG) + self.HEADER_SIZE:-len(self.END_FLAG)] # 处理客户端命令 response = self.process_command(cmd_type, body, packet_num) if response: client_sock.sendall(response) except Exception as e: logging.error(f"Client handling error: {e}") finally: client_sock.close() logging.info("Client disconnected") def process_command(self, cmd_type, body, packet_num): """处理客户端命令并生成响应""" try: # 读取AI命令 if cmd_type == self.CMD_READ_AI: return self.create_ai_response(packet_num) # 写入AO命令 elif cmd_type == self.CMD_WRITE_AO: if len(body) != 288: # 36 doubles * 8 bytes logging.error(f"Invalid AO data length: {len(body)}") return None # 修改:使用小端字节序解包数据 ao_data = struct.unpack('<' + 'd'*36, body) self.ao_values = list(ao_data) logging.info(f"接收到的AO: {ao_data}... (total {len(ao_data)} values)") return self.create_ao_response(True, packet_num) # 读取DeltaT命令 elif cmd_type == self.CMD_READ_DELTAT: # 更新一些DeltaT值以模拟变化 for i in range(4): self.deltat_values[i] += random.randint(1, 10) print("deltatT为", self.deltat_values) return self.create_deltat_response(packet_num) else: logging.warning(f"Unknown command type: {cmd_type}") return None except Exception as e: logging.error(f"Command processing error: {e}") return None def create_ai_response(self, packet_num): """创建AI响应数据包""" # 模拟值轻微变化 for i in range(8): self.ai_values[i] += random.uniform(-0.1, 0.1) self.ai_values[i] = max(0.0, min(10.0, self.ai_values[i])) # 随机翻转一些DI状态 for i in range(4): if random.random() < 0.2: # 20%几率翻转 self.di_states[i] = 1 - self.di_states[i] # print(self.di_states, 1111) # 修改:生成500组AI/DI数据 full_body = b'' for _ in range(500): # 修改:使用小端字节序打包8个AI值 ai_bytes = struct.pack('<' + 'd'*8, *self.ai_values) # 打包DI状态为一个64位整数 di_state_int = 0 for i, state in enumerate(self.di_states): di_state_int |= (state << i) di_bytes = struct.pack('H I I', cmd_type, data_size, packet_num) return self.START_FLAG + header + body + self.END_FLAG def shutdown(self): """停止服务器""" self.stop_event.set() if self.server_socket: self.server_socket.close() logging.info("Box1 simulator stopped") if __name__ == "__main__": # 创建模拟器实例 simulator = Box1Simulator("localhost", 8000) try: # 运行模拟器直到手动停止 while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping simulator...") simulator.shutdown()