swimprotocol.udp

class swimprotocol.udp.UdpTransport(config, worker)[source]

Bases: Transport[UdpConfig]

Implements Transport using UDP, without broadcast.

This transport assumes that the name of each cluster member is in host:port format, and that any cluster member can receive UDP packets from any other cluster member.

Parameters:
  • config (Final) – The cluster configuration object.

  • worker (Final) –

config_type

alias of UdpConfig

property bind_host: str

The local bind address used to open the UDP socket to receive packets.

See also

asyncio.loop.create_datagram_endpoint()

property bind_port: int

The local bind port used to open the UDP socket to receive packets.

See also

asyncio.loop.create_datagram_endpoint()

swimprotocol.udp.config

class swimprotocol.udp.config.UdpConfig(*, bind_host=None, bind_port=None, default_host=None, default_port=None, discovery=False, mtu_size=1500, **kwargs)[source]

Bases: BaseConfig

Implements BaseConfig, adding additional configuration required for UdpTransport.

Parameters:
  • bind_host (Optional[str]) – The hostname or IP address to bind the UDP socket. The hostname from the local_name address is used by default.

  • bind_port (Optional[int]) – The port number to bind the UDP socket. The port from the local_name address is used by default.

  • default_host (Optional[str]) – The hostname or IP address to connect to if an address string does not specify a hostname, e.g. ':1234'.

  • default_port (Optional[int]) – The port number to connect to if an address string does not specify a port number, e.g. 'myhost'.

  • discovery (bool) – Resolve the local address as a DNS A/AAAA record containing peers. The local IP address will also be auto-discovered by attempting to connect() to the hostname.

  • kwargs (Any) – Additional keyword arguments passed to the BaseConfig constructor.

  • mtu_size (int) –

classmethod add_arguments(parser, *, prefix='--')[source]

Implementations (such as the demo) may use this method to add command-line based configuration for the transport.

Note

Arguments added should use prefix and explicitly provide a unique name, e.g.:

parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)

This prevents collision with other argument names and allows custom prefix values without affecting the Namespace.

Parameters:
  • parser (ArgumentParser) – The argument parser.

  • prefix (str) – The prefix for added arguments, which should start with -- and end with -, e.g. '--' or '--foo-'.

Return type:

None

classmethod parse_args(args, *, env_prefix='SWIM')[source]

Parse the given Namespace into a dictionary of keyword arguments for the BaseConfig constructor. Sub-classes should override this method to add additional keyword arguments as needed.

Some keywords will default to environment variables if not given in args:

SWIM_SECRET, SWIM_SECRET_FILE [*]

The secret keyword argument.

SWIM_NAME

The local_name keyword argument.

SWIM_PEERS

The comma-separated peers keyword argument.

Parameters:
  • args (Namespace) – The command-line arguments.

  • env_prefix (str) – Prefix for the environment variables.

Return type:

dict[str, Any]

Docker Services

If your application is deployed as a Docker Service, the UdpConfig discovery=True keyword argument can be used to discover configuration based on the service name. For example:

config = UdpConfig(local_name='tasks.my-service:9999', discovery=True, ...)

Docker provides a tasks DNS lookup that resolves to the IP addresses of all replicas of the service. In this example, tasks.my-service is resolved to these IP addresses. The IP address local to the container is chosen as the local member and the rest are peer members.

In practice, this DNS lookup is often not immediately successful when the replicas start up. A service may also be scaled down to a single replica, which has no need of a cluster. These scenarios will raise a TransientConfigError with a wait_hint value. This exception can be caught to continuously retry the cluster configuration until successful:

async def start() -> None:
    while True:
        try:
            config = UdpConfig(local_name='tasks.my-service:9999',
                               discovery=True, ...)
        except TransientConfigError as exc:
            await asyncio.sleep(exc.wait_hint)
        else:
            break
    # ...

swimprotocol.udp.pack

class swimprotocol.udp.pack.UdpPack(signatures, *, pickle_protocol=5, prefix_xor=b'SWIM?!')[source]

Bases: object

Packs and unpacks SWIM protocol Packet objects from raw UDP packets or TCP connections. The pickle module is used for serialization, so Signatures is used to sign the payloads.

Parameters:
  • signatures (Signatures) – Generates and verifies cluster packet signatures.

  • pickle_protocol (int) – The pickle protocol version number.

  • prefix_xor (bytes) – A 4-byte string used to XOR the packet prefix, as a sanity check to detect malformed or incomplete UDP packets.

pack(packet)[source]

Uses pickle to serialize packet, generates a digital signature of the pickled data, and returns a byte-string that can be sent as a raw UDP packet.

The resulting byte-string starts with a 4-byte struct prefix (XOR’ed with prefix_xor) with the struct format !BBH. The first byte is the length of the salt, the second byte is the length of the signature, and the final two bytes are the length of the pickled payload. After the prefix, the salt, digest, and pickled payload byte-strings are concatenated.

Parameters:

packet (Packet) – The SWIM protocol packet to serialize.

Return type:

bytes

unpack(data)[source]

Deserializes a byte-string that was created using pack() into a SWIM protocol packet. If any assumptions about the serialized data are not met, including an invalid signature, None is returned to indicate that data was malformed or incomplete.

Parameters:

data (bytes) – The serialized byte-string of the SWIM protocol packet.

Return type:

Packet | None

swimprotocol.udp.protocol

class swimprotocol.udp.protocol.BaseProtocol(thread_pool, udp_pack, recv_queue)[source]

Bases: TaskOwner

Base class of UdpProtocol and TcpProtocol. Each will call handle_packet() upon receipt of a full packet.

Parameters:
  • thread_pool (ThreadPoolExecutor) – A thread pool for CPU-heavy operations.

  • udp_pack (UdpPack) –

  • recv_queue (Queue[Packet]) –

async handle_packet(data)[source]

Parse the data into a packet and push it onto the worker recv_queue.

Parameters:

data (bytes) – The bytes representing a packet to be parsed by UdpPack.

Return type:

None

class swimprotocol.udp.protocol.UdpProtocol(thread_pool, udp_pack, recv_queue)[source]

Bases: BaseProtocol, DatagramProtocol

Implements DatagramProtocol to receive SWIM protocol packets by UDP.

Each packet received is passed directly to handle_packet().

Parameters:
  • thread_pool (ThreadPoolExecutor) –

  • udp_pack (UdpPack) –

  • recv_queue (Queue[Packet]) –

datagram_received(data, addr)[source]

Called when some datagram is received.

Parameters:
Return type:

None

class swimprotocol.udp.protocol.TcpProtocol(thread_pool, udp_pack, recv_queue)[source]

Bases: BaseProtocol, Protocol

Implements Protocol to receive SWIM protocol packets by TCP.

All data received is accumulated until the connection is closed, with the result treated as a complete packet and sent to handle_packet().

Parameters:
  • thread_pool (ThreadPoolExecutor) –

  • udp_pack (UdpPack) –

  • recv_queue (Queue[Packet]) –

data_received(data)[source]

Called when some data is received.

The argument is a bytes object.

Parameters:

data (bytes) –

Return type:

None

connection_lost(exc)[source]

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

Parameters:

exc (Exception | None) –

Return type:

None

swimprotocol.udp.send

class swimprotocol.udp.send.UdpSend(config, udp_pack, thread_pool, send_queue, udp_transport)[source]

Bases: DaemonTask

Daemon task that waits for packets on send_queue and sends them using either by UDP or – for oversized packets – establishing a TCP connection.

Parameters:
  • config (UdpConfig) –

  • udp_pack (UdpPack) –

  • thread_pool (ThreadPoolExecutor) –

  • send_queue (Queue[tuple[Member, Packet]]) –

  • udp_transport (DatagramTransport) –

async run()[source]

The method to run while the context is entered. The task is cancelled when the context exits.

Return type:

NoReturn