swimprotocol
¶
swimprotocol.address
¶
- class swimprotocol.address.Address(host, port)[source]¶
Bases:
object
Manages an address for socket connections.
swimprotocol.config
¶
- class swimprotocol.config.ConfigT_co¶
Covariant type variable for
BaseConfig
sub-classes.alias of TypeVar(‘ConfigT_co’, bound=
BaseConfig
, covariant=True)
- exception swimprotocol.config.ConfigError[source]¶
Bases:
Exception
Raised when the configuration is insufficient or invalid for running a cluster, along with a human-readable message about what was wrong.
- exception swimprotocol.config.TransientConfigError(msg=None, *, wait_hint=60.0)[source]¶
Bases:
ConfigError
Raised when a possibly-temporary failure has prevented configuration of the cluster. This exception is often chained with the cause, e.g.
OSError
. Importantly, this exception indicates that configuration of the cluster may succeed eventually if retried.
- class swimprotocol.config.BaseConfig(*, secret, local_name, peers, local_metadata={}, ping_interval=1.0, ping_timeout=0.3, ping_req_count=1, ping_req_timeout=0.9, suspect_timeout=5.0, sync_interval=0.5)[source]¶
Bases:
object
Configure the cluster behavior and characteristics.
Transport
implementations should sub-class to add additional configuration.- Parameters:
secret (Union[None, str, bytes]) – The shared secret for cluster packet signing, see
Signatures
.local_name (str) – The unique name of the local cluster member.
peers (Sequence[str]) – At least one name of another known node in the cluster.
local_metadata (Mapping[str, bytes]) – The initial local cluster member metadata.
ping_interval (float) – Time between ping attempts to random cluster members.
ping_timeout (float) – Time to wait for an ack after sending a ping.
ping_req_count (int) – Number of nodes to send a ping-req when a ping fails.
ping_req_timeout (float) – Time to wait for an ack after sending a ping-req.
suspect_timeout (float) – Time to wait after losing connectivity with a cluster member before marking it offline.
sync_interval (float) – Time between sync attempts to disseminate cluster changes.
- Raises:
ConfigError – The given configuration was invalid.
TransientConfigError – The configuration failed due to a failure that may not be permanent.
- property signatures: Signatures¶
Generates and verifies cluster packet signatures.
- classmethod add_arguments(parser, *, prefix='--')[source]¶
Implementations (such as the demo) may use this method to add command-line based configuration for the transport.
Note
Arguments added should use prefix and explicitly provide a unique name, e.g.:
parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)
This prevents collision with other argument names and allows custom prefix values without affecting the
Namespace
.- Parameters:
parser (ArgumentParser) – The argument parser.
prefix (str) – The prefix for added arguments, which should start with
--
and end with-
, e.g.'--'
or'--foo-'
.
- Return type:
None
- classmethod parse_args(args, *, env_prefix='SWIM')[source]¶
Parse the given
Namespace
into a dictionary of keyword arguments for theBaseConfig
constructor. Sub-classes should override this method to add additional keyword arguments as needed.Some keywords will default to environment variables if not given in args:
SWIM_SECRET
,SWIM_SECRET_FILE
[*]The secret keyword argument.
SWIM_NAME
The local_name keyword argument.
SWIM_PEERS
The comma-separated peers keyword argument.
- final classmethod from_args(args, **overrides)[source]¶
Build and return a new cluster config object. This first calls
parse_args()
and then passes the results as keyword arguments to the constructor.- Parameters:
- Return type:
swimprotocol.listener
¶
- swimprotocol.listener.ListenerCallback¶
A callable that takes the notified item.
alias of
Callable
[[Concatenate
[ListenT_contra
,ListenP
]],Any
]
- class swimprotocol.listener.CallbackPoll(listener, callback, *args, **kwargs)[source]¶
Bases:
Generic
[ListenT
,ListenP
],DaemonTask
,TaskOwner
Listens for items and running the callback.
- Parameters:
listener (Listener[ListenT]) –
callback (ListenerCallback[ListenT, ListenP]) –
args (ListenP.args) –
kwargs (ListenP.kwargs) –
- class swimprotocol.listener.Listener[source]¶
Bases:
Generic
[ListenT
]Implements basic listener and callback functionality. Producers can call
notify()
with an item, and consumers can wait for those items withpoll()
or register a callback withon_notify()
.- on_notify(callback, *args, **kwargs)[source]¶
Provides a context manager that causes callback to be called when a producer calls
notify()
.- Parameters:
callback (Callable[[Concatenate[ListenT, ~ListenP]], Any]) – The callback function, which will be passed the item argument from
notify()
.args (~ListenP) –
kwargs (~ListenP) –
- Return type:
CallbackPoll[ListenT, ~ListenP]
- async poll()[source]¶
Wait until
notify()
is called and return all item objects. More than one item may be returned ifnotify()
is called more than once before theasyncio
event loop is re-entered.- Return type:
Sequence[ListenT]
- notify(item)[source]¶
Triggers a notification with item, waking any
poll()
calls and running anyon_notify()
callbacks.- Parameters:
item (ListenT) – The object to be sent to the consumers.
- Return type:
None
swimprotocol.members
¶
- class swimprotocol.members.MemberSnapshot(name, clock, status, status_time, metadata)[source]¶
Bases:
object
Represents a member at a previous moment in time.
- class swimprotocol.members.Member(name, local)[source]¶
Bases:
object
Represents a member node of the cluster.
- METADATA_UNKNOWN: Mapping[str, bytes] = {}¶
Before a non-local cluster member metadata has been initialized with a known value, it is assigned this empty
dict
for identity comparisons.
- property clock: int¶
The sequence clock tracking changes distributed across the cluster. This value is always increasing, as changes are made to the member status or metadata.
- property previous: MemberSnapshot¶
A snapshot of the member before the most recent change.
- class swimprotocol.members.Members(config)[source]¶
-
Manages the members of the cluster.
- Parameters:
config (BaseConfig) – The cluster config object.
- property local: Member¶
The local member for the process.
- find(count, *, status=Status.ALL, exclude=frozenset({}))[source]¶
Return a randomly-chosen subset of non-local cluster members that meet the given criteria.
- get(name, validity=None)[source]¶
Return the cluster member with the given name, creating it if does not exist.
If validity is given and name matches an existing non-local member, it is compared to the previous validity value, causing a full resynchronize if they are different.
- update(member, *, new_status=None, new_metadata=None)[source]¶
Update the cluster member status or metadata.
- apply(member, source, clock, *, status, metadata)[source]¶
Apply a disseminated update from source to member.
- Parameters:
- Return type:
None
- get_gossip(target)[source]¶
Iterates through cluster members looking for gossip that should be sent to target.
See also
swimprotocol.packet
¶
- class swimprotocol.packet.Source(name, validity)[source]¶
Bases:
object
Uniquely identifies the local cluster member that created the packet.
- class swimprotocol.packet.Packet(source)[source]¶
Bases:
object
Base class for a packet sent between cluster members.
Transport
implementations may use these directly, e.g.UdpPack
, or adapt their contents into another protocol.- Parameters:
source (Source) – The name of the local cluster member that created the packet.
- class swimprotocol.packet.Ping(source)[source]¶
Bases:
Packet
Packets used for the SWIM protocol ping operation, which do not explicitly contain any other information other than the source.
- Parameters:
source (Source) –
- class swimprotocol.packet.PingReq(source, target)[source]¶
Bases:
Packet
Packets used for the SWIM protocol ping-req operation, which contain a target member in addition to source.
- class swimprotocol.packet.Ack(source)[source]¶
Bases:
Packet
Packets used for the SWIM protocol ack response, which indicates that source is online.
- Parameters:
source (Source) –
- class swimprotocol.packet.Gossip(source, name, clock, status, metadata)[source]¶
Bases:
Packet
Packets used for SWIM protocol gossip, which alert other members when a cluster member has changed status or metadata. This information is intended to travel around the cluster until all members are aware of the change.
- Parameters:
name (str) – The name of the cluster member whose state has changed.
clock (int) – The sequence clock value associated with the change.
status (Status) – The current perceived status of the cluster member.
metadata (Mapping[str, bytes] | None) – The current metadata associated with the cluster member.
source (Source) –
swimprotocol.shuffle
¶
- class swimprotocol.shuffle.Shuffle[source]¶
Bases:
Set
[ShuffleT_co
]A set of objects that can be accessed in a “shuffled” manner, similar to a deck of cards. All operations including
choice()
are O(1) time complexity.
- class swimprotocol.shuffle.WeakShuffle(init=())[source]¶
Bases:
Shuffle
[ShuffleT
],MutableSet
[ShuffleT
]An implementation of
Shuffle
that holds only weak references to the set elements.- Parameters:
init (Iterable[ShuffleT]) – Initial objects to add.
- discard(val)[source]¶
Remove an element. Do not raise an exception if absent.
- Parameters:
val (ShuffleT) –
- Return type:
None
swimprotocol.sign
¶
- class swimprotocol.sign.Signature(salt, digest)[source]¶
Bases:
NamedTuple
A salted digest, including the salt value.
- class swimprotocol.sign.Signatures(secret, *, hash_name='sha256', salt_len=16, check_version=True)[source]¶
Bases:
object
Provides a hash signature for inclusion alongside
Packet
objects when implementing aTransport
protocol.Note
Passing
secret=None
usesuuid.getnode()
as the secret, which effectively allows you to experiment locally but will not suffice for connections over a network.- Parameters:
swimprotocol.status
¶
- class swimprotocol.status.Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Flag
Possible cluster member status values, as well as aggregate values that can be used with bitwise operations but must not be assigned.
- ONLINE = 1¶
The member is responding as expected.
- OFFLINE = 2¶
The member has stopped responding for long enough to avoid false positives.
- SUSPECT = 4¶
The member has failed to respond, but is not yet declared fully offline.
- AVAILABLE = 5¶
Aggregate status for statuses that are considered responding,
ONLINE
andSUSPECT
, for use with bitwise operations.
- UNAVAILABLE = 6¶
Aggregate status for statuses that are not considered responding,
OFFLINE
andSUSPECT
, for use with bitwise operations.
- ALL = 7¶
Aggregate status for all statuses, for use with bitwise operations.
- transition(to)[source]¶
Prevents impossible status transitions, returning a new status to be used instead of to.
- Parameters:
to (Status) – The desired transition status.
- Raises:
ValueError – to was an aggregate status, which cannot be transitioned to directly.
- Return type:
swimprotocol.tasks
¶
- class swimprotocol.tasks.TaskT¶
The type of task result.
alias of TypeVar(‘TaskT’)
- class swimprotocol.tasks.DaemonTask[source]¶
Bases:
AbstractAsyncContextManager
[Task
[NoReturn
]]Base class for a task that is run for the duration of an
async with
context.
- class swimprotocol.tasks.TaskOwner[source]¶
Bases:
object
Base class for any class that needs to run sub-tasks.
Because
asyncio
can be garbage-collected while running, the purpose of this base class is to keep a strong reference to all running tasks. The task removes its own reference when it is complete, effectively allowing it to “daemonize”.
swimprotocol.transport
¶
- class swimprotocol.transport.TransportT¶
Type variable for
Transport
implementations.alias of TypeVar(‘TransportT’, bound=
Transport[BaseConfig]
)
- swimprotocol.transport.load_transport(name='udp', *, group='swimprotocol.transport')[source]¶
Load and return the
Transport
implementation by name.- Parameters:
- Raises:
DistributionNotFound – A dependency of the transport entry point was not able to be satisfied.
KeyError – The given name did not exist in the entry point group.
- Return type:
type[swimprotocol.transport.Transport[swimprotocol.config.BaseConfig]]
- class swimprotocol.transport.Transport(config, worker)[source]¶
Bases:
Generic
[ConfigT_co
],AbstractAsyncContextManager
[None
]Interface of the basic functionality needed to act as the transport layer for the SWIM protocol. The transport layer is responsible for sending and receiving ping, ping-req, and ack packets for failure detection, and transmitting gossip for dissemination. The transport must be entered with
async with
to be activated.- Parameters:
config (ConfigT_co) – The cluster config object.
worker (Worker) –
- config_type: ClassVar[type[swimprotocol.config.BaseConfig]]¶
The
BaseConfig
sub-class used by this transport.
swimprotocol.worker
¶
- class swimprotocol.worker.Worker(config, members)[source]¶
Bases:
DaemonTask
,TaskOwner
Manages the failure detection and dissemination components of the SWIM protocol.
See also
- Parameters:
config (BaseConfig) – The cluster configuration object.
members (Members) – Tracks the state of the cluster members.
- property send_queue: Queue[tuple[swimprotocol.members.Member, swimprotocol.packet.Packet]]¶
The queue of packets to be sent.
- final async check(target)[source]¶
Attempts to determine if target is responding, setting it to suspect if it does not respond with an ack.
See also
- Parameters:
target (Member) – The cluster member to check.
- Return type:
None
- final async disseminate(target)[source]¶
Sends any gossip that might be needed by target.
See also
- Parameters:
target (Member) – The cluster member to disseminate to updates to.
- Return type:
None
- async run_failure_detection()[source]¶
Indefinitely send failure detection packets to other cluster members.
Note
Override this method to control when and how
check()
is called. By default, one random cluster member is chosen everyping_interval
seconds.- Return type:
- async run_dissemination()[source]¶
Indefinitely send dissemination packets to other cluster members.
Note
Override this method to control when and how
disseminate()
is called. By default, one random cluster member is chosen everysync_interval
seconds.- Return type:
- final async run()[source]¶
Indefinitely handle received SWIM protocol packets and, at configurable intervals, send failure detection and dissemination packets. This method calls
run_failure_detection()
andrun_dissemination()
.- Return type: