swimprotocol

swimprotocol.address

class swimprotocol.address.Address(host, port)[source]

Bases: object

Manages an address for socket connections.

Parameters:
  • host (str) – The address hostname string.

  • port (int) – The address port number.

classmethod get(addr)[source]

Return an Address from a (host, port) tuple.

Parameters:

addr (tuple[str, int]) – The address tuple from socket functions.

Return type:

Address

class swimprotocol.address.AddressParser(address_type=<class 'swimprotocol.address.Address'>, *, default_host=None, default_port=None)[source]

Bases: object

Manages the defaults to use when parsing an address string.

Parameters:
  • address_type (_AddressType) – Override the Address implementation.

  • default_host (Optional[str]) – The default hostname, if missing from the address string (e.g. :1234:).

  • default_port (Optional[int]) – The default port number, if missing from the address string (e.g. example.tld).

swimprotocol.config

class swimprotocol.config.ConfigT_co

Covariant type variable for BaseConfig sub-classes.

alias of TypeVar(‘ConfigT_co’, bound=BaseConfig, covariant=True)

exception swimprotocol.config.ConfigError[source]

Bases: Exception

Raised when the configuration is insufficient or invalid for running a cluster, along with a human-readable message about what was wrong.

exception swimprotocol.config.TransientConfigError(msg=None, *, wait_hint=60.0)[source]

Bases: ConfigError

Raised when a possibly-temporary failure has prevented configuration of the cluster. This exception is often chained with the cause, e.g. OSError. Importantly, this exception indicates that configuration of the cluster may succeed eventually if retried.

Parameters:
  • msg (Optional[str]) – The exception message.

  • wait_hint (float) – A suggested sleep() time before trying again.

Return type:

None

class swimprotocol.config.BaseConfig(*, secret, local_name, peers, local_metadata={}, ping_interval=1.0, ping_timeout=0.3, ping_req_count=1, ping_req_timeout=0.9, suspect_timeout=5.0, sync_interval=0.5)[source]

Bases: object

Configure the cluster behavior and characteristics. Transport implementations should sub-class to add additional configuration.

Parameters:
  • secret (Union[None, str, bytes]) – The shared secret for cluster packet signing, see Signatures.

  • local_name (str) – The unique name of the local cluster member.

  • peers (Sequence[str]) – At least one name of another known node in the cluster.

  • local_metadata (Mapping[str, bytes]) – The initial local cluster member metadata.

  • ping_interval (float) – Time between ping attempts to random cluster members.

  • ping_timeout (float) – Time to wait for an ack after sending a ping.

  • ping_req_count (int) – Number of nodes to send a ping-req when a ping fails.

  • ping_req_timeout (float) – Time to wait for an ack after sending a ping-req.

  • suspect_timeout (float) – Time to wait after losing connectivity with a cluster member before marking it offline.

  • sync_interval (float) – Time between sync attempts to disseminate cluster changes.

Raises:
property signatures: Signatures

Generates and verifies cluster packet signatures.

classmethod add_arguments(parser, *, prefix='--')[source]

Implementations (such as the demo) may use this method to add command-line based configuration for the transport.

Note

Arguments added should use prefix and explicitly provide a unique name, e.g.:

parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)

This prevents collision with other argument names and allows custom prefix values without affecting the Namespace.

Parameters:
  • parser (ArgumentParser) – The argument parser.

  • prefix (str) – The prefix for added arguments, which should start with -- and end with -, e.g. '--' or '--foo-'.

Return type:

None

classmethod parse_args(args, *, env_prefix='SWIM')[source]

Parse the given Namespace into a dictionary of keyword arguments for the BaseConfig constructor. Sub-classes should override this method to add additional keyword arguments as needed.

Some keywords will default to environment variables if not given in args:

SWIM_SECRET, SWIM_SECRET_FILE [*]

The secret keyword argument.

SWIM_NAME

The local_name keyword argument.

SWIM_PEERS

The comma-separated peers keyword argument.

Parameters:
  • args (Namespace) – The command-line arguments.

  • env_prefix (str) – Prefix for the environment variables.

Return type:

dict[str, Any]

final classmethod from_args(args, **overrides)[source]

Build and return a new cluster config object. This first calls parse_args() and then passes the results as keyword arguments to the constructor.

Parameters:
  • args (Namespace) – The command-line arguments.

  • overrides (Any) – Keyword arguments to override.

Return type:

ConfigT_co

swimprotocol.listener

swimprotocol.listener.ListenerCallback

A callable that takes the notified item.

alias of Callable[[Concatenate[ListenT_contra, ListenP]], Any]

class swimprotocol.listener.CallbackPoll(listener, callback, *args, **kwargs)[source]

Bases: Generic[ListenT, ListenP], DaemonTask, TaskOwner

Listens for items and running the callback.

Parameters:
  • listener (Listener[ListenT]) –

  • callback (ListenerCallback[ListenT, ListenP]) –

  • args (ListenP.args) –

  • kwargs (ListenP.kwargs) –

async run()[source]

The method to run while the context is entered. The task is cancelled when the context exits.

Return type:

NoReturn

class swimprotocol.listener.Listener[source]

Bases: Generic[ListenT]

Implements basic listener and callback functionality. Producers can call notify() with an item, and consumers can wait for those items with poll() or register a callback with on_notify().

on_notify(callback, *args, **kwargs)[source]

Provides a context manager that causes callback to be called when a producer calls notify().

Parameters:
  • callback (Callable[[Concatenate[ListenT, ~ListenP]], Any]) – The callback function, which will be passed the item argument from notify().

  • args (~ListenP) –

  • kwargs (~ListenP) –

Return type:

CallbackPoll[ListenT, ~ListenP]

async poll()[source]

Wait until notify() is called and return all item objects. More than one item may be returned if notify() is called more than once before the asyncio event loop is re-entered.

Return type:

Sequence[ListenT]

notify(item)[source]

Triggers a notification with item, waking any poll() calls and running any on_notify() callbacks.

Parameters:

item (ListenT) – The object to be sent to the consumers.

Return type:

None

swimprotocol.members

class swimprotocol.members.MemberSnapshot(name, clock, status, status_time, metadata)[source]

Bases: object

Represents a member at a previous moment in time.

Parameters:
class swimprotocol.members.Member(name, local)[source]

Bases: object

Represents a member node of the cluster.

Parameters:
METADATA_UNKNOWN: Mapping[str, bytes] = {}

Before a non-local cluster member metadata has been initialized with a known value, it is assigned this empty dict for identity comparisons.

property clock: int

The sequence clock tracking changes distributed across the cluster. This value is always increasing, as changes are made to the member status or metadata.

property status: Status

The last known status of the cluster member.

property status_time: float

The local system time when status last changed.

property metadata: Mapping[str, bytes]

The last known metadata of the cluster member.

property previous: MemberSnapshot

A snapshot of the member before the most recent change.

class swimprotocol.members.Members(config)[source]

Bases: Set[Member]

Manages the members of the cluster.

Parameters:

config (BaseConfig) – The cluster config object.

property local: Member

The local member for the process.

property non_local: Set[Member]

All of the non-local cluster members.

find(count, *, status=Status.ALL, exclude=frozenset({}))[source]

Return a randomly-chosen subset of non-local cluster members that meet the given criteria.

Parameters:
  • count (int) – At most this many members will be returned.

  • status (Status) – The real or aggregate status of the members.

  • exclude (Set[Member]) – Members that must not be included in the resulting list.

Return type:

frozenset[swimprotocol.members.Member]

get_status(status)[source]

Return all of the non-local cluster members with the given status.

Parameters:

status (Status) – A real status like ONLINE or an aggregate status like AVAILABLE.

Return type:

Shuffle[Member]

get(name, validity=None)[source]

Return the cluster member with the given name, creating it if does not exist.

If validity is given and name matches an existing non-local member, it is compared to the previous validity value, causing a full resynchronize if they are different.

Parameters:
  • name (str) – The unique name of the cluster member.

  • validity (bytes | None) – Random bytes used to check existing member validity.

Return type:

Member

update(member, *, new_status=None, new_metadata=None)[source]

Update the cluster member status or metadata.

Parameters:
  • member (Member) – The cluster member to update.

  • new_status (Status | None) – A new status for the member, if any.

  • new_metadata (Mapping[str, bytes] | None) – New metadata dictionary for the member, if any.

Return type:

None

apply(member, source, clock, *, status, metadata)[source]

Apply a disseminated update from source to member.

Parameters:
  • member (Member) – The cluster member to update.

  • source (Member) – The cluster member that disseminated the update.

  • clock (int) – The sequence clock of the update.

  • status (Status) – The status to apply to member.

  • metadata (Mapping[str, bytes] | None) – The metadata to apply to member, if known.

Return type:

None

get_gossip(target)[source]

Iterates through cluster members looking for gossip that should be sent to target.

See also

Dissemination

Parameters:

target (Member) – The recipient of the cluster gossip.

Return type:

Generator[Member, None, None]

ack_gossip(member, source, clock)[source]

Marks the source cluster member as having received updates about member up to the given sequence clock. This prevents repeated transfer of known gossip.

Parameters:
  • member (Member) – The cluster member that was updated.

  • source (Member) – The cluster member that received the update.

  • clock (int) – The sequence clock of the update.

Return type:

None

swimprotocol.packet

class swimprotocol.packet.Source(name, validity)[source]

Bases: object

Uniquely identifies the local cluster member that created the packet.

Parameters:
  • name (str) – The name of the local cluster member.

  • validity (bytes) – Random bytestring used to detect non-unique name values.

class swimprotocol.packet.Packet(source)[source]

Bases: object

Base class for a packet sent between cluster members.

Transport implementations may use these directly, e.g. UdpPack, or adapt their contents into another protocol.

Parameters:

source (Source) – The name of the local cluster member that created the packet.

class swimprotocol.packet.Ping(source)[source]

Bases: Packet

Packets used for the SWIM protocol ping operation, which do not explicitly contain any other information other than the source.

Parameters:

source (Source) –

class swimprotocol.packet.PingReq(source, target)[source]

Bases: Packet

Packets used for the SWIM protocol ping-req operation, which contain a target member in addition to source.

Parameters:
  • target (str) – The name of the target cluster member.

  • source (Source) –

class swimprotocol.packet.Ack(source)[source]

Bases: Packet

Packets used for the SWIM protocol ack response, which indicates that source is online.

Parameters:

source (Source) –

class swimprotocol.packet.Gossip(source, name, clock, status, metadata)[source]

Bases: Packet

Packets used for SWIM protocol gossip, which alert other members when a cluster member has changed status or metadata. This information is intended to travel around the cluster until all members are aware of the change.

Parameters:
  • name (str) – The name of the cluster member whose state has changed.

  • clock (int) – The sequence clock value associated with the change.

  • status (Status) – The current perceived status of the cluster member.

  • metadata (Mapping[str, bytes] | None) – The current metadata associated with the cluster member.

  • source (Source) –

class swimprotocol.packet.GossipAck(source, name, clock)[source]

Bases: Packet

Packets used to acknowledge receipt of a Gossip packet.

Parameters:
  • name (str) – The name of the cluster member from the Gossip packet.

  • clock (int) – The sequence clock from the Gossip packet.

  • source (Source) –

swimprotocol.shuffle

class swimprotocol.shuffle.Shuffle[source]

Bases: Set[ShuffleT_co]

A set of objects that can be accessed in a “shuffled” manner, similar to a deck of cards. All operations including choice() are O(1) time complexity.

abstract choice()[source]

Choose an object from the set at random and return it. This object is not removed from the set.

See also

random.choice()

Raises:

KeyError – The set was empty.

Return type:

ShuffleT_co

class swimprotocol.shuffle.WeakShuffle(init=())[source]

Bases: Shuffle[ShuffleT], MutableSet[ShuffleT]

An implementation of Shuffle that holds only weak references to the set elements.

Parameters:

init (Iterable[ShuffleT]) – Initial objects to add.

add(val)[source]

Add an element.

Parameters:

val (ShuffleT) –

Return type:

None

discard(val)[source]

Remove an element. Do not raise an exception if absent.

Parameters:

val (ShuffleT) –

Return type:

None

choice()[source]

Choose an object from the set at random and return it. This object is not removed from the set.

See also

random.choice()

Raises:

KeyError – The set was empty.

Return type:

ShuffleT

swimprotocol.sign

class swimprotocol.sign.Signature(salt, digest)[source]

Bases: NamedTuple

A salted digest, including the salt value.

Parameters:
  • salt (bytes) – Salt used to sign digest.

  • digest (bytes) – The salted digest.

salt: bytes

Alias for field number 0

digest: bytes

Alias for field number 1

class swimprotocol.sign.Signatures(secret, *, hash_name='sha256', salt_len=16, check_version=True)[source]

Bases: object

Provides a hash signature for inclusion alongside Packet objects when implementing a Transport protocol.

Note

Passing secret=None uses uuid.getnode() as the secret, which effectively allows you to experiment locally but will not suffice for connections over a network.

Parameters:
  • secret (Union[None, str, bytes]) – A shared secret among all cluster members.

  • hash_name (str) – The hashlib hash name to use.

  • salt_len (int) – The length of the salt to use when hashing.

  • check_version (bool) – True if the swimprotocol.__version__ should be included in the signature and verification.

sign(data)[source]

Sign the data using a new random salt, returning the salt and the resulting digest as a tuple.

Parameters:

data (bytes) – The bytes to be signed.

Return type:

Signature

verify(data, sig)[source]

Verify that a signature is valid for the given salt and data.

Parameters:
  • data (bytes) – The bytes to be verified.

  • sig (tuple[bytes, bytes]) – A tuple of the salt used during signing and the resulting digest.

Return type:

bool

swimprotocol.status

class swimprotocol.status.Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

Possible cluster member status values, as well as aggregate values that can be used with bitwise operations but must not be assigned.

ONLINE = 1

The member is responding as expected.

OFFLINE = 2

The member has stopped responding for long enough to avoid false positives.

SUSPECT = 4

The member has failed to respond, but is not yet declared fully offline.

AVAILABLE = 5

Aggregate status for statuses that are considered responding, ONLINE and SUSPECT, for use with bitwise operations.

UNAVAILABLE = 6

Aggregate status for statuses that are not considered responding, OFFLINE and SUSPECT, for use with bitwise operations.

ALL = 7

Aggregate status for all statuses, for use with bitwise operations.

property name: str

The name of the status, e.g. 'ONLINE'.

transition(to)[source]

Prevents impossible status transitions, returning a new status to be used instead of to.

Parameters:

to (Status) – The desired transition status.

Raises:

ValueErrorto was an aggregate status, which cannot be transitioned to directly.

Return type:

Status

classmethod all_statuses()[source]

A collection of all the statuses, including aggregate statuses.

Return type:

Collection[Status]

swimprotocol.tasks

class swimprotocol.tasks.TaskT

The type of task result.

alias of TypeVar(‘TaskT’)

class swimprotocol.tasks.DaemonTask[source]

Bases: AbstractAsyncContextManager[Task[NoReturn]]

Base class for a task that is run for the duration of an async with context.

abstract async run()[source]

The method to run while the context is entered. The task is cancelled when the context exits.

Return type:

NoReturn

class swimprotocol.tasks.TaskOwner[source]

Bases: object

Base class for any class that needs to run sub-tasks.

Because asyncio can be garbage-collected while running, the purpose of this base class is to keep a strong reference to all running tasks. The task removes its own reference when it is complete, effectively allowing it to “daemonize”.

run_subtask(coro)[source]

Run the coro sub-task.

Parameters:

coro (Coroutine[Any, Any, TaskT]) – The coroutine to run.

Return type:

Task[TaskT]

swimprotocol.transport

class swimprotocol.transport.TransportT

Type variable for Transport implementations.

alias of TypeVar(‘TransportT’, bound=Transport[BaseConfig])

swimprotocol.transport.load_transport(name='udp', *, group='swimprotocol.transport')[source]

Load and return the Transport implementation by name.

Parameters:
  • name (str) – The name of the transport entry point.

  • group (str) – The entry point group.

Raises:
  • DistributionNotFound – A dependency of the transport entry point was not able to be satisfied.

  • KeyError – The given name did not exist in the entry point group.

Return type:

type[swimprotocol.transport.Transport[swimprotocol.config.BaseConfig]]

class swimprotocol.transport.Transport(config, worker)[source]

Bases: Generic[ConfigT_co], AbstractAsyncContextManager[None]

Interface of the basic functionality needed to act as the transport layer for the SWIM protocol. The transport layer is responsible for sending and receiving ping, ping-req, and ack packets for failure detection, and transmitting gossip for dissemination. The transport must be entered with async with to be activated.

Parameters:
config_type: ClassVar[type[swimprotocol.config.BaseConfig]]

The BaseConfig sub-class used by this transport.

swimprotocol.worker

class swimprotocol.worker.Worker(config, members)[source]

Bases: DaemonTask, TaskOwner

Manages the failure detection and dissemination components of the SWIM protocol.

Parameters:
  • config (BaseConfig) – The cluster configuration object.

  • members (Members) – Tracks the state of the cluster members.

property recv_queue: Queue[Packet]

The queue of packets received.

property send_queue: Queue[tuple[swimprotocol.members.Member, swimprotocol.packet.Packet]]

The queue of packets to be sent.

final async check(target)[source]

Attempts to determine if target is responding, setting it to suspect if it does not respond with an ack.

Parameters:

target (Member) – The cluster member to check.

Return type:

None

final async disseminate(target)[source]

Sends any gossip that might be needed by target.

See also

Dissemination

Parameters:

target (Member) – The cluster member to disseminate to updates to.

Return type:

None

async run_failure_detection()[source]

Indefinitely send failure detection packets to other cluster members.

Note

Override this method to control when and how check() is called. By default, one random cluster member is chosen every ping_interval seconds.

Return type:

NoReturn

async run_dissemination()[source]

Indefinitely send dissemination packets to other cluster members.

Note

Override this method to control when and how disseminate() is called. By default, one random cluster member is chosen every sync_interval seconds.

Return type:

NoReturn

final async run()[source]

Indefinitely handle received SWIM protocol packets and, at configurable intervals, send failure detection and dissemination packets. This method calls run_failure_detection() and run_dissemination().

Return type:

NoReturn