Source code for swimprotocol.worker


from __future__ import annotations

import asyncio
from asyncio import Event, Queue, Task, TimeoutError
from collections.abc import Mapping, Sequence
from contextlib import suppress
from typing import final, Final, Optional, NoReturn
from weakref import WeakSet, WeakKeyDictionary

from .config import BaseConfig
from .members import Member, Members
from .packet import Packet, Ping, PingReq, Ack, Gossip, GossipAck
from .status import Status
from .tasks import DaemonTask, TaskOwner

__all__ = ['Worker']


[docs]class Worker(DaemonTask, TaskOwner): """Manages the failure detection and dissemination components of the SWIM protocol. See Also: :ref:`Failure Detection`, :ref:`Dissemination` Args: config: The cluster configuration object. members: Tracks the state of the cluster members. """ def __init__(self, config: BaseConfig, members: Members) -> None: super().__init__() self.config: Final = config self.members: Final = members self._recv_queue: Queue[Packet] = Queue() self._send_queue: Queue[tuple[Member, Packet]] = Queue() self._waiting: WeakKeyDictionary[Member, WeakSet[Event]] = \ WeakKeyDictionary() self._listening: WeakKeyDictionary[Member, WeakSet[Member]] = \ WeakKeyDictionary() self._suspect: WeakKeyDictionary[Member, Task[None]] = \ WeakKeyDictionary() @property def recv_queue(self) -> Queue[Packet]: """The queue of packets received.""" return self._recv_queue @property def send_queue(self) -> Queue[tuple[Member, Packet]]: """The queue of packets to be sent.""" return self._send_queue async def _send(self, target: Member, packet: Packet) -> None: await self._send_queue.put((target, packet)) def _add_waiting(self, member: Member, event: Event) -> None: waiting = self._waiting.get(member) if waiting is None: self._waiting[member] = waiting = WeakSet() waiting.add(event) def _add_listening(self, member: Member, target: Member) -> None: listening = self._listening.get(target) if listening is None: self._listening[target] = listening = WeakSet() listening.add(member) def _notify_waiting(self, member: Member) -> None: waiting = self._waiting.get(member) if waiting is not None: for event in waiting: event.set() def _get_listening(self, member: Member) -> Sequence[Member]: listening = self._listening.pop(member, None) if listening is not None: return list(listening) else: return [] async def _run_handler(self) -> NoReturn: local = self.members.local while True: packet = await self.recv_queue.get() source = self.members.get(packet.source.name, packet.source.validity) self._notify_waiting(source) for target in self._get_listening(source): await self._send(target, Ack(source=source.source)) if isinstance(packet, Ping): await self._send(source, Ack(source=local.source)) elif isinstance(packet, PingReq): target = self.members.get(packet.target) await self._send(target, Ping(source=local.source)) self._add_listening(source, target) elif isinstance(packet, Gossip): member = self.members.get(packet.name) ack = self._apply_gossip(local, source, member, packet) await self._send(source, ack) elif isinstance(packet, GossipAck): member = self.members.get(packet.name) self.members.ack_gossip(member, source, packet.clock) def _build_gossip(self, local: Member, member: Member) -> Gossip: if member.metadata is Member.METADATA_UNKNOWN: metadata: Optional[Mapping[str, bytes]] = None else: metadata = member.metadata return Gossip(source=local.source, name=member.name, clock=member.clock, status=member.status, metadata=metadata) def _apply_gossip(self, local: Member, source: Member, member: Member, gossip: Gossip) -> GossipAck: self._handle_status(member, gossip.status) self.members.apply(member, source, gossip.clock, status=gossip.status, metadata=gossip.metadata) return GossipAck(source=local.source, name=gossip.name, clock=gossip.clock) async def _wait(self, target: Member, timeout: float) -> bool: event = Event() self._add_waiting(target, event) with suppress(TimeoutError): await asyncio.wait_for(event.wait(), timeout) return event.is_set() def _handle_status(self, target: Member, status: Status) -> None: if status == Status.SUSPECT: suspect_task = self._suspect.get(target) if suspect_task is None: self._suspect[target] = asyncio.create_task( self._suspect_wait(target)) else: suspect_task = self._suspect.pop(target, None) if suspect_task is not None: suspect_task.cancel() async def _suspect_wait(self, target: Member) -> None: await asyncio.sleep(self.config.suspect_timeout) self.members.update(target, new_status=Status.OFFLINE) _ = self._suspect.pop(target, None)
[docs] @final async def check(self, target: Member) -> None: """Attempts to determine if *target* is responding, setting it to :term:`suspect` if it does not respond with an :term:`ack`. See Also: :ref:`Failure Detection` Args: target: The cluster member to check. """ local = self.members.local await self._send(target, Ping(source=local.source)) online = await self._wait(target, self.config.ping_timeout) if not online: count = self.config.ping_req_count indirects = self.members.find( count, status=Status.AVAILABLE, exclude={target}) if indirects: ping_req = PingReq(source=local.source, target=target.name) await asyncio.wait([ asyncio.create_task(self._send(indirect, ping_req)) for indirect in indirects]) online = await self._wait(target, self.config.ping_req_timeout) new_status = Status.ONLINE if online else Status.SUSPECT self._handle_status(target, new_status) self.members.update(target, new_status=new_status)
[docs] @final async def disseminate(self, target: Member) -> None: """Sends any :term:`gossip` that might be needed by *target*. See Also: :ref:`Dissemination` Args: target: The cluster member to disseminate to updates to. """ local = self.members.local for member in self.members.get_gossip(target): packet = self._build_gossip(local, member) await self._send(target, packet)
[docs] async def run_failure_detection(self) -> NoReturn: """Indefinitely send failure detection packets to other cluster members. .. note:: Override this method to control when and how :meth:`.check` is called. By default, one random cluster member is chosen every :class:`ping_interval <swimprotocol.config.Config>` seconds. """ while True: targets = self.members.find(1) assert targets for target in targets: self.run_subtask(self.check(target)) await asyncio.sleep(self.config.ping_interval)
[docs] async def run_dissemination(self) -> NoReturn: """Indefinitely send dissemination packets to other cluster members. .. note:: Override this method to control when and how :meth:`.disseminate` is called. By default, one random cluster member is chosen every :class:`sync_interval <swimprotocol.config.Config>` seconds. """ while True: targets = self.members.find(1, status=Status.AVAILABLE) for target in targets: self.run_subtask(self.disseminate(target)) await asyncio.sleep(self.config.sync_interval)
[docs] @final async def run(self) -> NoReturn: """Indefinitely handle received SWIM protocol packets and, at configurable intervals, send failure detection and dissemination packets. This method calls :meth:`.run_failure_detection` and :meth:`.run_dissemination`. """ await asyncio.gather( self._run_handler(), self.run_failure_detection(), self.run_dissemination()) raise RuntimeError()