from __future__ import annotations
from collections.abc import Awaitable, Callable, Iterable
from contextlib import suppress
from typing import TypeAlias, NoReturn
from pymap.bytes import MaybeBytes
from pymap.concurrent import Event
from pymap.config import IMAPConfig
from pymap.context import socket_info, connection_exit
from pymap.exceptions import NotAllowedError, NotSupportedError, \
CloseConnection
from pymap.fetch import MessageAttributes
from pymap.interfaces.login import LoginInterface
from pymap.interfaces.session import SessionInterface
from pymap.parsing.command import CommandAuth, CommandNonAuth, CommandSelect, \
Command
from pymap.parsing.command.any import CapabilityCommand, LogoutCommand, \
NoOpCommand, IdCommand
from pymap.parsing.command.nonauth import AuthenticateCommand, LoginCommand, \
StartTLSCommand
from pymap.parsing.command.auth import AppendCommand, CreateCommand, \
DeleteCommand, ListCommand, RenameCommand, SelectCommand, StatusCommand, \
SubscribeCommand, UnsubscribeCommand
from pymap.parsing.command.select import CheckCommand, CloseCommand, \
IdleCommand, ExpungeCommand, CopyCommand, MoveCommand, FetchCommand, \
StoreCommand, SearchCommand
from pymap.parsing.commands import InvalidCommand
from pymap.parsing.primitives import List, Number
from pymap.parsing.response import ResponseOk, ResponseNo, ResponseBad, \
ResponseCode, ResponsePreAuth, CommandResponse, UntaggedResponse
from pymap.parsing.response.code import Capability, PermanentFlags, UidNext, \
UidValidity, Unseen, MailboxId
from pymap.parsing.response.specials import FlagsResponse, ExistsResponse, \
RecentResponse, FetchResponse, ListResponse, LSubResponse, IdResponse, \
SearchResponse, StatusResponse
from pymap.parsing.specials import StatusAttribute, FetchAttribute, FetchValue
from pymap.selected import SelectedMailbox
from pysasl.creds.plain import PlainCredentials
from pysasl.creds.server import ServerCredentials
__all__ = ['ConnectionState']
_AuthCommands: TypeAlias = AuthenticateCommand | LoginCommand
_CommandRet: TypeAlias = tuple[CommandResponse, SelectedMailbox | None]
_CommandFunc: TypeAlias = Callable[[Command], Awaitable[_CommandRet]]
_flags_attr = FetchAttribute(b'FLAGS')
_uid_attr = FetchAttribute(b'UID')
[docs]
class ConnectionState:
"""Defines the state flow of the IMAP connection. Determines if a command
can be processed at that point in the connection, and interacts with the
backend plugin.
"""
def __init__(self, login: LoginInterface, config: IMAPConfig) -> None:
super().__init__()
self.config = config
self.auth = config.initial_auth
self.login = login
self._session: SessionInterface | None = None
self._selected: SelectedMailbox | None = None
self._capability = list(config.initial_capability)
@property
def session(self) -> SessionInterface:
if self._session is None:
# Commands using this attribute should be state-bound to only be
# allowed once a session has begun, so this should not throw.
raise AttributeError('Connection has not begun a session.',
name='session', obj=self)
return self._session
@property
def selected(self) -> SelectedMailbox:
if self._selected is None:
# Commands using this attribute should be state-bound to only be
# allowed once mailbox has been selected, so this should not throw.
raise AttributeError('Connection has not selected a mailbox.',
name='selected', obj=self)
return self._selected
@property
def capability(self) -> Capability:
if self._session:
return Capability(self._capability)
else:
if self.auth.get_server(b'PLAIN') is None:
logindisabled = [b'LOGINDISABLED']
else:
logindisabled = []
return Capability(self._capability + logindisabled +
[b'AUTH=%b' % mech.name for mech in
self.auth.server_mechanisms])
async def do_cleanup(self) -> None:
with suppress(Exception):
await self.session.cleanup()
async def _login(self, creds: ServerCredentials) \
-> SessionInterface:
stack = connection_exit.get()
authenticated = await self.login.authenticate(creds)
authorized = await self.login.authorize(authenticated, creds.authzid)
return await stack.enter_async_context(authorized.new_session())
async def do_greeting(self) -> CommandResponse:
sock_info = socket_info.get()
if self.config.reject_dnsbl and sock_info.dnsbl is not None:
raise NotAllowedError(f'Connection rejected: {sock_info.dnsbl}')
preauth_creds = self.config.preauth_credentials
if preauth_creds:
self._session = await self._login(preauth_creds)
elif sock_info.from_localhost:
self.auth = self.config.tls_auth
resp_cls: type[CommandResponse]
if preauth_creds:
resp_cls = ResponsePreAuth
else:
resp_cls = ResponseOk
return resp_cls(b'*', self.config.greeting, self.capability)
async def do_authenticate(self, cmd: _AuthCommands,
creds: ServerCredentials | None) \
-> CommandResponse:
if not creds:
return ResponseNo(cmd.tag, b'Invalid authentication mechanism.')
self._session = await self._login(creds)
self._capability.extend(self.config.login_capability)
return ResponseOk(cmd.tag, b'Authentication successful.',
self.capability)
async def do_login(self, cmd: LoginCommand) -> _CommandRet:
if b'LOGINDISABLED' in self.capability:
raise NotSupportedError('LOGIN is disabled.')
creds = PlainCredentials(
cmd.userid.decode('utf-8', 'surrogateescape'),
cmd.password.decode('utf-8', 'surrogateescape'))
return await self.do_authenticate(cmd, creds), None
async def do_starttls(self, cmd: StartTLSCommand) -> _CommandRet:
try:
self._capability.remove(b'STARTTLS')
except ValueError:
raise NotSupportedError('STARTTLS not available.') from None
self.auth = self.config.tls_auth
return ResponseOk(cmd.tag, b'Ready to handshake.'), None
async def do_capability(self, cmd: CapabilityCommand) -> _CommandRet:
response = ResponseOk(cmd.tag, b'Capabilities listed.')
response.add_untagged(UntaggedResponse(self.capability.string))
return response, None
async def do_noop(self, cmd: NoOpCommand) -> _CommandRet:
updates = None
if self._selected and self._session:
updates = await self.session.check_mailbox(self.selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_id(self, cmd: IdCommand) -> _CommandRet:
response = ResponseOk(cmd.tag, cmd.command + b' completed.')
response.add_untagged(IdResponse(self.config.id_response))
return response, None
async def do_select(self, cmd: SelectCommand) -> _CommandRet:
self._selected = None
mailbox, updates = await self.session.select_mailbox(
cmd.mailbox, cmd.readonly)
if updates.readonly:
num_recent = mailbox.recent
resp = ResponseOk(cmd.tag, b'Selected mailbox.',
ResponseCode.of(b'READ-ONLY'))
resp.add_untagged_ok(b'Read-only mailbox.', PermanentFlags([]))
else:
num_recent = updates.session_flags.recent
resp = ResponseOk(cmd.tag, b'Selected mailbox.',
ResponseCode.of(b'READ-WRITE'))
resp.add_untagged_ok(b'Flags permitted.',
PermanentFlags(mailbox.permanent_flags))
messages = updates.messages
resp.add_untagged(FlagsResponse(mailbox.flags))
resp.add_untagged(ExistsResponse(messages.exists))
resp.add_untagged(RecentResponse(num_recent))
resp.add_untagged_ok(b'Predicted next UID.',
UidNext(mailbox.next_uid))
resp.add_untagged_ok(b'UIDs valid.',
UidValidity(mailbox.uid_validity))
if mailbox.first_unseen:
resp.add_untagged_ok(b'First unseen message.',
Unseen(mailbox.first_unseen))
resp.add_untagged_ok(b'Object ID.', MailboxId(mailbox.mailbox_id))
return resp, updates
async def do_create(self, cmd: CreateCommand) -> _CommandRet:
if cmd.mailbox == 'INBOX':
return ResponseNo(cmd.tag, b'Cannot create INBOX.'), None
mailbox_id, updates = await self.session.create_mailbox(
cmd.mailbox, selected=self._selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.',
MailboxId(mailbox_id)), updates
async def do_delete(self, cmd: DeleteCommand) -> _CommandRet:
if cmd.mailbox == 'INBOX':
return ResponseNo(cmd.tag, b'Cannot delete INBOX.'), None
updates = await self.session.delete_mailbox(
cmd.mailbox, selected=self._selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_rename(self, cmd: RenameCommand) -> _CommandRet:
if cmd.to_mailbox == 'INBOX':
return ResponseNo(cmd.tag, b'Cannot rename to INBOX.'), None
updates = await self.session.rename_mailbox(
cmd.from_mailbox, cmd.to_mailbox, selected=self._selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_status(self, cmd: StatusCommand) -> _CommandRet:
mailbox, updates = await self.session.get_mailbox(
cmd.mailbox, selected=self._selected)
data: dict[StatusAttribute, MaybeBytes] = {}
for attr in cmd.status_list:
if attr == b'MESSAGES':
data[attr] = Number(mailbox.exists)
elif attr == b'RECENT':
if updates and updates.mailbox_id == mailbox.mailbox_id:
data[attr] = Number(updates.session_flags.recent)
else:
data[attr] = Number(mailbox.recent)
elif attr == b'UNSEEN':
data[attr] = Number(mailbox.unseen)
elif attr == b'UIDNEXT':
data[attr] = Number(mailbox.next_uid)
elif attr == b'UIDVALIDITY':
data[attr] = Number(mailbox.uid_validity)
elif attr == b'MAILBOXID':
data[attr] = mailbox.mailbox_id.parens
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
resp.add_untagged(StatusResponse(cmd.mailbox, data))
return resp, updates
async def do_append(self, cmd: AppendCommand) -> _CommandRet:
if len(cmd.messages) > 1 and b'MULTIAPPEND' not in self.capability:
raise NotSupportedError('MULTIAPPEND is disabled.')
if cmd.cancelled:
return ResponseNo(cmd.tag, b'APPEND cancelled.'), None
if cmd.error:
raise cmd.error
append_uid, updates = await self.session.append_messages(
cmd.mailbox, cmd.messages, selected=self._selected)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.', append_uid)
return resp, updates
async def do_subscribe(self, cmd: SubscribeCommand) -> _CommandRet:
updates = await self.session.subscribe(
cmd.mailbox, selected=self._selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_unsubscribe(self, cmd: UnsubscribeCommand) -> _CommandRet:
updates = await self.session.unsubscribe(
cmd.mailbox, selected=self._selected)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_list(self, cmd: ListCommand) -> _CommandRet:
mailboxes, updates = await self.session.list_mailboxes(
cmd.ref_name, cmd.filter, subscribed=cmd.only_subscribed,
selected=self._selected)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
resp_type = LSubResponse if cmd.only_subscribed else ListResponse
for name, sep, attrs in mailboxes:
resp.add_untagged(resp_type(name, sep, attrs))
return resp, updates
async def do_check(self, cmd: CheckCommand) -> _CommandRet:
updates = await self.session.check_mailbox(
self.selected, housekeeping=True)
return ResponseOk(cmd.tag, cmd.command + b' completed.'), updates
async def do_close(self, cmd: CloseCommand) -> _CommandRet:
await self.session.expunge_mailbox(self.selected)
self._selected = None
return ResponseOk(cmd.tag, cmd.command + b' completed.'), None
async def do_expunge(self, cmd: ExpungeCommand) -> _CommandRet:
updates = await self.session.expunge_mailbox(
self.selected, cmd.uid_set)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
return resp, updates
async def do_copy(self, cmd: CopyCommand) -> _CommandRet:
copy_uid, updates = await self.session.copy_messages(
self.selected, cmd.sequence_set, cmd.mailbox)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.', copy_uid)
return resp, updates
async def do_move(self, cmd: MoveCommand) -> _CommandRet:
copy_uid, updates = await self.session.move_messages(
self.selected, cmd.sequence_set, cmd.mailbox)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
resp.add_untagged_ok(b'Moved.', copy_uid)
return resp, updates
async def do_fetch(self, cmd: FetchCommand) -> _CommandRet:
if not cmd.uid:
self.selected.hide_expunged = True
set_seen = not self.selected.readonly and \
any(attr.set_seen for attr in cmd.attributes)
messages, updates = await self.session.fetch_messages(
self.selected, cmd.sequence_set, set_seen)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
for msg_seq, msg in messages:
if msg.expunged:
resp.code = ResponseCode.of(b'EXPUNGEISSUED')
msg_attrs = MessageAttributes(msg, self.selected, cmd.attributes)
fetch_resp = FetchResponse(msg_seq, msg_attrs,
writing_hook=msg_attrs.load_hook())
resp.add_untagged(fetch_resp)
return resp, updates
async def do_search(self, cmd: SearchCommand) -> _CommandRet:
if not cmd.uid:
self.selected.hide_expunged = True
messages, updates = await self.session.search_mailbox(
self.selected, cmd.keys)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
msg_ids: list[int] = []
for msg_seq, msg in messages:
if msg.expunged:
resp.code = ResponseCode.of(b'EXPUNGEISSUED')
if cmd.uid:
msg_ids.append(msg.uid)
else:
msg_ids.append(msg_seq)
resp.add_untagged(SearchResponse(msg_ids))
return resp, updates
async def do_store(self, cmd: StoreCommand) -> _CommandRet:
if not cmd.uid:
self.selected.hide_expunged = True
if cmd.silent:
self.selected.silence(cmd.sequence_set, cmd.flag_set, cmd.mode)
messages, updates = await self.session.update_flags(
self.selected, cmd.sequence_set, cmd.flag_set, cmd.mode)
resp = ResponseOk(cmd.tag, cmd.command + b' completed.')
session_flags = self.selected.session_flags
for msg_seq, msg in messages:
if msg.expunged:
resp.code = ResponseCode.of(b'EXPUNGEISSUED')
elif cmd.silent:
continue
flags = msg.get_flags(session_flags)
fetch_data: list[FetchValue] = [
FetchValue.of(_flags_attr, List(flags, sort=True))]
if cmd.uid:
fetch_data.append(
FetchValue.of(_uid_attr, Number(msg.uid)))
resp.add_untagged(FetchResponse(msg_seq, fetch_data))
return resp, updates
async def do_idle(self, cmd: IdleCommand) -> _CommandRet:
if b'IDLE' not in self.capability:
raise NotSupportedError('IDLE is disabled.')
return ResponseOk(cmd.tag, cmd.command + b' completed.'), None
@classmethod
async def do_logout(cls, cmd: LogoutCommand) -> NoReturn:
raise CloseConnection()
async def receive_updates(self, cmd: IdleCommand, done: Event) \
-> Iterable[UntaggedResponse]:
selected = await self.session.check_mailbox(
self.selected, wait_on=done)
self._selected, untagged = selected.fork(cmd)
return untagged
@classmethod
def _get_func_name(cls, cmd: Command) -> str:
cmd_type = type(cmd)
while cmd_type.delegate:
cmd_type = cmd_type.delegate
cmd_str = str(cmd_type.command, 'ascii').lower()
return 'do_' + cmd_str
async def do_command(self, cmd: Command) -> CommandResponse:
if isinstance(cmd, InvalidCommand):
return ResponseBad(cmd.tag, cmd.message)
elif self._session and isinstance(cmd, CommandNonAuth):
msg = cmd.command + b': Already authenticated.'
return ResponseBad(cmd.tag, msg)
elif not self._session and isinstance(cmd, CommandAuth):
msg = cmd.command + b': Must authenticate first.'
return ResponseBad(cmd.tag, msg)
elif not self._selected and isinstance(cmd, CommandSelect):
msg = cmd.command + b': Must select a mailbox first.'
return ResponseBad(cmd.tag, msg)
func_name = self._get_func_name(cmd)
try:
func: _CommandFunc = getattr(self, func_name)
except AttributeError:
return ResponseNo(cmd.tag, cmd.command + b': Not Implemented')
response, selected = await func(cmd)
if selected is not None:
self._selected, untagged = selected.fork(cmd)
response.add_untagged(*untagged)
return response