Source code for swimprotocol.udp.send
from __future__ import annotations
import asyncio
from asyncio import Queue, Protocol, DatagramTransport
from concurrent.futures import ThreadPoolExecutor
from contextlib import closing
from typing import NoReturn
from .config import UdpConfig
from .pack import UdpPack
from ..address import Address
from ..members import Member
from ..packet import Packet
from ..tasks import DaemonTask
__all__ = ['UdpSend']
[docs]class UdpSend(DaemonTask):
"""Daemon task that waits for packets on *send_queue* and sends them using
either by UDP or -- for oversized packets -- establishing a TCP connection.
"""
def __init__(self, config: UdpConfig, udp_pack: UdpPack,
thread_pool: ThreadPoolExecutor,
send_queue: Queue[tuple[Member, Packet]],
udp_transport: DatagramTransport) -> None:
super().__init__()
self._address_parser = config.address_parser
self._mtu_size = config.mtu_size
self._udp_pack = udp_pack
self._thread_pool = thread_pool
self._send_queue = send_queue
self._udp_transport = udp_transport
[docs] async def run(self) -> NoReturn:
send_queue = self._send_queue
while True:
member, packet = await send_queue.get()
asyncio.create_task(self._do_send(member, packet))
async def _do_send(self, member: Member, packet: Packet) -> None:
thread_pool = self._thread_pool
udp_transport = self._udp_transport
loop = asyncio.get_running_loop()
packet_data = await loop.run_in_executor(
thread_pool, self._udp_pack.pack, packet)
address = self._address_parser.parse(member.name)
if len(packet_data) <= self._mtu_size:
udp_transport.sendto(packet_data, (address.host, address.port))
else:
asyncio.create_task(self._tcp_send(packet_data, address))
async def _tcp_send(self, packet_data: bytes, address: Address) -> None:
loop = asyncio.get_running_loop()
try:
tcp_transport, _ = await loop.create_connection(
Protocol, address.host, address.port)
with closing(tcp_transport):
tcp_transport.write(packet_data)
except Exception: # noqa: S110
pass