from __future__ import annotations
import socket
from ipaddress import IPv4Address, IPv6Address
from socket import AddressFamily, SocketKind
from ssl import SSLObject, SSLSocket
from typing import Optional, Union
from .result import ProxyResult, ProxyResultUnknown, ProxyResultIPv4, \
ProxyResultIPv6, ProxyResultUnix
from .tlv import ProxyProtocolTLV, ProxyProtocolSSLTLV, ProxyProtocolExtTLV
from .typing import PeerCert, SockAddr, TransportProtocol
__all__ = ['build_socket_result', 'build_transport_result']
[docs]
def build_socket_result(sock: socket.socket, *,
unique_id: Optional[bytes] = None,
dnsbl: Optional[str] = None) -> ProxyResult:
"""Build a :class:`~proxyprotocol.result.ProxyResult` from the given
*sock* object.
Args:
sock: A connected socket object.
unique_id: A unique ID to associate with the connection.
dnsbl: The DNSBL lookup result for the connection.
"""
peername: SockAddr = sock.getpeername()
sockname: SockAddr = sock.getsockname()
ssl_sock: Optional[SSLSocket] = None
if isinstance(sock, SSLSocket):
ssl_sock = sock
return _build_result(sock, peername, sockname, ssl_sock, unique_id, dnsbl)
[docs]
def build_transport_result(transport: TransportProtocol, *,
unique_id: Optional[bytes] = None,
dnsbl: Optional[str] = None) -> ProxyResult:
"""Build a :class:`~proxyprotocol.result.ProxyResult` from the given
*transport* object.
Args:
transport: A connected transport object.
unique_id: A unique ID to associate with the connection.
dnsbl: The DNSBL lookup result for the connection.
"""
sock: socket.socket = transport.get_extra_info('socket')
peername: SockAddr = transport.get_extra_info('peername')
sockname: SockAddr = transport.get_extra_info('sockname')
ssl_obj: Optional[SSLObject] = transport.get_extra_info('ssl_object')
return _build_result(sock, peername, sockname, ssl_obj, unique_id, dnsbl)
def _build_result(sock: socket.socket, peername: SockAddr, sockname: SockAddr,
ssl_obj: Union[None, SSLObject, SSLSocket],
unique_id: Optional[bytes], dnsbl: Optional[str]) \
-> ProxyResult:
family = AddressFamily(sock.family)
protocol = SocketKind(sock.type)
if family == socket.AF_INET:
assert isinstance(peername, tuple)
assert isinstance(sockname, tuple)
tlv = _build_tlv(ssl_obj, unique_id, dnsbl)
return ProxyResultIPv4((IPv4Address(peername[0]), peername[1]),
(IPv4Address(sockname[0]), sockname[1]),
protocol=protocol, tlv=tlv)
elif family == socket.AF_INET6:
assert isinstance(peername, tuple)
assert isinstance(sockname, tuple)
tlv = _build_tlv(ssl_obj, unique_id, dnsbl)
return ProxyResultIPv6((IPv6Address(peername[0]), peername[1]),
(IPv6Address(sockname[0]), sockname[1]),
protocol=protocol, tlv=tlv)
elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
assert isinstance(peername, str)
assert isinstance(sockname, str)
tlv = _build_tlv(ssl_obj, unique_id, dnsbl)
return ProxyResultUnix(peername, sockname, protocol=protocol, tlv=tlv)
else:
return ProxyResultUnknown()
def _build_tlv(ssl_obj: Union[None, SSLObject, SSLSocket],
unique_id: Optional[bytes],
dnsbl: Optional[str]) -> ProxyProtocolTLV:
ssl_tlv: Optional[ProxyProtocolSSLTLV] = None
ext_tlv: Optional[ProxyProtocolExtTLV] = None
if dnsbl is not None:
ext_tlv = ProxyProtocolExtTLV(init=ext_tlv, dnsbl=dnsbl)
if ssl_obj is not None:
cipher, version, secret_bits = ssl_obj.cipher() or (None, None, None)
peercert: Optional[PeerCert] = ssl_obj.getpeercert()
ssl_tlv = ProxyProtocolSSLTLV(
has_ssl=True, verified=True,
has_cert_conn=(peercert is not None),
cipher=cipher, version=version)
ext_tlv = ProxyProtocolExtTLV(
init=ext_tlv, compression=ssl_obj.compression(),
secret_bits=secret_bits, peercert=peercert)
return ProxyProtocolTLV(unique_id=unique_id, ssl=ssl_tlv, ext=ext_tlv)