swimprotocol.udp
¶
- class swimprotocol.udp.UdpTransport(config, worker)[source]¶
-
Implements
Transport
using UDP, without broadcast.This transport assumes that the name of each cluster member is in
host:port
format, and that any cluster member can receive UDP packets from any other cluster member.
swimprotocol.udp.config
¶
- class swimprotocol.udp.config.UdpConfig(*, bind_host=None, bind_port=None, default_host=None, default_port=None, discovery=False, mtu_size=1500, **kwargs)[source]¶
Bases:
BaseConfig
Implements
BaseConfig
, adding additional configuration required forUdpTransport
.- Parameters:
bind_host (Optional[str]) – The hostname or IP address to bind the UDP socket. The hostname from the local_name address is used by default.
bind_port (Optional[int]) – The port number to bind the UDP socket. The port from the local_name address is used by default.
default_host (Optional[str]) – The hostname or IP address to connect to if an address string does not specify a hostname, e.g.
':1234'
.default_port (Optional[int]) – The port number to connect to if an address string does not specify a port number, e.g.
'myhost'
.discovery (bool) – Resolve the local address as a DNS A/AAAA record containing peers. The local IP address will also be auto-discovered by attempting to
connect()
to the hostname.kwargs (Any) – Additional keyword arguments passed to the
BaseConfig
constructor.mtu_size (int) –
- classmethod add_arguments(parser, *, prefix='--')[source]¶
Implementations (such as the demo) may use this method to add command-line based configuration for the transport.
Note
Arguments added should use prefix and explicitly provide a unique name, e.g.:
parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)
This prevents collision with other argument names and allows custom prefix values without affecting the
Namespace
.- Parameters:
parser (ArgumentParser) – The argument parser.
prefix (str) – The prefix for added arguments, which should start with
--
and end with-
, e.g.'--'
or'--foo-'
.
- Return type:
None
- classmethod parse_args(args, *, env_prefix='SWIM')[source]¶
Parse the given
Namespace
into a dictionary of keyword arguments for theBaseConfig
constructor. Sub-classes should override this method to add additional keyword arguments as needed.Some keywords will default to environment variables if not given in args:
SWIM_SECRET
,SWIM_SECRET_FILE
[*]The secret keyword argument.
SWIM_NAME
The local_name keyword argument.
SWIM_PEERS
The comma-separated peers keyword argument.
Docker Services¶
If your application is deployed as a Docker Service, the
UdpConfig
discovery=True
keyword argument
can be used to discover configuration based on the service name. For example:
config = UdpConfig(local_name='tasks.my-service:9999', discovery=True, ...)
Docker provides a tasks DNS lookup that resolves to the IP addresses of all
replicas of the service. In this example, tasks.my-service
is resolved to
these IP addresses. The IP address local to the container is chosen as the
local member and the rest are peer members.
In practice, this DNS lookup is often not immediately successful when the
replicas start up. A service may also be scaled down to a single replica, which
has no need of a cluster. These scenarios will raise a
TransientConfigError
with a wait_hint value. This
exception can be caught to continuously retry the cluster configuration until
successful:
async def start() -> None:
while True:
try:
config = UdpConfig(local_name='tasks.my-service:9999',
discovery=True, ...)
except TransientConfigError as exc:
await asyncio.sleep(exc.wait_hint)
else:
break
# ...
swimprotocol.udp.pack
¶
- class swimprotocol.udp.pack.UdpPack(signatures, *, pickle_protocol=5, prefix_xor=b'SWIM?!')[source]¶
Bases:
object
Packs and unpacks SWIM protocol
Packet
objects from raw UDP packets or TCP connections. Thepickle
module is used for serialization, soSignatures
is used to sign the payloads.- Parameters:
signatures (Signatures) – Generates and verifies cluster packet signatures.
prefix_xor (bytes) – A 4-byte string used to XOR the packet prefix, as a sanity check to detect malformed or incomplete UDP packets.
- pack(packet)[source]¶
Uses
pickle
to serialize packet, generates a digital signature of the pickled data, and returns a byte-string that can be sent as a raw UDP packet.The resulting byte-string starts with a 4-byte
struct
prefix (XOR’ed with prefix_xor) with the struct format!BBH
. The first byte is the length of the salt, the second byte is the length of the signature, and the final two bytes are the length of the pickled payload. After the prefix, the salt, digest, and pickled payload byte-strings are concatenated.
swimprotocol.udp.protocol
¶
- class swimprotocol.udp.protocol.BaseProtocol(thread_pool, udp_pack, recv_queue)[source]¶
Bases:
TaskOwner
Base class of
UdpProtocol
andTcpProtocol
. Each will callhandle_packet()
upon receipt of a full packet.- Parameters:
- async handle_packet(data)[source]¶
Parse the data into a packet and push it onto the worker
recv_queue
.
- class swimprotocol.udp.protocol.UdpProtocol(thread_pool, udp_pack, recv_queue)[source]¶
Bases:
BaseProtocol
,DatagramProtocol
Implements
DatagramProtocol
to receive SWIM protocol packets by UDP.Each packet received is passed directly to
handle_packet()
.
- class swimprotocol.udp.protocol.TcpProtocol(thread_pool, udp_pack, recv_queue)[source]¶
Bases:
BaseProtocol
,Protocol
Implements
Protocol
to receive SWIM protocol packets by TCP.All data received is accumulated until the connection is closed, with the result treated as a complete packet and sent to
handle_packet()
.
swimprotocol.udp.send
¶
- class swimprotocol.udp.send.UdpSend(config, udp_pack, thread_pool, send_queue, udp_transport)[source]¶
Bases:
DaemonTask
Daemon task that waits for packets on send_queue and sends them using either by UDP or – for oversized packets – establishing a TCP connection.
- Parameters: