Source code for swimprotocol.udp.pack
from __future__ import annotations
import pickle # nosec
import struct
from typing import Final, Optional
from ..packet import Packet
from ..sign import Signatures
__all__ = ['UdpPack']
_prefix = struct.Struct('!BBI')
[docs]class UdpPack:
"""Packs and unpacks SWIM protocol :class:`~swimprotocol.packet.Packet`
objects from raw UDP packets or TCP connections. The :mod:`pickle` module
is used for serialization, so :class:`~swimprotocol.sign.Signatures` is
used to sign the payloads.
Args:
signatures: Generates and verifies cluster packet signatures.
pickle_protocol: The :mod:`pickle` protocol version number.
prefix_xor: A 4-byte string used to XOR the packet prefix, as a sanity
check to detect malformed or incomplete UDP packets.
"""
def __init__(self, signatures: Signatures, *,
pickle_protocol: int = pickle.HIGHEST_PROTOCOL,
prefix_xor: bytes = b'SWIM?!') -> None:
super().__init__()
if len(prefix_xor) != _prefix.size:
raise ValueError(f'{prefix_xor!r} must be {_prefix.size} bytes')
self.signatures: Final = signatures
self.pickle_protocol: Final = pickle_protocol
self.prefix_xor: Final = prefix_xor
def _xor_prefix(self, prefix: bytes) -> bytes:
zipped = zip(prefix, self.prefix_xor, strict=True)
return bytes([left ^ right for left, right in zipped])
[docs] def pack(self, packet: Packet) -> bytes:
"""Uses :mod:`pickle` to serialize *packet*, generates a digital
signature of the pickled data, and returns a byte-string that can be
sent as a raw UDP packet.
The resulting byte-string starts with a 4-byte :mod:`struct` prefix
(XOR'ed with *prefix_xor*) with the `struct format
<https://docs.python.org/3/library/struct.html#format-strings>`_
``!BBH``. The first byte is the length of the salt, the second byte is
the length of the signature, and the final two bytes are the length of
the pickled payload. After the prefix, the salt, digest, and pickled
payload byte-strings are concatenated.
Args:
packet: The SWIM protocol packet to serialize.
"""
pickled = pickle.dumps(packet, self.pickle_protocol)
salt, digest = self.signatures.sign(pickled)
salt_start = _prefix.size
digest_start = salt_start + len(salt)
data_start = digest_start + len(digest)
prefix = _prefix.pack(len(salt), len(digest), len(pickled))
packed = bytearray(data_start + len(pickled))
packed[0:salt_start] = self._xor_prefix(prefix)
packed[salt_start:digest_start] = salt
packed[digest_start:data_start] = digest
packed[data_start:] = pickled
return packed
[docs] def unpack(self, data: bytes) -> Optional[Packet]:
"""Deserializes a byte-string that was created using :meth:`.pack` into
a SWIM protocol packet. If any assumptions about the serialized data
are not met, including an invalid signature, ``None`` is returned to
indicate that *data* was malformed or incomplete.
Args:
data: The serialized byte-string of the SWIM protocol packet.
"""
data_view = memoryview(data)
salt_start = _prefix.size
prefix = self._xor_prefix(data_view[0:salt_start])
try:
salt_len, digest_len, data_len = _prefix.unpack(prefix)
except struct.error:
return None
digest_start = salt_start + salt_len
data_start = digest_start + digest_len
data_end = data_start + data_len
salt = data_view[salt_start:digest_start]
digest = data_view[digest_start:data_start]
pickled = data_view[data_start:data_end]
signatures = self.signatures
if len(digest) != signatures.digest_size or len(pickled) != data_len:
return None
if signatures.verify(pickled, (salt, digest)):
packet = pickle.loads(pickled) # noqa: S301
assert isinstance(packet, Packet)
return packet
else:
return None