Source code for swimprotocol.udp
from __future__ import annotations
import asyncio
from concurrent.futures import ThreadPoolExecutor
from contextlib import closing, AsyncExitStack
from typing import Any, Final, Optional
from .config import UdpConfig
from .pack import UdpPack
from .protocol import UdpProtocol, TcpProtocol
from .send import UdpSend
from ..transport import Transport
from ..worker import Worker
__all__ = ['UdpTransport']
[docs]class UdpTransport(Transport[UdpConfig]):
"""Implements :class:`~swimprotocol.transport.Transport` using UDP, without
`broadcast <https://en.wikipedia.org/wiki/Broadcast_address>`_.
This transport assumes that the name of each cluster member is in
``host:port`` format, and that any cluster member can receive UDP packets
from any other cluster member.
Args:
config: The cluster configuration object.
"""
config_type = UdpConfig
def __init__(self, config: UdpConfig, worker: Worker) -> None:
super().__init__(config, worker)
self.address_parser: Final = config.address_parser
self.udp_pack: Final = UdpPack(config.signatures)
self._local_address = self.address_parser.parse(config.local_name)
self._stack = AsyncExitStack()
@property
def bind_host(self) -> str:
"""The local bind address used to open the UDP socket to receive
packets.
See Also:
:func:`asyncio.loop.create_datagram_endpoint`
"""
bind_host: Optional[str] = self.config.bind_host
return bind_host or self._local_address.host
@property
def bind_port(self) -> int:
"""The local bind port used to open the UDP socket to receive packets.
See Also:
:func:`asyncio.loop.create_datagram_endpoint`
"""
bind_port: Optional[int] = self.config.bind_port
return bind_port or self._local_address.port
async def __aenter__(self) -> None:
loop = asyncio.get_running_loop()
stack = self._stack
thread_pool = stack.enter_context(ThreadPoolExecutor())
recv_queue = self.worker.recv_queue
send_queue = self.worker.send_queue
tcp_server = await loop.create_server(
lambda: TcpProtocol(thread_pool, self.udp_pack, recv_queue),
self.bind_host, self.bind_port, reuse_port=True)
udp_transport, _ = await loop.create_datagram_endpoint(
lambda: UdpProtocol(thread_pool, self.udp_pack, recv_queue),
reuse_port=True, local_addr=(self.bind_host, self.bind_port))
await stack.enter_async_context(UdpSend(
self.config, self.udp_pack, thread_pool, send_queue,
udp_transport))
stack.enter_context(closing(udp_transport))
await stack.enter_async_context(tcp_server)
def __aexit__(self, *exc_details: Any) -> Any:
return self._stack.__aexit__(*exc_details)