You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
	
	
		
			189 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
		
		
			
		
	
	
			189 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
| 
											2 years ago
										 | import logging | ||
|  | import sched | ||
|  | import socket | ||
|  | import threading | ||
|  | import time | ||
|  | 
 | ||
|  | import modbus_tk.defines as cst | ||
|  | from modbus_tk import LOGGER | ||
|  | from modbus_tk.exceptions import ModbusError | ||
|  | from modbus_tk.modbus_tcp import TcpMaster | ||
|  | 
 | ||
|  | logging.basicConfig( | ||
|  |     level=logging.DEBUG, | ||
|  |     format="[%(asctime)s] %(name)s:%(levelname)s: %(message)s" | ||
|  | ) | ||
|  | 
 | ||
|  | 
 | ||
|  | class Block(object): | ||
|  |     __slots__ = ['address', 'reg_type', '_data'] | ||
|  | 
 | ||
|  |     def __init__(self, address, size, reg_type): | ||
|  |         assert reg_type in {cst.COILS, cst.DISCRETE_INPUTS, cst.HOLDING_REGISTERS, cst.ANALOG_INPUTS} | ||
|  |         self.address = address | ||
|  |         self.reg_type = reg_type | ||
|  |         self._data = [0] * size | ||
|  | 
 | ||
|  |     def __repr__(self): | ||
|  |         return '<{cls} {reg_type}-{start}:{stop} {data}>'.format( | ||
|  |             cls=self.__class__.__name__, | ||
|  |             reg_type=self.reg_type, | ||
|  |             start=self.address, | ||
|  |             stop=self.address + len(self) - 1, | ||
|  |             data=self._data | ||
|  |         ) | ||
|  | 
 | ||
|  |     def __len__(self): | ||
|  |         return len(self._data) | ||
|  | 
 | ||
|  |     @property | ||
|  |     def size(self): | ||
|  |         return len(self._data) | ||
|  | 
 | ||
|  |     @property | ||
|  |     def readonly(self): | ||
|  |         return self.reg_type in {cst.DISCRETE_INPUTS, cst.ANALOG_INPUTS} | ||
|  | 
 | ||
|  |     @property | ||
|  |     def minimum(self): | ||
|  |         return 0 | ||
|  | 
 | ||
|  |     @property | ||
|  |     def maximum(self): | ||
|  |         return 1 if self.reg_type in {cst.COILS, cst.DISCRETE_INPUTS} else 0xffff | ||
|  | 
 | ||
|  |     def offset(self, address): | ||
|  |         if self.address <= address <= self.address + len(self): | ||
|  |             return address - self.address | ||
|  |         raise IndexError('Block address out of range') | ||
|  | 
 | ||
|  |     def memset(self, address, values): | ||
|  |         for value in values: | ||
|  |             if not (self.minimum <= value <= self.maximum): | ||
|  |                 raise ValueError('value should between {min} and {max}'.format(min=self.minimum, max=self.maximum)) | ||
|  |         start = self.offset(address) | ||
|  |         stop = self.offset(address + len(values)) | ||
|  |         self._data[start:stop] = values | ||
|  | 
 | ||
|  |     def memcpy(self, address, size): | ||
|  |         start = self.offset(address) | ||
|  |         stop = self.offset(address + size) | ||
|  |         return self._data[start:stop] | ||
|  | 
 | ||
|  | 
 | ||
|  | class Slave(object): | ||
|  |     def __init__(self, host='127.0.0.1', port=502, slave_id=1, timeout=300): | ||
|  |         self.slave_id = slave_id | ||
|  |         self._master = TcpMaster(host, port, timeout_in_sec=timeout / 1000.0) | ||
|  |         self._sample_thread = threading.Thread(name='sampleLoop', target=self._sample_loop) | ||
|  |         self._is_looping = False | ||
|  |         self._memory = { | ||
|  |             cst.COILS: [], | ||
|  |             cst.DISCRETE_INPUTS: [], | ||
|  |             cst.HOLDING_REGISTERS: [], | ||
|  |             cst.ANALOG_INPUTS: [], | ||
|  |         } | ||
|  | 
 | ||
|  |     def add_block(self, block): | ||
|  |         assert isinstance(block, Block) | ||
|  |         self._memory[block.reg_type].append(block) | ||
|  | 
 | ||
|  |     def merge(self): | ||
|  |         _memory = { | ||
|  |             cst.COILS: [], | ||
|  |             cst.DISCRETE_INPUTS: [], | ||
|  |             cst.HOLDING_REGISTERS: [], | ||
|  |             cst.ANALOG_INPUTS: [] | ||
|  |         } | ||
|  |         for reg_type, blocks in self._memory.items(): | ||
|  |             sorted_blocks = sorted(blocks, key=lambda x: x.address) | ||
|  |             for block in sorted_blocks: | ||
|  |                 if _memory[reg_type]: | ||
|  |                     _block = _memory[reg_type].pop() | ||
|  |                     if block.address + block.size < _block.address or _block.address + _block.size < block.address: | ||
|  |                         _memory[reg_type].append(_block) | ||
|  |                         _memory[reg_type].append(block) | ||
|  |                     else: | ||
|  |                         address = min(block.address, _block.address) | ||
|  |                         size = max(block.address + block.size, _block.address + _block.size) - address | ||
|  |                         _memory[reg_type].append( | ||
|  |                             Block(address=address, size=size, reg_type=reg_type) | ||
|  |                         ) | ||
|  |                 else: | ||
|  |                     _memory[reg_type].append(block) | ||
|  |         self._memory = _memory | ||
|  | 
 | ||
|  |     @property | ||
|  |     def blocks(self): | ||
|  |         for reg_type, blocks in self._memory.items(): | ||
|  |             for block in blocks: | ||
|  |                 yield block | ||
|  | 
 | ||
|  |     def memset(self, reg_type, address, values): | ||
|  |         if reg_type in {cst.COILS, cst.HOLDING_REGISTERS}: | ||
|  |             for block in self._memory.get(reg_type): | ||
|  |                 try: | ||
|  |                     block.memset(address, values) | ||
|  |                     return block | ||
|  |                 except IndexError: | ||
|  |                     pass | ||
|  |         raise IndexError('Block address out of range') | ||
|  | 
 | ||
|  |     def memcpy(self, reg_type, address, size): | ||
|  |         for block in self._memory.get(reg_type, []): | ||
|  |             try: | ||
|  |                 return block.memcpy(address, size) | ||
|  |             except IndexError: | ||
|  |                 pass | ||
|  |         raise IndexError('Block address out of range') | ||
|  | 
 | ||
|  |     def fetch(self): | ||
|  |         for block in self.blocks: | ||
|  |             try: | ||
|  |                 result = self._master.execute( | ||
|  |                     slave=self.slave_id, | ||
|  |                     function_code=block.reg_type, | ||
|  |                     starting_address=block.address, | ||
|  |                     quantity_of_x=len(block) | ||
|  |                 ) | ||
|  |                 block.memset(block.address, result) | ||
|  |             except (ModbusError, socket.error) as e: | ||
|  |                 LOGGER.error(e) | ||
|  | 
 | ||
|  |     def push(self, block): | ||
|  |         function_code = cst.WRITE_MULTIPLE_COILS if block.reg_type is cst.COILS else cst.WRITE_MULTIPLE_REGISTERS | ||
|  |         try: | ||
|  |             result = self._master.execute( | ||
|  |                 slave=self.slave_id, | ||
|  |                 function_code=function_code, | ||
|  |                 starting_address=block.address, | ||
|  |                 quantity_of_x=block.size, | ||
|  |                 output_value=block._data | ||
|  |             ) | ||
|  |             # print result | ||
|  |         except (ModbusError, socket.error) as e: | ||
|  |             LOGGER.error(e) | ||
|  | 
 | ||
|  |     def _sample_loop(self, delay=50): | ||
|  |         scheduler = sched.scheduler(timefunc=time.time, delayfunc=time.sleep) | ||
|  | 
 | ||
|  |         def _task(): | ||
|  |             if self._is_looping: | ||
|  |                 scheduler.enter(delay=delay / 1000.0, priority=1, action=_task, argument=()) | ||
|  |             # print time.time() | ||
|  |             self.fetch() | ||
|  |             self.ts = time.time() | ||
|  | 
 | ||
|  |         _task() | ||
|  |         scheduler.run() | ||
|  | 
 | ||
|  |     def start_loop(self): | ||
|  |         self._is_looping = True | ||
|  |         self._sample_thread.setDaemon(True) | ||
|  |         self._sample_thread.start() | ||
|  | 
 | ||
|  |     def stop_loop(self): | ||
|  |         self._is_looping = False | ||
|  |         if self._sample_thread.isAlive(): | ||
|  |             self._sample_thread.join(.1)  # waiting a sample loop |