Source code for swimprotocol.listener
from __future__ import annotations
from asyncio import Event
from collections.abc import Sequence
from typing import Callable, TypeAlias, TypeVar, Generic, Any, NoReturn
from typing_extensions import Concatenate, ParamSpec
from weakref import WeakKeyDictionary
from .tasks import DaemonTask, TaskOwner
__all__ = ['ListenerCallback', 'CallbackPoll', 'Listener']
ListenT = TypeVar('ListenT')
ListenT_contra = TypeVar('ListenT_contra', contravariant=True)
ListenP = ParamSpec('ListenP')
#: A callable that takes the notified item.
ListenerCallback: TypeAlias = Callable[
Concatenate[ListenT_contra, ListenP],
Any]
[docs]class CallbackPoll(Generic[ListenT, ListenP], DaemonTask, TaskOwner):
"""Listens for items and running the callback.
"""
def __init__(self, listener: Listener[ListenT],
callback: ListenerCallback[ListenT, ListenP],
*args: ListenP.args, **kwargs: ListenP.kwargs) -> None:
super().__init__()
self._listener = listener
self._callback = callback
self._args = args
self._kwargs = kwargs
[docs] async def run(self) -> NoReturn:
listener = self._listener
callback = self._callback
args = self._args
kwargs = self._kwargs
while True:
items = await listener.poll()
for item in items:
self.run_subtask(callback(item, *args, **kwargs))
[docs]class Listener(Generic[ListenT]):
"""Implements basic listener and callback functionality. Producers can
call :meth:`.notify` with an item, and consumers can wait for those items
with :meth:`.poll` or register a callback with :meth:`.on_notify`.
"""
def __init__(self) -> None:
super().__init__()
self.event = Event()
self._waiting: WeakKeyDictionary[Event, list[ListenT]] = \
WeakKeyDictionary()
[docs] def on_notify(self, callback: ListenerCallback[ListenT, ListenP],
*args: ListenP.args, **kwargs: ListenP.kwargs) \
-> CallbackPoll[ListenT, ListenP]:
"""Provides a context manager that causes *callback* to be called when
a producer calls :meth:`.notify`.
Args:
callback: The callback function, which will be passed the *item*
argument from :meth:`.notify`.
"""
return CallbackPoll(self, callback, *args, **kwargs)
[docs] async def poll(self) -> Sequence[ListenT]:
"""Wait until :meth:`.notify` is called and return all *item* objects.
More than one item may be returned if :meth:`.notify` is called more
than once before the :mod:`asyncio` event loop is re-entered.
"""
event = Event()
self._waiting[event] = []
await event.wait()
return self._waiting[event]
[docs] def notify(self, item: ListenT) -> None:
"""Triggers a notification with *item*, waking any :meth:`.poll` calls
and running any :meth:`.on_notify` callbacks.
Args:
item: The object to be sent to the consumers.
"""
for event, args in self._waiting.items():
args.append(item)
event.set()