import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_tcp, hooks import logging import struct import threading import time # from protocol.ModBus.ByteOrder import * import threading class SearchSlaveThread(threading.Thread): def __init__(self, callback, master): super().__init__() self.stopEvent = threading.Event() self.callback = callback self.master = master def run(self): for address in range(1, 126): if not self.stopEvent.is_set(): self.callback(address, self.master.judgeSlave(address)) def stop(self): self.stopEvent.set() class DPV1Master(): resetData = struct.unpack('>hhhh', b'\x00\x00\x00\x00\x00\x00\x00\x00') def __init__(self, host, port): try: self.master = modbus_tcp.TcpMaster(host = host, port = port) self.master.set_timeout(5.0) except Exception as e: self.master = None # hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv) # hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend) def writeMultipleRegister(self, slaveId, address, outputValue): if not self.master: return 'error' try: self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=outputValue) except Exception as e: return 'error' def readHoldingRegisters(self, slaveId, startAddress, varNums): if not self.master: return 'error' try: value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums) return value except: return '读取错误' def areAllZeros(self, tup): return all(x == 0 for x in tup) def searchSlave(self, callback = None): # self.searchSlaveThread = SearchSlaveThread(callback = callback, master = self) # self.searchSlaveThread.start() data = self.readHoldingRegisters(1, 850, 64) dataHex = struct.pack('>64h', *data) indices = [i for i, byte in enumerate(dataHex) if byte == 1] return ",".join(map(str, indices)) def closeThread(self): self.searchSlaveThread.stop() def readParm(self, address, slot, index, length, callback = None): length = int(length) hexAddress = address.to_bytes(1, byteorder='little') slotByte = slot.to_bytes(1, byteorder='little') indexByte = index.to_bytes(1, byteorder='little') readByteStream = b'\x01' + hexAddress + slotByte + indexByte + b'\xF0\x00' readDate = struct.unpack('>hhh', readByteStream) # self.writeMultipleRegister(1, 750, self.resetData) # time.sleep(0.1) self.writeMultipleRegister(1, 750, readDate) time.sleep(1.1) value = self.readHoldingRegisters(1, 750, -(-length // 2)) # -(-length // 2)向上取整 # print(value, 222) if value[0] == 57344: self.writeMultipleRegister(1, 750, self.resetData) time.sleep(0.1) return '读取错误' value = struct.pack('>{}H'.format(len(value)), *value)[:length] # print(value) # print(address, slot, index) # print(value) self.writeMultipleRegister(1, 750, self.resetData) time.sleep(0.1) # if callable(callback): # callback(value) # return return value def writeParm(self, address, slot, index, length, valueByte): length = int(length) hexAddress = address.to_bytes(1, byteorder='little') slotByte = slot.to_bytes(1, byteorder='little') indexByte = index.to_bytes(1, byteorder='little') lengthByte = length.to_bytes(1, byteorder='little') writeByteStream = b'\x02' + hexAddress + slotByte + indexByte + lengthByte + valueByte writeByteStream = writeByteStream if len(writeByteStream) % 2 == 0 else writeByteStream + b'\x00' self.writeMultipleRegister(1, 750, self.resetData) # print(len(writeByteStream)) writeDate = struct.unpack('>{}h'.format(len(writeByteStream) // 2), writeByteStream) self.writeMultipleRegister(1, 750, writeDate) time.sleep(0.4) value = self.readHoldingRegisters(1, 750, 2) if value[0] == 57344: result = '写入错误' else: result = '写入成功' self.writeMultipleRegister(1, 750, self.resetData) time.sleep(0.1) return result def judgeSlave(self, address): hexAddress = address.to_bytes(1, byteorder='little') searchByteStream = b'\x01' + hexAddress + b'\x01\x00\xF0\x00' searchDate = struct.unpack('>hhh', searchByteStream) self.writeMultipleRegister(1, 750, searchDate) time.sleep(1.1) dir = self.readHoldingRegisters(1, 750, 6) time.sleep(0.1) # print(dir) self.writeMultipleRegister(1, 750, self.resetData) time.sleep(0.1) if not self.areAllZeros(dir) and dir != 'error': return True else: return False def editDevAddress(self, oldAddress, newAddress, identNumer): oldAddressHex = oldAddress.to_bytes(length=1, byteorder='little') newAddressHex = newAddress.to_bytes(1, byteorder='little') closeProtocolHex = b'\x05\x00' closeProtocolData = struct.unpack('>h', closeProtocolHex) self.writeMultipleRegister(1, 750, closeProtocolHex) time.sleep(0.4) # print(identNumer) idHighHex = int(identNumer[:2], 16).to_bytes(1, byteorder='little') idLowHex = int(identNumer[2:], 16).to_bytes(1, byteorder='little') editAddressStream = b'\x03' + oldAddressHex + b'\x00'+ newAddressHex + b'\x00' + idHighHex + b'\x00' + idLowHex # print(editAddressStream) editAddressDate = struct.unpack('>hhhh', editAddressStream) # print(editAddressDate) self.writeMultipleRegister(1, 750, self.resetData) self.writeMultipleRegister(1, 750, editAddressDate) time.sleep(0.4) value = self.readHoldingRegisters(1, 750, 2) if value[0] == 57344: result = '修改错误' else: result = '修改成功' self.writeMultipleRegister(1, 750, self.resetData) time.sleep(0.1) return result # def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): # try: # if order == 'int': # valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0] # else: # value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2) # if order == 'ABCD': # 大端模式 # valueByte = ABCDToFloat(value) # elif order == 'DCBA': # 小端模式 # valueByte = DCBAToFloat(value) # elif order == 'BADC': # valueByte = BADCToFloat(value) # elif order == 'CDAB': # valueByte = CDABToFloat(value) # return valueByte # except Exception as e: # return 'error' # def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'): # try: # value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums) # return value[0] # except Exception as e: # return 'error' # def readInputCoils(self, slaveId, startAddress, varNums): # print(slaveId, startAddress, varNums) # try: # value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums) # return value[0] # except: # return 'error' # # print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5]) # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f') # self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f') # self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1) # self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]) # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f')) # Read and write floats # send some queries # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12)) if __name__ == "__main__": tcpMaster = DPV1Master('192.168.3.10', 502) tcpMaster.searchSlave() # tcpMaster.readHoldingRegisters(1, 6, 2) # tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5]) # # master = modbus_tcp.TcpMaster(host="localhost", port=502) # values = [6] # master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)