You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

70 lines
1.8 KiB
Python

4 months ago
import struct
import socket
import time
# commond
# 1、Start
# 2、Stop
# 3、Quit
# 4、Data
def getRealAO(x,highValue, lowValue):
if highValue:
lowValue = float(lowValue)
highValue = float(highValue)
return (16 * (x - lowValue) + 4 * (highValue-lowValue))/(1000 * (highValue - lowValue))
else:
return x/1000
class AnalogDataPacket(object):
def __init__(self, commond: int, PacketSerialNo: int, lis: list):
self.PacketSerialNo = PacketSerialNo
# print(struct.pack('d', struct.pack('B' * 8, *lis[16:])))
DO = ''
for x in reversed(lis[16:]):
DO += str(int(x))
# print(DO)s
DO = int(DO, 2)
print(lis)
valueList = lis[:16] + [DO]
self.value = struct.pack('d' * 17, *valueList)
self.commond = commond
self.buf = [
0xF312, self.commond, self.PacketSerialNo, self.value, 0xF314
]
# print(self.buf)
def packBytes(self):
packstyle = '>HHL{}sH'.format(str(len(self.value)))
req = struct.pack(packstyle, *self.buf)
return req
class AnalogClient(object):
def __init__(self, url):
# print(url)
host, port = url.split(':')
self._host = host
self._port = int(port)
self.socket = socket.socket()
self.packNo = 0X000001
def connect(self):
self.socket.connect((self._host, self._port))
def close(self):
self.socket.close()
def writeData(self, lis: list):
l = [x for x in lis]
for i in range(8):
l[i] = l[i]
pack = AnalogDataPacket(4, self.packNo, l)
self.socket.send(pack.packBytes())
self.packNo += 1
res = self.socket.recv(8)
data = struct.unpack('>HLH', res)
# print(data)
return data