Source code for swimprotocol.config


from __future__ import annotations

import os
from argparse import ArgumentParser, Namespace
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import final, TypeVar, Final, Any, Union, Optional

from .sign import Signatures

__all__ = ['ConfigT_co', 'ConfigError', 'TransientConfigError', 'BaseConfig']

#: Covariant type variable for :class:`BaseConfig` sub-classes.
ConfigT_co = TypeVar('ConfigT_co', bound='BaseConfig', covariant=True)


[docs]class ConfigError(Exception): """Raised when the configuration is insufficient or invalid for running a cluster, along with a human-readable message about what was wrong. """ pass
[docs]class TransientConfigError(ConfigError): """Raised when a possibly-temporary failure has prevented configuration of the cluster. This exception is often chained with the cause, e.g. :exc:`OSError`. Importantly, this exception indicates that configuration of the cluster may succeed eventually if retried. Args: msg: The exception message. wait_hint: A suggested :func:`~asyncio.sleep` time before trying again. """ def __init__(self, msg: Optional[str] = None, *, wait_hint: float = 60.0) -> None: super().__init__(msg) self.wait_hint: Final = wait_hint
[docs]class BaseConfig: """Configure the cluster behavior and characteristics. :class:`~swimprotocol.transport.Transport` implementations should sub-class to add additional configuration. Args: secret: The shared secret for cluster packet signing, see :class:`Signatures`. local_name: The unique name of the local cluster member. peers: At least one name of another known node in the cluster. local_metadata: The initial local cluster member metadata. ping_interval: Time between :term:`ping` attempts to random cluster members. ping_timeout: Time to wait for an :term:`ack` after sending a :term:`ping`. ping_req_count: Number of nodes to send a :term:`ping-req` when a :term:`ping` fails. ping_req_timeout: Time to wait for an *ack* after sending a :term:`ping-req`. suspect_timeout: Time to wait after losing connectivity with a cluster member before marking it offline. sync_interval: Time between sync attempts to disseminate cluster changes. Raises: ConfigError: The given configuration was invalid. TransientConfigError: The configuration failed due to a failure that may not be permanent. """ _empty: dict[str, bytes] = {} def __init__(self, *, secret: Union[None, str, bytes], local_name: str, peers: Sequence[str], local_metadata: Mapping[str, bytes] = _empty, ping_interval: float = 1.0, ping_timeout: float = 0.3, ping_req_count: int = 1, ping_req_timeout: float = 0.9, suspect_timeout: float = 5.0, sync_interval: float = 0.5) -> None: super().__init__() self._signatures = Signatures(secret) self.local_name: Final = local_name self.peers: Final = peers self.local_metadata: Final = local_metadata self.ping_interval: Final = ping_interval self.ping_timeout: Final = ping_timeout self.ping_req_count: Final = ping_req_count self.ping_req_timeout: Final = ping_req_timeout self.suspect_timeout: Final = suspect_timeout self.sync_interval: Final = sync_interval self._validate() def _validate(self) -> None: if not self.local_name: raise ConfigError('This cluster instance needs a local name.') @property def signatures(self) -> Signatures: """Generates and verifies cluster packet signatures.""" return self._signatures
[docs] @classmethod def add_arguments(cls, parser: ArgumentParser, *, prefix: str = '--') -> None: """Implementations (such as the :term:`demo`) may use this method to add command-line based configuration for the transport. Note: Arguments added should use *prefix* and explicitly provide a unique name, e.g.:: parser.add_argument(f'{prefix}arg', dest='swim_arg', ...) This prevents collision with other argument names and allows custom *prefix* values without affecting the :class:`~argparse.Namespace`. Args: parser: The argument parser. prefix: The prefix for added arguments, which should start with ``--`` and end with ``-``, e.g. ``'--'`` or ``'--foo-'``. """ group = parser.add_argument_group('swim options') group.add_argument(f'{prefix}secret', dest='swim_secret', metavar='STRING', help='The secret string used to verify messages.') group.add_argument(f'{prefix}name', dest='swim_name', metavar='localname', help='External name or address for this node.') group.add_argument(f'{prefix}peer', dest='swim_peers', metavar='peername', action='append', default=[], help='At least one name or address of ' 'a known peer.')
@classmethod def _get_env(cls, env_prefix: str, env: str) -> Optional[str]: env_file_val = os.getenv(f'{env_prefix}_{env}_FILE') if env_file_val is not None: env_path = Path(env_file_val).expanduser() try: with open(env_path, 'r') as env_file: return env_file.read().rstrip('\r\n') except OSError: pass env_val = os.getenv(f'{env_prefix}_{env}') if env_val is not None: return env_val return None @classmethod def _get_env_list(cls, env_prefix: str, env: str) -> Sequence[str]: env_val = cls._get_env(env_prefix, env) if env_val: return env_val.split(',') else: return []
[docs] @classmethod def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \ -> dict[str, Any]: """Parse the given :class:`~argparse.Namespace` into a dictionary of keyword arguments for the :class:`BaseConfig` constructor. Sub-classes should override this method to add additional keyword arguments as needed. Some keywords will default to environment variables if not given in *args*: ``SWIM_SECRET``, ``SWIM_SECRET_FILE`` [*]_ The *secret* keyword argument. ``SWIM_NAME`` The *local_name* keyword argument. ``SWIM_PEERS`` The comma-separated *peers* keyword argument. .. [*] The value is read from the given file path. Args: args: The command-line arguments. env_prefix: Prefix for the environment variables. """ secret = args.swim_secret or cls._get_env(env_prefix, 'SECRET') local_name = args.swim_name or cls._get_env(env_prefix, 'NAME') peers = args.swim_peers or cls._get_env_list(env_prefix, 'PEERS') return {'secret': secret, 'local_name': local_name, 'peers': peers}
[docs] @final @classmethod def from_args(cls: type[ConfigT_co], args: Namespace, **overrides: Any) -> ConfigT_co: """Build and return a new cluster config object. This first calls :meth:`.parse_args` and then passes the results as keyword arguments to the constructor. Args: args: The command-line arguments. overrides: Keyword arguments to override. """ kwargs = cls.parse_args(args) kwargs |= overrides return cls(**kwargs)