You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
16 KiB
Python

4 months ago
import socket
import struct
import threading
import time
import logging
from collections import deque
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Box1Communicator:
START_FLAG = b'\xF3\x12'
END_FLAG = b'\xF3\x14'
HEADER_SIZE = 10
# 命令类型
CMD_READ_AI = 1
CMD_WRITE_AO = 2
CMD_READ_DELTAT = 3
RESP_READ_AI = 11
RESP_WRITE_AO = 12
RESP_READ_DELTAT = 13
def __init__(self, host, port, reconnectInterval=3):
self.host = host
self.port = port
self.reconnectInterval = reconnectInterval
self.sock = None
self.connected = False
self.packetCounter = 0
self.lock = threading.Lock()
self.stopEvent = threading.Event()
# 回调函数
self.aiCallback = None
self.deltatCallback = None
self.aoCallback = None
# 响应等待机制
self.pending_responses = {}
self.response_lock = threading.Lock()
4 months ago
# 新增就绪状态标志
self.isReady = False # 服务器是否完全就绪
4 months ago
self.connect()
# 启动连接和接收线程
threading.Thread(target=self.connectThread, daemon=True).start()
threading.Thread(target=self.receiveThread, daemon=True).start()
def registerCallbacks(self, aiCallback=None, deltatCallback=None, aoCallback=None):
self.aiCallback = aiCallback
self.deltatCallback = deltatCallback
self.aoCallback = aoCallback
def connectThread(self):
while not self.stopEvent.is_set():
if not self.connected:
self.connect()
4 months ago
# 重连后执行就绪检查
if self.connected:
self.checkServerReady() # 新增:检查服务器就绪状态
time.sleep(0.5)
4 months ago
def connect(self):
try:
if self.sock:
self.sock.close()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self.connected = True
self.packetCounter = 0
4 months ago
self.isReady = False # 重置就绪状态
# 连接后执行就绪检
4 months ago
logging.info(f"Connected to {self.host}:{self.port}")
return True
except Exception as e:
logging.error(f"Connection failed: {e}")
self.connected = False
time.sleep(self.reconnectInterval)
return False
4 months ago
def sendCommand(self, cmdType, data=None, skipReadyCheck=False):
# 增加就绪状态检查
# 增加连接状态检查,确保完全连接后再发送
if not self.connected or not self.sock:
print("Not connected")
4 months ago
if not self.connect():
4 months ago
return None
4 months ago
try:
with self.lock:
body = b''
if cmdType == self.CMD_WRITE_AO and data:
if len(data) != 36:
raise ValueError("AO data requires 36 elements")
4 months ago
body = struct.pack('<' + 'd'*36, *data)
4 months ago
elif cmdType == self.CMD_READ_AI or cmdType == self.CMD_READ_DELTAT:
4 months ago
body = struct.pack('<d', 0) # 占位数据
4 months ago
dataSize = len(body)
current_counter = self.packetCounter
header = struct.pack('>H I I', cmdType, dataSize, current_counter)
packet = self.START_FLAG + header + body + self.END_FLAG
# 创建等待事件
response_event = threading.Event()
with self.response_lock:
self.pending_responses[current_counter] = (cmdType, response_event, deque())
self.sock.sendall(packet)
self.packetCounter += 1
return current_counter
except Exception as e:
logging.error(f"Send error: {e}")
self.connected = False
return None
def waitForResponse(self, packetNum, expectedType, timeout=5):
"""等待指定packetNum的响应"""
try:
with self.response_lock:
if packetNum not in self.pending_responses:
logging.error(f"Invalid packetNum: {packetNum}")
return None
_, event, result_queue = self.pending_responses[packetNum]
# 等待响应或超时
if not event.wait(timeout):
logging.warning(f"Response timeout for packet {packetNum}")
return None
with self.response_lock:
# 检查结果是否有效
if not result_queue:
return None
result = result_queue.popleft()
# 验证响应类型
if result[0] != expectedType + 10: # 响应类型 = 命令类型 + 10
logging.error(f"Response type mismatch. Expected:{expectedType+10}, Got:{result[0]}")
return None
return result[1]
finally:
with self.response_lock:
if packetNum in self.pending_responses:
del self.pending_responses[packetNum]
def receiveThread(self):
buffer = b''
while not self.stopEvent.is_set():
try:
if not self.connected:
4 months ago
time.sleep(0.5)
4 months ago
continue
data = self.sock.recv(4096)
if not data:
self.connected = False
continue
buffer += data
while True:
startPos = buffer.find(self.START_FLAG)
if startPos == -1:
break
# 移除起始标志前的无效数据
if startPos > 0:
buffer = buffer[startPos:]
# 检查是否收到完整头部
if len(buffer) < len(self.START_FLAG) + self.HEADER_SIZE:
break
# 解析头部
header = buffer[len(self.START_FLAG):len(self.START_FLAG) + self.HEADER_SIZE]
dataType, dataSize, packetNum = struct.unpack('>H I I', header)
# 检查是否收到完整数据包
frameLen = len(self.START_FLAG) + self.HEADER_SIZE + dataSize + len(self.END_FLAG)
if len(buffer) < frameLen:
break
# 提取完整数据帧
fullFrame = buffer[:frameLen]
buffer = buffer[frameLen:]
# 验证结束标志
if fullFrame[-len(self.END_FLAG):] != self.END_FLAG:
logging.warning("Invalid end flag. Discarding packet.")
continue
# 处理数据包
body = fullFrame[len(self.START_FLAG) + self.HEADER_SIZE:-len(self.END_FLAG)]
self.processData(dataType, body, packetNum)
except Exception as e:
logging.error(f"Receive error: {e}")
self.connected = False
def processData(self, dataType, body, packetNum):
try:
result = None
4 months ago
# print(body, dataType)
4 months ago
# AI响应处理
if dataType == self.RESP_READ_AI:
4 months ago
# print(body)
if len(body) != 36000:
4 months ago
logging.error(f"Invalid AI response length: {len(body)}")
return
# print(1)
4 months ago
for i in range(500):
start_idx = i * 72
end_idx_ai = start_idx + 64
end_idx_di = start_idx + 72
# 解析8个AI值 (64字节)
ai_chunk = body[start_idx:end_idx_ai]
aiValues = struct.unpack('<8d', ai_chunk)
# 解析16个DI状态 (8字节)
di_chunk = body[end_idx_ai:end_idx_di]
di_double = struct.unpack('<d', di_chunk)[0]
# print(di_double)
diBools = [(int(di_double) >> j) & 1 for j in range(16)]
# all_ai_values.append(ai_values)
# all_di_states.append(di_bools)
4 months ago
result = (aiValues, diBools)
# print(result)
4 months ago
# print(result)
4 months ago
if self.aiCallback:
self.aiCallback(aiValues, diBools, packetNum)
# DeltaT响应处理
elif dataType == self.RESP_READ_DELTAT:
if len(body) != 128:
logging.error(f"Invalid DeltaT response length: {len(body)}")
return
4 months ago
#(body)
deltatValues = struct.unpack('<16Q', body)
4 months ago
result = deltatValues
if self.deltatCallback:
self.deltatCallback(deltatValues, packetNum)
# AO响应处理
elif dataType == self.RESP_WRITE_AO:
4 months ago
print(len(body))
success = struct.unpack('<d', body)[0] == 0.0 if len(body)==8 else False
4 months ago
result = success
if self.aoCallback:
self.aoCallback(success, packetNum)
else:
logging.warning(f"Unknown response type: {dataType}")
return
# 唤醒等待线程
with self.response_lock:
if packetNum in self.pending_responses:
_, event, result_queue = self.pending_responses[packetNum]
result_queue.append((dataType, result))
event.set()
except Exception as e:
logging.error(f"Data processing error: {e}")
def readAIDI(self):
"""同步读取AI数据"""
packetNum = self.sendCommand(self.CMD_READ_AI)
if packetNum is None:
return None
return self.waitForResponse(packetNum, self.CMD_READ_AI) # 返回16通道AI和8通道DI
def readDeltaT(self):
"""同步读取DeltaT数据"""
packetNum = self.sendCommand(self.CMD_READ_DELTAT)
if packetNum is None:
return None
return self.waitForResponse(packetNum, self.CMD_READ_DELTAT)
def writeAo(self, aoData):
"""同步写入AO数据并返回结果"""
packetNum = self.sendCommand(self.CMD_WRITE_AO, aoData)
if packetNum is None:
return False
return self.waitForResponse(packetNum, self.CMD_WRITE_AO) or False
def shutdown(self):
self.stopEvent.set()
if self.sock:
self.sock.close()
# 唤醒所有等待线程
with self.response_lock:
for packetNum, (_, event, _) in list(self.pending_responses.items()):
event.set()
del self.pending_responses[packetNum]
logging.info("Communicator stopped")
import time
import logging
import threading
import random
# from procli import Box1Communicator # 导入前面修改的客户端类
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("box1_client_test.log"),
logging.StreamHandler()
]
)
# 全局状态
class TestState:
running = True
read_count = 0
read_count_1 = 0
write_count = 0
last_ai = None
last_deltat = None
# 回调函数定义
def ai_callback(ai_values, di_states, packet_num):
TestState.last_ai = (ai_values, di_states)
TestState.read_count += 1
# if TestState.read_count % 10 == 0:
# logging.info(f"AI Callback - Packet {packet_num}: Received {TestState.read_count}th AI update")
logging.info(f" 读取到的AI: {ai_values}...")
logging.info(f" 读取到的DI: {di_states}...")
def deltat_callback(deltat_values, packet_num):
TestState.last_deltat = deltat_values
TestState.read_count_1 += 1
# if TestState.read_count_1 % 10 == 0:
# logging.info(f"DeltaT Callback - Packet {packet_num}: Received {TestState.read_count}th DeltaT update")
logging.info(f"读取到的delat: {deltat_values}...")
def ao_callback(success, packet_num):
TestState.write_count += 1
status = "SUCCESS" if success else "FAILED"
# if TestState.write_count % 10 == 0:
logging.info(f"写入AO成功 {packet_num}: Write {status} ({TestState.write_count}th write)")
def read_worker(communicator, interval):
"""循环读取数据的线程"""
while TestState.running:
try:
# 读取AI数据
communicator.readAIDI()
# 读取DeltaT数据
communicator.readDeltaT()
# 等待指定间隔
time.sleep(interval)
except Exception as e:
logging.error(f"Read worker error: {e}")
time.sleep(1)
def write_worker(communicator, interval):
"""循环写入数据的线程"""
ao_data = [0.0] * 36 # 初始AO数据
while TestState.running:
try:
# 随机修改一些AO值
for i in range(5):
idx = random.randint(0, 35)
ao_data[idx] = random.uniform(0.0, 5.0)
# 写入AO数据
# print(ao_data)
communicator.writeAo(ao_data)
# 等待指定间隔
time.sleep(interval)
except Exception as e:
logging.error(f"Write worker error: {e}")
time.sleep(1)
def main():
# 创建通信器实例
4 months ago
communicator = Box1Communicator("192.168.1.5", 5055)
4 months ago
# 注册回调函数
communicator.registerCallbacks(
aiCallback=ai_callback,
deltatCallback=deltat_callback,
aoCallback=ao_callback
)
4 months ago
#for i in range(1000):
4 months ago
4 months ago
#communicator.readAIDI()
#time.sleep(0.5)
# communicator.readDeltaT()
#communicator.writeAo([x for x in range(36)])
4 months ago
# 单步调试
# logging.info("Starting Box1 client test with continuous read/write...")
4 months ago
try:
4 months ago
# # 等待初始连接
4 months ago
time.sleep(1)
4 months ago
# # 启动读取线程 (每200ms读取一次)
4 months ago
read_thread = threading.Thread(target=read_worker, args=(communicator, 0.5), daemon=True)
read_thread.start()
4 months ago
# # 启动写入线程 (每500ms写入一次)
4 months ago
write_thread = threading.Thread(target=write_worker, args=(communicator, 0.5), daemon=True)
write_thread.start()
4 months ago
# # # 启动状态监控线程
4 months ago
# monitor_thread = threading.Thread(target=status_monitor, daemon=True)
4 months ago
# # monitor_thread.start()
# # 主线程等待用户输入退出
4 months ago
logging.info("Continuous read/write test running. Press Enter to stop...")
input()
4 months ago
4 months ago
except KeyboardInterrupt:
logging.info("Test interrupted by user")
finally:
# 设置停止标志
TestState.running = False
4 months ago
# # 等待工作线程结束
4 months ago
time.sleep(1)
4 months ago
# # 清理资源
communicator.shutdown()
# logging.info("Test completed. Final stats:")
# logging.info(f"Total reads: {TestState.read_count}")
# logging.info(f"Total writes: {TestState.write_count}")
if __name__ == "__main__":
main()