Source code for pymap.concurrent

"""Implements some concurrency utilities used by pymap. Each has
an implementation using :mod:`asyncio` and :mod:`threading` concurrency
primitives.

"""

from __future__ import annotations

import asyncio
import os.path
import time
from abc import abstractmethod, ABCMeta
from asyncio import Event as _asyncio_Event, Lock as _asyncio_Lock, \
    TimeoutError
from collections.abc import Awaitable, MutableSet, Sequence, AsyncIterator
from concurrent.futures import Executor, ThreadPoolExecutor
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from contextvars import copy_context, Context
from functools import partial
from threading import local, Event as _threading_Event, Lock as _threading_Lock
from typing import TypeVar, TypeAlias
from weakref import finalize, WeakSet

__all__ = ['Subsystem', 'Event', 'ReadWriteLock', 'FileLock', 'EventT', 'RetT']

#: Type variable with an upper bound of :class:`Event`.
EventT = TypeVar('EventT', bound='Event')

#: Type variable for any return type.
RetT = TypeVar('RetT')

_Delay: TypeAlias = Sequence[float]


[docs] class Subsystem(metaclass=ABCMeta): """Utility for creating concurrency primitives for a subsystem, either :mod:`asyncio` or :mod:`threading`. """
[docs] @classmethod def for_executor(cls, executor: Executor | None) -> Subsystem: """Return a subsystem based on the given executor. If ``executor`` is None, use :mod:`asyncio`. If ``executor`` is a :class:`concurrent.futures.ThreadPoolExecutor`, use :mod:`threading`. Args: executor: The executor in use, if any. """ if isinstance(executor, ThreadPoolExecutor): return cls.for_threading(executor) elif executor is None: return cls.for_asyncio() else: raise TypeError(executor)
[docs] @classmethod def for_asyncio(cls) -> Subsystem: """Return a subsystem for :mod:`asyncio`.""" return _AsyncioSubsystem()
[docs] @classmethod def for_threading(cls, executor: ThreadPoolExecutor) -> Subsystem: """Return a subsystem for :mod:`threading`.""" return _ThreadingSubsystem(executor)
@property @abstractmethod def subsystem(self) -> str: """The subsystem name, ``'asyncio'`` or ``'threading'``.""" ...
[docs] @abstractmethod def execute(self, future: Awaitable[RetT]) -> Awaitable[RetT]: """Executes the future and returns its result in the subsystem. For :mod:`asyncio`, this simply means ``return await future``. For :mod:`threading`, it uses :meth:`~asyncio.AbstractEventLoop.run_in_executor` to run the future in another thread. Args: future: An awaitable result to execute by the subsystem. """ ...
[docs] @abstractmethod def new_rwlock(self) -> ReadWriteLock: """Return a new read-write lock.""" ...
[docs] @abstractmethod def new_event(self) -> Event: """Return a new concurrent event.""" ...
class _AsyncioSubsystem(Subsystem): @property def subsystem(self) -> str: return 'asyncio' def execute(self, future: Awaitable[RetT]) -> Awaitable[RetT]: return future def new_rwlock(self) -> _AsyncioReadWriteLock: return _AsyncioReadWriteLock() def new_event(self) -> _AsyncioEvent: return _AsyncioEvent() class _ThreadingSubsystem(Subsystem): # pragma: no cover class _EventLoopLocal(local): def __init__(self) -> None: self.event_loop = loop = asyncio.new_event_loop() finalize(self, loop.close) def __init__(self, executor: ThreadPoolExecutor) -> None: super().__init__() self._local = self._EventLoopLocal() self._executor = executor @property def subsystem(self) -> str: return 'threading' async def execute(self, future: Awaitable[RetT]) -> RetT: loop = asyncio.get_event_loop() ctx = copy_context() return await loop.run_in_executor( self._executor, self._run_in_thread, future, ctx) def _run_in_thread(self, future: Awaitable[RetT], ctx: Context) -> RetT: loop = self._local.event_loop foo: partial[RetT] = partial(loop.run_until_complete, future) return ctx.run(foo) def new_rwlock(self) -> _ThreadingReadWriteLock: return _ThreadingReadWriteLock() def new_event(self) -> _ThreadingEvent: return _ThreadingEvent()
[docs] class ReadWriteLock(metaclass=ABCMeta): """Read-write lock."""
[docs] @classmethod def for_asyncio(cls) -> ReadWriteLock: """Return a read-write lock for asyncio.""" return _AsyncioReadWriteLock()
[docs] @classmethod def for_threading(cls) -> ReadWriteLock: """Return a read-write lock for threading.""" return _ThreadingReadWriteLock()
@property @abstractmethod def subsystem(self) -> str: """The subsystem the read-write lock was created for, ``'asyncio'`` or ``'threading'``. """ ...
[docs] @abstractmethod def read_lock(self) -> AbstractAsyncContextManager[None]: """Acquires a read-lock, blocking until any write-locks are released. """ ...
[docs] @abstractmethod def write_lock(self) -> AbstractAsyncContextManager[None]: """Acquires a write-lock, blocking until all read- or write-locks are released. """ ...
[docs] class Event(metaclass=ABCMeta): """Concurrent event, one thread signals and any waiting event is released. """
[docs] @classmethod def for_asyncio(cls) -> Event: """Return an event for asyncio.""" return _AsyncioEvent()
[docs] @classmethod def for_threading(cls) -> Event: """Return an event for threading.""" return _ThreadingEvent()
@property @abstractmethod def subsystem(self) -> str: """The subsystem the event was created for, ``'asyncio'`` or ``'threading'``. """ ...
[docs] @abstractmethod def or_event(self: EventT, *events: EventT) -> EventT: """Return a new event that is signalled when either the current event or any of the provided events are signalled. """ ...
[docs] @abstractmethod def is_set(self) -> bool: """Return True if the event is set.""" ...
[docs] @abstractmethod def set(self) -> None: """Signal the waiting threads to release.""" ...
[docs] @abstractmethod def clear(self) -> None: """Clear the signal, allowing threads to wait again.""" ...
[docs] @abstractmethod async def wait(self, *, timeout: float | None = None) -> None: """Wait until another thread signals the event. Args: timeout: Maximum time to wait, in seconds. """ ...
class _AsyncioReadWriteLock(ReadWriteLock): def __init__(self) -> None: super().__init__() self._read_lock = _asyncio_Lock() self._write_lock = _asyncio_Lock() self._counter = 0 @property def subsystem(self) -> str: return 'asyncio' async def _acquire_read(self) -> bool: async with self._read_lock: self._counter += 1 return self._counter == 1 async def _release_read(self) -> bool: async with self._read_lock: self._counter -= 1 return self._counter == 0 @asynccontextmanager async def read_lock(self) -> AsyncIterator[None]: if await self._acquire_read(): await self._write_lock.acquire() try: yield finally: if await self._release_read(): self._write_lock.release() @asynccontextmanager async def write_lock(self) -> AsyncIterator[None]: async with self._write_lock: yield class _ThreadingReadWriteLock(ReadWriteLock): # pragma: no cover def __init__(self) -> None: super().__init__() self._read_lock = _threading_Lock() self._write_lock = _threading_Lock() self._counter = 0 @property def subsystem(self) -> str: return 'threading' def _acquire_read(self) -> bool: with self._read_lock: self._counter += 1 return self._counter == 1 def _release_read(self) -> bool: with self._read_lock: self._counter -= 1 return self._counter == 0 @asynccontextmanager async def read_lock(self) -> AsyncIterator[None]: if self._acquire_read(): self._write_lock.acquire() try: yield finally: if self._release_read(): self._write_lock.release() @asynccontextmanager async def write_lock(self) -> AsyncIterator[None]: with self._write_lock: yield
[docs] class FileLock(ReadWriteLock): # pragma: no cover """Uses the presence or absence of a file on the filesystem to simulate a read-write lock. If the file is present, other read- and write-locks will block until the file is gone. If the file is absent, read-locks will not block. Write-locks will create the file on acquire and remove it on release. The delay arguments are a sequence of floats used as the duration of successive :func:`~asyncio.sleep` calls. If this sequence is exhausted before a lock is established, :class:`~asyncio.TimeoutError` is thrown. Args: path: The path of the lock file. expiration: Lock files older than this age will be deleted. read_retry_delay: The delay sequence between read-lock attempts. write_retry_delay: The delay sequence between write-lock attempts. """ _DEFAULT_DELAY = (0.01, 0.03, 0.06, 0.1, 0.15, 0.25, 0.4, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) def __init__(self, path: str, expiration: float = 600.0, read_retry_delay: _Delay = _DEFAULT_DELAY, write_retry_delay: _Delay = _DEFAULT_DELAY) \ -> None: super().__init__() self._path = path self._expiration = expiration self._read_retry_delay = read_retry_delay self._write_retry_delay = write_retry_delay @property def subsystem(self) -> str: """The subsystem the read-write lock was created for, ``'file'``.""" return 'file' def _check_lock(self) -> bool: try: statinfo = os.stat(self._path) except FileNotFoundError: return True else: if time.time() - statinfo.st_mtime >= self._expiration: try: os.unlink(self._path) except OSError: pass return True return False def _try_lock(self) -> bool: try: with open(self._path, 'x'): pass except FileExistsError: return False else: return True def _unlock(self) -> None: try: os.unlink(self._path) except OSError: pass
[docs] @asynccontextmanager async def read_lock(self) -> AsyncIterator[None]: if self._check_lock(): yield return for delay in self._read_retry_delay: await asyncio.sleep(delay) if not os.path.exists(self._path): yield break else: raise TimeoutError()
[docs] @asynccontextmanager async def write_lock(self) -> AsyncIterator[None]: if self._check_lock() and self._try_lock(): try: yield finally: self._unlock() return for delay in self._write_retry_delay: await asyncio.sleep(delay) if self._try_lock(): try: yield finally: self._unlock() break else: raise TimeoutError()
class _AsyncioEvent(Event): def __init__(self) -> None: super().__init__() self._event = _asyncio_Event() self._listeners: MutableSet[_AsyncioEvent] = WeakSet() @property def subsystem(self) -> str: return 'asyncio' def or_event(self, *events: _AsyncioEvent) -> _AsyncioEvent: or_event = _AsyncioEvent() self._listeners.add(or_event) for event in events: event._listeners.add(or_event) return or_event def is_set(self) -> bool: return self._event.is_set() def set(self) -> None: self._event.set() for listener in self._listeners: listener.set() def clear(self) -> None: self._event.clear() async def wait(self, *, timeout: float | None = None) -> None: task = asyncio.create_task(self._event.wait()) try: await asyncio.wait_for(task, timeout) except TimeoutError: pass class _ThreadingEvent(Event): # pragma: no cover def __init__(self) -> None: super().__init__() self._event = _threading_Event() self._listeners: MutableSet[_ThreadingEvent] = WeakSet() @property def subsystem(self) -> str: return 'threading' def or_event(self, *events: _ThreadingEvent) -> _ThreadingEvent: or_event = _ThreadingEvent() self._listeners.add(or_event) for event in events: event._listeners.add(or_event) return or_event def is_set(self) -> bool: return self._event.is_set() def set(self) -> None: self._event.set() for listener in self._listeners: listener.set() def clear(self) -> None: self._event.clear() async def wait(self, *, timeout: float | None = None) -> None: self._event.wait(timeout=timeout)