Source code for swimprotocol.tasks
from __future__ import annotations
import asyncio
from abc import abstractmethod, ABCMeta
from asyncio import Task
from collections.abc import Coroutine, MutableSet
from contextlib import AbstractAsyncContextManager
from typing import Any, NoReturn, Optional, TypeVar
__all__ = ['TaskT', 'DaemonTask', 'TaskOwner']
#: The type of task result.
TaskT = TypeVar('TaskT')
[docs]class TaskOwner:
"""Base class for any class that needs to run sub-tasks.
Because :mod:`asyncio` can be garbage-collected while running, the purpose
of this base class is to keep a strong reference to all running tasks. The
task removes its own reference when it is complete, effectively allowing it
to "daemonize".
"""
def __init__(self) -> None:
super().__init__()
self._running: MutableSet[Task[Any]] = set()
[docs] def run_subtask(self, coro: Coroutine[Any, Any, TaskT]) -> Task[TaskT]:
"""Run the *coro* sub-task.
Args:
coro: The coroutine to run.
"""
running = self._running
task = asyncio.create_task(coro)
running.add(task)
task.add_done_callback(running.discard)
return task
[docs]class DaemonTask(AbstractAsyncContextManager[Task[NoReturn]],
metaclass=ABCMeta):
"""Base class for a task that is run for the duration of an ``async with``
context.
"""
def __init__(self) -> None:
super().__init__()
self._task: Optional[Task[NoReturn]] = None
async def __aenter__(self) -> Task[TaskT]:
self._task = task = asyncio.create_task(self.run())
return task
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) \
-> Any:
task = self._task
if task is not None:
task.cancel()
[docs] @abstractmethod
async def run(self) -> NoReturn:
"""The method to run while the context is entered. The task is
cancelled when the context exits.
"""
...