You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
9.3 KiB
Python

6 months ago
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp, hooks
import logging
import struct
import threading
import time
# from protocol.ModBus.ByteOrder import *
import threading
class SearchSlaveThread(threading.Thread):
def __init__(self, callback, master):
super().__init__()
self.stopEvent = threading.Event()
self.callback = callback
self.master = master
def run(self):
for address in range(1, 126):
if not self.stopEvent.is_set():
self.callback(address, self.master.judgeSlave(address))
def stop(self):
self.stopEvent.set()
class DPV1Master():
resetData = struct.unpack('>hhhh', b'\x00\x00\x00\x00\x00\x00\x00\x00')
def __init__(self, host, port):
try:
self.master = modbus_tcp.TcpMaster(host = host, port = port)
self.master.set_timeout(5.0)
except Exception as e:
self.master = None
# hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv)
# hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend)
def writeMultipleRegister(self, slaveId, address, outputValue):
if not self.master:
return 'error'
try:
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=outputValue)
except Exception as e:
return 'error'
def readHoldingRegisters(self, slaveId, startAddress, varNums):
if not self.master:
return 'error'
try:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)
return value
except:
return '读取错误'
def areAllZeros(self, tup):
return all(x == 0 for x in tup)
def searchSlave(self, callback = None):
# self.searchSlaveThread = SearchSlaveThread(callback = callback, master = self)
# self.searchSlaveThread.start()
data = self.readHoldingRegisters(1, 850, 64)
if data == '读取错误':
return data
dataHex = struct.pack('>64h', *data)
indices = [i for i, byte in enumerate(dataHex) if byte == 1]
return "".join(map(str, indices))
def closeThread(self):
self.searchSlaveThread.stop()
def readParm(self, address, slot, index, length, callback = None):
length = int(length)
hexAddress = address.to_bytes(1, byteorder='little')
slotByte = slot.to_bytes(1, byteorder='little')
indexByte = index.to_bytes(1, byteorder='little')
readByteStream = b'\x01' + hexAddress + slotByte + indexByte + b'\xF0\x00'
readDate = struct.unpack('>hhh', readByteStream)
# self.writeMultipleRegister(1, 750, self.resetData)
# time.sleep(0.1)
self.writeMultipleRegister(1, 750, readDate)
time.sleep(1.1)
value = self.readHoldingRegisters(1, 750, -(-length // 2)) # -(-length // 2)向上取整
# print(value, 222)
if value[0] == 57344:
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return '读取错误'
value = struct.pack('>{}H'.format(len(value)), *value)[:length]
# print(value)
# print(address, slot, index)
# print(value)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
# if callable(callback):
# callback(value)
# return
return value
def writeParm(self, address, slot, index, length, valueByte):
length = int(length)
hexAddress = address.to_bytes(1, byteorder='little')
slotByte = slot.to_bytes(1, byteorder='little')
indexByte = index.to_bytes(1, byteorder='little')
lengthByte = length.to_bytes(1, byteorder='little')
writeByteStream = b'\x02' + hexAddress + slotByte + indexByte + lengthByte + valueByte
writeByteStream = writeByteStream if len(writeByteStream) % 2 == 0 else writeByteStream + b'\x00'
self.writeMultipleRegister(1, 750, self.resetData)
# print(len(writeByteStream))
writeDate = struct.unpack('>{}h'.format(len(writeByteStream) // 2), writeByteStream)
self.writeMultipleRegister(1, 750, writeDate)
time.sleep(0.4)
value = self.readHoldingRegisters(1, 750, 2)
if value[0] == 57344:
result = '写入错误'
else:
result = '写入成功'
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return result
def judgeSlave(self, address):
# print(type(address))
hexAddress = address.to_bytes(1, byteorder='little')
searchByteStream = b'\x01' + hexAddress + b'\x01\x01\xF0\x00'
# print(searchByteStream)
searchDate = struct.unpack('>hhh', searchByteStream)
self.writeMultipleRegister(1, 750, searchDate)
time.sleep(1.1)
dir = self.readHoldingRegisters(1, 750, 6)
time.sleep(0.1)
# print(dir)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
# print(dir)
if not self.areAllZeros(dir) and dir != 'error':
return True
else:
return False
def editDevAddress(self, oldAddress, newAddress, identNumer):
oldAddressHex = oldAddress.to_bytes(length=1, byteorder='little')
newAddressHex = newAddress.to_bytes(1, byteorder='little')
closeProtocolHex = b'\x05\x00'
closeProtocolData = struct.unpack('>h', closeProtocolHex)
self.writeMultipleRegister(1, 750, closeProtocolData)
time.sleep(1.5)
# print(identNumer)
idHighHex = int(identNumer[:2], 16).to_bytes(1, byteorder='little')
idLowHex = int(identNumer[2:], 16).to_bytes(1, byteorder='little')
editAddressStream = b'\x03' + oldAddressHex + b'\x00'+ newAddressHex + b'\x00' + idHighHex + b'\x00' + idLowHex
# print(editAddressStream)
editAddressDate = struct.unpack('>hhhh', editAddressStream)
# print(editAddressDate)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.7)
self.writeMultipleRegister(1, 750, editAddressDate)
time.sleep(0.4)
value = self.readHoldingRegisters(1, 750, 2)
if value[0] == 57344:
result = '修改错误'
else:
result = '修改成功'
openProtocolHex = b'\x05\x01'
openProtocolData = struct.unpack('>h', openProtocolHex)
self.writeMultipleRegister(1, 750, openProtocolData)
time.sleep(0.2)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return result
# def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# if order == 'int':
# valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
# else:
# value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
# if order == 'ABCD': # 大端模式
# valueByte = ABCDToFloat(value)
# elif order == 'DCBA': # 小端模式
# valueByte = DCBAToFloat(value)
# elif order == 'BADC':
# valueByte = BADCToFloat(value)
# elif order == 'CDAB':
# valueByte = CDABToFloat(value)
# return valueByte
# except Exception as e:
# return 'error'
# def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# return value[0]
# except Exception as e:
# return 'error'
# def readInputCoils(self, slaveId, startAddress, varNums):
# print(slaveId, startAddress, varNums)
# try:
# value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# return value[0]
# except:
# return 'error'
#
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
# Read and write floats
# send some queries
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
if __name__ == "__main__":
tcpMaster = DPV1Master('192.168.3.10', 502)
tcpMaster.searchSlave()
# tcpMaster.readHoldingRegisters(1, 6, 2)
# tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5])
#
# master = modbus_tcp.TcpMaster(host="localhost", port=502)
# values = [6]
# master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)