You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
2.4 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import enum
import logging
import typing
from dataclasses import dataclass
# from skio.worker.state import SlotInfo
from typing import Any
class ProtocolType(enum.Enum):
# beijing & 301
RCV = 5
# SMNPC
SMPXI = 10
SMHSL = 11
modbus = 12
# TXS
TXS = 12
TXS_PXI_Dev = 13
class SigType(enum.Enum):
ALIAS = 1
STO = 10
SAI, SAO = 11, 12
SDI, SDO = 13, 14
DPO = 15
SCTR = 16
HSL_BEAT = 17
HSLO = 18
NIS_F, NIS_A = 19, 20
AI, AO, AIO = 1, 2, 3
DI, DO, DIO = 4, 5, 6
SHSL = 7
TXS, FREG = 21, 22
FAI, FAO, FDI, FDO = 23, 24, 25, 26
FAV, FPT100 = 27, 28
class ValType(enum.Enum):
B1 = 0
U8, U16, U32, U64 = 1, 2, 3, 4
I8, I16, I32, I64 = 5, 6, 7, 8
F32, D64 = 9, 10
BOOL = 11
s32 = 12
@dataclass
class BlockInfo(object):
idx: int
off: int
bit: int
vt: ValType
length: int
@dataclass
class FVar(object):
id: int
name: str
sig_type: SigType
val_type: ValType
length: int = 1
uri: str = ''
slot: str = ''
si: typing.Optional['SlotInfo'] = None
eu: str = None
rlo: float = None
rhi: float = None
elo: float = None
ehi: float = None
channel: int = 1
trigger: bool = False
group: list = None
trans_value: typing.Union[str, int, bool, float] = None
# 是否使用福清port回读当硬件置值过来的时候使用
txs_port: bool = False
@dataclass
class IVar(object):
id: int
name: str
sig_type: SigType
chr: ValType
reg: int
length: int = 1
slot: str = ''
si: typing.Optional['SlotInfo'] = None
engineering_unit: str = None
rlo: float = None
rhi: float = None
elo: float = None
ehi: float = None
T_Val = typing.Union[float, int, bool]
LOGGER = logging.getLogger('SkIO')
class IDev(object):
def setup(self, uri):
raise NotImplementedError
def read(self, var: IVar) -> T_Val:
raise NotImplementedError
def write(self, var: IVar, value: T_Val) -> T_Val:
raise NotImplementedError
class CmdType(enum.Enum):
SETUP = 0
PING, PONG = 1, 2
READ, WRITE = 3, 4
FQREAD, FQWRITE = 6, 7
# 执行 txs 脚本
TXSPY = 5
SUCCESS, ERROR = 10, 11
@dataclass
class KCommand(object):
type: CmdType
body: Any = None
@dataclass
class KReply(object):
type: CmdType
body: Any = None