main
zcwBit 4 months ago
parent 640eb919c2
commit 88d82f5257

@ -0,0 +1,69 @@
import struct
import socket
import time
# commond
# 1、Start
# 2、Stop
# 3、Quit
# 4、Data
def getRealAO(x,highValue, lowValue):
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
return (16 * (x - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue))
else:
return x/1000
class AnalogDataPacket(object):
def __init__(self, commond: int, PacketSerialNo: int, lis: list):
self.PacketSerialNo = PacketSerialNo
# print(struct.pack('d', struct.pack('B' * 8, *lis[16:])))
DO = ''
for x in reversed(lis[16:]):
DO += str(int(x))
# print(DO)s
DO = int(DO, 2)
print(lis)
valueList = lis[:16] + [DO]
self.value = struct.pack('d' * 17, *valueList)
self.commond = commond
self.buf = [
0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314
]
# print(self.buf)
def packBytes(self):
packstyle = '>HHL{}sH'.format(str(len(self.value)))
req = struct.pack(packstyle, *self.buf)
return req
class AnalogClient(object):
def __init__(self, url):
# print(url)
host, port = url.split(':')
self._host = host
self._port = int(port)
self.socket = socket.socket()
self.packNo = 0X000001
def connect(self):
self.socket.connect((self._host, self._port))
def close(self):
self.socket.close()
def writeData(self, lis: list):
l = [x for x in lis]
for i in range(8):
l[i] = l[i]
pack = AnalogDataPacket(4, self.packNo, l)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
# print(data)
return data

@ -57,8 +57,6 @@ class Box1Communicator:
if not self.connected: if not self.connected:
self.connect() self.connect()
# 重连后执行就绪检查 # 重连后执行就绪检查
if self.connected:
self.checkServerReady() # 新增:检查服务器就绪状态
time.sleep(0.5) time.sleep(0.5)
def connect(self): def connect(self):
@ -217,28 +215,39 @@ class Box1Communicator:
logging.error(f"Invalid AI response length: {len(body)}") logging.error(f"Invalid AI response length: {len(body)}")
return return
# print(1) # print(1)
for i in range(500): l = []
start_idx = i * 72 for i in range(9):
end_idx_ai = start_idx + 64 # print(i)
end_idx_di = start_idx + 72 start_idx = i * 4000
#end_idx_ai = start_idx + 64
#end_idx_di = start_idx + 72
# 解析8个AI值 (64字节)
ai_chunk = body[start_idx:end_idx_ai]
aiValues = struct.unpack('<8d', ai_chunk)
# 解析8个AI值 (64字节)
ai_chunk = body[start_idx:start_idx + 8]
#print(ai_chunk)
if i == 8:
diValues = struct.unpack('<d', ai_chunk)[0]
diBools = [(int(diValues) >> j) & 1 for j in range(16)]
l.append(diBools)
else:
aiValues = struct.unpack('<d', ai_chunk)
l.append(aiValues)
# 解析16个DI状态 (8字节) # 解析16个DI状态 (8字节)
di_chunk = body[end_idx_ai:end_idx_di] #di_chunk = body[end_idx_ai:end_idx_di]
di_double = struct.unpack('<d', di_chunk)[0] #di_double = struct.unpack('<d', di_chunk)[0]
# print(di_double) # print(di_double)
diBools = [(int(di_double) >> j) & 1 for j in range(16)] #diBools = [(int(di_double) >> j) & 1 for j in range(16)]
# all_ai_values.append(ai_values) # all_ai_values.append(ai_values)
# all_di_states.append(di_bools) # all_di_states.append(di_bools)
result = (aiValues, diBools) result = (l[0:8], l[8])
#print(result) #print(result)
# print(result) # print(result)
if self.aiCallback: if self.aiCallback:
self.aiCallback(aiValues, diBools, packetNum) self.aiCallback(result[0], result[1], packetNum)
# DeltaT响应处理 # DeltaT响应处理
elif dataType == self.RESP_READ_DELTAT: elif dataType == self.RESP_READ_DELTAT:
@ -383,12 +392,11 @@ def write_worker(communicator, interval):
while TestState.running: while TestState.running:
try: try:
# 随机修改一些AO值 # 随机修改一些AO值
for i in range(5): #for i in range(5):
idx = random.randint(0, 35) ao_data = [0.004,0.005,0.006,0.007,0.008,0.009,0.010,0.011,0.012,0.013,0.014,0.015,0.016,0.017,0.018,0.019,0,1,100,255,1,2,3,4,5,6,7,8,100,200,300,400,500,600,700,800]
ao_data[idx] = random.uniform(0.0, 5.0)
# 写入AO数据 # 写入AO数据
# print(ao_data) print(ao_data)
communicator.writeAo(ao_data) communicator.writeAo(ao_data)
# 等待指定间隔 # 等待指定间隔
@ -401,7 +409,7 @@ def write_worker(communicator, interval):
def main(): def main():
# 创建通信器实例 # 创建通信器实例
communicator = Box1Communicator("192.168.1.5", 5055) communicator = Box1Communicator("192.168.1.50", 5055)
@ -416,8 +424,14 @@ def main():
#communicator.readAIDI() #communicator.readAIDI()
#time.sleep(0.5) #time.sleep(0.5)
#ao_data = [0.004,0.005,0.006,0.007,0.008,0.009,0.010,0.011,0.012,0.013,0.014,0.015,0.016,0.017,0.018,0.019,2,1,200,255,1,2,3,4,5,6,7,8,100,200,300,400,500,600,700,800]
#communicator.writeAo(ao_data)
#time.sleep(2)
#communicator.readDeltaT() #communicator.readDeltaT()
#communicator.writeAo([x for x in range(36)]) #time.sleep(1)
#communicator.readDeltaT()
#
# 单步调试 # 单步调试
@ -437,7 +451,7 @@ def main():
# # # 启动状态监控线程 # # # 启动状态监控线程
# monitor_thread = threading.Thread(target=status_monitor, daemon=True) # monitor_thread = threading.Thread(target=status_monitor, daemon=True)
# # monitor_thread.start() # monitor_thread.start()
# # 主线程等待用户输入退出 # # 主线程等待用户输入退出
logging.info("Continuous read/write test running. Press Enter to stop...") logging.info("Continuous read/write test running. Press Enter to stop...")
@ -450,10 +464,10 @@ def main():
TestState.running = False TestState.running = False
# # 等待工作线程结束 # # 等待工作线程结束
time.sleep(1) # time.sleep(1)
# # 清理资源 # # 清理资源
communicator.shutdown() #communicator.shutdown()
# logging.info("Test completed. Final stats:") # logging.info("Test completed. Final stats:")
# logging.info(f"Total reads: {TestState.read_count}") # logging.info(f"Total reads: {TestState.read_count}")
# logging.info(f"Total writes: {TestState.write_count}") # logging.info(f"Total writes: {TestState.write_count}")

@ -0,0 +1,86 @@
import struct
import socket
import time
# commond
# 1、Start
# 2、Stop
# 3、Quit
# 4、Data
class RTDTCdataPacket(object):
def __init__(self, commond: int, PacketSerialNo: int, lis: list):
self.PacketSerialNo = PacketSerialNo
self.value = struct.pack('d' * len(lis), *lis)
self.commond = commond
print(lis)
self.buf = [
0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314
]
# print(self.buf)
def packBytes(self):
packstyle = '>HHL{}sH'.format(str(len(self.value)))
# print(packstyle)
req = struct.pack(packstyle, *self.buf)
return req
class RTDTCClient(object):
def __init__(self, url):
# print(url)
host, port = url.split(':')
self._host = host
self._port = int(port)
self.socket = socket.socket()
self.packNo = 0X000001
def connect(self):
try:
self.socket.connect((self._host, self._port))
except:
pass
def start(self):
self.connect()
self.packNo = 0X000001
pack = RTDTCdataPacket(1, self.packNo, [0] * 16)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
return data
def stop(self):
pack = RTDTCdataPacket(2, self.packNo, [0] * 16)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
return data
def quit(self):
pack = RTDTCdataPacket(3, self.packNo, [0] * 16)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
self.socket.close()
return data
def writeData(self, lis: list):
pack = RTDTCdataPacket(4, self.packNo, lis)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
return data
if __name__ == '__main__':
a = RTDTCClient('127.0.0.1:6350')
# a.connect()
a.start()
# print(1)
for x in range(50):
time.sleep(1)
a.writeData([x + 0.001] * 16)
# print(2)
a.stop()
Loading…
Cancel
Save