You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.0 KiB
Python

7 months ago
#!/usr/bin/env python
# -*- coding: utf_8 -*-
"""
Modbus TestKit: example of a custom simulator
(C)2009 - Luc Jean - luc.jean@gmail.com
(C)2009 - Apidev - http://www.apidev.fr
This is distributed under GNU LGPL license, see license.txt
"""
from __future__ import print_function
import sys
import struct
from modbus_tk.simulator import Simulator, LOGGER
from modbus_tk.defines import HOLDING_REGISTERS
from modbus_tk.modbus_tcp import TcpServer
from modbus_tk.utils import PY2
try:
import serial
from modbus_tk.modbus_rtu import RtuServer
except ImportError:
pass
class MySimulator(Simulator):
"""A custom simulator"""
def __init__(self, *args, **kwargs):
"""Constructor"""
Simulator.__init__(self, *args, **kwargs)
# add a new command: cv will make possible to change a value
self.add_command("cv", self.change_value)
self.add_command("set_pi", self.set_pi)
# create a slave and block
slave = self.server.add_slave(1)
slave.add_block("foo", HOLDING_REGISTERS, 0, 100)
def change_value(self, args):
"""change the value of some registers"""
address = int(args[1])
# get the list of values and cast it to integers
values = []
for val in args[2:]:
values.append(int(val))
# custom rules: if the value of reg0 is greater than 30 then reg1 is set to 1
if address == 0 and values[0] > 30:
try:
values[1] = 1
except IndexError:
values.append(1)
# operates on slave 1 and block foo
slave = self.server.get_slave(1)
slave.set_values("foo", address, values)
return self._tuple_to_str(values)
def set_pi(self, args):
"""change the value of some registers"""
address = int(args[1])
# operates on slave 1 and block foo
slave = self.server.get_slave(1)
if PY2:
pi_bytes = [ord(a_byte) for a_byte in struct.pack("f", 3.14)]
else:
pi_bytes = [int(a_byte) for a_byte in struct.pack("f", 3.14)]
pi_register1 = pi_bytes[0] * 256 + pi_bytes[1]
pi_register2 = pi_bytes[2] * 256 + pi_bytes[3]
slave.set_values("foo", address, [pi_register1, pi_register2])
values = slave.get_values("foo", address, 2)
return self._tuple_to_str(values)
def main():
"""main"""
#Connect to the slave
if 'rtu' in sys.argv:
server = RtuServer(serial.Serial(port=sys.argv[-1]))
else:
server = TcpServer(error_on_missing_slave=True)
simu = MySimulator(server)
try:
LOGGER.info("'quit' for closing the server")
simu.start()
except Exception as excpt:
print(excpt)
finally:
simu.close()
if __name__ == "__main__":
help_text = """
Usage:
python mysimu.py -> Run in TCP mode
python mysimu.py rtu /dev/ptyp5 -> Run in RTU mode and open the port given as last argument
"""
if '-h' in sys.argv:
print(help_text)
else:
main()