Top

yaqd_core module

Core python package for implementing yaq deamons, and associated utilities.

"""Core python package for implementing yaq deamons, and associated utilities."""

__all__ = [
    "exceptions",
    "logging",
    "__version__",
    "Base",
    "Hardware",
    "ContinuousHardware",
    "Sensor",
    "DiscreteHardware",
]

from . import exceptions
from . import logging
from .__version__ import __version__
from ._daemon import Base
from ._hardware import Hardware, ContinuousHardware, DiscreteHardware
from ._sensor import Sensor

Module variables

var __version__

Classes

class Base

class Base:
    _daemons: List["Base"] = []
    _kind: str = "base"
    _branch: Optional[str] = None

    def __init__(self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path):
        """Create a yaq daemon.

        Parameters
        ----------
        name: str
            A name for this daemon
        config: dict
            Configuration parameters
        config_filepath: str
            The path for the configuration (not used internally, availble to clients)
        """
        self.name = name
        self._config = config
        self._config_filepath = config_filepath
        self._state_filepath = (
            pathlib.Path(appdirs.user_data_dir("yaqd-state", "yaq"))
            / self._kind
            / f"{self.name}-state.toml"
        )
        self.logger = logging.getLogger(self.name)
        if "log_level" in self._config:
            self.logger.setLevel(logging.name_to_level[self._config["log_level"]])
        if self._config.get("log_to_file"):
            fh = logging_.FileHandler(
                self._state_filepath.with_name(
                    f"{self.name}-{time.strftime('%Y-%m-%dT%H:%M:%S%z')}.log"
                )
            )
            fh.setFormatter(logging.formatter)
            self.logger.addHandler(fh)
        self.logger.info(f"Config File Path = {self._config_filepath}")
        self.logger.info(f"State File Path = {self._state_filepath}")
        self.logger.info(f"TCP Port = {config['port']}")
        self._clients: List[str] = []

        self.serial = config.get("serial", None)
        self.make = config.get("make", None)
        self.model = config.get("model", None)

        self._busy_sig = asyncio.Event()
        self._not_busy_sig = asyncio.Event()

        self._loop = asyncio.get_event_loop()

        try:
            self._state_filepath.parent.mkdir(parents=True, exist_ok=True)
            with self._state_filepath.open("rt") as f:
                state = toml.load(f)
        except (toml.TomlDecodeError, FileNotFoundError):
            state = {}

        self._load_state(state)
        self._tasks = [
            self._loop.create_task(self.save_state()),
            self._loop.create_task(self.update_state()),
        ]

    @classproperty
    def _avro_protocol(cls):
        try:
            with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr") as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    @classproperty
    def _version(cls) -> str:
        import importlib

        return getattr(
            importlib.import_module(cls.__module__.split(".")[0]),
            "__version__",
            "UNKNOWN VERSION",
        )

    @classmethod
    def main(cls):
        """Run the event loop."""
        loop = asyncio.get_event_loop()
        if sys.platform.startswith("win"):
            signals = ()
        else:
            signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
        for s in signals:
            loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))

        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--config",
            "-c",
            default=(
                pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
            ),
            action="store",
            help="Path to the configuration toml file.",
        )
        parser.add_argument(
            "--verbose",
            "-v",
            action="store_const",
            dest="log_level",
            const="debug",
            help="Alias for --log-level=debug",
        )
        parser.add_argument(
            "--log-level",
            "-l",
            action="store",
            dest="log_level",
            choices=[
                "debug",
                "info",
                "notice",
                "warning",
                "error",
                "critical",
                "alert",
                "emergency",
            ],
            help="Set the log level explicitly",
        )

        parser.add_argument("--version", action="store_true")
        parser.add_argument("--protocol", action="store_true")

        args = parser.parse_args()

        if args.log_level:
            logging.setLevel(logging.name_to_level[args.log_level])

        if args.version:
            print(f"module {cls.__module__} version {cls._version}")
            print(f"avro version {__avro_version__}")
            print(f"yaqd_core version {__version__}")
            print(f"Python {sys.version}")
            sys.exit(0)

        if args.protocol:
            with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
                for line in f:
                    print(line, end="")
            sys.exit(0)

        config_filepath = pathlib.Path(args.config)
        config_file = toml.load(config_filepath)

        main_task = loop.create_task(cls._main(config_filepath, config_file, args))
        try:
            loop.run_forever()
        except asyncio.exceptions.CancelledError:
            pass
        finally:
            loop.close()

    @classmethod
    async def _main(cls, config_filepath, config_file, args=None):
        """Parse command line arguments, start event loop tasks."""
        loop = asyncio.get_running_loop()
        cls.__servers = []
        for section in config_file:
            try:
                config = cls._parse_config(config_file, section, args)
            except ValueError:
                continue
            logger.debug(f"Starting {section} with {config}")
            await cls._start_daemon(section, config, config_filepath)

        while cls.__servers:
            awaiting = cls.__servers
            cls.__servers = []
            await asyncio.wait(awaiting)
            await asyncio.sleep(1)
        loop.stop()

    @classmethod
    async def _start_daemon(cls, name, config, config_filepath):
        loop = asyncio.get_running_loop()
        daemon = cls(name, config, config_filepath)
        cls._daemons.append(daemon)

        # This function is here to namespace `daemon` so it doesn't
        # get overridden for the lambda
        def server(daemon):
            return lambda: Protocol(daemon)

        ser = await loop.create_server(
            server(daemon), config.get("host", ""), config.get("port", None)
        )
        daemon._server = ser
        cls.__servers.append(ser.serve_forever())

    @classmethod
    def _parse_config(cls, config_file, section, args=None):
        if section == "shared-settings":
            raise ValueError(f"Section name '{section}' reserved")
        config = {}
        for name, type_ in cls._avro_protocol.get("config", {}).items():
            if "default" in type_:
                config[name] = type_["default"]
        config.update(config_file.get("shared-settings", {}).copy())
        config.update(config_file[section])
        if args:
            try:
                if args.log_level:
                    config.update(log_level=args.log_level)
            except AttributeError:
                pass

        if not config.get("enable", True):
            logger.info(f"Section '{section}' is disabled")
            raise ValueError(f"Section '{section}' is disabled")
        return config

    @classmethod
    async def shutdown_all(cls, sig, loop):
        """Gracefully shutdown the asyncio loop.

        Gathers all current tasks, and allows daemons to perform cleanup tasks.

        Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
        Original code is licensed under the MIT license, and sublicensed here.
        """
        logger.info(f"Received signal {sig.name}...")
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        logger.info(f"Cancelling {len(tasks)} outstanding tasks")
        [task.cancel() for task in tasks]
        # This is done after cancelling so that shutdown tasks which require the loop
        # are not themselves cancelled.
        [d.close() for d in cls._daemons]
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        await asyncio.gather(*tasks, return_exceptions=True)
        [d._save_state() for d in cls._daemons]
        if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
            config_filepath = [d._config_filepath for d in cls._daemons][0]
            config_file = toml.load(config_filepath)
            await cls._main(config_filepath, config_file)
        loop.stop()

    def shutdown(self, restart=False):
        self.logger.info(f"Shutting Down {self.name}")
        self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
        [task.cancel() for task in self._tasks]
        self.close()
        self._server.close()
        if restart:
            config_filepath = self._config_filepath
            config_file = toml.load(config_filepath)
            try:
                config = type(self)._parse_config(config_file, self.name)
                self._loop.create_task(
                    type(self)._start_daemon(self.name, config, config_filepath)
                )
            except ValueError as e:
                self.logger.error(e.message)

    def _connection_made(self, peername: str) -> None:
        self._clients.append(peername)
        self.logger.debug(f"_connection_made {self._clients}")

    def _connection_lost(self, peername: str) -> None:
        self._clients.remove(peername)
        self.logger.debug(f"_connection_lost {self._clients}")

    def _save_state(self) -> None:
        """Write the current state to disk."""
        with open(self._state_filepath, "wt") as f:
            f.write(self.get_state())

    async def save_state(self):
        """Schedule writing the current state to disk.

        Note: Current implementation only writes while busy (and once after busy)
        """
        while True:
            while self._busy:
                self._save_state()
                await asyncio.sleep(0.1)
            self._save_state()
            await self._busy_sig.wait()

    def get_config_filepath(self) -> str:
        """Retrieve the current filepath of the configuration."""
        return str(self._config_filepath.absolute())

    def get_config(self) -> str:
        """Retrieve the current configuration, including any defaults."""
        return toml.dumps(self._config)

    def id(self) -> Dict[str, Optional[str]]:
        """Dictionary of identifying information for the daemon."""
        return {
            "name": self.name,
            "kind": self._kind,
            "make": self.make,
            "model": self.model,
            "serial": self.serial,
        }

    @property
    def _busy(self) -> bool:
        """Indicates the current 'busy' state for use in internal functions.

        Setting busy can be done with `self._busy = <True|False>`.
        Async tasks can wait for either sense using `await self._[not_]busy_sig.wait()`.
        """
        return self._busy_sig.is_set()

    @_busy.setter
    def _busy(self, value):
        if value:
            self._busy_sig.set()
            self._not_busy_sig.clear()
        else:
            self._not_busy_sig.set()
            self._busy_sig.clear()

    def busy(self) -> bool:
        """Boolean representing if the daemon is busy (state updated) or not."""
        return self._busy

    # The following functions (plus __init__) are what most daemon need to implement

    async def update_state(self):
        """Continually monitor and update the current daemon state."""
        pass

    def get_state(self) -> str:
        """Return the current daemon state."""
        return toml.dumps(self._state)

    def _load_state(self, state):
        """Load an initial state from a dictionary (typically read from the state.toml file).

        Must be tolerant of missing fields, including entirely empty initial states.
        Raise an exception if state is invalid.

        Parameters
        ----------
        state: dict
            The saved state to load.
        """
        self._state = state
        for name, type_ in self._avro_protocol.get("state", {}).items():
            self._state.setdefault(name, type_.get("default", None))

    def close(self):
        pass

Ancestors (in MRO)

  • Base
  • builtins.object

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path):
    """Create a yaq daemon.
    Parameters
    ----------
    name: str
        A name for this daemon
    config: dict
        Configuration parameters
    config_filepath: str
        The path for the configuration (not used internally, availble to clients)
    """
    self.name = name
    self._config = config
    self._config_filepath = config_filepath
    self._state_filepath = (
        pathlib.Path(appdirs.user_data_dir("yaqd-state", "yaq"))
        / self._kind
        / f"{self.name}-state.toml"
    )
    self.logger = logging.getLogger(self.name)
    if "log_level" in self._config:
        self.logger.setLevel(logging.name_to_level[self._config["log_level"]])
    if self._config.get("log_to_file"):
        fh = logging_.FileHandler(
            self._state_filepath.with_name(
                f"{self.name}-{time.strftime('%Y-%m-%dT%H:%M:%S%z')}.log"
            )
        )
        fh.setFormatter(logging.formatter)
        self.logger.addHandler(fh)
    self.logger.info(f"Config File Path = {self._config_filepath}")
    self.logger.info(f"State File Path = {self._state_filepath}")
    self.logger.info(f"TCP Port = {config['port']}")
    self._clients: List[str] = []
    self.serial = config.get("serial", None)
    self.make = config.get("make", None)
    self.model = config.get("model", None)
    self._busy_sig = asyncio.Event()
    self._not_busy_sig = asyncio.Event()
    self._loop = asyncio.get_event_loop()
    try:
        self._state_filepath.parent.mkdir(parents=True, exist_ok=True)
        with self._state_filepath.open("rt") as f:
            state = toml.load(f)
    except (toml.TomlDecodeError, FileNotFoundError):
        state = {}
    self._load_state(state)
    self._tasks = [
        self._loop.create_task(self.save_state()),
        self._loop.create_task(self.update_state()),
    ]

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

def close(self):
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> str:
    """Retrieve the current configuration, including any defaults."""
    return toml.dumps(self._config)

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_state(

self)

Return the current daemon state.

def get_state(self) -> str:
    """Return the current daemon state."""
    return toml.dumps(self._state)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self._kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Instance variables

var logger

var make

var model

var name

var serial

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    parser.add_argument("--protocol", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"module {cls.__module__} version {cls._version}")
        print(f"avro version {__avro_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    if args.protocol:
        with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
            for line in f:
                print(line, end="")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class ContinuousHardware

class ContinuousHardware(Hardware):
    _kind: str = "continuous-hardware"

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        super().__init__(name, config, config_filepath)
        self._out_of_limits = config["out_of_limits"]

    def get_limits(self) -> List[float]:
        assert self._state["hw_limits"][0] < self._state["hw_limits"][1]
        config_limits = self._config["limits"]
        assert config_limits[0] < config_limits[1]
        out = [
            max(self._state["hw_limits"][0], config_limits[0]),
            min(self._state["hw_limits"][1], config_limits[1]),
        ]
        assert out[0] < out[1]
        return out

    def in_limits(self, position: float) -> bool:
        low, upp = self.get_limits()
        if low <= position <= upp:
            return True
        else:
            return False

    def set_position(self, position: float) -> None:
        if not self.in_limits(position):
            if self._out_of_limits == "closest":
                low, upp = self.get_limits()
                if position > upp:
                    position = upp
                elif position < low:
                    position = low
            elif self._out_of_limits == "ignore":
                return
            else:
                raise ValueError(f"{position} not in ranges {self.get_limits()}")
        super().set_position(position)

Ancestors (in MRO)

Class variables

var traits

Static methods

def __init__(

self, name, config, config_filepath)

Inheritance: Hardware.__init__

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    super().__init__(name, config, config_filepath)
    self._out_of_limits = config["out_of_limits"]

def busy(

self)

Inheritance: Base.busy

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

def close(self):
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> str:
    """Retrieve the current configuration, including any defaults."""
    return toml.dumps(self._config)

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_destination(

self)

def get_destination(self) -> float:
    return self._state["destination"]

def get_limits(

self)

def get_limits(self) -> List[float]:
    assert self._state["hw_limits"][0] < self._state["hw_limits"][1]
    config_limits = self._config["limits"]
    assert config_limits[0] < config_limits[1]
    out = [
        max(self._state["hw_limits"][0], config_limits[0]),
        min(self._state["hw_limits"][1], config_limits[1]),
    ]
    assert out[0] < out[1]
    return out

def get_position(

self)

def get_position(self) -> float:
    return self._state["position"]

def get_state(

self)

Return the current daemon state.

def get_state(self) -> str:
    """Return the current daemon state."""
    return toml.dumps(self._state)

def get_units(

self)

def get_units(self) -> Union[str, None]:
    return self._units

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self._kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def in_limits(

self, position)

def in_limits(self, position: float) -> bool:
    low, upp = self.get_limits()
    if low <= position <= upp:
        return True
    else:
        return False

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_position(

self, position)

def set_position(self, position: float) -> None:
    if not self.in_limits(position):
        if self._out_of_limits == "closest":
            low, upp = self.get_limits()
            if position > upp:
                position = upp
            elif position < low:
                position = low
        elif self._out_of_limits == "ignore":
            return
        else:
            raise ValueError(f"{position} not in ranges {self.get_limits()}")
    super().set_position(position)

def set_relative(

self, distance)

def set_relative(self, distance: float) -> float:
    new = self._state["destination"] + distance
    self.set_position(new)
    return new

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    parser.add_argument("--protocol", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"module {cls.__module__} version {cls._version}")
        print(f"avro version {__avro_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    if args.protocol:
        with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
            for line in f:
                print(line, end="")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class DiscreteHardware

class DiscreteHardware(Hardware):
    _kind: str = "discrete-hardware"

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        self._position_identifiers: Dict[str, Any] = config.get("identifiers", {})
        super().__init__(name, config, config_filepath)

    def get_position_identifiers(self):
        return self._position_identifiers

    def set_identifier(self, identifier):
        p = self._position_identifiers[identifier]
        self.set_position(p)
        return p

    def get_identifier(self):
        return self._state["position_identifier"]

Ancestors (in MRO)

Class variables

var traits

Static methods

def __init__(

self, name, config, config_filepath)

Inheritance: Hardware.__init__

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    self._position_identifiers: Dict[str, Any] = config.get("identifiers", {})
    super().__init__(name, config, config_filepath)

def busy(

self)

Inheritance: Base.busy

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

def close(self):
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> str:
    """Retrieve the current configuration, including any defaults."""
    return toml.dumps(self._config)

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_destination(

self)

def get_destination(self) -> float:
    return self._state["destination"]

def get_identifier(

self)

def get_identifier(self):
    return self._state["position_identifier"]

def get_position(

self)

def get_position(self) -> float:
    return self._state["position"]

def get_position_identifiers(

self)

def get_position_identifiers(self):
    return self._position_identifiers

def get_state(

self)

Return the current daemon state.

def get_state(self) -> str:
    """Return the current daemon state."""
    return toml.dumps(self._state)

def get_units(

self)

def get_units(self) -> Union[str, None]:
    return self._units

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self._kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_identifier(

self, identifier)

def set_identifier(self, identifier):
    p = self._position_identifiers[identifier]
    self.set_position(p)
    return p

def set_position(

self, position)

def set_position(self, position: float) -> None:
    self._busy = True
    self._state["destination"] = position
    self._set_position(position)

def set_relative(

self, distance)

def set_relative(self, distance: float) -> float:
    new = self._state["destination"] + distance
    self.set_position(new)
    return new

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    parser.add_argument("--protocol", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"module {cls.__module__} version {cls._version}")
        print(f"avro version {__avro_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    if args.protocol:
        with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
            for line in f:
                print(line, end="")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class Hardware

class Hardware(Base):
    traits = ["has-position"]
    _kind = "hardware"

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        self._units = None
        super().__init__(name, config, config_filepath)

    def get_position(self) -> float:
        return self._state["position"]

    def get_units(self) -> Union[str, None]:
        return self._units

    def get_destination(self) -> float:
        return self._state["destination"]

    def set_position(self, position: float) -> None:
        self._busy = True
        self._state["destination"] = position
        self._set_position(position)

    def _set_position(self, position: float) -> None:
        raise NotImplementedError

    def set_relative(self, distance: float) -> float:
        new = self._state["destination"] + distance
        self.set_position(new)
        return new

Ancestors (in MRO)

Class variables

var traits

Static methods

def __init__(

self, name, config, config_filepath)

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    self._units = None
    super().__init__(name, config, config_filepath)

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

def close(self):
    pass

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> str:
    """Retrieve the current configuration, including any defaults."""
    return toml.dumps(self._config)

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_destination(

self)

def get_destination(self) -> float:
    return self._state["destination"]

def get_position(

self)

def get_position(self) -> float:
    return self._state["position"]

def get_state(

self)

Return the current daemon state.

def get_state(self) -> str:
    """Return the current daemon state."""
    return toml.dumps(self._state)

def get_units(

self)

def get_units(self) -> Union[str, None]:
    return self._units

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self._kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def set_position(

self, position)

def set_position(self, position: float) -> None:
    self._busy = True
    self._state["destination"] = position
    self._set_position(position)

def set_relative(

self, distance)

def set_relative(self, distance: float) -> float:
    new = self._state["destination"] + distance
    self.set_position(new)
    return new

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    parser.add_argument("--protocol", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"module {cls.__module__} version {cls._version}")
        print(f"avro version {__avro_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    if args.protocol:
        with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
            for line in f:
                print(line, end="")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

class Sensor

class Sensor(Base):
    _kind = "base-sensor"

    def __init__(
        self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
    ):
        super().__init__(name, config, config_filepath)
        self._measured: MeasureType = dict()  # values must be numbers or arrays
        self._channel_names: List[str] = []
        self._channel_units: Dict[str, str] = dict()
        self._channel_shapes: Dict[str, Tuple[int]] = dict()
        self._measurement_id = 0

    def measure(self, loop: bool = False) -> int:
        """Start a measurement, optionally looping.

        Sensor will remain busy until measurement completes.

        Parameters
        ----------
        loop: bool, optional
            Toggle looping behavior. Default False.

        See Also
        --------
        stop_looping
        """
        self._looping = loop
        if not self._busy:
            self._busy = True
            self._loop.create_task(self._runner(loop=loop))
        return self._measurement_id

    def get_channel_names(self):
        """Get current channel names."""
        return self._channel_names

    def get_channel_shapes(self):
        """Get channel shapes."""
        # as default behavior, assume all channels are scalars
        if self._channel_shapes:
            return self._channel_shapes
        else:
            return {k: () for k in self._channel_names}

    def get_channel_units(self):
        """Get channel units."""
        return self._channel_units

    def get_measured(self) -> MeasureType:
        """Get most recently measured values."""
        return self._measured

    async def _measure(self) -> MeasureType:
        """Do measurement, filling _measured dictionary.

        Returns dictionary with keys channel names, values numbers or arrays.
        """
        raise NotImplementedError

    async def _runner(self, loop: bool) -> None:
        """Handle execution of _measure, including looping and setting of _measurement_id."""
        while True:
            self._measured = await self._measure()
            assert set(self._measured.keys()) == set(self._channel_names)
            self._measured["measurement_id"] = self._measurement_id
            if not self._looping:
                self._busy = False
                self._measurement_id += 1
                break
            await asyncio.sleep(0)

    def stop_looping(self) -> None:
        """Stop looping."""
        self._looping = False

Ancestors (in MRO)

Static methods

def __init__(

self, name, config, config_filepath)

Inheritance: Base.__init__

Create a yaq daemon.

Parameters

name: str A name for this daemon config: dict Configuration parameters config_filepath: str The path for the configuration (not used internally, availble to clients)

def __init__(
    self, name: str, config: Dict[str, Any], config_filepath: pathlib.Path
):
    super().__init__(name, config, config_filepath)
    self._measured: MeasureType = dict()  # values must be numbers or arrays
    self._channel_names: List[str] = []
    self._channel_units: Dict[str, str] = dict()
    self._channel_shapes: Dict[str, Tuple[int]] = dict()
    self._measurement_id = 0

def busy(

self)

Boolean representing if the daemon is busy (state updated) or not.

def busy(self) -> bool:
    """Boolean representing if the daemon is busy (state updated) or not."""
    return self._busy

def close(

self)

def close(self):
    pass

def get_channel_names(

self)

Get current channel names.

def get_channel_names(self):
    """Get current channel names."""
    return self._channel_names

def get_channel_shapes(

self)

Get channel shapes.

def get_channel_shapes(self):
    """Get channel shapes."""
    # as default behavior, assume all channels are scalars
    if self._channel_shapes:
        return self._channel_shapes
    else:
        return {k: () for k in self._channel_names}

def get_channel_units(

self)

Get channel units.

def get_channel_units(self):
    """Get channel units."""
    return self._channel_units

def get_config(

self)

Retrieve the current configuration, including any defaults.

def get_config(self) -> str:
    """Retrieve the current configuration, including any defaults."""
    return toml.dumps(self._config)

def get_config_filepath(

self)

Retrieve the current filepath of the configuration.

def get_config_filepath(self) -> str:
    """Retrieve the current filepath of the configuration."""
    return str(self._config_filepath.absolute())

def get_measured(

self)

Get most recently measured values.

def get_measured(self) -> MeasureType:
    """Get most recently measured values."""
    return self._measured

def get_state(

self)

Return the current daemon state.

def get_state(self) -> str:
    """Return the current daemon state."""
    return toml.dumps(self._state)

def id(

self)

Dictionary of identifying information for the daemon.

def id(self) -> Dict[str, Optional[str]]:
    """Dictionary of identifying information for the daemon."""
    return {
        "name": self.name,
        "kind": self._kind,
        "make": self.make,
        "model": self.model,
        "serial": self.serial,
    }

def measure(

self, loop=False)

Start a measurement, optionally looping.

Sensor will remain busy until measurement completes.

Parameters

loop: bool, optional Toggle looping behavior. Default False.

See Also

stop_looping

def measure(self, loop: bool = False) -> int:
    """Start a measurement, optionally looping.
    Sensor will remain busy until measurement completes.
    Parameters
    ----------
    loop: bool, optional
        Toggle looping behavior. Default False.
    See Also
    --------
    stop_looping
    """
    self._looping = loop
    if not self._busy:
        self._busy = True
        self._loop.create_task(self._runner(loop=loop))
    return self._measurement_id

def save_state(

self)

Schedule writing the current state to disk.

Note: Current implementation only writes while busy (and once after busy)

async def save_state(self):
    """Schedule writing the current state to disk.
    Note: Current implementation only writes while busy (and once after busy)
    """
    while True:
        while self._busy:
            self._save_state()
            await asyncio.sleep(0.1)
        self._save_state()
        await self._busy_sig.wait()

def shutdown(

self, restart=False)

def shutdown(self, restart=False):
    self.logger.info(f"Shutting Down {self.name}")
    self.logger.info(f"Cancelling {len(self._tasks)} outstanding tasks")
    [task.cancel() for task in self._tasks]
    self.close()
    self._server.close()
    if restart:
        config_filepath = self._config_filepath
        config_file = toml.load(config_filepath)
        try:
            config = type(self)._parse_config(config_file, self.name)
            self._loop.create_task(
                type(self)._start_daemon(self.name, config, config_filepath)
            )
        except ValueError as e:
            self.logger.error(e.message)

def stop_looping(

self)

Stop looping.

def stop_looping(self) -> None:
    """Stop looping."""
    self._looping = False

def update_state(

self)

Continually monitor and update the current daemon state.

async def update_state(self):
    """Continually monitor and update the current daemon state."""
    pass

Methods

def main(

cls)

Run the event loop.

@classmethod
def main(cls):
    """Run the event loop."""
    loop = asyncio.get_event_loop()
    if sys.platform.startswith("win"):
        signals = ()
    else:
        signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)))
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        "-c",
        default=(
            pathlib.Path(appdirs.user_config_dir("yaqd", "yaq")) / cls._kind / "config.toml"
        ),
        action="store",
        help="Path to the configuration toml file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_const",
        dest="log_level",
        const="debug",
        help="Alias for --log-level=debug",
    )
    parser.add_argument(
        "--log-level",
        "-l",
        action="store",
        dest="log_level",
        choices=[
            "debug",
            "info",
            "notice",
            "warning",
            "error",
            "critical",
            "alert",
            "emergency",
        ],
        help="Set the log level explicitly",
    )
    parser.add_argument("--version", action="store_true")
    parser.add_argument("--protocol", action="store_true")
    args = parser.parse_args()
    if args.log_level:
        logging.setLevel(logging.name_to_level[args.log_level])
    if args.version:
        print(f"module {cls.__module__} version {cls._version}")
        print(f"avro version {__avro_version__}")
        print(f"yaqd_core version {__version__}")
        print(f"Python {sys.version}")
        sys.exit(0)
    if args.protocol:
        with open(pathlib.Path(inspect.getfile(cls)).parent / f"{cls._kind}.avpr", "r") as f:
            for line in f:
                print(line, end="")
        sys.exit(0)
    config_filepath = pathlib.Path(args.config)
    config_file = toml.load(config_filepath)
    main_task = loop.create_task(cls._main(config_filepath, config_file, args))
    try:
        loop.run_forever()
    except asyncio.exceptions.CancelledError:
        pass
    finally:
        loop.close()

def shutdown_all(

cls, sig, loop)

Gracefully shutdown the asyncio loop.

Gathers all current tasks, and allows daemons to perform cleanup tasks.

Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ Original code is licensed under the MIT license, and sublicensed here.

@classmethod
async def shutdown_all(cls, sig, loop):
    """Gracefully shutdown the asyncio loop.
    Gathers all current tasks, and allows daemons to perform cleanup tasks.
    Adapted from https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
    Original code is licensed under the MIT license, and sublicensed here.
    """
    logger.info(f"Received signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    logger.info(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]
    # This is done after cancelling so that shutdown tasks which require the loop
    # are not themselves cancelled.
    [d.close() for d in cls._daemons]
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    await asyncio.gather(*tasks, return_exceptions=True)
    [d._save_state() for d in cls._daemons]
    if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
        config_filepath = [d._config_filepath for d in cls._daemons][0]
        config_file = toml.load(config_filepath)
        await cls._main(config_filepath, config_file)
    loop.stop()

Sub-modules