You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
9.3 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp, hooks
import logging
import struct
import threading
import time
# from protocol.ModBus.ByteOrder import *
import threading
class SearchSlaveThread(threading.Thread):
def __init__(self, callback, master):
super().__init__()
self.stopEvent = threading.Event()
self.callback = callback
self.master = master
def run(self):
for address in range(1, 126):
if not self.stopEvent.is_set():
self.callback(address, self.master.judgeSlave(address))
def stop(self):
self.stopEvent.set()
class DPV1Master():
resetData = struct.unpack('>hhhh', b'\x00\x00\x00\x00\x00\x00\x00\x00')
def __init__(self, host, port):
try:
self.master = modbus_tcp.TcpMaster(host = host, port = port)
self.master.set_timeout(5.0)
except Exception as e:
self.master = None
# hooks.install_hook("modbus_tcp.TcpMaster.after_recv", afterRecv)
# hooks.install_hook("modbus_tcp.TcpMaster.after_send", afterSend)
def writeMultipleRegister(self, slaveId, address, outputValue):
if not self.master:
return 'error'
try:
self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=outputValue)
except Exception as e:
return 'error'
def readHoldingRegisters(self, slaveId, startAddress, varNums):
if not self.master:
return 'error'
try:
value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)
return value
except:
return '读取错误'
def areAllZeros(self, tup):
return all(x == 0 for x in tup)
def searchSlave(self, callback = None):
# self.searchSlaveThread = SearchSlaveThread(callback = callback, master = self)
# self.searchSlaveThread.start()
data = self.readHoldingRegisters(1, 850, 64)
if data == '读取错误':
return data
dataHex = struct.pack('>64h', *data)
indices = [i for i, byte in enumerate(dataHex) if byte == 1]
return "".join(map(str, indices))
def closeThread(self):
self.searchSlaveThread.stop()
def readParm(self, address, slot, index, length, callback = None):
length = int(length)
hexAddress = address.to_bytes(1, byteorder='little')
slotByte = slot.to_bytes(1, byteorder='little')
indexByte = index.to_bytes(1, byteorder='little')
readByteStream = b'\x01' + hexAddress + slotByte + indexByte + b'\xF0\x00'
readDate = struct.unpack('>hhh', readByteStream)
# self.writeMultipleRegister(1, 750, self.resetData)
# time.sleep(0.1)
self.writeMultipleRegister(1, 750, readDate)
time.sleep(1.1)
value = self.readHoldingRegisters(1, 750, -(-length // 2)) # -(-length // 2)向上取整
# print(value, 222)
if value[0] == 57344:
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return '读取错误'
value = struct.pack('>{}H'.format(len(value)), *value)[:length]
# print(value)
# print(address, slot, index)
# print(value)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
# if callable(callback):
# callback(value)
# return
return value
def writeParm(self, address, slot, index, length, valueByte):
length = int(length)
hexAddress = address.to_bytes(1, byteorder='little')
slotByte = slot.to_bytes(1, byteorder='little')
indexByte = index.to_bytes(1, byteorder='little')
lengthByte = length.to_bytes(1, byteorder='little')
writeByteStream = b'\x02' + hexAddress + slotByte + indexByte + lengthByte + valueByte
writeByteStream = writeByteStream if len(writeByteStream) % 2 == 0 else writeByteStream + b'\x00'
self.writeMultipleRegister(1, 750, self.resetData)
# print(len(writeByteStream))
writeDate = struct.unpack('>{}h'.format(len(writeByteStream) // 2), writeByteStream)
self.writeMultipleRegister(1, 750, writeDate)
time.sleep(0.4)
value = self.readHoldingRegisters(1, 750, 2)
if value[0] == 57344:
result = '写入错误'
else:
result = '写入成功'
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return result
def judgeSlave(self, address):
# print(type(address))
hexAddress = address.to_bytes(1, byteorder='little')
searchByteStream = b'\x01' + hexAddress + b'\x01\x01\xF0\x00'
# print(searchByteStream)
searchDate = struct.unpack('>hhh', searchByteStream)
self.writeMultipleRegister(1, 750, searchDate)
time.sleep(1.1)
dir = self.readHoldingRegisters(1, 750, 6)
time.sleep(0.1)
# print(dir)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
# print(dir)
if not self.areAllZeros(dir) and dir != 'error':
return True
else:
return False
def editDevAddress(self, oldAddress, newAddress, identNumer):
oldAddressHex = oldAddress.to_bytes(length=1, byteorder='little')
newAddressHex = newAddress.to_bytes(1, byteorder='little')
closeProtocolHex = b'\x05\x00'
closeProtocolData = struct.unpack('>h', closeProtocolHex)
self.writeMultipleRegister(1, 750, closeProtocolData)
time.sleep(1.5)
# print(identNumer)
idHighHex = int(identNumer[:2], 16).to_bytes(1, byteorder='little')
idLowHex = int(identNumer[2:], 16).to_bytes(1, byteorder='little')
editAddressStream = b'\x03' + oldAddressHex + b'\x00'+ newAddressHex + b'\x00' + idHighHex + b'\x00' + idLowHex
# print(editAddressStream)
editAddressDate = struct.unpack('>hhhh', editAddressStream)
# print(editAddressDate)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.7)
self.writeMultipleRegister(1, 750, editAddressDate)
time.sleep(0.4)
value = self.readHoldingRegisters(1, 750, 2)
if value[0] == 57344:
result = '修改错误'
else:
result = '修改成功'
openProtocolHex = b'\x05\x01'
openProtocolData = struct.unpack('>h', openProtocolHex)
self.writeMultipleRegister(1, 750, openProtocolData)
time.sleep(0.2)
self.writeMultipleRegister(1, 750, self.resetData)
time.sleep(0.1)
return result
# def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# if order == 'int':
# valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0]
# else:
# value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2)
# if order == 'ABCD': # 大端模式
# valueByte = ABCDToFloat(value)
# elif order == 'DCBA': # 小端模式
# valueByte = DCBAToFloat(value)
# elif order == 'BADC':
# valueByte = BADCToFloat(value)
# elif order == 'CDAB':
# valueByte = CDABToFloat(value)
# return valueByte
# except Exception as e:
# return 'error'
# def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'):
# try:
# value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums)
# return value[0]
# except Exception as e:
# return 'error'
# def readInputCoils(self, slaveId, startAddress, varNums):
# print(slaveId, startAddress, varNums)
# try:
# value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums)
# return value[0]
# except:
# return 'error'
#
# print(cst.READ_HOLDING_REGISTERS, cst.READ_COILS, cst.WRITE_SINGLE_REGISTER, cst.WRITE_SINGLE_COIL)
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 0, output_value=[1, 2, 3, 4, 5])
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=5, output_value=[3.0], data_format='>f')
# self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=6, output_value=[4.0], data_format='>f')
# self.master.execute(2, cst.WRITE_SINGLE_COIL, 2, output_value=1)
# self.master.execute(2, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1])
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))
# Read and write floats
# send some queries
# logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
if __name__ == "__main__":
tcpMaster = DPV1Master('192.168.3.10', 502)
tcpMaster.searchSlave()
# tcpMaster.readHoldingRegisters(1, 6, 2)
# tcpMaster.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[1,2,3,4,5])
#
# master = modbus_tcp.TcpMaster(host="localhost", port=502)
# values = [6]
# master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=values)