0618更新
parent
750ae65fb9
commit
68961ec9cc
@ -1,69 +0,0 @@
|
||||
import struct
|
||||
import socket
|
||||
import time
|
||||
# commond
|
||||
# 1、Start
|
||||
# 2、Stop
|
||||
# 3、Quit
|
||||
# 4、Data
|
||||
def getRealAO(x,highValue, lowValue):
|
||||
if highValue:
|
||||
lowValue = float(lowValue)
|
||||
highValue = float(highValue)
|
||||
return (16 * (x - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue))
|
||||
else:
|
||||
return x/1000
|
||||
|
||||
class AnalogDataPacket(object):
|
||||
def __init__(self, commond: int, PacketSerialNo: int, lis: list):
|
||||
self.PacketSerialNo = PacketSerialNo
|
||||
# print(struct.pack('d', struct.pack('B' * 8, *lis[16:])))
|
||||
DO = ''
|
||||
for x in reversed(lis[16:]):
|
||||
DO += str(int(x))
|
||||
# print(DO)s
|
||||
DO = int(DO, 2)
|
||||
# print(lis)
|
||||
valueList = lis[:16] + [DO]
|
||||
self.value = struct.pack('d' * 17, *valueList)
|
||||
self.commond = commond
|
||||
self.buf = [
|
||||
0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314
|
||||
]
|
||||
# print(self.buf)
|
||||
def packBytes(self):
|
||||
packstyle = '>HHL{}sH'.format(str(len(self.value)))
|
||||
req = struct.pack(packstyle, *self.buf)
|
||||
return req
|
||||
|
||||
|
||||
|
||||
class AnalogClient(object):
|
||||
def __init__(self, url):
|
||||
# print(url)
|
||||
host, port = url.split(':')
|
||||
self._host = host
|
||||
self._port = int(port)
|
||||
self.socket = socket.socket()
|
||||
self.packNo = 0X000001
|
||||
|
||||
def connect(self):
|
||||
self.socket.connect((self._host, self._port))
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
|
||||
def writeData(self, lis: list):
|
||||
l = [x for x in lis]
|
||||
for i in range(8):
|
||||
l[i] = l[i]
|
||||
pack = AnalogDataPacket(4, self.packNo, l)
|
||||
self.socket.send(pack.packBytes())
|
||||
self.packNo += 1
|
||||
res = self.socket.recv(8)
|
||||
data = struct.unpack('>HLH', res)
|
||||
# print(data)
|
||||
return data
|
||||
|
||||
|
||||
|
@ -0,0 +1,427 @@
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
class Box1Communicator:
|
||||
START_FLAG = b'\xF3\x12'
|
||||
END_FLAG = b'\xF3\x14'
|
||||
HEADER_SIZE = 10
|
||||
|
||||
# 命令类型
|
||||
CMD_READ_AI = 1
|
||||
CMD_WRITE_AO = 2
|
||||
CMD_READ_DELTAT = 3
|
||||
RESP_READ_AI = 11
|
||||
RESP_WRITE_AO = 12
|
||||
RESP_READ_DELTAT = 13
|
||||
|
||||
def __init__(self, host, port, reconnectInterval=3):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.reconnectInterval = reconnectInterval
|
||||
|
||||
self.sock = None
|
||||
self.connected = False
|
||||
self.packetCounter = 0
|
||||
self.lock = threading.Lock()
|
||||
self.stopEvent = threading.Event()
|
||||
|
||||
# 回调函数
|
||||
self.aiCallback = None
|
||||
self.deltatCallback = None
|
||||
self.aoCallback = None
|
||||
|
||||
# 响应等待机制
|
||||
self.pending_responses = {}
|
||||
self.response_lock = threading.Lock()
|
||||
self.connect()
|
||||
|
||||
# 启动连接和接收线程
|
||||
threading.Thread(target=self.connectThread, daemon=True).start()
|
||||
threading.Thread(target=self.receiveThread, daemon=True).start()
|
||||
|
||||
def registerCallbacks(self, aiCallback=None, deltatCallback=None, aoCallback=None):
|
||||
self.aiCallback = aiCallback
|
||||
self.deltatCallback = deltatCallback
|
||||
self.aoCallback = aoCallback
|
||||
|
||||
def connectThread(self):
|
||||
while not self.stopEvent.is_set():
|
||||
if not self.connected:
|
||||
self.connect()
|
||||
time.sleep(1)
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.bind((self.host, 6323))
|
||||
self.sock.connect((self.host, self.port))
|
||||
self.connected = True
|
||||
self.packetCounter = 0
|
||||
logging.info(f"Connected to {self.host}:{self.port}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(f"Connection failed: {e}")
|
||||
self.connected = False
|
||||
time.sleep(self.reconnectInterval)
|
||||
return False
|
||||
|
||||
def sendCommand(self, cmdType, data=None):
|
||||
if not self.connected:
|
||||
if not self.connect():
|
||||
return None # 连接失败
|
||||
|
||||
try:
|
||||
with self.lock:
|
||||
body = b''
|
||||
if cmdType == self.CMD_WRITE_AO and data:
|
||||
if len(data) != 36:
|
||||
raise ValueError("AO data requires 36 elements")
|
||||
body = struct.pack('>' + 'd'*36, *data)
|
||||
elif cmdType == self.CMD_READ_AI or cmdType == self.CMD_READ_DELTAT:
|
||||
body = struct.pack('>d', 0) # 占位数据
|
||||
|
||||
dataSize = len(body)
|
||||
current_counter = self.packetCounter
|
||||
header = struct.pack('>H I I', cmdType, dataSize, current_counter)
|
||||
packet = self.START_FLAG + header + body + self.END_FLAG
|
||||
|
||||
# 创建等待事件
|
||||
response_event = threading.Event()
|
||||
with self.response_lock:
|
||||
self.pending_responses[current_counter] = (cmdType, response_event, deque())
|
||||
|
||||
self.sock.sendall(packet)
|
||||
self.packetCounter += 1
|
||||
return current_counter
|
||||
except Exception as e:
|
||||
logging.error(f"Send error: {e}")
|
||||
self.connected = False
|
||||
return None
|
||||
|
||||
def waitForResponse(self, packetNum, expectedType, timeout=5):
|
||||
"""等待指定packetNum的响应"""
|
||||
try:
|
||||
with self.response_lock:
|
||||
if packetNum not in self.pending_responses:
|
||||
logging.error(f"Invalid packetNum: {packetNum}")
|
||||
return None
|
||||
|
||||
_, event, result_queue = self.pending_responses[packetNum]
|
||||
|
||||
# 等待响应或超时
|
||||
if not event.wait(timeout):
|
||||
logging.warning(f"Response timeout for packet {packetNum}")
|
||||
return None
|
||||
|
||||
with self.response_lock:
|
||||
# 检查结果是否有效
|
||||
if not result_queue:
|
||||
return None
|
||||
|
||||
result = result_queue.popleft()
|
||||
# 验证响应类型
|
||||
if result[0] != expectedType + 10: # 响应类型 = 命令类型 + 10
|
||||
logging.error(f"Response type mismatch. Expected:{expectedType+10}, Got:{result[0]}")
|
||||
return None
|
||||
|
||||
return result[1]
|
||||
finally:
|
||||
with self.response_lock:
|
||||
if packetNum in self.pending_responses:
|
||||
del self.pending_responses[packetNum]
|
||||
|
||||
def receiveThread(self):
|
||||
buffer = b''
|
||||
while not self.stopEvent.is_set():
|
||||
try:
|
||||
if not self.connected:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
data = self.sock.recv(4096)
|
||||
if not data:
|
||||
self.connected = False
|
||||
continue
|
||||
|
||||
buffer += data
|
||||
|
||||
while True:
|
||||
startPos = buffer.find(self.START_FLAG)
|
||||
if startPos == -1:
|
||||
break
|
||||
|
||||
# 移除起始标志前的无效数据
|
||||
if startPos > 0:
|
||||
buffer = buffer[startPos:]
|
||||
|
||||
# 检查是否收到完整头部
|
||||
if len(buffer) < len(self.START_FLAG) + self.HEADER_SIZE:
|
||||
break
|
||||
|
||||
# 解析头部
|
||||
header = buffer[len(self.START_FLAG):len(self.START_FLAG) + self.HEADER_SIZE]
|
||||
dataType, dataSize, packetNum = struct.unpack('>H I I', header)
|
||||
|
||||
# 检查是否收到完整数据包
|
||||
frameLen = len(self.START_FLAG) + self.HEADER_SIZE + dataSize + len(self.END_FLAG)
|
||||
if len(buffer) < frameLen:
|
||||
break
|
||||
|
||||
# 提取完整数据帧
|
||||
fullFrame = buffer[:frameLen]
|
||||
buffer = buffer[frameLen:]
|
||||
|
||||
# 验证结束标志
|
||||
if fullFrame[-len(self.END_FLAG):] != self.END_FLAG:
|
||||
logging.warning("Invalid end flag. Discarding packet.")
|
||||
continue
|
||||
|
||||
# 处理数据包
|
||||
body = fullFrame[len(self.START_FLAG) + self.HEADER_SIZE:-len(self.END_FLAG)]
|
||||
self.processData(dataType, body, packetNum)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Receive error: {e}")
|
||||
self.connected = False
|
||||
|
||||
def processData(self, dataType, body, packetNum):
|
||||
try:
|
||||
result = None
|
||||
|
||||
# AI响应处理
|
||||
if dataType == self.RESP_READ_AI:
|
||||
if len(body) != 72:
|
||||
logging.error(f"Invalid AI response length: {len(body)}")
|
||||
return
|
||||
# print(1)
|
||||
aiValues = struct.unpack('>8d', body[:64])
|
||||
diBools = [(struct.unpack('>Q', body[64:72])[0] >> i) & 1 for i in range(16)]
|
||||
result = (aiValues, diBools)
|
||||
# print(result)
|
||||
if self.aiCallback:
|
||||
self.aiCallback(aiValues, diBools, packetNum)
|
||||
|
||||
# DeltaT响应处理
|
||||
elif dataType == self.RESP_READ_DELTAT:
|
||||
if len(body) != 128:
|
||||
logging.error(f"Invalid DeltaT response length: {len(body)}")
|
||||
return
|
||||
|
||||
deltatValues = struct.unpack('>16d', body)
|
||||
result = deltatValues
|
||||
if self.deltatCallback:
|
||||
self.deltatCallback(deltatValues, packetNum)
|
||||
|
||||
# AO响应处理
|
||||
elif dataType == self.RESP_WRITE_AO:
|
||||
success = struct.unpack('>d', body)[0] == 0.0 if len(body)==8 else False
|
||||
result = success
|
||||
if self.aoCallback:
|
||||
self.aoCallback(success, packetNum)
|
||||
else:
|
||||
logging.warning(f"Unknown response type: {dataType}")
|
||||
return
|
||||
|
||||
# 唤醒等待线程
|
||||
with self.response_lock:
|
||||
if packetNum in self.pending_responses:
|
||||
_, event, result_queue = self.pending_responses[packetNum]
|
||||
result_queue.append((dataType, result))
|
||||
event.set()
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Data processing error: {e}")
|
||||
|
||||
def readAIDI(self):
|
||||
"""同步读取AI数据"""
|
||||
packetNum = self.sendCommand(self.CMD_READ_AI)
|
||||
if packetNum is None:
|
||||
return None
|
||||
return self.waitForResponse(packetNum, self.CMD_READ_AI) # 返回16通道AI和8通道DI
|
||||
|
||||
def readDeltaT(self):
|
||||
"""同步读取DeltaT数据"""
|
||||
packetNum = self.sendCommand(self.CMD_READ_DELTAT)
|
||||
if packetNum is None:
|
||||
return None
|
||||
return self.waitForResponse(packetNum, self.CMD_READ_DELTAT)
|
||||
|
||||
def writeAo(self, aoData):
|
||||
"""同步写入AO数据并返回结果"""
|
||||
packetNum = self.sendCommand(self.CMD_WRITE_AO, aoData)
|
||||
if packetNum is None:
|
||||
return False
|
||||
return self.waitForResponse(packetNum, self.CMD_WRITE_AO) or False
|
||||
|
||||
def shutdown(self):
|
||||
self.stopEvent.set()
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
|
||||
# 唤醒所有等待线程
|
||||
with self.response_lock:
|
||||
for packetNum, (_, event, _) in list(self.pending_responses.items()):
|
||||
event.set()
|
||||
del self.pending_responses[packetNum]
|
||||
|
||||
logging.info("Communicator stopped")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
import random
|
||||
# from procli import Box1Communicator # 导入前面修改的客户端类
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler("box1_client_test.log"),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
# 全局状态
|
||||
class TestState:
|
||||
running = True
|
||||
read_count = 0
|
||||
read_count_1 = 0
|
||||
write_count = 0
|
||||
last_ai = None
|
||||
last_deltat = None
|
||||
|
||||
# 回调函数定义
|
||||
def ai_callback(ai_values, di_states, packet_num):
|
||||
TestState.last_ai = (ai_values, di_states)
|
||||
TestState.read_count += 1
|
||||
# if TestState.read_count % 10 == 0:
|
||||
# logging.info(f"AI Callback - Packet {packet_num}: Received {TestState.read_count}th AI update")
|
||||
logging.info(f" 读取到的AI: {ai_values}...")
|
||||
logging.info(f" 读取到的DI: {di_states}...")
|
||||
|
||||
def deltat_callback(deltat_values, packet_num):
|
||||
TestState.last_deltat = deltat_values
|
||||
TestState.read_count_1 += 1
|
||||
# if TestState.read_count_1 % 10 == 0:
|
||||
# logging.info(f"DeltaT Callback - Packet {packet_num}: Received {TestState.read_count}th DeltaT update")
|
||||
logging.info(f"读取到的delat: {deltat_values}...")
|
||||
|
||||
def ao_callback(success, packet_num):
|
||||
TestState.write_count += 1
|
||||
status = "SUCCESS" if success else "FAILED"
|
||||
# if TestState.write_count % 10 == 0:
|
||||
logging.info(f"写入AO成功 {packet_num}: Write {status} ({TestState.write_count}th write)")
|
||||
|
||||
def read_worker(communicator, interval):
|
||||
"""循环读取数据的线程"""
|
||||
while TestState.running:
|
||||
try:
|
||||
# 读取AI数据
|
||||
communicator.readAIDI()
|
||||
|
||||
# 读取DeltaT数据
|
||||
communicator.readDeltaT()
|
||||
|
||||
# 等待指定间隔
|
||||
time.sleep(interval)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Read worker error: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
def write_worker(communicator, interval):
|
||||
"""循环写入数据的线程"""
|
||||
ao_data = [0.0] * 36 # 初始AO数据
|
||||
|
||||
while TestState.running:
|
||||
try:
|
||||
# 随机修改一些AO值
|
||||
for i in range(5):
|
||||
idx = random.randint(0, 35)
|
||||
ao_data[idx] = random.uniform(0.0, 5.0)
|
||||
|
||||
# 写入AO数据
|
||||
# print(ao_data)
|
||||
communicator.writeAo(ao_data)
|
||||
|
||||
# 等待指定间隔
|
||||
time.sleep(interval)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Write worker error: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def main():
|
||||
# 创建通信器实例
|
||||
communicator = Box1Communicator("localhost", 8000)
|
||||
|
||||
|
||||
|
||||
# 注册回调函数
|
||||
communicator.registerCallbacks(
|
||||
aiCallback=ai_callback,
|
||||
deltatCallback=deltat_callback,
|
||||
aoCallback=ao_callback
|
||||
)
|
||||
|
||||
communicator.readAIDI()
|
||||
communicator.readDeltaT()
|
||||
communicator.writeAo([1.0] * 36)
|
||||
|
||||
# 单步调试
|
||||
|
||||
# logging.info("Starting Box1 client test with continuous read/write...")
|
||||
|
||||
# try:
|
||||
# # 等待初始连接
|
||||
# time.sleep(1)
|
||||
|
||||
# # 启动读取线程 (每200ms读取一次)
|
||||
# read_thread = threading.Thread(target=read_worker, args=(communicator, 0.2), daemon=True)
|
||||
# read_thread.start()
|
||||
|
||||
# # 启动写入线程 (每500ms写入一次)
|
||||
# write_thread = threading.Thread(target=write_worker, args=(communicator, 0.2), daemon=True)
|
||||
# write_thread.start()
|
||||
|
||||
# # # 启动状态监控线程
|
||||
# # monitor_thread = threading.Thread(target=status_monitor, daemon=True)
|
||||
# # monitor_thread.start()
|
||||
|
||||
# # 主线程等待用户输入退出
|
||||
# logging.info("Continuous read/write test running. Press Enter to stop...")
|
||||
# input()
|
||||
|
||||
# except KeyboardInterrupt:
|
||||
# logging.info("Test interrupted by user")
|
||||
# finally:
|
||||
# # 设置停止标志
|
||||
# TestState.running = False
|
||||
|
||||
# # 等待工作线程结束
|
||||
# time.sleep(1)
|
||||
|
||||
# # 清理资源
|
||||
communicator.shutdown()
|
||||
# logging.info("Test completed. Final stats:")
|
||||
# logging.info(f"Total reads: {TestState.read_count}")
|
||||
# logging.info(f"Total writes: {TestState.write_count}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,86 +0,0 @@
|
||||
import struct
|
||||
import socket
|
||||
import time
|
||||
# commond
|
||||
# 1、Start
|
||||
# 2、Stop
|
||||
# 3、Quit
|
||||
# 4、Data
|
||||
|
||||
class RTDTCdataPacket(object):
|
||||
def __init__(self, commond: int, PacketSerialNo: int, lis: list):
|
||||
self.PacketSerialNo = PacketSerialNo
|
||||
self.value = struct.pack('d' * len(lis), *lis)
|
||||
self.commond = commond
|
||||
# print(lis)
|
||||
self.buf = [
|
||||
0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314
|
||||
]
|
||||
# print(self.buf)
|
||||
def packBytes(self):
|
||||
packstyle = '>HHL{}sH'.format(str(len(self.value)))
|
||||
# print(packstyle)
|
||||
req = struct.pack(packstyle, *self.buf)
|
||||
return req
|
||||
|
||||
|
||||
class RTDTCClient(object):
|
||||
def __init__(self, url):
|
||||
# print(url)
|
||||
host, port = url.split(':')
|
||||
self._host = host
|
||||
self._port = int(port)
|
||||
self.socket = socket.socket()
|
||||
self.packNo = 0X000001
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self.socket.connect((self._host, self._port))
|
||||
except:
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
self.connect()
|
||||
self.packNo = 0X000001
|
||||
pack = RTDTCdataPacket(1, self.packNo, [0] * 16)
|
||||
self.socket.send(pack.packBytes())
|
||||
self.packNo += 1
|
||||
res = self.socket.recv(8)
|
||||
data = struct.unpack('>HLH', res)
|
||||
return data
|
||||
|
||||
def stop(self):
|
||||
pack = RTDTCdataPacket(2, self.packNo, [0] * 16)
|
||||
self.socket.send(pack.packBytes())
|
||||
self.packNo += 1
|
||||
res = self.socket.recv(8)
|
||||
data = struct.unpack('>HLH', res)
|
||||
return data
|
||||
|
||||
def quit(self):
|
||||
pack = RTDTCdataPacket(3, self.packNo, [0] * 16)
|
||||
self.socket.send(pack.packBytes())
|
||||
self.packNo += 1
|
||||
res = self.socket.recv(8)
|
||||
data = struct.unpack('>HLH', res)
|
||||
self.socket.close()
|
||||
return data
|
||||
|
||||
def writeData(self, lis: list):
|
||||
pack = RTDTCdataPacket(4, self.packNo, lis)
|
||||
self.socket.send(pack.packBytes())
|
||||
self.packNo += 1
|
||||
res = self.socket.recv(8)
|
||||
data = struct.unpack('>HLH', res)
|
||||
return data
|
||||
|
||||
if __name__ == '__main__':
|
||||
a = RTDTCClient('127.0.0.1:6350')
|
||||
# a.connect()
|
||||
a.start()
|
||||
# print(1)
|
||||
for x in range(50):
|
||||
time.sleep(1)
|
||||
a.writeData([x + 0.001] * 16)
|
||||
# print(2)
|
||||
a.stop()
|
Loading…
Reference in New Issue