Source code for proxyprotocol.dnsbl


from __future__ import annotations

import asyncio
from abc import abstractmethod, ABCMeta
from asyncio import AbstractEventLoop
from ipaddress import IPv4Address, IPv4Network
from socket import AF_INET, SOCK_STREAM
from typing import Optional, Sequence
from typing_extensions import Final

from .sock import SocketInfo

__all__ = ['Dnsbl', 'NoopDnsbl', 'BasicDnsbl', 'SpamhausDnsbl']


[docs] class Dnsbl(metaclass=ABCMeta): """Manages the optional lookup of the connecting IP address against a trusted `DNSBL <https://en.wikipedia.org/wiki/Domain_Name_System-based_blackhole_list>`_. """ __slots__: Sequence[str] = []
[docs] @abstractmethod async def lookup(self, sock_info: SocketInfo, *, loop: Optional[AbstractEventLoop] = None) \ -> Optional[str]: """Looks up the connecting IP address and returns the DNSBL hostname and the lookup result. Any timeout or misconfiguration is treated as an empty result. Args: sock_info: The connection socket info. """ ...
[docs] @classmethod def load(cls, host: Optional[str], *, timeout: Optional[float] = None) -> Dnsbl: """Given a DNSBL hostname, returns a :class:`Dnsbl` implementation that best suits the given *host*. Args: host: The DNSBL hostname, if any. timeout: The time to wait for a response, in seconds, or None for indefinite. Raises: ValueError: The *host* is invalid for this :class:`Dnsbl`. """ if host is None: return NoopDnsbl() elif host.endswith('.spamhaus.org'): return SpamhausDnsbl(host, timeout=timeout) else: return BasicDnsbl(host, timeout=timeout)
[docs] class NoopDnsbl(Dnsbl): """Disables DNSBL lookup altogether, :meth:`.lookup` always returns ``None``. """ __slots__: Sequence[str] = []
[docs] async def lookup(self, sock_info: SocketInfo, *, loop: Optional[AbstractEventLoop] = None) \ -> Optional[str]: return None
[docs] class BasicDnsbl(Dnsbl): """A basic :class:`Dnsbl` implementation that simply returns the DNSBL hostname if the DNS lookup returns any IP addresses. """ __slots__ = ['host', 'timeout'] def __init__(self, host: str, timeout: Optional[float]) -> None: super().__init__() self.host: Final = host self.timeout: Final = timeout
[docs] def map_results(self, addresses: Sequence[IPv4Address]) -> Optional[str]: """Given a list of IP address results from a DNSBL lookup, return a single string categorizing the results or ``None`` to discard them. Args: addresses: The list of IP address results. """ if addresses: return self.host else: return None
[docs] async def lookup(self, sock_info: SocketInfo, *, loop: Optional[AbstractEventLoop] = None) \ -> Optional[str]: host = self.host peername_ip = sock_info.peername_ip if not isinstance(peername_ip, IPv4Address): return self.map_results([]) loop = loop or asyncio.get_running_loop() lookup = '.'.join(peername_ip.reverse_pointer.split('.')[0:4] + [host]) try: addrinfo = await asyncio.wait_for( loop.getaddrinfo(lookup, 0, family=AF_INET, type=SOCK_STREAM), self.timeout) except OSError: pass else: if addrinfo: addresses = [IPv4Address(res[4][0]) for res in addrinfo] return self.map_results(addresses) return self.map_results([])
[docs] class SpamhausDnsbl(BasicDnsbl): """A :class:`Dnsbl` designed for querying `Spamhaus <https://www.spamhaus.org/>`_ DNSBLs. """ __slots__: Sequence[str] = [] _mapping = [(IPv4Network('127.0.0.2/32'), 'https://www.spamhaus.org/sbl/'), (IPv4Network('127.0.0.3/32'), 'https://www.spamhaus.org/css/'), (IPv4Network('127.0.0.4/30'), 'https://www.spamhaus.org/xbl/'), (IPv4Network('127.0.0.10/31'), 'https://www.spamhaus.org/pbl/')]
[docs] def map_results(self, addresses: Sequence[IPv4Address]) -> Optional[str]: if not addresses: return None result = addresses[0] for network, host in self._mapping: if result in network: return host return self.host