Source code for pymap.cluster
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Collection, Iterator, Mapping, MutableMapping, \
MutableSet, Set
from typing import TypeAlias, TypeVar, Protocol, Any
from weakref import WeakKeyDictionary, WeakSet, WeakValueDictionary
__all__ = ['MemberInterface', 'ListenCallback', 'ClusterMetadata']
ArgT = TypeVar('ArgT')
ArgT_contra = TypeVar('ArgT_contra', contravariant=True)
[docs]
class MemberInterface(Protocol):
"""A hashable type that represents a cluster member node."""
@abstractmethod
def __hash__(self) -> int:
...
@property
@abstractmethod
def name(self) -> str:
"""The name of the cluster member node."""
...
@property
@abstractmethod
def metadata(self) -> _Metadata:
"""The metadata mapping from the cluster member node."""
...
_Metadata: TypeAlias = Mapping[str, bytes]
_MemberValues: TypeAlias = WeakKeyDictionary[MemberInterface, bytes]
_MemberValuesView: TypeAlias = Mapping[MemberInterface, bytes]
[docs]
class ListenCallback(Protocol[ArgT_contra]):
[docs]
@abstractmethod
def __call__(self, arg: ArgT_contra, /, metadata: _Metadata) -> None:
"""Called when the value associated with a local metadata key has been
changed.
Args:
arg: The argument given when the callback was registered with
:meth:`~ClusterMetadata.listen`.
metadata: The updated local metadata mapping.
"""
...
class _LocalMetadata(MutableMapping[str, bytes]):
def __init__(self) -> None:
super().__init__()
self._map: dict[str, bytes] = {}
self._callbacks: WeakValueDictionary[ListenCallback[Any], object] = \
WeakValueDictionary()
def __getitem__(self, key: str) -> bytes:
return self._map[key]
def __setitem__(self, key: str, value: bytes) -> None:
local = self._map
curr = local.get(key)
if curr != value:
local[key] = value
for callback, arg in self._callbacks.items():
callback(arg, local)
def __delitem__(self, key: str) -> None:
local = self._map
del local[key]
for callback, arg in self._callbacks.items():
callback(arg, local)
def __iter__(self) -> Iterator[str]:
return iter(self._map)
def __len__(self) -> int:
return len(self._map)
def __repr__(self) -> str: # pragma: no cover
return repr(self._map)
class _RemoteMetadata(Mapping[str, _MemberValues]):
def __init__(self) -> None:
super().__init__()
self._map: dict[str, _MemberValues] = {}
def __getitem__(self, key: str) -> _MemberValues:
return self._map[key]
def __iter__(self) -> Iterator[str]:
return iter(self._map)
def __len__(self) -> int:
return len(self._map)
def _add_member(self, member: MemberInterface) -> None:
for key, value in member.metadata.items():
member_values = self._map.get(key)
if member_values is None:
self._map[key] = member_values = WeakKeyDictionary()
member_values[member] = value
def _del_member(self, member: MemberInterface) -> None:
for member_values in self._map.values():
member_values.pop(member, None)
def __repr__(self) -> str: # pragma: no cover
return repr({key: dict(val) for key, val in self._map.items()})
[docs]
class ClusterMetadata(MutableSet[MemberInterface]):
"""Keeps a mapping of metadata pertaining to the current instance and
possibly other clustered instances, if the current instance is part of a
cluster.
The object also acts as a set of :class:`MemberInterface` objects. The
cluster service should add new or updated members and discard them as they
go offline.
Args:
init: Initial remote cluster members.
"""
def __init__(self, init: Collection[MemberInterface] | None = None, /) \
-> None:
super().__init__()
self._members: WeakSet[MemberInterface] = WeakSet()
self._local = _LocalMetadata()
self._remote = _RemoteMetadata()
if init is not None:
for member in init:
self.add(member)
def __contains__(self, member: object) -> bool:
return member in self._members
def __iter__(self) -> Iterator[MemberInterface]:
return iter(self._members)
def __len__(self) -> int:
return len(self._members)
[docs]
def add(self, member: MemberInterface) -> None:
self._members.add(member)
self._remote._add_member(member)
[docs]
def discard(self, member: MemberInterface) -> None:
self._members.discard(member)
self._remote._del_member(member)
@property
def local(self) -> MutableMapping[str, bytes]:
"""The local cluster instance metadata. Keys added, removed, and
modified in this mapping should be disseminated to the rest of the
cluster.
"""
return self._local
@property
def remote(self) -> Mapping[str, _MemberValuesView]:
"""The remote cluster instance metadata, organized by metadata key."""
return self._remote
[docs]
def get_all(self, key: str) -> Set[bytes]:
"""Returns the set of all known values for the metadata key, local and
remote.
Args:
key: The metadata key.
"""
results = set(self._remote[key].values())
if key in self._local:
results.add(self._local[key])
return results
[docs]
def listen(self, callback: ListenCallback[ArgT], arg: ArgT) -> None:
"""Adds a callback to be run whenever the local metadata has been
modified. Cluster services can use this to disseminate metadata
changes.
Note:
The *arg* object is weakly referenced, and the callback will be
automatically removed if it is :term:`garbage-collected <garbage
collection>`.
Args:
callback: Function called with the updated local metadata mapping.
arg: The first argument passed to *callback*.
"""
self._local._callbacks[callback] = arg
callback(arg, self._local)