import serial import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_rtu import sys from .ByteOrder import * from .SetMessage import * # master = modbus_rtu.RtuMaster(serial.Serial(port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0)) # exec(slave=1, function_code=READ_HOLDING_REGISTERS, starting_address=0, quantity_of_x=0, output_value=0, data_format=“”, expected_length=-1) # 参数说明: # @slave=1 : identifier of the slave. from 1 to 247. # @function_code=READ_HOLDING_REGISTERS:功能码 # @starting_address=100:寄存器的开始地址 # @quantity_of_x=3:寄存器/线圈的数量 # @output_value:一个整数或可迭代的值:1/[1,1,1,0,0,1]/xrange(12) # @data_format:对接收的数据进行格式化 class RTUMaster(): def __init__(self, port = 'COM4', baudrate = 9600, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0): try: self.master = modbus_rtu.RtuMaster(serial.Serial(port = port, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, xonxoff = xonxoff)) self.master.set_timeout(5.0) self.master.set_verbose(True) except Exception as e: pass def writeSingleRegister(self, slaveId, address, outputValue, order='ABCD'): try: if '.' not in str(outputValue): # 整数值,使用单寄存器写入 outputValue = int(outputValue) self.master.execute(slaveId, cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=outputValue) else: # 浮点数值,需要使用多寄存器写入(因为浮点数占用2个寄存器) outputValue = float(outputValue) valueByte = None # 初始化变量 if order == 'ABCD': # 大端模式 valueByte = floatToABCD(outputValue) elif order == 'DCBA': # 小端模式 valueByte = floatToDCBA(outputValue) elif order == 'BADC': valueByte = floatToBADC(outputValue) elif order == 'CDAB': valueByte = floatToCDAB(outputValue) else: # 默认使用 ABCD 字节序 # print(f"Unknown byte order '{order}', using default ABCD") valueByte = floatToABCD(outputValue) if valueByte is not None: # 浮点数必须使用 WRITE_MULTIPLE_REGISTERS,因为需要写入2个寄存器 self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=address, output_value=valueByte) else: # print(f"Failed to convert float value {outputValue} with order {order}") return 'error' except Exception as e: print(f'writeSingleRegister error: {e}') return 'error' def writeSingleCoil(self, slaveId, address, outputValue): try: outputValue = int(outputValue) if outputValue in [0, 1]: self.master.execute(slaveId, cst.WRITE_SINGLE_COIL, address, output_value=outputValue) except Exception as e: return 'error' def writeMultipleRegisters(self, slaveId, startAddress, outputValues, order='ABCD'): """写多个寄存器,支持不同数据类型和字节序""" try: processedValues = [] for value in outputValues: if '.' not in str(value): # 整数值 processedValues.append(int(value)) else: # 浮点值 - 根据字节序转换为寄存器对 floatValue = float(value) if order == 'ABCD': # 大端模式 valueByte = floatToABCD(floatValue) elif order == 'DCBA': # 小端模式 valueByte = floatToDCBA(floatValue) elif order == 'BADC': valueByte = floatToBADC(floatValue) elif order == 'CDAB': valueByte = floatToCDAB(floatValue) processedValues.extend(valueByte) self.master.execute(slaveId, cst.WRITE_MULTIPLE_REGISTERS, starting_address=startAddress, output_value=processedValues) except Exception as e: return 'error' def writeMultipleCoils(self, slaveId, startAddress, outputValues): """写多个线圈,支持布尔值列表或0/1整数列表""" try: processedValues = [] for value in outputValues: if isinstance(value, bool): processedValues.append(1 if value else 0) else: intValue = int(value) if intValue in [0, 1]: processedValues.append(intValue) else: # 如果值不是0或1,转换为布尔值 processedValues.append(1 if intValue else 0) self.master.execute(slaveId, cst.WRITE_MULTIPLE_COILS, starting_address=startAddress, output_value=processedValues) except Exception as e: return 'error' def readHoldingRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_HOLDING_REGISTERS, startAddress, 2) if order == 'ABCD': # 大端模式 valueByte = ABCDToFloat(value) elif order == 'DCBA': # 小端模式 valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except Exception as e: return 'error' def readInputRegisters(self, slaveId, startAddress, varNums, order = 'ABCD'): try: if order == 'int': valueByte = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, varNums)[0] else: value = self.master.execute(slaveId, cst.READ_INPUT_REGISTERS, startAddress, 2) if order == 'ABCD': # 大端模式 valueByte = ABCDToFloat(value) elif order == 'DCBA': # 小端模式 valueByte = DCBAToFloat(value) elif order == 'BADC': valueByte = BADCToFloat(value) elif order == 'CDAB': valueByte = CDABToFloat(value) return valueByte except Exception as e: return 'error' def readCoils(self, slaveId, startAddress, varNums, order='ABCD'): try: value = self.master.execute(slaveId, cst.READ_COILS, startAddress, varNums) # 如果只读取一个线圈,返回单个值;否则返回列表 return value[0] except Exception as e: return 'error' def readInputCoils(self, slaveId, startAddress, varNums): try: value = self.master.execute(slaveId, cst.READ_DISCRETE_INPUTS, startAddress, varNums) # 如果只读取一个线圈,返回单个值;否则返回列表 return value[0] except Exception as e: return 'error' # -*- coding: utf_8 -*- # import serial # import modbus_tk # import modbus_tk.defines as cst # from modbus_tk import modbus_rtu # def mod(PORT="com2"): # red = [] # alarm = "" # try: # # 设定串口为从站 # master = modbus_rtu.RtuMaster(serial.Serial(port=PORT, # baudrate=9600, bytesize=8, parity='N', stopbits=1)) # master.set_timeout(5.0) # master.set_verbose(True) # # 读保持寄存器 # red = master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 9) # 这里可以修改需要读取的功能码 # print(red) # alarm = "正常" # return list(red), alarm# # except Exception as exc: # print(str(exc)) # alarm = (str(exc)) # return red, alarm ##如果异常就返回[],故障信息 # if __name__ == "__main__": # mod()