You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
3.0 KiB
Python
117 lines
3.0 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf_8 -*-
|
|
"""
|
|
Modbus TestKit: example of a custom simulator
|
|
|
|
(C)2009 - Luc Jean - luc.jean@gmail.com
|
|
(C)2009 - Apidev - http://www.apidev.fr
|
|
|
|
This is distributed under GNU LGPL license, see license.txt
|
|
|
|
"""
|
|
from __future__ import print_function
|
|
|
|
import sys
|
|
import struct
|
|
|
|
from modbus_tk.simulator import Simulator, LOGGER
|
|
from modbus_tk.defines import HOLDING_REGISTERS
|
|
from modbus_tk.modbus_tcp import TcpServer
|
|
from modbus_tk.utils import PY2
|
|
|
|
try:
|
|
import serial
|
|
from modbus_tk.modbus_rtu import RtuServer
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
class MySimulator(Simulator):
|
|
"""A custom simulator"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Constructor"""
|
|
Simulator.__init__(self, *args, **kwargs)
|
|
# add a new command: cv will make possible to change a value
|
|
self.add_command("cv", self.change_value)
|
|
self.add_command("set_pi", self.set_pi)
|
|
|
|
# create a slave and block
|
|
slave = self.server.add_slave(1)
|
|
slave.add_block("foo", HOLDING_REGISTERS, 0, 100)
|
|
|
|
def change_value(self, args):
|
|
"""change the value of some registers"""
|
|
address = int(args[1])
|
|
|
|
# get the list of values and cast it to integers
|
|
values = []
|
|
for val in args[2:]:
|
|
values.append(int(val))
|
|
|
|
# custom rules: if the value of reg0 is greater than 30 then reg1 is set to 1
|
|
if address == 0 and values[0] > 30:
|
|
try:
|
|
values[1] = 1
|
|
except IndexError:
|
|
values.append(1)
|
|
|
|
# operates on slave 1 and block foo
|
|
slave = self.server.get_slave(1)
|
|
slave.set_values("foo", address, values)
|
|
return self._tuple_to_str(values)
|
|
|
|
def set_pi(self, args):
|
|
"""change the value of some registers"""
|
|
address = int(args[1])
|
|
|
|
# operates on slave 1 and block foo
|
|
slave = self.server.get_slave(1)
|
|
|
|
if PY2:
|
|
pi_bytes = [ord(a_byte) for a_byte in struct.pack("f", 3.14)]
|
|
else:
|
|
pi_bytes = [int(a_byte) for a_byte in struct.pack("f", 3.14)]
|
|
|
|
pi_register1 = pi_bytes[0] * 256 + pi_bytes[1]
|
|
pi_register2 = pi_bytes[2] * 256 + pi_bytes[3]
|
|
|
|
slave.set_values("foo", address, [pi_register1, pi_register2])
|
|
|
|
values = slave.get_values("foo", address, 2)
|
|
return self._tuple_to_str(values)
|
|
|
|
|
|
def main():
|
|
"""main"""
|
|
|
|
#Connect to the slave
|
|
if 'rtu' in sys.argv:
|
|
server = RtuServer(serial.Serial(port=sys.argv[-1]))
|
|
else:
|
|
server = TcpServer(error_on_missing_slave=True)
|
|
|
|
simu = MySimulator(server)
|
|
|
|
try:
|
|
LOGGER.info("'quit' for closing the server")
|
|
simu.start()
|
|
|
|
except Exception as excpt:
|
|
print(excpt)
|
|
|
|
finally:
|
|
simu.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
help_text = """
|
|
Usage:
|
|
python mysimu.py -> Run in TCP mode
|
|
python mysimu.py rtu /dev/ptyp5 -> Run in RTU mode and open the port given as last argument
|
|
"""
|
|
if '-h' in sys.argv:
|
|
print(help_text)
|
|
else:
|
|
main()
|