Source code for swimprotocol.transport
from __future__ import annotations
from contextlib import AbstractAsyncContextManager
from importlib.metadata import entry_points
from typing import Generic, TypeVar, Final, ClassVar
from .config import ConfigT_co, BaseConfig
from .worker import Worker
__all__ = ['TransportT', 'load_transport', 'Transport']
#: Type variable for :class:`Transport` implementations.
TransportT = TypeVar('TransportT', bound='Transport[BaseConfig]')
[docs]def load_transport(name: str = 'udp', *, group: str = __name__) \
-> type[Transport[BaseConfig]]:
"""Load and return the :class:`Transport` implementation by *name*.
Args:
name: The name of the transport entry point.
group: The entry point group.
Raises:
DistributionNotFound: A dependency of the transport entry point was not
able to be satisfied.
KeyError: The given name did not exist in the entry point group.
"""
for entry_point in entry_points(group=group, name=name):
transport_type: type[Transport[BaseConfig]] = entry_point.load()
return transport_type
raise KeyError(f'{name!r} entry point not found in {group!r}')
[docs]class Transport(Generic[ConfigT_co], AbstractAsyncContextManager[None]):
"""Interface of the basic functionality needed to act as the
:term:`transport` layer for the SWIM protocol. The transport layer is
responsible for sending and receiving :term:`ping`, :term:`ping-req`, and
:term:`ack` packets for failure detection, and transmitting :term:`gossip`
for dissemination. The transport must be entered with ``async with`` to be
activated.
Args:
config: The cluster config object.
"""
#: The :class:`~swimprotocol.config.BaseConfig` sub-class used by this
#: transport.
config_type: ClassVar[type[BaseConfig]]
def __init__(self, config: ConfigT_co, worker: Worker) -> None:
super().__init__()
self.config: Final = config
self.worker: Final = worker