import serial import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_rtu, hooks import sys from .ByteOrder import * from .SetMessage import * # master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0)) # exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1) # 参数说明: # @slave=1 : identifier of the slave. from 1 to 247. # @function_code=READ_HOLDING_REGISTERS:功能码 # @starting_address=100:寄存器的开始地址 # @quantity_of_x=3:寄存器/线圈的数量 # @output_value:一个整数或可迭代的值:1/[1,1,1,0,0,1]/xrange(12) # @data_format:对接收的数据进行格式化 class RTUMaster(): def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0): try: self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff)) self.master.set_timeout(5.0) self.master.set_verbose(True) except Exception as e: pass hooks.install_hook("modbus_rtu.RtuMaster.after_recv", afterRecv) hooks.install_hook("modbus_rtu.RtuMaster.before_send", afterSend) def writeSingleRegister(self, slaveId, address, outputValue, order = 'ABCD'): try: if '.' not in str(outputValue): outputValue = int(outputValue) # print(outputValue) self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address = address, output_value = outputValue) else: outputValue = float(outputValue) if order == 'ABCD': # 大端模式 valueByte = floatToABCD(outputValue) elif order == 'DCBA': # 小端模式 valueByte = floatToDCBA(outputValue) elif order == 'BADC': valueByte = floatToBADC(outputValue) elif order == 'CDAB': valueByte = floatToCDAB(outputValue) self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address = address, output_value=valueByte) except Exception as e: print('error') def writeSingleCoil(self, slaveId, address, outputValue): if outputValue in [0, 1]: self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value = outputValue) def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2) if order == 'ABCD': # 大端模式 valueByte = ABCDToFloat(value) elif order == 'DCBA': # 小端模式 valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except Exception as e: return 'error' def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2) if order == 'ABCD': # 大端模式 valueByte = ABCDToFloat(value) elif order == 'DCBA': # 小端模式 valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except Exception as e: return 'error' def readCoils(self, slaveId, startAddress, varNums, order = 'ABCD'): try: value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums) return value[0] except Exception as e: return 'error' def readInputCoils(self, slaveId, startAddress, varNums): try: value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums) return value[0] except Exception as e: return 'error' # -*- coding: utf_8 -*- # import serial # import modbus_tk # import modbus_tk.defines as cst # from modbus_tk import modbus_rtu # def mod(PORT="com2"): # red = [] # alarm = "" # try: # # 设定串口为从站 # master = modbus_rtu.RtuMaster(serial.Serial(port=PORT, # baudrate=9600, bytesize=8, parity='N', stopbits=1)) # master.set_timeout(5.0) # master.set_verbose(True) # # 读保持寄存器 # red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码 # print(red) # alarm = "正常" # return list(red), alarm # except Exception as exc: # print(str(exc)) # alarm = (str(exc)) # return red, alarm ##如果异常就返回[],故障信息 # if __name__ == "__main__": # mod()