import modbus_tk import modbus_tk.defines as cst from modbus_tk import modbus_tcp, hooks import serial import struct from .ByteOrder import * from .SetMessage import * # PORT = 'COM3' #PORT = '/dev/ptyp5' class TCPSlave(): def __init__(self, address = '127.0.0.1', port = 502): try: self.server = modbus_tcp.TcpServer(address=address, port=port) self.server.start() print(f"TCP Slave started on {address}:{port}") except Exception as e: print(f"Failed to start TCP Slave on {address}:{port}: {e}") raise e # hooks.install_hook('modbus.Server.before_handle_request', afterRecv) # hooks.install_hook("modbus.Server.after_handle_request", afterSend) # 创建从站 # 添加存储区 # supported block types # COILS = 1 # DISCRETE_INPUTS = 2 # HOLDING_REGISTERS = 3 # ANALOG_INPUTS = 4 def addSlave(self, slaveId): try: self.server.add_slave(slaveId) slave = self.server.get_slave(slaveId) # 添加存储区 - 使用正确的地址范围 slave.add_block('0', cst.COILS, 0, 9999) # 线圈 0-9999 slave.add_block('1', cst.DISCRETE_INPUTS, 10000, 19999) # 离散输入 10000-19999 slave.add_block('3', cst.ANALOG_INPUTS, 30000, 39999) # 输入寄存器 30000-39999 slave.add_block('4', cst.HOLDING_REGISTERS, 40000, 49999) # 保持寄存器 40000-49999 print(f"Added slave {slaveId} with storage blocks") return True except Exception as e: print(f"Failed to add slave {slaveId}: {e}") return False def setValue(self, slaveId, name, address, value, order='ABCD'): try: slave = self.server.get_slave(slaveId) # print(value) if '.' in str(value): # 浮点数处理 floatValue = float(value) if order == 'ABCD': # 大端模式 valueByte = floatToABCD(floatValue) elif order == 'DCBA': # 小端模式 valueByte = floatToDCBA(floatValue) elif order == 'BADC': valueByte = floatToBADC(floatValue) elif order == 'CDAB': valueByte = floatToCDAB(floatValue) else: valueByte = floatToABCD(floatValue) slave.set_values(name, address, valueByte) else: # 整数处理 intValue = int(value) slave.set_values(name, address, intValue) return True except Exception as e: print(f"TCP Slave setValue error: {e}") print(f"Error details - slaveId: {slaveId}, name: {name}, address: {address}, value: {value}") return False def readValue(self, slaveId, name, address, order = 'int'): try: slave = self.server.get_slave(slaveId) if order == 'int': value = slave.get_values(name, address, 1)[0] else: value = slave.get_values(name, address, 2) if order == 'ABCD': # 大端模式 value = ABCDToFloat(value) elif order == 'DCBA': # 小端模式 value = DCBAToFloat(value) elif order == 'BADC': value = BADCToFloat(value) elif order == 'CDAB': value = CDABToFloat(value) return value except Exception as e: return 'error' if __name__ == "__main__": a = TCPSlave() a.addSlave(1) a.setValue(1, '0', 0, 1) a.readValue(1, '0', 0) a.setValue(1, '1', 10000, 1) a.readValue(1, '1', 10000) a.setValue(1, '3', 30000, 2) a.readValue(1, '3', 30000) a.setValue(1, '3', 30002, 2.5121231) a.readValue(1, '3', 30002, order ='ABCD') a.setValue(1, '4', 40000, 3) a.readValue(1, '4', 40000) a.setValue(1, '4', 40002, 3.51231234564) a.readValue(1, '4', 40002, order ='ABCD')