Source code for swimprotocol.shuffle


from __future__ import annotations

import random
from abc import abstractmethod, ABCMeta
from collections.abc import Iterable, Iterator, MutableSet, Set
from typing import TypeVar
from weakref import ref, WeakKeyDictionary, WeakValueDictionary

__all__ = ['ShuffleT', 'ShuffleT_co', 'Shuffle', 'WeakShuffle']

ShuffleT = TypeVar('ShuffleT')
ShuffleT_co = TypeVar('ShuffleT_co', covariant=True)


[docs]class Shuffle(Set[ShuffleT_co], metaclass=ABCMeta): """A set of objects that can be accessed in a "shuffled" manner, similar to a deck of cards. All operations including :meth:`.choice` are *O(1)* time complexity. """
[docs] @abstractmethod def choice(self) -> ShuffleT_co: """Choose an object from the set at random and return it. This object is not removed from the set. See Also: :func:`random.choice` Raises: KeyError: The set was empty. """ ...
[docs]class WeakShuffle(Shuffle[ShuffleT], MutableSet[ShuffleT]): """An implementation of :class:`Shuffle` that holds only weak references to the set elements. Args: init: Initial objects to add. """ def __init__(self, /, init: Iterable[ShuffleT] = ()) -> None: super().__init__() self._weak_vals: WeakKeyDictionary[ShuffleT, ref[ShuffleT]] = \ WeakKeyDictionary() self._weak_vals_rev: WeakValueDictionary[ref[ShuffleT], ShuffleT] = \ WeakValueDictionary() self._indexes: dict[ref[ShuffleT], int] = {} self._values: list[ref[ShuffleT]] = [] for val in init: self.add(val) def _finalize(self, weak_val: ref[ShuffleT]) -> None: index = self._indexes.pop(weak_val, None) if index is not None: values = self._values end_index = len(values) - 1 if index < end_index: values[index] = values[end_index] del self._values[end_index]
[docs] def add(self, val: ShuffleT) -> None: if val not in self._weak_vals: weak_val = ref(val, self._finalize) self._weak_vals[val] = weak_val self._weak_vals_rev[weak_val] = val self._indexes[weak_val] = len(self._values) self._values.append(weak_val)
[docs] def discard(self, val: ShuffleT) -> None: weak_val = self._weak_vals.get(val) if weak_val is not None: del self._weak_vals[val] del self._weak_vals_rev[weak_val] self._finalize(weak_val)
[docs] def choice(self) -> ShuffleT: """Choose an object from the set at random and return it. This object is not removed from the set. See Also: :func:`random.choice` Raises: KeyError: The set was empty. """ try: weak_val = random.choice(self._values) # noqa: S311 except IndexError as exc: raise KeyError('choice from an empty set') from exc val = weak_val() assert val is not None return val
def __contains__(self, val: object) -> bool: return val in self._weak_vals def __iter__(self) -> Iterator[ShuffleT]: return self._weak_vals.keys() def __len__(self) -> int: return len(self._weak_vals)