diff --git a/BlockServer/component_switcher/component_switcher.py b/BlockServer/component_switcher/component_switcher.py index cfefed1a..e68dd845 100644 --- a/BlockServer/component_switcher/component_switcher.py +++ b/BlockServer/component_switcher/component_switcher.py @@ -2,18 +2,22 @@ import json import os -import types from queue import Queue -from typing import Any, Dict, Iterable, List, Set, Type +from typing import Any, Callable, Dict, Iterable, List, Optional, Set -from BlockServer.core.config_list_manager import ConfigListManager +import numpy.typing as npt from server_common.channel_access import ChannelAccess from server_common.helpers import MACROS, PVPREFIX_MACRO from server_common.utilities import SEVERITY from server_common.utilities import print_and_log as _common_print_and_log +from BlockServer.core.config_list_manager import ConfigListManager + +type PVBaseValue = bool | int | float | str +type PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None + -def print_and_log(message: str, *args, **kwargs) -> None: +def print_and_log(message: str, *args: str, **kwargs: str) -> None: _common_print_and_log(f"ComponentSwitcher: {message}", *args, **kwargs) @@ -43,9 +47,9 @@ def __init__( self, config_list: ConfigListManager, blockserver_write_queue: Queue, - reload_current_config_func: types.FunctionType, - file_manager: ComponentSwitcherConfigFileManager = None, - channel_access_class: Type[ChannelAccess] = None, + reload_current_config_func: Callable[[], None], + file_manager: Optional[ComponentSwitcherConfigFileManager] = None, + channel_access_class: Optional[ChannelAccess] = None, ) -> None: self._config_list = config_list self._blockserver_write_queue = blockserver_write_queue @@ -89,7 +93,7 @@ def create_monitors(self) -> None: print_and_log("Adding monitor to PV {}".format(pv)) - def callback(val: Any, stat: int, sevr: int) -> None: + def callback(val: PVValue, stat: int, sevr: int) -> None: """ Callback function called when the monitored PV changes. diff --git a/BlockServer/config/block.py b/BlockServer/config/block.py index d138ec69..0e5de957 100644 --- a/BlockServer/config/block.py +++ b/BlockServer/config/block.py @@ -13,7 +13,7 @@ # along with this program; if not, you can obtain a copy from # https://www.eclipse.org/org/documents/epl-v10.php or # http://opensource.org/licenses/eclipse-1.0.php -from typing import Dict, Union +from typing import Dict, TypedDict, Union from server_common.helpers import PVPREFIX_MACRO @@ -41,17 +41,17 @@ def __init__( pv: str, local: bool = True, visible: bool = True, - component: str = None, + component: str | None = None, runcontrol: bool = False, - lowlimit: float = None, - highlimit: float = None, + lowlimit: float | None = None, + highlimit: float | None = None, suspend_on_invalid: bool = False, log_periodic: bool = False, log_rate: float = 5, log_deadband: float = 0, set_block: bool = False, - set_block_val: str = None, - ): + set_block_val: str | None = None, + ) -> None: """Constructor. Args: @@ -92,7 +92,7 @@ def _get_pv(self) -> str: pv_name = PVPREFIX_MACRO + self.pv return pv_name - def set_visibility(self, visible: bool): + def set_visibility(self, visible: bool) -> None: """Toggle the visibility of the block. Args: @@ -100,16 +100,18 @@ def set_visibility(self, visible: bool): """ self.visible = visible - def __str__(self): + def __str__(self) -> str: set_block_str = "" if self.set_block: set_block_str = f", SetBlockVal: {self.set_block_val}" return ( - f"Name: {self.name}, PV: {self.pv}, Local: {self.local}, Visible: {self.visible}, Component: {self.component}" - f", RCEnabled: {self.rc_enabled}, RCLow: {self.rc_lowlimit}, RCHigh: {self.rc_highlimit}{set_block_str}" + f"Name: {self.name}, PV: {self.pv}, Local: {self.local}, " + f"Visible: {self.visible}, Component: {self.component}" + f", RCEnabled: {self.rc_enabled}, RCLow: {self.rc_lowlimit}, " + f"RCHigh: {self.rc_highlimit}{set_block_str}" ) - def to_dict(self) -> Dict[str, Union[str, float, bool]]: + def to_dict(self) -> Dict[str, Union[str, float, bool, None]]: """Puts the block's details into a dictionary. Returns: @@ -131,3 +133,17 @@ def to_dict(self) -> Dict[str, Union[str, float, bool]]: "set_block": self.set_block, "set_block_val": self.set_block_val, } + + +class BlockKwargs(TypedDict, total=False): + visible: bool + component: str | None + runcontrol: bool + lowlimit: float | None + highlimit: float | None + suspend_on_invalid: bool + log_periodic: bool + log_rate: float + log_deadband: float + set_block: bool + set_block_val: str | None diff --git a/BlockServer/config/configuration.py b/BlockServer/config/configuration.py index 93fa3055..14a5c1eb 100644 --- a/BlockServer/config/configuration.py +++ b/BlockServer/config/configuration.py @@ -17,15 +17,16 @@ """Contains all the code for defining a configuration or component""" from collections import OrderedDict -from typing import Dict +from typing import Dict, Unpack -from BlockServer.config.block import Block +from server_common.helpers import PVPREFIX_MACRO +from server_common.utilities import print_and_log + +from BlockServer.config.block import Block, BlockKwargs from BlockServer.config.group import Group from BlockServer.config.ioc import IOC from BlockServer.config.metadata import MetaData from BlockServer.core.constants import GRP_NONE -from server_common.helpers import PVPREFIX_MACRO -from server_common.utilities import print_and_log class Configuration: @@ -41,7 +42,7 @@ class Configuration: is_component (bool): Whether it is actually a component """ - def __init__(self, macros: Dict): + def __init__(self, macros: Dict) -> None: """Constructor. Args: @@ -56,7 +57,14 @@ def __init__(self, macros: Dict): self.components = OrderedDict() self.is_component = False - def add_block(self, name: str, pv: str, group: str = GRP_NONE, local: bool = True, **kwargs): + def add_block( + self, + name: str, + pv: str, + group: str = GRP_NONE, + local: bool = True, + **kwargs: Unpack[BlockKwargs], + ) -> None: """Add a block to the configuration. Args: @@ -85,15 +93,15 @@ def add_block(self, name: str, pv: str, group: str = GRP_NONE, local: bool = Tru def add_ioc( self, name: str, - component: str = None, - autostart: bool = None, - restart: bool = None, - macros: Dict = None, - pvs: Dict = None, - pvsets: Dict = None, - simlevel: str = None, - remotePvPrefix: str = None, - ): + component: str | None = None, + autostart: bool | None = None, + restart: bool | None = None, + macros: Dict | None = None, + pvs: Dict | None = None, + pvsets: Dict | None = None, + simlevel: str | None = None, + remote_pv_prefix: str | None = None, + ) -> None: """Add an IOC to the configuration. Args: @@ -115,7 +123,15 @@ def add_ioc( ) else: self.iocs[name.upper()] = IOC( - name, autostart, restart, component, macros, pvs, pvsets, simlevel, remotePvPrefix + name, + autostart if autostart is not None else True, + restart if restart is not None else True, + component, + macros, + pvs, + pvsets, + simlevel, + remote_pv_prefix, ) def get_name(self) -> str: @@ -128,7 +144,7 @@ def get_name(self) -> str: self.meta.name.decode("utf-8") if isinstance(self.meta.name, bytes) else self.meta.name ) - def set_name(self, name: str): + def set_name(self, name: str) -> None: """Sets the configuration's name. Args: diff --git a/BlockServer/config/group.py b/BlockServer/config/group.py index 25d56467..9be46f47 100644 --- a/BlockServer/config/group.py +++ b/BlockServer/config/group.py @@ -25,7 +25,7 @@ class Group: component (string): The component the group belongs to """ - def __init__(self, name: str, component: str = None): + def __init__(self, name: str, component: str | None = None) -> None: """Constructor. Args: @@ -39,7 +39,7 @@ def __init__(self, name: str, component: str = None): def __str__(self) -> str: return f"Name: {self.name}, COMPONENT: {self.component}, Blocks: {self.blocks}" - def to_dict(self) -> Dict[str, Union[str, List]]: + def to_dict(self) -> Dict[str, Union[str, List, None]]: """Puts the group's details into a dictionary. Returns: diff --git a/BlockServer/config/ioc.py b/BlockServer/config/ioc.py index 8ca9675b..a5274c2c 100644 --- a/BlockServer/config/ioc.py +++ b/BlockServer/config/ioc.py @@ -23,8 +23,10 @@ class IOC: Attributes: name (string): The name of the IOC - autostart (bool): Whether the IOC should automatically start/restart when the configuration is loaded/changed - restart (bool): If auto start is true, then proc serv will restart the IOC if it terminates unexpectedly + autostart (bool): Whether the IOC should automatically + start/restart when the configuration is loaded/changed + restart (bool): If auto start is true, then proc serv will + restart the IOC if it terminates unexpectedly component (string): The component the IOC belongs to macros (dict): The IOC's macros pvs (dict): The IOC's PVs @@ -37,43 +39,50 @@ def __init__( name: str, autostart: bool = True, restart: bool = True, - component: str = None, - macros: Dict = None, - pvs: Dict = None, - pvsets: Dict = None, - simlevel: str = None, - remotePvPrefix: str = None, - ): + component: str | None = None, + macros: Dict | None = None, + pvs: Dict | None = None, + pvsets: Dict | None = None, + simlevel: str | None = None, + remotePvPrefix: str | None = None, # noqa: N803 + ) -> None: """Constructor. Args: name: The name of the IOC - autostart: Whether the IOC should automatically start/restart when the configuration is + autostart: Whether the IOC should automatically + start/restart when the configuration is loaded/changed - restart: If auto start is true, then proc serv will restart the IOC if it terminates unexpectedly + restart: If auto start is true, then proc serv will + restart the IOC if it terminates unexpectedly component: The component the IOC belongs to macros: The IOC's macros pvs: The IOC's PVs pvsets: The IOC's PV sets simlevel: The level of simulation - remotePvPrefix: The remote pv prefix + remotePvPrefix: The remote pv prefix, + has to be formatted like this to be read in properly. """ self.name = name self.autostart = autostart self.restart = restart self.component = component - self.remotePvPrefix = remotePvPrefix + self.remote_pv_prefix = remotePvPrefix if simlevel is None: self.simlevel = "none" else: self.simlevel = simlevel.lower() - - if macros is None: - self.macros = {} - else: - self.macros = macros - + self.macros = {} + if macros is not None: + for name, data in macros.items(): + if "useDefault" in data and ( + (not data["useDefault"]) or data["useDefault"] == "False" + ): + self.macros.update({name: data}) + self.macros[name].pop("useDefault") + elif "useDefault" not in data: + self.macros.update({name: data}) if pvs is None: self.pvs = {} else: @@ -107,7 +116,7 @@ def _dict_to_list(in_dict: Dict[str, Any]) -> List[Any]: def __str__(self) -> str: return f"{self.__class__.__name__}(name={self.name}, component={self.component})" - def to_dict(self) -> Dict[str, Union[str, bool, List[Any]]]: + def to_dict(self) -> Dict[str, Union[str, bool, List[Any], None]]: """Puts the IOC's details into a dictionary. Returns: @@ -122,11 +131,11 @@ def to_dict(self) -> Dict[str, Union[str, bool, List[Any]]]: "pvsets": self._dict_to_list(self.pvsets), "macros": self._dict_to_list(self.macros), "component": self.component, - "remotePvPrefix": self.remotePvPrefix, + "remotePvPrefix": self.remote_pv_prefix, } - def get(self, name): + def get(self, name: str) -> bool | str | Dict | None: return self.__getattribute__(name) - def __getitem__(self, name): + def __getitem__(self, name: str) -> bool | str | Dict | None: return self.__getattribute__(name) diff --git a/BlockServer/config/json_converter.py b/BlockServer/config/json_converter.py index f7fff874..76378bc5 100644 --- a/BlockServer/config/json_converter.py +++ b/BlockServer/config/json_converter.py @@ -15,11 +15,13 @@ # http://opensource.org/licenses/eclipse-1.0.php import json -from collections import OrderedDict -from typing import Any, Dict, List +from typing import TYPE_CHECKING, Any, Dict, List from BlockServer.core.constants import GRP_NONE +if TYPE_CHECKING: + from BlockServer.config.group import Group + class ConfigurationJsonConverter: """Helper class for converting configuration data to and from JSON. @@ -29,7 +31,7 @@ class ConfigurationJsonConverter: """ @staticmethod - def _groups_to_list(groups: OrderedDict) -> List[Dict[str, Any]]: + def _groups_to_list(groups: Dict[str, "Group"]) -> List[Dict[str, Any]]: grps = [] if groups is not None: for group in groups.values(): @@ -46,7 +48,7 @@ def _groups_to_list(groups: OrderedDict) -> List[Dict[str, Any]]: return grps @staticmethod - def groups_to_json(groups: OrderedDict) -> str: + def groups_to_json(groups: Dict[str, "Group"]) -> str: """Converts the groups dictionary to a JSON list Args: diff --git a/BlockServer/config/xml_converter.py b/BlockServer/config/xml_converter.py index feb329a9..70f861d7 100644 --- a/BlockServer/config/xml_converter.py +++ b/BlockServer/config/xml_converter.py @@ -13,16 +13,55 @@ # along with this program; if not, you can obtain a copy from # https://www.eclipse.org/org/documents/epl-v10.php or # http://opensource.org/licenses/eclipse-1.0.php -from typing import Dict, OrderedDict +from typing import Dict, List, OrderedDict from xml.dom import minidom +from xml.etree import ElementTree + +from server_common.helpers import PVPREFIX_MACRO +from server_common.utilities import parse_boolean, value_list_to_xml from BlockServer.config.block import Block from BlockServer.config.group import Group from BlockServer.config.ioc import IOC from BlockServer.config.metadata import MetaData -from BlockServer.core.constants import * -from server_common.helpers import PVPREFIX_MACRO -from server_common.utilities import * +from BlockServer.core.constants import ( + GRP_NONE, + SIMLEVELS, + TAG_AUTOSTART, + TAG_BLOCK, + TAG_BLOCKS, + TAG_COMPONENT, + TAG_COMPONENTS, + TAG_EDIT, + TAG_EDITS, + TAG_GROUP, + TAG_GROUPS, + TAG_IOC, + TAG_IOCS, + TAG_LOCAL, + TAG_LOG_DEADBAND, + TAG_LOG_PERIODIC, + TAG_LOG_RATE, + TAG_MACRO, + TAG_MACROS, + TAG_NAME, + TAG_PV, + TAG_PVS, + TAG_PVSET, + TAG_PVSETS, + TAG_READ_PV, + TAG_REMOTE_PREFIX, + TAG_RESTART, + TAG_RUNCONTROL_ENABLED, + TAG_RUNCONTROL_HIGH, + TAG_RUNCONTROL_LOW, + TAG_RUNCONTROL_SUSPEND_ON_INVALID, + TAG_SET_BLOCK, + TAG_SET_BLOCK_VAL, + TAG_SIMLEVEL, + TAG_VALUE, + TAG_VISIBLE, +) KEY_NONE = GRP_NONE.lower() TAG_ENABLED = "enabled" @@ -63,7 +102,7 @@ class ConfigurationXmlConverter: """ @staticmethod - def blocks_to_xml(blocks: OrderedDict, macros: Dict): + def blocks_to_xml(blocks: OrderedDict, macros: Dict) -> str: """Generates an XML representation for a supplied dictionary of blocks. Args: @@ -110,7 +149,7 @@ def groups_to_xml(groups: OrderedDict, include_none: bool = False) -> str: return minidom.parseString(ElementTree.tostring(root)).toprettyxml() @staticmethod - def iocs_to_xml(iocs: OrderedDict): + def iocs_to_xml(iocs: OrderedDict) -> str: """Generates an XML representation for a supplied list of iocs. Args: @@ -130,7 +169,7 @@ def iocs_to_xml(iocs: OrderedDict): return minidom.parseString(ElementTree.tostring(root)).toprettyxml() @staticmethod - def components_to_xml(comps: OrderedDict): + def components_to_xml(comps: OrderedDict) -> str: """Generates an XML representation for a supplied dictionary of components. Args: @@ -148,7 +187,7 @@ def components_to_xml(comps: OrderedDict): return minidom.parseString(ElementTree.tostring(root)).toprettyxml() @staticmethod - def meta_to_xml(data: MetaData): + def meta_to_xml(data: MetaData) -> str: """Generates an XML representation of the meta data for each configuration. Args: @@ -184,7 +223,7 @@ def meta_to_xml(data: MetaData): return minidom.parseString(ElementTree.tostring(root)).toprettyxml() @staticmethod - def _block_to_xml(root_xml: ElementTree.Element, block: Block, macros: Dict): + def _block_to_xml(root_xml: ElementTree.Element, block: Block, macros: Dict) -> None: """Generates the XML for a block""" name = block.name read_pv = block.pv @@ -238,7 +277,7 @@ def _block_to_xml(root_xml: ElementTree.Element, block: Block, macros: Dict): set_block_val.text = str(block.set_block_val) @staticmethod - def _group_to_xml(root_xml: ElementTree, group: Group): + def _group_to_xml(root_xml: ElementTree.Element, group: Group) -> None: """Generates the XML for a group""" grp = ElementTree.SubElement(root_xml, TAG_GROUP) grp.set(TAG_NAME, group.name) @@ -249,7 +288,7 @@ def _group_to_xml(root_xml: ElementTree, group: Group): b.set(TAG_NAME, blk) @staticmethod - def _ioc_to_xml(root_xml: ElementTree.Element, ioc: IOC): + def _ioc_to_xml(root_xml: ElementTree.Element, ioc: IOC) -> None: """Generates the XML for an ioc""" grp = ElementTree.SubElement(root_xml, TAG_IOC) grp.set(TAG_NAME, ioc.name) @@ -257,8 +296,8 @@ def _ioc_to_xml(root_xml: ElementTree.Element, ioc: IOC): grp.set(TAG_AUTOSTART, str(ioc.autostart).lower()) if ioc.restart is not None: grp.set(TAG_RESTART, str(ioc.restart).lower()) - if ioc.remotePvPrefix is not None: - grp.set(TAG_REMOTE_PREFIX, str(ioc.remotePvPrefix)) + if ioc.remote_pv_prefix is not None: + grp.set(TAG_REMOTE_PREFIX, str(ioc.remote_pv_prefix)) grp.set(TAG_SIMLEVEL, str(ioc.simlevel)) @@ -272,13 +311,15 @@ def _ioc_to_xml(root_xml: ElementTree.Element, ioc: IOC): value_list_to_xml(ioc.pvsets, grp, TAG_PVSETS, TAG_PVSET) @staticmethod - def _component_to_xml(root_xml: ElementTree.Element, name: str): + def _component_to_xml(root_xml: ElementTree.Element, name: str) -> None: """Generates the XML for a component""" grp = ElementTree.SubElement(root_xml, TAG_COMPONENT) grp.set(TAG_NAME, name) @staticmethod - def blocks_from_xml(root_xml: ElementTree.Element, blocks: OrderedDict, groups: OrderedDict): + def blocks_from_xml( + root_xml: ElementTree.Element, blocks: OrderedDict, groups: OrderedDict + ) -> None: """Populates the supplied dictionary of blocks and groups based on an XML tree. Args: @@ -293,7 +334,13 @@ def blocks_from_xml(root_xml: ElementTree.Element, blocks: OrderedDict, groups: for b in blks: n = ConfigurationXmlConverter._find_single_node(b, NS_TAG_BLOCK, TAG_NAME) read = ConfigurationXmlConverter._find_single_node(b, NS_TAG_BLOCK, TAG_READ_PV) - if n is not None and n.text != "" and read is not None and read.text is not None: + if ( + n is not None + and n.text is not None + and n.text != "" + and read is not None + and read.text is not None + ): name = n.text # Blocks automatically get assigned to the NONE group @@ -314,18 +361,18 @@ def blocks_from_xml(root_xml: ElementTree.Element, blocks: OrderedDict, groups: rc_enabled = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_RUNCONTROL_ENABLED ) - if rc_enabled is not None: + if rc_enabled is not None and rc_enabled.text is not None: blocks[name.lower()].rc_enabled = rc_enabled.text == "True" rc_low = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_RUNCONTROL_LOW ) - if rc_low is not None: + if rc_low is not None and rc_low.text is not None: blocks[name.lower()].rc_lowlimit = float(rc_low.text) rc_high = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_RUNCONTROL_HIGH ) - if rc_high is not None: + if rc_high is not None and rc_high.text is not None: blocks[name.lower()].rc_highlimit = float(rc_high.text) rc_suspend_on_invalid = ConfigurationXmlConverter._find_single_node( @@ -340,25 +387,25 @@ def blocks_from_xml(root_xml: ElementTree.Element, blocks: OrderedDict, groups: log_periodic = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_LOG_PERIODIC ) - if log_periodic is not None: + if log_periodic is not None and log_periodic.text is not None: blocks[name.lower()].log_periodic = log_periodic.text == "True" log_rate = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_LOG_RATE ) - if log_rate is not None: + if log_rate is not None and log_rate.text is not None: blocks[name.lower()].log_rate = float(log_rate.text) log_deadband = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_LOG_DEADBAND ) - if log_deadband is not None: + if log_deadband is not None and log_deadband.text is not None: blocks[name.lower()].log_deadband = float(log_deadband.text) set_block = ConfigurationXmlConverter._find_single_node( b, NS_TAG_BLOCK, TAG_SET_BLOCK ) - if set_block is not None: + if set_block is not None and set_block.text is not None: blocks[name.lower()].set_block = eval(set_block.text) set_block_val = ConfigurationXmlConverter._find_single_node( @@ -368,7 +415,9 @@ def blocks_from_xml(root_xml: ElementTree.Element, blocks: OrderedDict, groups: blocks[name.lower()].set_block_val = set_block_val.text @staticmethod - def groups_from_xml(root_xml: ElementTree.Element, groups: OrderedDict, blocks: OrderedDict): + def groups_from_xml( + root_xml: ElementTree.Element, groups: OrderedDict, blocks: OrderedDict + ) -> None: """Populates the supplied dictionary of groups and assign blocks based on an XML tree Args: @@ -395,7 +444,8 @@ def groups_from_xml(root_xml: ElementTree.Element, groups: OrderedDict, blocks: for b in blks: name = b.attrib[TAG_NAME] - # Check block is not already in the group. Unlikely, but may be a config was edited by hand... + # Check block is not already in the group. + # Unlikely, but may be a config was edited by hand... if name not in groups[gname_low].blocks: groups[gname_low].blocks.append(name) if name.lower() in blocks.keys(): @@ -406,7 +456,7 @@ def groups_from_xml(root_xml: ElementTree.Element, groups: OrderedDict, blocks: groups[KEY_NONE].blocks.remove(name) @staticmethod - def ioc_from_xml(root_xml: ElementTree.Element, iocs: OrderedDict): + def ioc_from_xml(root_xml: ElementTree.Element, iocs: OrderedDict) -> None: """Populates the supplied dictionary of IOCs based on an XML tree. Args: @@ -429,11 +479,11 @@ def ioc_from_xml(root_xml: ElementTree.Element, iocs: OrderedDict): if level in SIMLEVELS: iocs[n.upper()].simlevel = level if TAG_REMOTE_PREFIX in options: - iocs[n.upper()].remotePvPrefix = i.attrib[TAG_REMOTE_PREFIX] + iocs[n.upper()].remote_pv_prefix = i.attrib[TAG_REMOTE_PREFIX] try: # Get any macros - macros_xml = ConfigurationXmlConverter._find_single_node( + macros_xml = ConfigurationXmlConverter._find_single_node_with_none_check( i, NS_TAG_IOC, TAG_MACROS ) for m in macros_xml: @@ -441,13 +491,15 @@ def ioc_from_xml(root_xml: ElementTree.Element, iocs: OrderedDict): TAG_VALUE: str(m.attrib[TAG_VALUE]) } # Get any pvs - pvs_xml = ConfigurationXmlConverter._find_single_node(i, NS_TAG_IOC, TAG_PVS) + pvs_xml = ConfigurationXmlConverter._find_single_node_with_none_check( + i, NS_TAG_IOC, TAG_PVS + ) for p in pvs_xml: iocs[n.upper()].pvs[p.attrib[TAG_NAME]] = { TAG_VALUE: str(p.attrib[TAG_VALUE]) } # Get any pvsets - pvsets_xml = ConfigurationXmlConverter._find_single_node( + pvsets_xml = ConfigurationXmlConverter._find_single_node_with_none_check( i, NS_TAG_IOC, TAG_PVSETS ) for ps in pvsets_xml: @@ -458,7 +510,7 @@ def ioc_from_xml(root_xml: ElementTree.Element, iocs: OrderedDict): raise Exception("Tag not found in ioc.xml (" + str(err) + ")") @staticmethod - def components_from_xml(root_xml: ElementTree.Element, components: OrderedDict): + def components_from_xml(root_xml: ElementTree.Element, components: OrderedDict) -> None: """Populates the supplied dictionary of components based on an XML tree. Args: @@ -474,18 +526,20 @@ def components_from_xml(root_xml: ElementTree.Element, components: OrderedDict): components[n.lower()] = n @staticmethod - def get_configuresBlockGWAndArchiver_from_xml(root_xml: ElementTree.Element) -> bool: - configuresBlockGWAndArchiver = root_xml.find("./" + TAG_CONFIGURES_BLOCK_GW_AND_ARCHIVER) + def get_configures_block_g_w_and_archiver(root_xml: ElementTree.Element) -> bool: + configures_block_g_w_and_archiver = root_xml.find( + "./" + TAG_CONFIGURES_BLOCK_GW_AND_ARCHIVER + ) if ( - configuresBlockGWAndArchiver is not None - and configuresBlockGWAndArchiver.text is not None + configures_block_g_w_and_archiver is not None + and configures_block_g_w_and_archiver.text is not None ): - return configuresBlockGWAndArchiver.text.lower() == "true" + return configures_block_g_w_and_archiver.text.lower() == "true" else: return False @staticmethod - def meta_from_xml(root_xml: ElementTree.Element, data: MetaData): + def meta_from_xml(root_xml: ElementTree.Element, data: MetaData) -> None: """Populates the supplied MetaData object based on an XML tree. Args: @@ -500,21 +554,21 @@ def meta_from_xml(root_xml: ElementTree.Element, data: MetaData): if synoptic is not None: data.synoptic = synoptic.text if synoptic.text is not None else "" - isProtected = root_xml.find("./" + TAG_PROTECTED) - if isProtected is not None: - if isProtected.text is not None: - data.isProtected = isProtected.text.lower() == "true" + is_protected = root_xml.find("./" + TAG_PROTECTED) + if is_protected is not None: + if is_protected.text is not None: + data.isProtected = is_protected.text.lower() == "true" else: data.isProtected = False data.configuresBlockGWAndArchiver = ( - ConfigurationXmlConverter.get_configuresBlockGWAndArchiver_from_xml(root_xml) + ConfigurationXmlConverter.get_configures_block_g_w_and_archiver(root_xml) ) - isDynamic = root_xml.find("./" + TAG_DYNAMIC) - if isDynamic is not None: - if isDynamic.text is not None: - data.isDynamic = isDynamic.text.lower() == "true" + is_dynamic = root_xml.find("./" + TAG_DYNAMIC) + if is_dynamic is not None: + if is_dynamic.text is not None: + data.isDynamic = is_dynamic.text.lower() == "true" else: data.isDynamic = False @@ -522,7 +576,9 @@ def meta_from_xml(root_xml: ElementTree.Element, data: MetaData): data.history = [e.text for e in edits] @staticmethod - def _find_all_nodes(root: ElementTree.Element, tag: str, name: str): + def _find_all_nodes( + root: ElementTree.Element, tag: str, name: str + ) -> List[ElementTree.Element]: """Finds all the nodes regardless of whether it has a namespace or not. For example the name space for IOCs is xmlns:ioc="http://epics.isis.rl.ac.uk/schema/iocs/1.0" @@ -542,7 +598,9 @@ def _find_all_nodes(root: ElementTree.Element, tag: str, name: str): return nodes @staticmethod - def _find_single_node(root: ElementTree.Element, tag: str, name: str) -> ElementTree.Element: + def _find_single_node( + root: ElementTree.Element, tag: str, name: str + ) -> ElementTree.Element | None: """Finds a single node regardless of whether it has a namespace or not. For example the name space for IOCs is xmlns:ioc="http://epics.isis.rl.ac.uk/schema/iocs/1.0" @@ -559,41 +617,84 @@ def _find_single_node(root: ElementTree.Element, tag: str, name: str) -> Element if node is None: # Try without namespace node = root.find(name) + return node @staticmethod - def _display(child, index): + def _find_single_node_with_none_check( + root: ElementTree.Element, tag: str, name: str + ) -> ElementTree.Element: + """Finds a single node regardless of whether it has a namespace or not. + + For example the name space for IOCs is xmlns:ioc="http://epics.isis.rl.ac.uk/schema/iocs/1.0" + + Args: + root: The XML tree object + tag: The namespace tag + name: The item we are looking for + + Returns: The found node + """ + node = ConfigurationXmlConverter._find_single_node(root, tag, name) + if node is None: + raise NodeNotPresentError(name, root) + return node + + @staticmethod + def _display(child: ElementTree.Element, index: int) -> Dict[str, str | int | None]: return { "index": index, - "name": ConfigurationXmlConverter._find_single_node(child, "banner", "name").text, - "pv": ConfigurationXmlConverter._find_single_node(child, "banner", "pv").text, - "local": ConfigurationXmlConverter._find_single_node(child, "banner", "local").text, - "width": ConfigurationXmlConverter._find_single_node(child, "banner", "width").text, + "name": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "name" + ).text, + "pv": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "pv" + ).text, + "local": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "local" + ).text, + "width": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "width" + ).text, } @staticmethod - def _button(child, index): + def _button(child: ElementTree.Element, index: int) -> Dict[str, str | int | None]: return { "index": index, - "name": ConfigurationXmlConverter._find_single_node(child, "banner", "name").text, - "pv": ConfigurationXmlConverter._find_single_node(child, "banner", "pv").text, - "local": ConfigurationXmlConverter._find_single_node(child, "banner", "local").text, - "pvValue": ConfigurationXmlConverter._find_single_node(child, "banner", "pvValue").text, - "textColour": ConfigurationXmlConverter._find_single_node( + "name": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "name" + ).text, + "pv": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "pv" + ).text, + "local": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "local" + ).text, + "pvValue": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "pvValue" + ).text, + "textColour": ConfigurationXmlConverter._find_single_node_with_none_check( child, "banner", "textColour" ).text, - "buttonColour": ConfigurationXmlConverter._find_single_node( + "buttonColour": ConfigurationXmlConverter._find_single_node_with_none_check( child, "banner", "buttonColour" ).text, - "fontSize": ConfigurationXmlConverter._find_single_node( + "fontSize": ConfigurationXmlConverter._find_single_node_with_none_check( child, "banner", "fontSize" ).text, - "width": ConfigurationXmlConverter._find_single_node(child, "banner", "width").text, - "height": ConfigurationXmlConverter._find_single_node(child, "banner", "height").text, + "width": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "width" + ).text, + "height": ConfigurationXmlConverter._find_single_node_with_none_check( + child, "banner", "height" + ).text, } @staticmethod - def banner_config_from_xml(root): + def banner_config_from_xml( + root: ElementTree.Element, + ) -> Dict[str, List[Dict[str, str | int | None]]]: """ Parses the banner config XML to produce a banner config dictionary @@ -603,26 +704,37 @@ def banner_config_from_xml(root): Returns: A dictionary with two entries, the banner items and the banner buttons. The items have the properties name, pv, local. - The buttons have the properties name, pv, local, pvValue, textColour, buttonColour, width, height. + The buttons have the properties name, pv, local, pvValue, + textColour, buttonColour, width, height. """ if root is None: - return [] + return {} banner_displays = [] banner_buttons = [] - items = ConfigurationXmlConverter._find_single_node(root, "banner", "items") + items = ConfigurationXmlConverter._find_single_node_with_none_check(root, "banner", "items") index = 0 for item in items: child = item.find("./") - if "display" in child.tag: - banner_displays.append(ConfigurationXmlConverter._display(child, index)) - else: - banner_buttons.append(ConfigurationXmlConverter._button(child, index)) + if child is not None: + if "display" in child.tag: + banner_displays.append(ConfigurationXmlConverter._display(child, index)) + else: + banner_buttons.append(ConfigurationXmlConverter._button(child, index)) index += 1 return { "items": banner_displays, "buttons": banner_buttons, } + + +class NodeNotPresentError(LookupError): + def __init__(self, name: str, root: ElementTree.Element) -> None: + self.name = name + self.root = root + + def __str__(self) -> str: + return f"Node {self.name} not found in {self.root}." diff --git a/BlockServer/core/config_holder.py b/BlockServer/core/config_holder.py index 9ec2651b..e779de01 100644 --- a/BlockServer/core/config_holder.py +++ b/BlockServer/core/config_holder.py @@ -43,7 +43,7 @@ def __init__( macros: Dict, file_manager: ConfigurationFileManager, is_component: bool = False, - test_config: Configuration = None, + test_config: Configuration | None = None, ) -> None: """Constructor. @@ -127,7 +127,7 @@ def get_blocknames(self) -> List[str]: names.append(block.name) return names - def get_block_details(self): + def get_block_details(self) -> Dict: """Get the configuration details for all the blocks including any in components. Returns: @@ -183,7 +183,7 @@ def get_group_details(self) -> Dict[str, Group]: return groups - def _set_group_details(self, redefinition) -> None: + def _set_group_details(self, redefinition: List[Dict]) -> None: # Any redefinition only affects the main configuration homeless_blocks = self.get_blocknames() for grp in redefinition: @@ -245,7 +245,7 @@ def get_ioc_names(self, include_base: bool = False) -> List[str]: iocs.extend(cv.iocs) return iocs - def get_ioc_details(self): + def get_ioc_details(self) -> Dict: """Get the details of the IOCs in the configuration. Returns: @@ -253,7 +253,7 @@ def get_ioc_details(self): """ return copy.deepcopy(self._config.iocs) - def get_component_ioc_details(self): + def get_component_ioc_details(self) -> Dict: """Get the details of the IOCs in any components. Returns: @@ -266,7 +266,7 @@ def get_component_ioc_details(self): iocs[ioc_name] = ioc return iocs - def get_all_ioc_details(self): + def get_all_ioc_details(self) -> Dict: """Get the details of the IOCs in the configuration and any components. Returns: @@ -364,7 +364,7 @@ def is_dynamic(self) -> bool: """ return self._config.meta.isDynamic - def configures_block_gateway_and_archiver(self): + def configures_block_gateway_and_archiver(self) -> bool: """ Returns: (bool): Whether this config has a gwblock.pvlist and block_config.xml to configure the @@ -372,14 +372,14 @@ def configures_block_gateway_and_archiver(self): """ return self._config.meta.configuresBlockGWAndArchiver - def _comps_to_list(self): + def _comps_to_list(self) -> List: comps = [] for component_name, component_value in self._components.items(): if component_name.lower() != DEFAULT_COMPONENT.lower(): comps.append({"name": component_value.get_name()}) return comps - def _blocks_to_list(self, expand_macro: bool = False): + def _blocks_to_list(self, expand_macro: bool = False) -> List: blocks = self.get_block_details() blks = [] if blocks is not None: @@ -391,7 +391,7 @@ def _blocks_to_list(self, expand_macro: bool = False): blks.append(b) return blks - def _groups_to_list(self): + def _groups_to_list(self) -> List: groups = self.get_group_details() grps = [] if groups is not None: @@ -404,10 +404,10 @@ def _groups_to_list(self): grps.append(groups[GRP_NONE.lower()].to_dict()) return grps - def _iocs_to_list(self): + def _iocs_to_list(self) -> List[Dict]: return [ioc.to_dict() for ioc in self._config.iocs.values()] - def _iocs_to_list_with_components(self): + def _iocs_to_list_with_components(self) -> List[Dict]: ioc_list = self._iocs_to_list() for component in self._components.values(): @@ -415,7 +415,7 @@ def _iocs_to_list_with_components(self): ioc_list.append(ioc.to_dict()) return ioc_list - def _to_dict(self, data_list): + def _to_dict(self, data_list: List) -> None | Dict: return None if data_list is None else {item["name"]: item for item in data_list} def set_config(self, config: Configuration, is_component: bool = False) -> None: diff --git a/BlockServer/core/inactive_config_holder.py b/BlockServer/core/inactive_config_holder.py index 50498327..a86046a8 100644 --- a/BlockServer/core/inactive_config_holder.py +++ b/BlockServer/core/inactive_config_holder.py @@ -13,7 +13,11 @@ # along with this program; if not, you can obtain a copy from # https://www.eclipse.org/org/documents/epl-v10.php or # http://opensource.org/licenses/eclipse-1.0.php +from typing import Dict + +from BlockServer.config.configuration import Configuration from BlockServer.core.config_holder import ConfigHolder +from BlockServer.fileIO.file_manager import ConfigurationFileManager class InactiveConfigHolder(ConfigHolder): @@ -21,7 +25,12 @@ class InactiveConfigHolder(ConfigHolder): Class to hold a individual inactive configuration or component. """ - def __init__(self, macros, file_manager, test_config=None): + def __init__( + self, + macros: Dict, + file_manager: ConfigurationFileManager, + test_config: Configuration | None = None, + ) -> None: """ Constructor. @@ -31,7 +40,7 @@ def __init__(self, macros, file_manager, test_config=None): """ super(InactiveConfigHolder, self).__init__(macros, file_manager, test_config=test_config) - def save_inactive(self, name=None, as_comp=False): + def save_inactive(self, name: str | None = None, as_comp: bool = False) -> None: """Saves a configuration or component that is not currently in use. Args: @@ -43,7 +52,7 @@ def save_inactive(self, name=None, as_comp=False): self.save_configuration(name, as_comp) - def load_inactive(self, name, is_component=False): + def load_inactive(self, name: str, is_component: bool = False) -> None: """ Loads a configuration or component into memory for editing only. @@ -54,7 +63,7 @@ def load_inactive(self, name, is_component=False): config = self.load_configuration(name, is_component, False) self.set_config(config, is_component) - def set_config_details(self, details): + def set_config_details(self, details: Dict) -> None: """Set the details of the configuration from a dictionary. Args: @@ -75,7 +84,7 @@ def set_config_details(self, details): if ioc.get("component") is not None: raise ValueError("Cannot override iocs from components") - remotePvPrefix = ioc.get("remotePvPrefix") + remote_pv_prefix = ioc.get("remote_pv_prefix") self._add_ioc( ioc["name"], @@ -85,7 +94,7 @@ def set_config_details(self, details): pvs=pvs, pvsets=pvsets, simlevel=ioc.get("simlevel"), - remotePvPrefix=remotePvPrefix, + remotePvPrefix=remote_pv_prefix, ) if "blocks" in details: diff --git a/BlockServer/devices/devices_manager.py b/BlockServer/devices/devices_manager.py index 01aad928..628650d3 100644 --- a/BlockServer/devices/devices_manager.py +++ b/BlockServer/devices/devices_manager.py @@ -18,7 +18,8 @@ # http://opensource.org/licenses/eclipse-1.0.php import os -from typing import TYPE_CHECKING +from pathlib import Path +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from block_server import BlockServer @@ -47,7 +48,7 @@ class DevicesManager(OnTheFlyPvInterface): def __init__( self, block_server: "BlockServer", - schema_folder: str, + schema_folder: Path, file_io: DevicesFileIO = DevicesFileIO(), ) -> None: """Constructor. @@ -92,7 +93,7 @@ def handle_pv_write(self, pv: str, data: str) -> None: "MINOR", ) - def handle_pv_read(self, _: str) -> None: + def handle_pv_read(self, pv: str) -> None: """ Nothing to do as it is all handled by monitors """ @@ -151,7 +152,7 @@ def get_devices_filename(self) -> str: return os.path.join(FILEPATH_MANAGER.devices_dir, SCREENS_FILE) - def update(self, xml_data: bytes, message: str = None) -> None: + def update(self, xml_data: bytes, message: Optional[str] = None) -> None: """Updates the current device screens with new data. Args: @@ -171,7 +172,7 @@ def save_devices_xml(self, xml_data: str) -> None: xml_data_as_bytes = bytes(xml_data, "utf-8") try: ConfigurationSchemaChecker.check_xml_data_matches_schema( - self._schema_folder / SCREENS_SCHEMA, xml_data_as_bytes + str(self._schema_folder / SCREENS_SCHEMA), xml_data_as_bytes ) except ConfigurationInvalidUnderSchema as err: print_and_log(err) diff --git a/BlockServer/synoptic/synoptic_manager.py b/BlockServer/synoptic/synoptic_manager.py index c69b77b8..db80d4c1 100644 --- a/BlockServer/synoptic/synoptic_manager.py +++ b/BlockServer/synoptic/synoptic_manager.py @@ -15,19 +15,14 @@ # http://opensource.org/licenses/eclipse-1.0.php import os +from pathlib import Path from typing import TYPE_CHECKING, List if TYPE_CHECKING: from block_server import BlockServer from lxml import etree - -from BlockServer.core.active_config_holder import ActiveConfigHolder -from BlockServer.core.config_list_manager import InvalidDeleteException -from server_common.file_path_manager import FILEPATH_MANAGER -from BlockServer.core.on_the_fly_pv_interface import OnTheFlyPvInterface -from BlockServer.fileIO.schema_checker import ConfigurationSchemaChecker -from BlockServer.synoptic.synoptic_file_io import SynopticFileIO from server_common.common_exceptions import MaxAttemptsExceededException +from server_common.file_path_manager import FILEPATH_MANAGER from server_common.utilities import ( compress_and_hex, convert_from_json, @@ -36,6 +31,12 @@ print_and_log, ) +from BlockServer.core.active_config_holder import ActiveConfigHolder +from BlockServer.core.config_list_manager import InvalidDeleteException +from BlockServer.core.on_the_fly_pv_interface import OnTheFlyPvInterface +from BlockServer.fileIO.schema_checker import ConfigurationSchemaChecker +from BlockServer.synoptic.synoptic_file_io import SynopticFileIO + # Synoptics PVs are of the form IN:DEMO:SYNOPTICS:XXXXX (no BLOCKSERVER in the name) # This is to allow longer synoptic names without exceeded the maximum allowed length for PVs SYNOPTIC_PRE = "SYNOPTICS:" @@ -56,10 +57,10 @@ class SynopticManager(OnTheFlyPvInterface): def __init__( self, block_server: "BlockServer", - schema_folder: str, + schema_folder: Path, active_configholder: ActiveConfigHolder, file_io: SynopticFileIO = SynopticFileIO(), - ): + ) -> None: """Constructor. Args: @@ -82,7 +83,7 @@ def __init__( self._create_standard_pvs() self._load_initial() - def handle_pv_write(self, pv: str, data: str): + def handle_pv_write(self, pv: str, data: str) -> None: try: if pv == SYNOPTIC_PRE + SYNOPTIC_DELETE: self.delete(convert_from_json(data)) @@ -95,11 +96,11 @@ def handle_pv_write(self, pv: str, data: str): except Exception as err: print_and_log(f"Error writing to PV {pv}: {err}", "MAJOR") - def handle_pv_read(self, pv): + def handle_pv_read(self, pv: str) -> None: # Nothing to do as it is all handled by monitors pass - def update_monitors(self): + def update_monitors(self) -> None: with self._bs.monitor_lock: print_and_log("Updating synoptic monitors") self._bs.setParam( @@ -111,13 +112,13 @@ def update_monitors(self): self._bs.updatePVs() print_and_log("Finished updating synoptic monitors") - def on_config_change(self, full_init=False): + def on_config_change(self, full_init: bool = False) -> None: # If the config has a default synoptic then set the PV to that default = self._activech.get_config_meta().synoptic self.set_default_synoptic(default) self.update_monitors() - def _create_standard_pvs(self): + def _create_standard_pvs(self) -> None: self._bs.add_string_pv_to_db(SYNOPTIC_PRE + SYNOPTIC_NAMES, 16000) self._bs.add_string_pv_to_db(SYNOPTIC_PRE + SYNOPTIC_GET_DEFAULT, 16000) self._bs.add_string_pv_to_db(SYNOPTIC_PRE + SYNOPTIC_BLANK + SYNOPTIC_GET, 16000) @@ -134,7 +135,7 @@ def _create_standard_pvs(self): SYNOPTIC_PRE + SYNOPTIC_SCHEMA, compress_and_hex(self.get_synoptic_schema()) ) - def _load_initial(self): + def _load_initial(self) -> None: """Create the PVs for all the synoptics found in the synoptics directory.""" for f in self._file_io.get_list_synoptic_files(self._directory): # Load the data, checking the schema @@ -154,11 +155,13 @@ def _load_initial(self): except Exception as err: print_and_log(f"Error creating synoptic PV: {err}", "MAJOR") - def _create_pv(self, data: bytes): - """Creates a single PV based on a name and data. Adds this PV to the dictionary returned on get_synoptic_list + def _create_pv(self, data: bytes) -> None: + """Creates a single PV based on a name and data. + Adds this PV to the dictionary returned on get_synoptic_list Args: - data (bytes): Starting data for the pv, the pv name is derived from the name tag of this + data (bytes): Starting data for the pv, + the pv name is derived from the name tag of this """ name = self._get_synoptic_name_from_xml(data) if name not in self._synoptic_pvs: @@ -177,7 +180,7 @@ def _create_pv(self, data: bytes): compress_and_hex(str(data, encoding="utf-8")), ) - def update_pv_value(self, name, data): + def update_pv_value(self, name: str, data: bytes) -> None: """Updates value of a PV holding synoptic information with new data Args: @@ -187,11 +190,12 @@ def update_pv_value(self, name, data): self._bs.setParam(name, data) self._bs.updatePVs() - def get_synoptic_list(self): + def get_synoptic_list(self) -> List[str]: """Gets the names and associated pvs of the synoptic files in the synoptics directory. Returns: - list : Alphabetical list of synoptics files on the server, along with their associated pvs + list : Alphabetical list of synoptics files on the server, + along with their associated pvs """ syn_list = [] default_is_none_synoptic = True @@ -208,7 +212,7 @@ def get_synoptic_list(self): ) return ans - def set_default_synoptic(self, name): + def set_default_synoptic(self, name: str) -> None: """Sets the default synoptic. Args: @@ -240,7 +244,7 @@ def get_default_synoptic_xml(self) -> bytes: """ return self._default_syn_xml - def _get_synoptic_name_from_xml(self, xml_data: bytes): + def _get_synoptic_name_from_xml(self, xml_data: bytes) -> str: name = None root = etree.fromstring(xml_data) for child in root: @@ -250,7 +254,7 @@ def _get_synoptic_name_from_xml(self, xml_data: bytes): raise Exception("Synoptic contains no name tag") return name - def save_synoptic_xml(self, xml_data: bytes): + def save_synoptic_xml(self, xml_data: bytes) -> None: """Saves the xml under the filename taken from the xml name tag. Args: @@ -278,20 +282,20 @@ def save_synoptic_xml(self, xml_data: bytes): ) print_and_log("Synoptic saved: " + name) - def delete(self, delete_list: List[str]): + def delete(self, delete_list: List[str]) -> None: """Takes a list of synoptics and removes them from the file system and any relevant PVs. Args: delete_list (list): The synoptics to delete """ print_and_log("Deleting: " + ", ".join(list(delete_list)), "INFO") - delete_list = set(delete_list) - if not delete_list.issubset(self._synoptic_pvs.keys()): + delete_set = set(delete_list) + if not delete_set.issubset(self._synoptic_pvs.keys()): raise InvalidDeleteException("Delete list contains unknown configurations") for synoptic in delete_list: self._delete_synoptic(synoptic) - def _delete_synoptic(self, synoptic: str): + def _delete_synoptic(self, synoptic: str) -> None: fullname = synoptic + ".xml" try: self._file_io.delete_synoptic(self._directory, fullname) @@ -305,7 +309,7 @@ def _delete_synoptic(self, synoptic: str): self._bs.delete_pv_from_db(SYNOPTIC_PRE + self._synoptic_pvs[synoptic] + SYNOPTIC_GET) del self._synoptic_pvs[synoptic] - def update(self, xml_data: str): + def update(self, xml_data: str) -> None: """Updates the synoptic list when modifications are made via the filesystem. Args: @@ -325,7 +329,7 @@ def update(self, xml_data: str): self.update_monitors() - def get_synoptic_schema(self): + def get_synoptic_schema(self) -> str: """Gets the XSD data for the synoptic. Returns: @@ -336,7 +340,7 @@ def get_synoptic_schema(self): schema = schemafile.read() return schema - def get_blank_synoptic(self): + def get_blank_synoptic(self) -> str: """Gets a blank synoptic. Returns: diff --git a/BlockServer/test_modules/test_active_config_holder.py b/BlockServer/test_modules/test_active_config_holder.py index d5010acd..7f8bbadf 100644 --- a/BlockServer/test_modules/test_active_config_holder.py +++ b/BlockServer/test_modules/test_active_config_holder.py @@ -20,6 +20,8 @@ import mock.mock from mock import Mock from parameterized import parameterized +from server_common.constants import IS_LINUX +from server_common.helpers import MACROS from BlockServer.config.block import Block from BlockServer.config.configuration import Configuration @@ -33,8 +35,6 @@ from BlockServer.mocks.mock_file_manager import MockConfigurationFileManager from BlockServer.mocks.mock_ioc_control import MockIocControl from BlockServer.test_modules.helpers import modify_active -from server_common.constants import IS_LINUX -from server_common.helpers import MACROS CONFIG_PATH = "./test_configs/" BASE_PATH = "./example_base/" @@ -698,9 +698,9 @@ def test_WHEN_compare_ioc_properties_called_with_the_same_ioc_then_returns_empty @parameterized.expand( [ - ({"a": IOC("a", macros=True)}, {"a": IOC("a", macros=False)}), - ({"a": IOC("a", pvs=True)}, {"a": IOC("a", pvs=False)}), - ({"a": IOC("a", pvsets=True)}, {"a": IOC("a", pvsets=False)}), + ({"a": IOC("a", macros={"A_MACRO": "VALUE1"})}, {"a": IOC("a", macros=None)}), + ({"a": IOC("a", pvs={"A_PV": "VALUE1"})}, {"a": IOC("a", pvs={})}), + ({"a": IOC("a", pvsets={"A_PV": "VALUE1"})}, {"a": IOC("a", pvsets={})}), ({"a": IOC("a", simlevel="recsim")}, {"a": IOC("a", simlevel="devsim")}), ({"a": IOC("a", restart=True)}, {"a": IOC("a", restart=False)}), ({"a": IOC("a", autostart=True)}, {"a": IOC("a", autostart=False)}), @@ -715,14 +715,14 @@ def test_WHEN_compare_ioc_properties_called_with_different_then_restarts_ioc( def test_WHEN_compare_ioc_properties_called_with_new_ioc_then_starts_new_ioc(self): old_iocs = {} - new_iocs = {"a": IOC("a", macros=True)} + new_iocs = {"a": IOC("a", macros={})} start, restart, stop = _compare_ioc_properties(old_iocs, new_iocs) self.assertEqual(len(start), 1) self.assertEqual(len(restart), 0) def test_WHEN_compare_ioc_properties_called_with_new_ioc_then_stops_old_ioc(self): - old_iocs = {"a": IOC("a", macros=True)} + old_iocs = {"a": IOC("a", macros={})} new_iocs = {} start, restart, stop = _compare_ioc_properties(old_iocs, new_iocs) diff --git a/block_server.py b/block_server.py index 7456c09a..a179dc77 100644 --- a/block_server.py +++ b/block_server.py @@ -20,7 +20,7 @@ import os import sys import traceback -from typing import Any, Dict +from typing import TYPE_CHECKING, Any, Dict from server_common.channel_access import ManagerModeRequiredError, verify_manager_mode from server_common.channel_access_server import CAServer @@ -76,6 +76,9 @@ ) from WebServer.simple_webserver import Server +if TYPE_CHECKING: + from BlockServer.config.ioc import IOC + CURR_CONFIG_NAME_SEVR_VALUE = 0 CONFIG_PUSH_TIME = 300 # 5 minutes INST_SCRIPT_PUSH_TIME = 604800 # 7 days @@ -211,7 +214,6 @@ def __init__(self, ca_server: CAServer) -> None: write_thread.start() self.write_queue.put((self.initialise_configserver, (FACILITY,), "INITIALISING")) - # Starts the Web Server self.server = Server() self.server.start() @@ -247,15 +249,16 @@ def initialise_configserver(self, facility: str) -> None: # Import all the synoptic data and create PVs print_and_log("Creating synoptic manager...") - self._syn = SynopticManager(self, SCHEMA_DIR, self._active_configserver) - self.on_the_fly_handlers.append(self._syn) - print_and_log("Finished creating synoptic manager") + if SCHEMA_DIR is not None: + self._syn = SynopticManager(self, SCHEMA_DIR, self._active_configserver) + self.on_the_fly_handlers.append(self._syn) + print_and_log("Finished creating synoptic manager") - # Import all the devices data and create PVs - print_and_log("Creating devices manager...") - self._devices = DevicesManager(self, SCHEMA_DIR) - self.on_the_fly_handlers.append(self._devices) - print_and_log("Finished creating devices manager") + # Import all the devices data and create PVs + print_and_log("Creating devices manager...") + self._devices = DevicesManager(self, SCHEMA_DIR) + self.on_the_fly_handlers.append(self._devices) + print_and_log("Finished creating devices manager") try: if self._gateway.exists(): @@ -282,54 +285,57 @@ def read(self, reason: str) -> str: If an Exception is thrown in the reading of the information this is returned in compressed and hexed JSON. """ - try: - if reason == BlockserverPVNames.GROUPS: - grps = ConfigurationJsonConverter.groups_to_json( - self._active_configserver.get_group_details() - ) - value = compress_and_hex(grps) - elif reason == BlockserverPVNames.CONFIGS: - value = compress_and_hex(convert_to_json(self._config_list.get_configs())) - elif reason == BlockserverPVNames.COMPS: - value = compress_and_hex(convert_to_json(self._config_list.get_components())) - elif reason == BlockserverPVNames.BLANK_CONFIG: - js = convert_to_json(self.get_blank_config()) - value = compress_and_hex(js) - elif reason == BlockserverPVNames.BANNER_DESCRIPTION: - value = compress_and_hex(self.spangle_banner) - elif reason == BlockserverPVNames.ALL_COMPONENT_DETAILS: - value = compress_and_hex( - convert_to_json(list(self._config_list.all_components.values())) - ) - elif reason == BlockserverPVNames.CURR_CONFIG_NAME: - value = self._active_configserver.get_config_name() - elif reason == BlockserverPVNames.CURR_CONFIG_NAME_SEVR: - value = CURR_CONFIG_NAME_SEVR_VALUE - elif reason == BlockserverPVNames.HEARTBEAT: - value = 0 - else: - # Check to see if it is a on-the-fly PV - for handler in self.on_the_fly_handlers: - if handler.read_pv_exists(reason): - return handler.handle_pv_read(reason) + if self._active_configserver is not None: + try: + if reason == BlockserverPVNames.GROUPS: + grps = ConfigurationJsonConverter.groups_to_json( + self._active_configserver.get_group_details() + ) + value = compress_and_hex(grps) + elif reason == BlockserverPVNames.CONFIGS: + value = compress_and_hex(convert_to_json(self._config_list.get_configs())) + elif reason == BlockserverPVNames.COMPS: + value = compress_and_hex(convert_to_json(self._config_list.get_components())) + elif reason == BlockserverPVNames.BLANK_CONFIG: + js = convert_to_json(self.get_blank_config()) + value = compress_and_hex(js) + elif reason == BlockserverPVNames.BANNER_DESCRIPTION: + value = compress_and_hex(self.spangle_banner) + elif reason == BlockserverPVNames.ALL_COMPONENT_DETAILS: + value = compress_and_hex( + convert_to_json(list(self._config_list.all_components.values())) + ) + elif reason == BlockserverPVNames.CURR_CONFIG_NAME: + value = self._active_configserver.get_config_name() + elif reason == BlockserverPVNames.CURR_CONFIG_NAME_SEVR: + value = CURR_CONFIG_NAME_SEVR_VALUE + elif reason == BlockserverPVNames.HEARTBEAT: + value = 0 + else: + # Check to see if it is a on-the-fly PV + for handler in self.on_the_fly_handlers: + if handler.read_pv_exists(reason): + return handler.handle_pv_read(reason) - value = self.getParam(reason) - except Exception as err: - value = compress_and_hex(convert_to_json("Error: " + str(err))) - print_and_log(str(err), "MAJOR") - return value + value = self.getParam(reason) + except Exception as err: + value = compress_and_hex(convert_to_json("Error: " + str(err))) + print_and_log(str(err), "MAJOR") + return str(value) + else: + return "" - def write(self, reason: str, value: str) -> str: + def write(self, reason: str, value: str) -> bool: # type: ignore """A method called by SimpleServer when a PV is written to the BlockServer over Channel Access. The write commands are queued as Channel Access is single-threaded. + Stores the result of the write or any exception caused in a parameter. Args: reason (string): The PV that is being requested (without the PV prefix) value (string): The data being written to the 'reason' PV Returns: - string : "OK" in compressed and hexed JSON if function succeeds. - Otherwise returns the Exception in compressed and hexed JSON. + bool : True if the new value is accepted, False if rejected """ status = True try: @@ -382,11 +388,11 @@ def write(self, reason: str, value: str) -> str: break except Exception as err: - value = compress_and_hex(convert_to_json("Error: " + str(err))) + value = str(compress_and_hex(convert_to_json("Error: " + str(err)))) print_and_log(str(err), "MAJOR") else: if status: - value = compress_and_hex(convert_to_json("OK")) + value = str(compress_and_hex(convert_to_json("OK"))) # store the values if status: @@ -398,13 +404,16 @@ def load_last_config(self) -> None: The information is saved in a text file. """ - last = self._active_configserver.load_last_config() - if last is None: - print_and_log("Could not retrieve last configuration - starting blank configuration") - self._active_configserver.clear_config() - else: - print_and_log("Loaded last configuration: %s" % last) - self._initialise_config() + if self._active_configserver is not None: + last = self._active_configserver.load_last_config() + if last is None: + print_and_log( + "Could not retrieve last configuration - starting blank configuration" + ) + self._active_configserver.clear_config() + else: + print_and_log("Loaded last configuration: %s" % last) + self._initialise_config() def _set_curr_config(self, details: str) -> None: """Sets the current configuration details to that defined in the JSON, saves to disk, @@ -413,23 +422,23 @@ def _set_curr_config(self, details: str) -> None: Args: details (string): the configuration JSON """ + if self._active_configserver is not None: + current_name = self._active_configserver.get_config_name() + details_name = convert_from_json(details)["name"] + + # This method saves the given details and then reloads the current config. + # Sending the details of a new config to this method, as was being done incorrectly + # (see #4606) + # will save the details as a new config, but not load it. + # A warning is sent in case this happens again. + if current_name != details_name: + print_and_log( + f"Config details to be set ({details_name}) did " + f"not match current config ({current_name})", + "MINOR", + ) - current_name = self._active_configserver.get_config_name() - details_name = convert_from_json(details)["name"] - - # This method saves the given details and then reloads the current config. - # Sending the details of a new config to this method, as was being done incorrectly - # (see #4606) - # will save the details as a new config, but not load it. - # A warning is sent in case this happens again. - if current_name != details_name: - print_and_log( - f"Config details to be set ({details_name}) did " - f"not match current config ({current_name})", - "MINOR", - ) - - self.save_config(details) + self.save_config(details) def _initialise_config(self, full_init: bool = False) -> None: """Responsible for initialising the configuration. @@ -439,65 +448,67 @@ def _initialise_config(self, full_init: bool = False) -> None: full_init (bool, optional): whether this requires a full initialisation, e.g. on loading a new configuration """ - new_iocs, changed_iocs, removed_iocs = self._active_configserver.iocs_changed() - - self._ioc_control.stop_iocs(removed_iocs) - self._start_config_iocs(new_iocs, changed_iocs) - - if ( - CAEN_DISCRIMINATOR_IOC_NAME in self._active_configserver.get_ioc_names() - and CAEN_DISCRIMINATOR_IOC_NAME not in new_iocs - ): - # See https://github.com/ISISComputingGroup/IBEX/issues/5590 for justification of why - # this ioc gets special treatment. - ioc = self._active_configserver.get_all_ioc_details()[CAEN_DISCRIMINATOR_IOC_NAME] - if ioc.autostart: - print_and_log( - f"{CAEN_DISCRIMINATOR_IOC_NAME} present in configuration and set " - f"to autostart - restarting it" + if self._active_configserver is not None: + new_iocs, changed_iocs, removed_iocs = self._active_configserver.iocs_changed() + self._ioc_control.stop_iocs(list(removed_iocs)) + self._start_config_iocs(list(new_iocs), list(changed_iocs)) + if ( + CAEN_DISCRIMINATOR_IOC_NAME in self._active_configserver.get_ioc_names() + and CAEN_DISCRIMINATOR_IOC_NAME not in new_iocs + ): + # See https://github.com/ISISComputingGroup/IBEX/issues/5590 for + # justification of why this ioc gets special treatment. + ioc = self._active_configserver.get_all_ioc_details()[CAEN_DISCRIMINATOR_IOC_NAME] + if ioc.autostart: + print_and_log( + f"{CAEN_DISCRIMINATOR_IOC_NAME} present in configuration and set " + f"to autostart - restarting it" + ) + if self._ioc_control.get_ioc_status(CAEN_DISCRIMINATOR_IOC_NAME) == "RUNNING": + self._ioc_control.restart_iocs( + [CAEN_DISCRIMINATOR_IOC_NAME], reapply_auto=True + ) + else: + self.start_iocs([CAEN_DISCRIMINATOR_IOC_NAME]) + + # Set up the gateway + if self._active_configserver.blocks_changed() or full_init: + self._gateway.set_new_aliases( + self._active_configserver.get_block_details(), + self._active_configserver.configures_block_gateway_and_archiver(), + os.path.join( + CONFIG_DIR, "configurations", self._active_configserver.get_config_name() + ), ) - if self._ioc_control.get_ioc_status(CAEN_DISCRIMINATOR_IOC_NAME) == "RUNNING": - self._ioc_control.restart_iocs([CAEN_DISCRIMINATOR_IOC_NAME], reapply_auto=True) - else: - self.start_iocs([CAEN_DISCRIMINATOR_IOC_NAME]) - - # Set up the gateway - if self._active_configserver.blocks_changed() or full_init: - self._gateway.set_new_aliases( - self._active_configserver.get_block_details(), - self._active_configserver.configures_block_gateway_and_archiver(), - os.path.join( - CONFIG_DIR, "configurations", self._active_configserver.get_config_name() - ), - ) - self._config_list.active_config_name = self._active_configserver.get_config_name() - self._config_list.active_components = self._active_configserver.get_component_names() - self._config_list.update_monitors() + self._config_list.active_config_name = self._active_configserver.get_config_name() + self._config_list.active_components = self._active_configserver.get_component_names() + self._config_list.update_monitors() - self.update_blocks_monitors() + self.update_blocks_monitors() - self.update_get_details_monitors() - self.update_wd_details_monitors() - self.update_curr_config_name_monitors() - self._active_configserver.update_archiver(full_init) - for handler in self.on_the_fly_handlers: - handler.on_config_change(full_init=full_init) + self.update_get_details_monitors() + self.update_wd_details_monitors() + self.update_curr_config_name_monitors() + self._active_configserver.update_archiver(full_init) + for handler in self.on_the_fly_handlers: + handler.on_config_change(full_init=full_init) - # Update Web Server text - self.server.set_config(convert_to_json(self._active_configserver.get_config_details())) - self.write_queue.put((self.set_config_block_values, (), "LOADING_BLOCK_SETS")) + # Update Web Server text + self.server.set_config(convert_to_json(self._active_configserver.get_config_details())) + self.write_queue.put((self.set_config_block_values, (), "LOADING_BLOCK_SETS")) def _start_config_iocs(self, iocs_to_start: list[str], iocs_to_restart: list[str]) -> None: # Start the IOCs, if they are available and if they are flagged for autostart # Note: autostart means the IOC is started when the config is loaded, # restart means the IOC should automatically restart if it stops for some reason # (e.g. it crashes) - def _ioc_from_name(ioc_name: str) -> str: + def _ioc_from_name(ioc_name: str) -> "IOC": + assert self._active_configserver is not None return self._active_configserver.get_all_ioc_details()[ioc_name] def _is_remote(ioc_name: str) -> bool: - return _ioc_from_name(ioc_name).remotePvPrefix not in (None, "") + return _ioc_from_name(ioc_name).remote_pv_prefix not in (None, "") def _should_start(ioc_name: str) -> bool: ioc = _ioc_from_name(ioc_name) @@ -532,26 +543,28 @@ def load_config(self, config: str, full_init: bool = True) -> None: full_init (bool): True to restart all IOCs/services or False to restart only those required """ - print_and_log(f"Loading configuration '{config}'") - try: - self._active_configserver.load_active(config) - # If we get this far then assume the config is okay - self._initialise_config(full_init=full_init) - except Exception as err: - print_and_log(f"Exception while loading configuration '{config}': {err}", "MAJOR") - traceback.print_exc() + if self._active_configserver is not None: + print_and_log(f"Loading configuration '{config}'") + try: + self._active_configserver.load_active(config) + # If we get this far then assume the config is okay + self._initialise_config(full_init=full_init) + except Exception as err: + print_and_log(f"Exception while loading configuration '{config}': {err}", "MAJOR") + traceback.print_exc() def reload_current_config(self) -> None: """Reload the current configuration.""" - try: - print_and_log("Reloading current configuration") - self._active_configserver.reload_current_config() - self._initialise_config(full_init=True) - except Exception as err: - print_and_log( - "Exception while reloading current configuration: {}".format(err), "MAJOR" - ) - traceback.print_exc() + if self._active_configserver is not None: + try: + print_and_log("Reloading current configuration") + self._active_configserver.reload_current_config() + self._initialise_config(full_init=True) + except Exception as err: + print_and_log( + "Exception while reloading current configuration: {}".format(err), "MAJOR" + ) + traceback.print_exc() def save_config(self, json_data: str, as_comp: bool = False) -> None: """Save a configuration. @@ -614,14 +627,16 @@ def save_config(self, json_data: str, as_comp: bool = False) -> None: f"Problem occurred saving configuration: {traceback.format_exc()}", "MAJOR" ) - # Reload configuration if a component has changed - if as_comp and new_details["name"] in self._active_configserver.get_component_names(): - self.load_last_config() + if self._active_configserver is not None: + # Reload configuration if a component has changed + if as_comp and new_details["name"] in self._active_configserver.get_component_names(): + self.load_last_config() + + # If the configuration we are trying to save is the currently active one, + # we need to reload it. - # If the configuration we are trying to save is the currently active one, - # we need to reload it. - if config_name == self._active_configserver.get_config_name(): - self.load_config(config_name, full_init=False) + if config_name == self._active_configserver.get_config_name(): + self.load_config(config_name, full_init=False) def _get_inactive_history(self, name: str, is_component: bool = False) -> list[str | None]: # If it already exists load it @@ -640,16 +655,17 @@ def _get_timestamp(self) -> str: def update_blocks_monitors(self) -> None: """Updates the PV monitors for the blocks and groups, so the clients can see any changes.""" - with self.monitor_lock: - block_names = convert_to_json(self._active_configserver.get_blocknames()) - self.setParam(BlockserverPVNames.BLOCKNAMES, compress_and_hex(block_names)) + if self._active_configserver is not None: + with self.monitor_lock: + block_names = convert_to_json(self._active_configserver.get_blocknames()) + self.setParam(BlockserverPVNames.BLOCKNAMES, compress_and_hex(block_names)) - groups = ConfigurationJsonConverter.groups_to_json( - self._active_configserver.get_group_details() - ) - self.setParam(BlockserverPVNames.GROUPS, compress_and_hex(groups)) + groups = ConfigurationJsonConverter.groups_to_json( + self._active_configserver.get_group_details() + ) + self.setParam(BlockserverPVNames.GROUPS, compress_and_hex(groups)) - self.updatePVs() + self.updatePVs() def update_server_status(self, status: str = "") -> None: """Updates the monitor for the server status, so the clients can see any changes. @@ -667,36 +683,44 @@ def update_server_status(self, status: str = "") -> None: def update_get_details_monitors(self) -> None: """Updates the monitor for the active configuration, so the clients can see any changes.""" - with self.monitor_lock: - config_details_json = convert_to_json(self._active_configserver.get_config_details()) - self.setParam( - BlockserverPVNames.GET_CURR_CONFIG_DETAILS, compress_and_hex(config_details_json) - ) - self.updatePVs() + if self._active_configserver is not None: + with self.monitor_lock: + config_details_json = convert_to_json( + self._active_configserver.get_config_details() + ) + self.setParam( + BlockserverPVNames.GET_CURR_CONFIG_DETAILS, + compress_and_hex(config_details_json), + ) + self.updatePVs() def update_wd_details_monitors(self) -> None: """Updates the monitor for the active configuration, so the web dashboard can see any changes.""" - with self.monitor_lock: - config_details_json = convert_to_json( - { - k: v - for k, v in self._active_configserver.get_config_details().items() - if k not in ["component_iocs", "iocs"] - } - ) - self.setParam(BlockserverPVNames.WD_CONF_DETAILS, compress_and_hex(config_details_json)) - self.updatePVs() + if self._active_configserver is not None: + with self.monitor_lock: + config_details_json = convert_to_json( + { + k: v + for k, v in self._active_configserver.get_config_details().items() + if k not in ["component_iocs", "iocs"] + } + ) + self.setParam( + BlockserverPVNames.WD_CONF_DETAILS, compress_and_hex(config_details_json) + ) + self.updatePVs() def update_curr_config_name_monitors(self) -> None: """Updates the monitor for the active configuration name, so the clients can see any changes.""" - with self.monitor_lock: - self.setParam( - BlockserverPVNames.CURR_CONFIG_NAME, self._active_configserver.get_config_name() - ) - self.setParam(BlockserverPVNames.CURR_CONFIG_NAME_SEVR, CURR_CONFIG_NAME_SEVR_VALUE) - self.updatePVs() + if self._active_configserver is not None: + with self.monitor_lock: + self.setParam( + BlockserverPVNames.CURR_CONFIG_NAME, self._active_configserver.get_config_name() + ) + self.setParam(BlockserverPVNames.CURR_CONFIG_NAME_SEVR, CURR_CONFIG_NAME_SEVR_VALUE) + self.updatePVs() def consume_write_queue(self) -> None: """Actions any requests on the write queue. @@ -736,22 +760,26 @@ def start_iocs(self, iocs: list[str]) -> None: # If the IOC is in the config and auto-restart is set to true then # reapply the auto-restart setting after starting. # This is because stopping an IOC via procServ turns auto-restart off. - conf_iocs = self._active_configserver.get_all_ioc_details() - - # Request IOCs to start - for i in iocs: - self._ioc_control.start_ioc(i) - - # Once all IOC start requests issued, wait for running and apply auto restart as needed - for i in iocs: - if i in conf_iocs and conf_iocs[i].restart: - if conf_iocs[i].remotePvPrefix not in (None, ""): - print_and_log(f"IOC '{i}' is set to run remotely - not applying auto-restart.") - continue - # Give it time to start as IOC has to be running to be able to set restart property - print(f"Re-applying auto-restart setting to {i}") - self._ioc_control.waitfor_running(i) - self._ioc_control.set_autorestart(i, True) + if self._active_configserver is not None: + conf_iocs = self._active_configserver.get_all_ioc_details() + + # Request IOCs to start + for i in iocs: + self._ioc_control.start_ioc(i) + + # Once all IOC start requests issued, wait for running and apply auto restart as needed + for i in iocs: + if i in conf_iocs and conf_iocs[i].restart: + if conf_iocs[i].remote_pv_prefix not in (None, ""): + print_and_log( + f"IOC '{i}' is set to run remotely - not applying auto-restart." + ) + continue + # Give it time to start as IOC has to be running to + # be able to set restart property + print(f"Re-applying auto-restart setting to {i}") + self._ioc_control.waitfor_running(i) + self._ioc_control.set_autorestart(i, True) # Code for handling on-the-fly PVs def does_pv_exist(self, name: str) -> bool: @@ -759,31 +787,32 @@ def does_pv_exist(self, name: str) -> bool: # Code for handling block-sets def set_config_block_values(self) -> None: - blocks = { - block_details - for block_details in self._active_configserver.get_block_details().values() - if block_details.set_block - } - start = time() - timeout = 30 - prefix = MACROS[PVPREFIX_MACRO] - for block_details in blocks: - pv = f"{prefix}{block_details.pv}" - while not ChannelAccess.pv_exists(pv): - sleep(0.5) - if time() - start >= timeout: - print_and_log( - f"Gave up waiting for block {block_details.name}," - f" {block_details.pv} to exist", - "MAJOR", - ) - break - # check for existence of set-point pv - pv_with_setpoint = f"{pv}:SP" - if ChannelAccess.pv_exists(pv_with_setpoint): - ChannelAccess.caput(pv_with_setpoint, block_details.set_block_val) - else: - ChannelAccess.caput(pv, block_details.set_block_val) + if self._active_configserver is not None: + blocks = { + block_details + for block_details in self._active_configserver.get_block_details().values() + if block_details.set_block + } + start = time() + timeout = 30 + prefix = MACROS[PVPREFIX_MACRO] + for block_details in blocks: + pv = f"{prefix}{block_details.pv}" + while not ChannelAccess.pv_exists(pv): + sleep(0.5) + if time() - start >= timeout: + print_and_log( + f"Gave up waiting for block {block_details.name}," + f" {block_details.pv} to exist", + "MAJOR", + ) + break + # check for existence of set-point pv + pv_with_setpoint = f"{pv}:SP" + if ChannelAccess.pv_exists(pv_with_setpoint): + ChannelAccess.caput(pv_with_setpoint, block_details.set_block_val) + else: + ChannelAccess.caput(pv, block_details.set_block_val) def delete_pv_from_db(self, name: str) -> None: if name in manager.pvs[self.port]: