Top

yaqd_core module

__all__ = [
    "exceptions",
    "logging",
    "__version__",
    "Base",
    "Hardware",
    "ContinuousHardware",
    "Sensor",
]

from . import exceptions
from . import logging
from .__version__ import __version__
from ._daemon import Base
from ._hardware import Hardware, ContinuousHardware
from ._sensor import Sensor

Module variables

var __version__

Classes

class Base

class Base:
    defaults: Dict[str, Any] = {}
    traits: List[str] = ["is-daemon"]
    _kind: str = "base"
    _version: Optional[str] = None  # this class should not be directly instantiated
    _daemons: List["Base"] = []

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        """Create a yaq daemon.

        Parameters
        ----------
        name: str
            A name for this daemon
        config: dict
            Configuration parameters
        config_filepath: str
            The path for the configuration (not used internally, availble to clients)
        """
        self.name = name
        self.kind = self.__class__._kind
        self.config = config
        self._config_filepath = config_filepath
        self._state_filepath = (
            pathlib.Path(appdirs.user_data_dir("yaqd-state", "yaq"))
            / self.kind
            / f"{self.name}-state.toml"
        )
        self.logger = logging.getLogger(self.name)
        if "log_level" in self.config:
            self.logger.setLevel(logging.name_to_level[self.config["log_level"]])
        if self.config.get("log_to_file"):
            fh = logging_.FileHandler(
                self._state_filepath.with_name(
                    f"{self.name}-{time.strftime('%Y-%m-%dT%H:%M:%S%z')}.log"
                )
            )
            fh.setFormatter(logging.formatter)
            self.logger.addHandler(fh)
        self.logger.info(f"Config File Path = {self._config_filepath}")
        self.logger.info(f"State File Path = {self._state_filepath}")
        self.logger.info(f"TCP Port = {config['port']}")
        self._clients: List[str] = []

        self.serial = config.get("serial", None)
        self.make = config.get("make", None)
        self.model = config.get("model", None)

        self._busy_sig = asyncio.Event()
        self._not_busy_sig = asyncio.Event()

        self._loop = asyncio.get_event_loop()

        try:
            self._state_filepath.parent.mkdir(parents=True, exist_ok=True)
            with self._state_filepath.open("rt") as f:
                state = toml.load(f)
        except (toml.TomlDecodeError, FileNotFoundError):
            state = {}

        self._load_state(state)
        self._tasks = [
            self._loop.create_task(self.save_state()),
            self._loop.create_task(self.update_state()),
        ]

    @classmethod
    def main(cls):
        """Run the event loop."""
        loop = asyncio.get_event_loop()
        if sys.platform.startswith("win"):
            signals = ()
        else:
            signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
        for s in signals:
            loop.add_signal_handler(
                s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
            )

        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--config",
            "-c",
            default=(
                pathlib.Path(appdirs.user_config_dir("yaqd", "yaq"))
                / cls._kind
                / "config.toml"
            ),
            action="store",
            help="Path to the configuration toml file.",
        )
        parser.add_argument(
            "--verbose",
            "-v",
            action="store_const",
            dest="log_level",
            const="debug",
            help="Alias for --log-level=debug",
        )
        parser.add_argument(
            "--log-level",
            "-l",
            action="store",
            dest="log_level",
            choices=[
                "debug",
                "info",
                "notice",
                "warning",
                "error",
                "critical",
                "alert",
                "emergency",
            ],
            help="Set the log level explicitly",
        )

        parser.add_argument("--version", action="store_true")

        args = parser.parse_args()

        if args.log_level:
            logging.setLevel(logging.name_to_level[args.log_level])

        if args.version:
            print(f"'{cls._kind}' version {cls._version}")
            print(f"yaq rpc version {__rpc_version__}")
            print(f"yaqd_core version {__version__}")
            print(f"Python {sys.version}")
            sys.exit(0)

        config_filepath = pathlib.Path(args.config)
        config_file = toml.load(config_filepath)

        main_task = loop.create_task(cls._main(config_filepath, config_file, args))
        try:
            loop.run_forever()
        except asyncio.exceptions.CancelledError:
            pass
        finally:
            loop.close()

    @classmethod
    async def _main(cls, config_filepath, config_file, args=None):
        """Parse command line arguments, start event loop tasks."""
        loop = asyncio.get_running_loop()
        cls.__servers = []
        for section in config_file:
            try:
                config = cls._parse_config(config_file, section, args)
            except ValueError:
                pass
            await cls._start_daemon(section, config, config_filepath)

        while cls.__servers:
            awaiting = cls.__servers
            cls.__servers = []
            await asyncio.wait(awaiting)
            await asyncio.sleep(1)
        loop.stop()

    @classmethod
    async def _start_daemon(cls, name, config, config_filepath):
        loop = asyncio.get_running_loop()
        daemon = cls(name, config, config_filepath)
        cls._daemons.append(daemon)

        # This function is here to namespace `daemon` so it doesn't
        # get overridden for the lambda
        def server(daemon):
            return lambda: Protocol(daemon)

        ser = await loop.create_server(
            server(daemon), config.get("host", ""), config.get("port", None)
        )
        daemon._server = ser
        cls.__servers.append(ser.serve_forever())

    @classmethod
    def _parse_config(cls, config_file, section, args=None):
        if section == "shared-settings":
            raise ValueError(f"Section name '{section}' reserved")
        config = {}
        for c in reversed(cls.mro()):
            try:
                config.update(c.defaults)
            except AttributeError:
                continue
        config.update(config_file.get("shared-settings", {}).copy())
        config.update(config_file[section])
        if args:
            try:
                if args.log_level:
                    config.update(log_level=args.log_level)
            except AttributeError:
                pass

        if not config.get("enable", True):
            logger.info(f"Section '{section}' is disabled")
            raise ValueError(f"Section '{section}' is disabled")
        return config

    @classmethod
    async def shutdown_all(cls, sig, loop):
        """Gracefully shutdown the asyncio loop.

        Gathers all current tasks, and allows daemons to perform cleanup tasks.

        Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
        Original code is licensed under the MIT license, and sublicensed here.
        """
        logger.info(f"Received signal {sig.name}...")
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        logger.info(f"Cancelling {len(tasks)} outstanding tasks")
        logger.debug(f"{[task.get_name() for task in tasks]}")
        [task.cancel() for task in tasks]
        # This is done after cancelling so that shutdown tasks which require the loop
        # are not themselves cancelled.
        [d.close() for d in cls._daemons]
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        await asyncio.gather(*tasks, return_exceptions=True)
        [d._save_state() for d in cls._daemons]
        if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
            config_filepath = [d._config_filepath for d in cls._daemons][0]
            config_file = toml.load(config_filepath)
            await cls._main(config_filepath, config_file)
        loop.stop()

    def shutdown(self, restart=False):
        self.logger.info(f"Shutting Down {self.name}")
        self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
        self.logger.debug(f"{[task.get_name() for task in self._tasks]}")
        [task.cancel() for task in self._tasks]
        self.close()
        self._server.close()
        if restart:
            config_filepath = self._config_filepath
            config_file = toml.load(config_filepath)
            try:
                config = type(self)._parse_config(config_file, self.name)
                self._loop.create_task(
                    type(self)._start_daemon(self.name, config, config_filepath)
                )
            except ValueError as e:
                self.logger.error(e.message)

    def _connection_made(self, peername: str) -> None:
        self._clients.append(peername)
        self.logger.debug(f"_connection_made {self._clients}")

    def _connection_lost(self, peername: str) -> None:
        self._clients.remove(peername)
        self.logger.debug(f"_connection_lost {self._clients}")

    def get_traits(self) -> List[str]:
        """Get list of yaq-daemon traits."""
        implemented_traits: Set[str] = set()
        for ty in type(self).mro():
            if issubclass(ty, Base):
                implemented_traits |= set(ty.traits)
        return list(implemented_traits)

    def get_version(self) -> Optional[str]:
        """Get version."""
        return self._version

    def _save_state(self) -> None:
        """Write the current state to disk."""
        with open(self._state_filepath, "wt") as f:
            toml.dump(self.get_state(), f)

    async def save_state(self):
        """Schedule writing the current state to disk.

        Note: Current implementation only writes while busy (and once after busy)
        """
        while True:
            while self._busy:
                self._save_state()
                await asyncio.sleep(0.1)
            self._save_state()
            await self._busy_sig.wait()

    def get_config_filepath(self) -> str:
        """Retrieve the current filepath of the configuration."""
        return str(self._config_filepath.absolute())

    def get_config(self) -> Dict[str, Any]:
        """Retrieve the current configuration, including any defaults."""
        return self.config

    def list_methods(self) -> List[str]:
        """Return a list of all public methods."""
        filt = filter(lambda x: x[0] != "_", dir(self.__class__))
        # Use `isfunction` on the `__class__` to filter out classmethods
        filt = filter(lambda x: inspect.isfunction(getattr(self.__class__, x)), filt)
        filt = filter(lambda x: not asyncio.iscoroutinefunction(getattr(self, x)), filt)
        return list(filt)

    def set_state(self, **kwargs) -> None:
        """Set the daemon state.

        Input may be any portion of the entire state dictionary.
        Key value pairs that are not defined are propagated from the current daemon state.
        If input is not valid, daemon will raise exception.

        Parameters
        ----------
        state: dict
            New state
        """
        full = self.get_state()
        full.update(kwargs)
        self._load_state(full)
        self._save_state()

    def help(self, method: Optional[Union[str, Sequence[str]]] = None):
        """Return useful, human readable information about methods.

        Parameters
        ----------
        method: str or list of str (optional)
            The method or list of methods for which help is requested.
            Default is information on the daemon itself.

        Returns
        -------
        str or list of str: The requested documentation.
        """
        if method is None:
            return self.__doc__
        if isinstance(method, str):
            fun = getattr(self, method)
            return f"{method}{str(inspect.signature(fun))}\n{fun.__doc__}"
        return list(self.help(c) for c in method)

    def id(self) -> Dict[str, Optional[str]]:
        """Dictionary of identifying information for the daemon."""
        return {
            "name": self.name,
            "kind": self.kind,
            "make": self.make,
            "model": self.model,
            "serial": self.serial,
        }

    @property
    def _busy(self) -> bool:
        """Indicates the current 'busy' state for use in internal functions.

        Setting busy can be done with `self._busy = <True|False>`.
        Async tasks can wait for either sense using `await self._[not_]busy_sig.wait()`.
        """
        return self._busy_sig.is_set()

    @_busy.setter
    def _busy(self, value):
        if value:
            self._busy_sig.set()
            self._not_busy_sig.clear()
        else:
            self._not_busy_sig.set()
            self._busy_sig.clear()

    def busy(self) -> bool:
        """Boolean representing if the daemon is busy (state updated) or not."""
        return self._busy

    # The following functions (plus __init__) are what most daemon need to implement

    async def update_state(self):
        """Continually monitor and update the current daemon state."""
        pass

    def get_state(self) -> Dict[str, Any]:
        """Return the current daemon state."""
        return {}

    def _load_state(self, state):
        """Load an initial state from a dictionary (typically read from the state.toml file).

        Must be tolerant of missing fields, including entirely empty initial states.
        Raise an exception if state is invalid.

        Parameters
        ----------
        state: dict
            The saved state to load.
        """
        pass

    def close(self):
        """Perform necessary clean-up and stop running."""
        pass

Ancestors (in MRO)

  • Base
  • builtins.object

Class variables

var defaults

var traits

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    """Create a yaq daemon.
    Parameters
    ----------
    name: str
        A name for this daemon
    config: dict
        Configuration parameters
    config_filepath: str
        The path for the configuration (not used internally, availble to clients)
    """
    self.name = name
    self.kind = self.__class__._kind
    self.config = config
    self._config_filepath = config_filepath
    self._state_filepath = (
        pathlib.Path(appdirs.user_data_dir("yaqd-state", "yaq"))
        / self.kind
        / f"{self.name}-state.toml"
    )
    self.logger = logging.getLogger(self.name)
    if "log_level" in self.config:
        self.logger.setLevel(logging.name_to_level[self.config["log_level"]])
    if self.config.get("log_to_file"):
        fh = logging_.FileHandler(
            self._state_filepath.with_name(
                f"{self.name}-{time.strftime('%Y-%m-%dT%H:%M:%S%z')}.log"
            )
        )
        fh.setFormatter(logging.formatter)
        self.logger.addHandler(fh)
    self.logger.info(f"Config File Path = {self._config_filepath}")
    self.logger.info(f"State File Path = {self._state_filepath}")
    self.logger.info(f"TCP Port = {config['port']}")
    self._clients: List[str] = []
    self.serial = config.get("serial", None)
    self.make = config.get("make", None)
    self.model = config.get("model", None)
    self._busy_sig = asyncio.Event()
    self._not_busy_sig = asyncio.Event()
    self._loop = asyncio.get_event_loop()
    try:
        self._state_filepath.parent.mkdir(parents=True, exist_ok=True)
        with self._state_filepath.open("rt") as f:
            state = toml.load(f)
    except (toml.TomlDecodeError, FileNotFoundError):
        state = {}
    self._load_state(state)
    self._tasks = [
        self._loop.create_task(self.save_state()),
        self._loop.create_task(self.update_state()),
    ]

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

Perform necessary clean-up and stop running.

def close(self):
    """Perform necessary clean-up and stop running."""
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> Dict[str, Any]:
    """Retrieve the current configuration, including any defaults."""
    return self.config

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_state(

self)

Return the current daemon state.

def get_state(self) -> Dict[str, Any]:
    """Return the current daemon state."""
    return {}

def get_traits(

self)

Get list of yaq-daemon traits.

def get_traits(self) -> List[str]:
    """Get list of yaq-daemon traits."""
    implemented_traits: Set[str] = set()
    for ty in type(self).mro():
        if issubclass(ty, Base):
            implemented_traits |= set(ty.traits)
    return list(implemented_traits)

def get_version(

self)

Get version.

def get_version(self) -> Optional[str]:
    """Get version."""
    return self._version

def help(

self, method=None)

Return useful, human readable information about methods.

Parameters

method: str or list of str (optional) The method or list of methods for which help is requested. Default is information on the daemon itself.

Returns

str or list of str: The requested documentation.

def help(self, method: Optional[Union[str, Sequence[str]]] = None):
    """Return useful, human readable information about methods.
    Parameters
    ----------
    method: str or list of str (optional)
        The method or list of methods for which help is requested.
        Default is information on the daemon itself.
    Returns
    -------
    str or list of str: The requested documentation.
    """
    if method is None:
        return self.__doc__
    if isinstance(method, str):
        fun = getattr(self, method)
        return f"{method}{str(inspect.signature(fun))}\n{fun.__doc__}"
    return list(self.help(c) for c in method)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self.kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def list_methods(

self)

Return a list of all public methods.

def list_methods(self) -> List[str]:
    """Return a list of all public methods."""
    filt = filter(lambda x: x[0] != "_", dir(self.__class__))
    # Use `isfunction` on the `__class__` to filter out classmethods
    filt = filter(lambda x: inspect.isfunction(getattr(self.__class__, x)), filt)
    filt = filter(lambda x: not asyncio.iscoroutinefunction(getattr(self, x)), filt)
    return list(filt)

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_state(

self, **kwargs)

Set the daemon state.

Input may be any portion of the entire state dictionary. Key value pairs that are not defined are propagated from the current daemon state. If input is not valid, daemon will raise exception.

Parameters

state: dict New state

def set_state(self, **kwargs) -> None:
    """Set the daemon state.
    Input may be any portion of the entire state dictionary.
    Key value pairs that are not defined are propagated from the current daemon state.
    If input is not valid, daemon will raise exception.
    Parameters
    ----------
    state: dict
        New state
    """
    full = self.get_state()
    full.update(kwargs)
    self._load_state(full)
    self._save_state()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    self.logger.debug(f"{[task.get_name() for task in self._tasks]}")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Instance variables

var config

var kind

var logger

var make

var model

var name

var serial

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
        )
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq"))
            / cls._kind
            / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"'{cls._kind}' version {cls._version}")
        print(f"yaq rpc version {__rpc_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    logger.debug(f"{[task.get_name() for task in tasks]}")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class ContinuousHardware

class ContinuousHardware(Hardware):
    traits = ["has-limits"]
    _kind: str = "continuous-hardware"
    _version: Optional[str] = "1.0.0" + f"+{__branch__}" if __branch__ else ""
    defaults = {"out_of_limits": "closest", "limits": (-float("inf"), float("inf"))}

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        super().__init__(name, config, config_filepath)
        self._out_of_limits = config["out_of_limits"]
        self._hw_limits = (-float("inf"), float("inf"))

    def get_limits(self) -> Tuple[float, float]:
        assert self._hw_limits[0] < self._hw_limits[1]
        config_limits = self.config["limits"]
        assert config_limits[0] < config_limits[1]
        out = (
            max(self._hw_limits[0], config_limits[0]),
            min(self._hw_limits[1], config_limits[1]),
        )
        assert out[0] < out[1]
        return out

    def in_limits(self, position: float) -> bool:
        low, upp = self.get_limits()
        if low <= position <= upp:
            return True
        else:
            return False

    def set_position(self, position: float) -> None:
        if not self.in_limits(position):
            if self._out_of_limits == "closest":
                low, upp = self.get_limits()
                if position > upp:
                    position = upp
                elif position < low:
                    position = low
            elif self._out_of_limits == "ignore":
                return
            else:
                raise ValueError(f"{position} not in ranges {self.get_limits()}")
        super().set_position(position)

    def get_state(self) -> Dict[str, Any]:
        state = super().get_state()
        state["hw_limits"] = self._hw_limits
        return state

    def _load_state(self, state: Dict[str, Any]) -> None:
        super()._load_state(state)
        self._hw_limits = state.get("hw_limits", (float("-inf"), float("inf")))

Ancestors (in MRO)

Class variables

var defaults

Inheritance: Base.defaults

var traits

Inheritance: Hardware.traits

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    super().__init__(name, config, config_filepath)
    self._out_of_limits = config["out_of_limits"]
    self._hw_limits = (-float("inf"), float("inf"))

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

Perform necessary clean-up and stop running.

def close(self):
    """Perform necessary clean-up and stop running."""
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> Dict[str, Any]:
    """Retrieve the current configuration, including any defaults."""
    return self.config

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_destination(

self)

def get_destination(self) -> float:
    return self._destination

def get_limits(

self)

def get_limits(self) -> Tuple[float, float]:
    assert self._hw_limits[0] < self._hw_limits[1]
    config_limits = self.config["limits"]
    assert config_limits[0] < config_limits[1]
    out = (
        max(self._hw_limits[0], config_limits[0]),
        min(self._hw_limits[1], config_limits[1]),
    )
    assert out[0] < out[1]
    return out

def get_position(

self)

def get_position(self) -> float:
    return self._position

def get_state(

self)

Return the current daemon state.

def get_state(self) -> Dict[str, Any]:
    state = super().get_state()
    state["hw_limits"] = self._hw_limits
    return state

def get_traits(

self)

Get list of yaq-daemon traits.

def get_traits(self) -> List[str]:
    """Get list of yaq-daemon traits."""
    implemented_traits: Set[str] = set()
    for ty in type(self).mro():
        if issubclass(ty, Base):
            implemented_traits |= set(ty.traits)
    return list(implemented_traits)

def get_units(

self)

def get_units(self) -> Union[str, None]:
    return self._units

def get_version(

self)

Get version.

def get_version(self) -> Optional[str]:
    """Get version."""
    return self._version

def help(

self, method=None)

Return useful, human readable information about methods.

Parameters

method: str or list of str (optional) The method or list of methods for which help is requested. Default is information on the daemon itself.

Returns

str or list of str: The requested documentation.

def help(self, method: Optional[Union[str, Sequence[str]]] = None):
    """Return useful, human readable information about methods.
    Parameters
    ----------
    method: str or list of str (optional)
        The method or list of methods for which help is requested.
        Default is information on the daemon itself.
    Returns
    -------
    str or list of str: The requested documentation.
    """
    if method is None:
        return self.__doc__
    if isinstance(method, str):
        fun = getattr(self, method)
        return f"{method}{str(inspect.signature(fun))}\n{fun.__doc__}"
    return list(self.help(c) for c in method)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self.kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def in_limits(

self, position)

def in_limits(self, position: float) -> bool:
    low, upp = self.get_limits()
    if low <= position <= upp:
        return True
    else:
        return False

def list_methods(

self)

Return a list of all public methods.

def list_methods(self) -> List[str]:
    """Return a list of all public methods."""
    filt = filter(lambda x: x[0] != "_", dir(self.__class__))
    # Use `isfunction` on the `__class__` to filter out classmethods
    filt = filter(lambda x: inspect.isfunction(getattr(self.__class__, x)), filt)
    filt = filter(lambda x: not asyncio.iscoroutinefunction(getattr(self, x)), filt)
    return list(filt)

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_position(

self, position)

def set_position(self, position: float) -> None:
    if not self.in_limits(position):
        if self._out_of_limits == "closest":
            low, upp = self.get_limits()
            if position > upp:
                position = upp
            elif position < low:
                position = low
        elif self._out_of_limits == "ignore":
            return
        else:
            raise ValueError(f"{position} not in ranges {self.get_limits()}")
    super().set_position(position)

def set_relative(

self, distance)

def set_relative(self, distance: float) -> float:
    new = self._destination + distance
    self.set_position(new)
    return new

def set_state(

self, **kwargs)

Set the daemon state.

Input may be any portion of the entire state dictionary. Key value pairs that are not defined are propagated from the current daemon state. If input is not valid, daemon will raise exception.

Parameters

state: dict New state

def set_state(self, **kwargs) -> None:
    """Set the daemon state.
    Input may be any portion of the entire state dictionary.
    Key value pairs that are not defined are propagated from the current daemon state.
    If input is not valid, daemon will raise exception.
    Parameters
    ----------
    state: dict
        New state
    """
    full = self.get_state()
    full.update(kwargs)
    self._load_state(full)
    self._save_state()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    self.logger.debug(f"{[task.get_name() for task in self._tasks]}")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
        )
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq"))
            / cls._kind
            / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"'{cls._kind}' version {cls._version}")
        print(f"yaq rpc version {__rpc_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    logger.debug(f"{[task.get_name() for task in tasks]}")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class Hardware

class Hardware(Base):
    traits = ["has-position"]
    _kind = "hardware"
    _version: Optional[str] = "1.0.0" + f"+{__branch__}" if __branch__ else ""

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        self._units = None
        super().__init__(name, config, config_filepath)

    def get_position(self) -> float:
        return self._position

    def get_units(self) -> Union[str, None]:
        return self._units

    def get_destination(self) -> float:
        return self._destination

    def set_position(self, position: float) -> None:
        self._busy = True
        self._destination = position
        self._set_position(position)

    def _set_position(self, position: float) -> None:
        self._position = position
        self._busy = False

    def set_relative(self, distance: float) -> float:
        new = self._destination + distance
        self.set_position(new)
        return new

    def get_state(self) -> Dict[str, Any]:
        return {"position": self._position, "destination": self._destination}

    def _load_state(self, state: Dict[str, Any]) -> None:
        self._position = state.get("position", math.nan)
        self._destination = state.get("destination", math.nan)

Ancestors (in MRO)

Class variables

var defaults

var traits

Inheritance: Base.traits

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    self._units = None
    super().__init__(name, config, config_filepath)

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

Perform necessary clean-up and stop running.

def close(self):
    """Perform necessary clean-up and stop running."""
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> Dict[str, Any]:
    """Retrieve the current configuration, including any defaults."""
    return self.config

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_destination(

self)

def get_destination(self) -> float:
    return self._destination

def get_position(

self)

def get_position(self) -> float:
    return self._position

def get_state(

self)

Return the current daemon state.

def get_state(self) -> Dict[str, Any]:
    return {"position": self._position, "destination": self._destination}

def get_traits(

self)

Get list of yaq-daemon traits.

def get_traits(self) -> List[str]:
    """Get list of yaq-daemon traits."""
    implemented_traits: Set[str] = set()
    for ty in type(self).mro():
        if issubclass(ty, Base):
            implemented_traits |= set(ty.traits)
    return list(implemented_traits)

def get_units(

self)

def get_units(self) -> Union[str, None]:
    return self._units

def get_version(

self)

Get version.

def get_version(self) -> Optional[str]:
    """Get version."""
    return self._version

def help(

self, method=None)

Return useful, human readable information about methods.

Parameters

method: str or list of str (optional) The method or list of methods for which help is requested. Default is information on the daemon itself.

Returns

str or list of str: The requested documentation.

def help(self, method: Optional[Union[str, Sequence[str]]] = None):
    """Return useful, human readable information about methods.
    Parameters
    ----------
    method: str or list of str (optional)
        The method or list of methods for which help is requested.
        Default is information on the daemon itself.
    Returns
    -------
    str or list of str: The requested documentation.
    """
    if method is None:
        return self.__doc__
    if isinstance(method, str):
        fun = getattr(self, method)
        return f"{method}{str(inspect.signature(fun))}\n{fun.__doc__}"
    return list(self.help(c) for c in method)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self.kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def list_methods(

self)

Return a list of all public methods.

def list_methods(self) -> List[str]:
    """Return a list of all public methods."""
    filt = filter(lambda x: x[0] != "_", dir(self.__class__))
    # Use `isfunction` on the `__class__` to filter out classmethods
    filt = filter(lambda x: inspect.isfunction(getattr(self.__class__, x)), filt)
    filt = filter(lambda x: not asyncio.iscoroutinefunction(getattr(self, x)), filt)
    return list(filt)

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_position(

self, position)

def set_position(self, position: float) -> None:
    self._busy = True
    self._destination = position
    self._set_position(position)

def set_relative(

self, distance)

def set_relative(self, distance: float) -> float:
    new = self._destination + distance
    self.set_position(new)
    return new

def set_state(

self, **kwargs)

Set the daemon state.

Input may be any portion of the entire state dictionary. Key value pairs that are not defined are propagated from the current daemon state. If input is not valid, daemon will raise exception.

Parameters

state: dict New state

def set_state(self, **kwargs) -> None:
    """Set the daemon state.
    Input may be any portion of the entire state dictionary.
    Key value pairs that are not defined are propagated from the current daemon state.
    If input is not valid, daemon will raise exception.
    Parameters
    ----------
    state: dict
        New state
    """
    full = self.get_state()
    full.update(kwargs)
    self._load_state(full)
    self._save_state()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    self.logger.debug(f"{[task.get_name() for task in self._tasks]}")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
        )
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq"))
            / cls._kind
            / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"'{cls._kind}' version {cls._version}")
        print(f"yaq rpc version {__rpc_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    logger.debug(f"{[task.get_name() for task in tasks]}")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class Sensor

class Sensor(Base):
    traits = ["is-sensor"]
    _kind = "base-sensor"
    _version: Optional[str] = None  # this class should not be directly instantiated

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        super().__init__(name, config, config_filepath)
        self._measured: MeasureType = dict()  # values must be numbers or arrays
        self.channel_names: List[str] = []
        self.channel_units: Dict[str, str] = dict()
        self.channel_shapes: Dict[str, Tuple[int]] = dict()
        self.measurement_id = 0

    def measure(self, loop: bool = False) -> int:
        """Start a measurement, optionally looping.

        Sensor will remain busy until measurement completes.

        Parameters
        ----------
        loop: bool, optional
            Toggle looping behavior. Default False.

        See Also
        --------
        stop_looping
        """
        self.looping = loop
        if not self._busy:
            self._busy = True
            self._loop.create_task(self._runner(loop=loop))
        return self.measurement_id

    def get_channel_names(self):
        """Get current channel names."""
        return self.channel_names

    def get_channel_shapes(self):
        """Get channel shapes."""
        # as default behavior, assume all channels are scalars
        if self.channel_shapes:
            return self.channel_shapes
        else:
            return {k: () for k in self.channel_names}

    def get_channel_units(self):
        """Get channel units."""
        return self.channel_units

    def get_measured(self) -> MeasureType:
        """Get most recently measured values."""
        return self._measured

    async def _measure(self) -> MeasureType:
        """Do measurement, filling _measured dictionary.

        Returns dictionary with keys channel names, values numbers or arrays.
        """
        return {}

    async def _runner(self, loop: bool) -> None:
        """Handle execution of _measure, including looping and setting of measurement_id."""
        while True:
            self._measured = await self._measure()
            assert set(self._measured.keys()) == set(self.channel_names)
            self._measured["measurement_id"] = self.measurement_id
            if not self.looping:
                self._busy = False
                self.measurement_id += 1
                break
            await asyncio.sleep(0)

    def stop_looping(self) -> None:
        """Stop looping."""
        self.looping = False

Ancestors (in MRO)

Class variables

var defaults

var traits

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    super().__init__(name, config, config_filepath)
    self._measured: MeasureType = dict()  # values must be numbers or arrays
    self.channel_names: List[str] = []
    self.channel_units: Dict[str, str] = dict()
    self.channel_shapes: Dict[str, Tuple[int]] = dict()
    self.measurement_id = 0

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

Perform necessary clean-up and stop running.

def close(self):
    """Perform necessary clean-up and stop running."""
    pass

def get_channel_names(

self)

Get current channel names.

def get_channel_names(self):
    """Get current channel names."""
    return self.channel_names

def get_channel_shapes(

self)

Get channel shapes.

def get_channel_shapes(self):
    """Get channel shapes."""
    # as default behavior, assume all channels are scalars
    if self.channel_shapes:
        return self.channel_shapes
    else:
        return {k: () for k in self.channel_names}

def get_channel_units(

self)

Get channel units.

def get_channel_units(self):
    """Get channel units."""
    return self.channel_units

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> Dict[str, Any]:
    """Retrieve the current configuration, including any defaults."""
    return self.config

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_measured(

self)

Get most recently measured values.

def get_measured(self) -> MeasureType:
    """Get most recently measured values."""
    return self._measured

def get_state(

self)

Return the current daemon state.

def get_state(self) -> Dict[str, Any]:
    """Return the current daemon state."""
    return {}

def get_traits(

self)

Get list of yaq-daemon traits.

def get_traits(self) -> List[str]:
    """Get list of yaq-daemon traits."""
    implemented_traits: Set[str] = set()
    for ty in type(self).mro():
        if issubclass(ty, Base):
            implemented_traits |= set(ty.traits)
    return list(implemented_traits)

def get_version(

self)

Get version.

def get_version(self) -> Optional[str]:
    """Get version."""
    return self._version

def help(

self, method=None)

Return useful, human readable information about methods.

Parameters

method: str or list of str (optional) The method or list of methods for which help is requested. Default is information on the daemon itself.

Returns

str or list of str: The requested documentation.

def help(self, method: Optional[Union[str, Sequence[str]]] = None):
    """Return useful, human readable information about methods.
    Parameters
    ----------
    method: str or list of str (optional)
        The method or list of methods for which help is requested.
        Default is information on the daemon itself.
    Returns
    -------
    str or list of str: The requested documentation.
    """
    if method is None:
        return self.__doc__
    if isinstance(method, str):
        fun = getattr(self, method)
        return f"{method}{str(inspect.signature(fun))}\n{fun.__doc__}"
    return list(self.help(c) for c in method)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self.kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def list_methods(

self)

Return a list of all public methods.

def list_methods(self) -> List[str]:
    """Return a list of all public methods."""
    filt = filter(lambda x: x[0] != "_", dir(self.__class__))
    # Use `isfunction` on the `__class__` to filter out classmethods
    filt = filter(lambda x: inspect.isfunction(getattr(self.__class__, x)), filt)
    filt = filter(lambda x: not asyncio.iscoroutinefunction(getattr(self, x)), filt)
    return list(filt)

def measure(

self, loop=False)

Start a measurement, optionally looping.

Sensor will remain busy until measurement completes.

Parameters

loop: bool, optional Toggle looping behavior. Default False.

See Also

stop_looping

def measure(self, loop: bool = False) -> int:
    """Start a measurement, optionally looping.
    Sensor will remain busy until measurement completes.
    Parameters
    ----------
    loop: bool, optional
        Toggle looping behavior. Default False.
    See Also
    --------
    stop_looping
    """
    self.looping = loop
    if not self._busy:
        self._busy = True
        self._loop.create_task(self._runner(loop=loop))
    return self.measurement_id

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_state(

self, **kwargs)

Set the daemon state.

Input may be any portion of the entire state dictionary. Key value pairs that are not defined are propagated from the current daemon state. If input is not valid, daemon will raise exception.

Parameters

state: dict New state

def set_state(self, **kwargs) -> None:
    """Set the daemon state.
    Input may be any portion of the entire state dictionary.
    Key value pairs that are not defined are propagated from the current daemon state.
    If input is not valid, daemon will raise exception.
    Parameters
    ----------
    state: dict
        New state
    """
    full = self.get_state()
    full.update(kwargs)
    self._load_state(full)
    self._save_state()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    self.logger.debug(f"{[task.get_name() for task in self._tasks]}")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def stop_looping(

self)

Stop looping.

def stop_looping(self) -> None:
    """Stop looping."""
    self.looping = False

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Instance variables

var measurement_id

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
        )
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq"))
            / cls._kind
            / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"'{cls._kind}' version {cls._version}")
        print(f"yaq rpc version {__rpc_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    logger.debug(f"{[task.get_name() for task in tasks]}")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

Sub-modules