diff --git a/Doc/library/poplib.rst b/Doc/library/poplib.rst index 23f20b00e6dc6d..e8b3e838a68c55 100644 --- a/Doc/library/poplib.rst +++ b/Doc/library/poplib.rst @@ -245,6 +245,18 @@ A :class:`POP3` instance has the following methods: .. versionadded:: 3.4 +.. method:: POP3.auth(mechanism, authobject=None, initial_response=None) + + Authenticate using the POP3 ``AUTH`` command as specified in :rfc:`5034`. + + If *initial_response* is provided (``bytes`` or ``str``), it is + base64-encoded and appended to the command after a single space. + + If *authobject* is provided, it is called with the server’s ``bytes`` + challenge (already base64-decoded) and must return the client response + (``bytes`` or ``str``). Return ``b'*'`` to abort the exchange. + + Instances of :class:`POP3_SSL` have no additional methods. The interface of this subclass is identical to its parent. diff --git a/Lib/poplib.py b/Lib/poplib.py index 4469bff44b4c45..9f917e1a5cc04e 100644 --- a/Lib/poplib.py +++ b/Lib/poplib.py @@ -17,6 +17,7 @@ import re import socket import sys +import base64 try: import ssl @@ -217,6 +218,60 @@ def pass_(self, pswd): """ return self._shortcmd('PASS %s' % pswd) + def auth(self, mechanism, authobject=None, initial_response=None): + """Authenticate to the POP3 server using the AUTH command (RFC 5034). + + Result is 'response'. + """ + if authobject is not None and initial_response is not None: + raise ValueError('authobject and initial_response are mutually exclusive') + + if initial_response is not None: + if isinstance(initial_response, str): + initial_response = initial_response.encode(self.encoding) + b64 = base64.b64encode(initial_response).decode('ascii') + return self._shortcmd(f'AUTH {mechanism} {b64}'.rstrip()) + + if authobject is None: + return self._shortcmd(f'AUTH {mechanism}') + + self._putcmd(f'AUTH {mechanism}') + while True: + line, _ = self._getline() + if line.startswith(b'+OK'): + return line + if line.startswith(b'-ERR'): + while self._getline() != b'.\r\n': + pass + raise error_proto(line.decode('ascii', 'replace')) + + if not line.startswith(b'+ '): + raise error_proto(f'malformed challenge line: {line!r}') + + challenge_b64 = line[2:] + challenge_b64 = challenge_b64.rstrip(b'\r\n') + + if challenge_b64: + try: + challenge = base64.b64decode(challenge_b64) + except Exception: + padded = challenge_b64 + b'=' * (-len(challenge_b64) % 4) + challenge = base64.b64decode(padded) + else: + challenge = b'' + + response = authobject(challenge) + if response is None: + response = b'' + if isinstance(response, str): + response = response.encode(self.encoding) + + if response == b'*': + self._putcmd('*') + err_line, _ = self._getline() + raise error_proto(err_line.decode('ascii', 'replace')) + + self._putcmd(base64.b64encode(response).decode('ascii')) def stat(self): """Get mailbox status. diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index ef2da97f86734a..9b9e872765f3d6 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -2,7 +2,7 @@ # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL # a real test suite - +import base64 import poplib import socket import os @@ -49,7 +49,7 @@ class DummyPOP3Handler(asynchat.async_chat): - CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} + CAPAS = {'UIDL': [], 'SASL': ['PLAIN'], 'IMPLEMENTATION': ['python-testlib-pop-server']} enable_UTF8 = False def __init__(self, conn): @@ -59,6 +59,8 @@ def __init__(self, conn): self.push('+OK dummy pop3 server ready. ') self.tls_active = False self.tls_starting = False + self._auth_pending = False + self._auth_mech = None def collect_incoming_data(self, data): self.in_buffer.append(data) @@ -67,6 +69,20 @@ def found_terminator(self): line = b''.join(self.in_buffer) line = str(line, 'ISO-8859-1') self.in_buffer = [] + + if self._auth_pending: + self._auth_pending = False + if line == '*': + self.push('-ERR authentication cancelled') + return + try: + base64.b64decode(line.encode('ascii')) + except Exception: + self.push('-ERR invalid base64') + return + self.push('+OK Logged in.') + return + cmd = line.split(' ')[0].lower() space = line.find(' ') if space != -1: @@ -85,6 +101,28 @@ def handle_error(self): def push(self, data): asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') + def cmd_auth(self, arg): + parts = arg.split() + if not parts: + self.push('-ERR missing mechanism') + return + mech = parts[0].upper() + if mech != 'PLAIN': + self.push('-ERR unsupported mechanism') + return + if len(parts) >= 2: + try: + base64.b64decode(parts[1].encode('ascii')) + except Exception: + self.push('-ERR invalid base64') + return + self.push('+OK Logged in.') + else: + self._auth_pending = True + self._auth_mech = mech + self.in_buffer.clear() + self.push('+ ') + def cmd_echo(self, arg): # sends back the received string (used by the test suite) self.push(arg) @@ -286,6 +324,49 @@ def test_pass_(self): self.assertOK(self.client.pass_('python')) self.assertRaises(poplib.error_proto, self.client.user, 'invalid') + def test_auth_plain_initial_response(self): + secret = b"user\x00adminuser\x00password" + resp = self.client.auth("PLAIN", initial_response=secret) + self.assertStartsWith(resp, b"+OK") + + def test_auth_plain_challenge_response(self): + secret = b"user\x00adminuser\x00password" + def authobject(challenge): + return secret + resp = self.client.auth("PLAIN", authobject=authobject) + self.assertStartsWith(resp, b"+OK") + + def test_auth_rejects_conflicting_args(self): + with self.assertRaises(ValueError): + self.client.auth("PLAIN", authobject=lambda c: b"x", initial_response=b"y") + + def test_auth_unsupported_mechanism(self): + with self.assertRaises(poplib.error_proto): + self.client.auth("FOO") + + def test_auth_cancel(self): + def authobject(_challenge): + return b"*" + with self.assertRaises(poplib.error_proto): + self.client.auth("PLAIN", authobject=authobject) + + def test_auth_mechanism_case_insensitive(self): + secret = b"user\x00adminuser\x00password" + # use lowercase mechanism name to ensure server accepts + resp = self.client.auth("plain", initial_response=secret) + self.assertStartsWith(resp, b"+OK") + + def test_auth_initial_response_str(self): + secret = "user\x00adminuser\x00password" # str, not bytes + resp = self.client.auth("PLAIN", initial_response=secret) + self.assertStartsWith(resp, b"+OK") + + def test_auth_authobject_returns_str(self): + def authobject(challenge): + return "user\x00adminuser\x00password" + resp = self.client.auth("PLAIN", authobject=authobject) + self.assertStartsWith(resp, b"+OK") + def test_stat(self): self.assertEqual(self.client.stat(), (10, 100)) @@ -434,6 +515,9 @@ def __init__(self, conn): self.push('+OK dummy pop3 server ready. ') self.tls_active = True self.tls_starting = False + # Initialize AUTH state like DummyPOP3Handler to avoid AttributeError + self._auth_pending = False + self._auth_mech = None @requires_ssl diff --git a/Misc/NEWS.d/next/Library/2025-12-12-15-09-34.gh-issue-64551.NfWlQX.rst b/Misc/NEWS.d/next/Library/2025-12-12-15-09-34.gh-issue-64551.NfWlQX.rst new file mode 100644 index 00000000000000..3ad94a43d1bd7b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-12-12-15-09-34.gh-issue-64551.NfWlQX.rst @@ -0,0 +1 @@ +Add RFC 5034 AUTH support to poplib