import struct import socket import time # commond # 1、Start # 2、Stop # 3、Quit # 4、Data def getRealAO(x,highValue, lowValue): if highValue: lowValue = float(lowValue) highValue = float(highValue) return (16 * (x - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue)) else: return x/1000 class AnalogDataPacket(object): def __init__(self, commond: int, PacketSerialNo: int, lis: list): self.PacketSerialNo = PacketSerialNo # print(struct.pack('d', struct.pack('B' * 8, *lis[16:]))) DO = '' for x in reversed(lis[16:]): DO += str(int(x)) # print(DO)s DO = int(DO, 2) # print(lis) valueList = lis[:16] + [DO] self.value = struct.pack('d' * 17, *valueList) self.commond = commond self.buf = [ 0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314 ] # print(self.buf) def packBytes(self): packstyle = '>HHL{}sH'.format(str(len(self.value))) req = struct.pack(packstyle, *self.buf) return req class AnalogClient(object): def __init__(self, url): # print(url) host, port = url.split(':') self._host = host self._port = int(port) self.socket = socket.socket() self.packNo = 0X000001 def connect(self): self.socket.connect((self._host, self._port)) def close(self): self.socket.close() def writeData(self, lis: list): l = [x for x in lis] for i in range(8): l[i] = l[i] pack = AnalogDataPacket(4, self.packNo, l) self.socket.send(pack.packBytes()) self.packNo += 1 res = self.socket.recv(8) data = struct.unpack('>HLH', res) # print(data) return data