swimprotocol¶
swimprotocol.address¶
- class swimprotocol.address.Address(host, port)[source]¶
Bases:
objectManages an address for socket connections.
swimprotocol.config¶
- class swimprotocol.config.ConfigT_co¶
Covariant type variable for
BaseConfigsub-classes.alias of TypeVar(‘ConfigT_co’, bound=
BaseConfig, covariant=True)
- exception swimprotocol.config.ConfigError[source]¶
Bases:
ExceptionRaised when the configuration is insufficient or invalid for running a cluster, along with a human-readable message about what was wrong.
- exception swimprotocol.config.TransientConfigError(msg=None, *, wait_hint=60.0)[source]¶
Bases:
ConfigErrorRaised when a possibly-temporary failure has prevented configuration of the cluster. This exception is often chained with the cause, e.g.
OSError. Importantly, this exception indicates that configuration of the cluster may succeed eventually if retried.
- class swimprotocol.config.BaseConfig(*, secret, local_name, peers, local_metadata={}, ping_interval=1.0, ping_timeout=0.3, ping_req_count=1, ping_req_timeout=0.9, suspect_timeout=5.0, sync_interval=0.5)[source]¶
Bases:
objectConfigure the cluster behavior and characteristics.
Transportimplementations should sub-class to add additional configuration.- Parameters:
secret (Union[None, str, bytes]) – The shared secret for cluster packet signing, see
Signatures.local_name (str) – The unique name of the local cluster member.
peers (Sequence[str]) – At least one name of another known node in the cluster.
local_metadata (Mapping[str, bytes]) – The initial local cluster member metadata.
ping_interval (float) – Time between ping attempts to random cluster members.
ping_timeout (float) – Time to wait for an ack after sending a ping.
ping_req_count (int) – Number of nodes to send a ping-req when a ping fails.
ping_req_timeout (float) – Time to wait for an ack after sending a ping-req.
suspect_timeout (float) – Time to wait after losing connectivity with a cluster member before marking it offline.
sync_interval (float) – Time between sync attempts to disseminate cluster changes.
- Raises:
ConfigError – The given configuration was invalid.
TransientConfigError – The configuration failed due to a failure that may not be permanent.
- property signatures: Signatures¶
Generates and verifies cluster packet signatures.
- classmethod add_arguments(parser, *, prefix='--')[source]¶
Implementations (such as the demo) may use this method to add command-line based configuration for the transport.
Note
Arguments added should use prefix and explicitly provide a unique name, e.g.:
parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)
This prevents collision with other argument names and allows custom prefix values without affecting the
Namespace.- Parameters:
parser (ArgumentParser) – The argument parser.
prefix (str) – The prefix for added arguments, which should start with
--and end with-, e.g.'--'or'--foo-'.
- Return type:
None
- classmethod parse_args(args, *, env_prefix='SWIM')[source]¶
Parse the given
Namespaceinto a dictionary of keyword arguments for theBaseConfigconstructor. Sub-classes should override this method to add additional keyword arguments as needed.Some keywords will default to environment variables if not given in args:
SWIM_SECRET,SWIM_SECRET_FILE[*]The secret keyword argument.
SWIM_NAMEThe local_name keyword argument.
SWIM_PEERSThe comma-separated peers keyword argument.
- final classmethod from_args(args, **overrides)[source]¶
Build and return a new cluster config object. This first calls
parse_args()and then passes the results as keyword arguments to the constructor.- Parameters:
- Return type:
swimprotocol.listener¶
- swimprotocol.listener.ListenerCallback¶
A callable that takes the notified item.
alias of
Callable[[Concatenate[ListenT_contra,ListenP]],Any]
- class swimprotocol.listener.CallbackPoll(listener, callback, *args, **kwargs)[source]¶
Bases:
Generic[ListenT,ListenP],DaemonTask,TaskOwnerListens for items and running the callback.
- Parameters:
listener (Listener[ListenT]) –
callback (ListenerCallback[ListenT, ListenP]) –
args (ListenP.args) –
kwargs (ListenP.kwargs) –
- class swimprotocol.listener.Listener[source]¶
Bases:
Generic[ListenT]Implements basic listener and callback functionality. Producers can call
notify()with an item, and consumers can wait for those items withpoll()or register a callback withon_notify().- on_notify(callback, *args, **kwargs)[source]¶
Provides a context manager that causes callback to be called when a producer calls
notify().- Parameters:
callback (Callable[[Concatenate[ListenT, ~ListenP]], Any]) – The callback function, which will be passed the item argument from
notify().args (~ListenP) –
kwargs (~ListenP) –
- Return type:
CallbackPoll[ListenT, ~ListenP]
- async poll()[source]¶
Wait until
notify()is called and return all item objects. More than one item may be returned ifnotify()is called more than once before theasyncioevent loop is re-entered.- Return type:
Sequence[ListenT]
- notify(item)[source]¶
Triggers a notification with item, waking any
poll()calls and running anyon_notify()callbacks.- Parameters:
item (ListenT) – The object to be sent to the consumers.
- Return type:
None
swimprotocol.members¶
- class swimprotocol.members.MemberSnapshot(name, clock, status, status_time, metadata)[source]¶
Bases:
objectRepresents a member at a previous moment in time.
- class swimprotocol.members.Member(name, local)[source]¶
Bases:
objectRepresents a member node of the cluster.
- METADATA_UNKNOWN: Mapping[str, bytes] = {}¶
Before a non-local cluster member metadata has been initialized with a known value, it is assigned this empty
dictfor identity comparisons.
- property clock: int¶
The sequence clock tracking changes distributed across the cluster. This value is always increasing, as changes are made to the member status or metadata.
- property previous: MemberSnapshot¶
A snapshot of the member before the most recent change.
- class swimprotocol.members.Members(config)[source]¶
-
Manages the members of the cluster.
- Parameters:
config (BaseConfig) – The cluster config object.
- property local: Member¶
The local member for the process.
- find(count, *, status=Status.ALL, exclude=frozenset({}))[source]¶
Return a randomly-chosen subset of non-local cluster members that meet the given criteria.
- get(name, validity=None)[source]¶
Return the cluster member with the given name, creating it if does not exist.
If validity is given and name matches an existing non-local member, it is compared to the previous validity value, causing a full resynchronize if they are different.
- update(member, *, new_status=None, new_metadata=None)[source]¶
Update the cluster member status or metadata.
- apply(member, source, clock, *, status, metadata)[source]¶
Apply a disseminated update from source to member.
- Parameters:
- Return type:
None
- get_gossip(target)[source]¶
Iterates through cluster members looking for gossip that should be sent to target.
See also
swimprotocol.packet¶
- class swimprotocol.packet.Source(name, validity)[source]¶
Bases:
objectUniquely identifies the local cluster member that created the packet.
- class swimprotocol.packet.Packet(source)[source]¶
Bases:
objectBase class for a packet sent between cluster members.
Transportimplementations may use these directly, e.g.UdpPack, or adapt their contents into another protocol.- Parameters:
source (Source) – The name of the local cluster member that created the packet.
- class swimprotocol.packet.Ping(source)[source]¶
Bases:
PacketPackets used for the SWIM protocol ping operation, which do not explicitly contain any other information other than the source.
- Parameters:
source (Source) –
- class swimprotocol.packet.PingReq(source, target)[source]¶
Bases:
PacketPackets used for the SWIM protocol ping-req operation, which contain a target member in addition to source.
- class swimprotocol.packet.Ack(source)[source]¶
Bases:
PacketPackets used for the SWIM protocol ack response, which indicates that source is online.
- Parameters:
source (Source) –
- class swimprotocol.packet.Gossip(source, name, clock, status, metadata)[source]¶
Bases:
PacketPackets used for SWIM protocol gossip, which alert other members when a cluster member has changed status or metadata. This information is intended to travel around the cluster until all members are aware of the change.
- Parameters:
name (str) – The name of the cluster member whose state has changed.
clock (int) – The sequence clock value associated with the change.
status (Status) – The current perceived status of the cluster member.
metadata (Mapping[str, bytes] | None) – The current metadata associated with the cluster member.
source (Source) –
swimprotocol.shuffle¶
- class swimprotocol.shuffle.Shuffle[source]¶
Bases:
Set[ShuffleT_co]A set of objects that can be accessed in a “shuffled” manner, similar to a deck of cards. All operations including
choice()are O(1) time complexity.
- class swimprotocol.shuffle.WeakShuffle(init=())[source]¶
Bases:
Shuffle[ShuffleT],MutableSet[ShuffleT]An implementation of
Shufflethat holds only weak references to the set elements.- Parameters:
init (Iterable[ShuffleT]) – Initial objects to add.
- discard(val)[source]¶
Remove an element. Do not raise an exception if absent.
- Parameters:
val (ShuffleT) –
- Return type:
None
swimprotocol.sign¶
- class swimprotocol.sign.Signature(salt, digest)[source]¶
Bases:
NamedTupleA salted digest, including the salt value.
- class swimprotocol.sign.Signatures(secret, *, hash_name='sha256', salt_len=16, check_version=True)[source]¶
Bases:
objectProvides a hash signature for inclusion alongside
Packetobjects when implementing aTransportprotocol.Note
Passing
secret=Noneusesuuid.getnode()as the secret, which effectively allows you to experiment locally but will not suffice for connections over a network.- Parameters:
swimprotocol.status¶
- class swimprotocol.status.Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
FlagPossible cluster member status values, as well as aggregate values that can be used with bitwise operations but must not be assigned.
- ONLINE = 1¶
The member is responding as expected.
- OFFLINE = 2¶
The member has stopped responding for long enough to avoid false positives.
- SUSPECT = 4¶
The member has failed to respond, but is not yet declared fully offline.
- AVAILABLE = 5¶
Aggregate status for statuses that are considered responding,
ONLINEandSUSPECT, for use with bitwise operations.
- UNAVAILABLE = 6¶
Aggregate status for statuses that are not considered responding,
OFFLINEandSUSPECT, for use with bitwise operations.
- ALL = 7¶
Aggregate status for all statuses, for use with bitwise operations.
- transition(to)[source]¶
Prevents impossible status transitions, returning a new status to be used instead of to.
- Parameters:
to (Status) – The desired transition status.
- Raises:
ValueError – to was an aggregate status, which cannot be transitioned to directly.
- Return type:
swimprotocol.tasks¶
- class swimprotocol.tasks.TaskT¶
The type of task result.
alias of TypeVar(‘TaskT’)
- class swimprotocol.tasks.DaemonTask[source]¶
Bases:
AbstractAsyncContextManager[Task[NoReturn]]Base class for a task that is run for the duration of an
async withcontext.
- class swimprotocol.tasks.TaskOwner[source]¶
Bases:
objectBase class for any class that needs to run sub-tasks.
Because
asynciocan be garbage-collected while running, the purpose of this base class is to keep a strong reference to all running tasks. The task removes its own reference when it is complete, effectively allowing it to “daemonize”.
swimprotocol.transport¶
- class swimprotocol.transport.TransportT¶
Type variable for
Transportimplementations.alias of TypeVar(‘TransportT’, bound=
Transport[BaseConfig])
- swimprotocol.transport.load_transport(name='udp', *, group='swimprotocol.transport')[source]¶
Load and return the
Transportimplementation by name.- Parameters:
- Raises:
DistributionNotFound – A dependency of the transport entry point was not able to be satisfied.
KeyError – The given name did not exist in the entry point group.
- Return type:
type[swimprotocol.transport.Transport[swimprotocol.config.BaseConfig]]
- class swimprotocol.transport.Transport(config, worker)[source]¶
Bases:
Generic[ConfigT_co],AbstractAsyncContextManager[None]Interface of the basic functionality needed to act as the transport layer for the SWIM protocol. The transport layer is responsible for sending and receiving ping, ping-req, and ack packets for failure detection, and transmitting gossip for dissemination. The transport must be entered with
async withto be activated.- Parameters:
config (ConfigT_co) – The cluster config object.
worker (Worker) –
- config_type: ClassVar[type[swimprotocol.config.BaseConfig]]¶
The
BaseConfigsub-class used by this transport.
swimprotocol.worker¶
- class swimprotocol.worker.Worker(config, members)[source]¶
Bases:
DaemonTask,TaskOwnerManages the failure detection and dissemination components of the SWIM protocol.
See also
- Parameters:
config (BaseConfig) – The cluster configuration object.
members (Members) – Tracks the state of the cluster members.
- property send_queue: Queue[tuple[swimprotocol.members.Member, swimprotocol.packet.Packet]]¶
The queue of packets to be sent.
- final async check(target)[source]¶
Attempts to determine if target is responding, setting it to suspect if it does not respond with an ack.
See also
- Parameters:
target (Member) – The cluster member to check.
- Return type:
None
- final async disseminate(target)[source]¶
Sends any gossip that might be needed by target.
See also
- Parameters:
target (Member) – The cluster member to disseminate to updates to.
- Return type:
None
- async run_failure_detection()[source]¶
Indefinitely send failure detection packets to other cluster members.
Note
Override this method to control when and how
check()is called. By default, one random cluster member is chosen everyping_intervalseconds.- Return type:
- async run_dissemination()[source]¶
Indefinitely send dissemination packets to other cluster members.
Note
Override this method to control when and how
disseminate()is called. By default, one random cluster member is chosen everysync_intervalseconds.- Return type:
- final async run()[source]¶
Indefinitely handle received SWIM protocol packets and, at configurable intervals, send failure detection and dissemination packets. This method calls
run_failure_detection()andrun_dissemination().- Return type: