diff --git a/imap_client/README.md b/imap_client/README.md new file mode 100644 index 0000000..55dc959 --- /dev/null +++ b/imap_client/README.md @@ -0,0 +1,50 @@ +IMAP Mail Client (CLI) + +Simple Python IMAP CLI client using the standard library. Supports listing mailboxes, searching messages, showing headers/body, flag operations, moving and deleting messages. + +Quick start + +1) Ensure Python 3.9+ is installed. +2) No third-party dependencies required. + +Environment variables (optional) + +- IMAP_HOST +- IMAP_PORT +- IMAP_SSL ("1" or "0") +- IMAP_STARTTLS ("1" or "0") +- IMAP_USER +- IMAP_PASSWORD + +Usage + +List available commands: + +```bash +python -m imap_client --help +``` + +Examples + +```bash +# List mailboxes +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret mailboxes + +# List unseen messages in INBOX, limiting to 20 +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret list --mailbox INBOX --unseen --limit 20 + +# Show a message by UID with headers and text body +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret show --mailbox INBOX --uid 12345 --headers --body + +# Move messages to Archive +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret move --mailbox INBOX --uids 12345,12346 --to Archive + +# Delete and expunge +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret delete --mailbox INBOX --uids 12345,12346 +python -m imap_client --host imap.example.com --ssl 1 --user alice --password secret expunge --mailbox INBOX +``` + +Security + +- Prefer using app-specific passwords or environment variables. Avoid passing credentials via shell history when possible. + diff --git a/imap_client/imap_client/__init__.py b/imap_client/imap_client/__init__.py new file mode 100644 index 0000000..84b6217 --- /dev/null +++ b/imap_client/imap_client/__init__.py @@ -0,0 +1,4 @@ +__all__ = ["__version__"] + +__version__ = "0.1.0" + diff --git a/imap_client/imap_client/__main__.py b/imap_client/imap_client/__main__.py new file mode 100644 index 0000000..ad4c43d --- /dev/null +++ b/imap_client/imap_client/__main__.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import argparse +import os +import sys +from typing import List + +from .imap_client import ImapClient, ImapConfig, ImapError, load_config_from_env + + +def parse_uids(uids_csv: str) -> List[int]: + if not uids_csv: + return [] + return [int(x) for x in uids_csv.split(',') if x.strip()] + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="imap-client", description="IMAP mail client CLI") + parser.add_argument("--host", default=os.environ.get("IMAP_HOST"), help="IMAP host") + parser.add_argument("--port", type=int, default=int(os.environ.get("IMAP_PORT", "993")), help="IMAP port") + parser.add_argument("--ssl", type=int, choices=[0, 1], default=int(os.environ.get("IMAP_SSL", "1")), help="Use SSL (default 1)") + parser.add_argument("--starttls", type=int, choices=[0, 1], default=int(os.environ.get("IMAP_STARTTLS", "0")), help="Use STARTTLS") + parser.add_argument("--user", default=os.environ.get("IMAP_USER"), help="Username") + parser.add_argument("--password", default=os.environ.get("IMAP_PASSWORD"), help="Password") + parser.add_argument("--timeout", type=int, default=int(os.environ.get("IMAP_TIMEOUT", "30")), help="Socket timeout seconds") + + sub = parser.add_subparsers(dest="command", required=True) + + sub.add_parser("mailboxes", help="List mailboxes") + + list_p = sub.add_parser("list", help="List message UIDs in a mailbox") + list_p.add_argument("--mailbox", default="INBOX") + list_p.add_argument("--limit", type=int, default=50) + list_p.add_argument("--unseen", action="store_true") + list_p.add_argument("--since", help="Since date in format 01-Jan-2024") + list_p.add_argument("--before", help="Before date in format 01-Jan-2024") + + show_p = sub.add_parser("show", help="Show message headers/body") + show_p.add_argument("--mailbox", default="INBOX") + show_p.add_argument("--uid", type=int, required=True) + show_p.add_argument("--headers", action="store_true") + show_p.add_argument("--body", action="store_true") + + flags_add = sub.add_parser("flag-add", help="Add flags to messages") + flags_add.add_argument("--mailbox", default="INBOX") + flags_add.add_argument("--uids", required=True, help="CSV of UIDs") + flags_add.add_argument("--flags", required=True, help="Space-separated flags, e.g. \\Seen \\Flagged") + + flags_rm = sub.add_parser("flag-remove", help="Remove flags from messages") + flags_rm.add_argument("--mailbox", default="INBOX") + flags_rm.add_argument("--uids", required=True) + flags_rm.add_argument("--flags", required=True) + + move_p = sub.add_parser("move", help="Move messages to another mailbox") + move_p.add_argument("--mailbox", default="INBOX") + move_p.add_argument("--uids", required=True) + move_p.add_argument("--to", required=True, dest="destination") + + del_p = sub.add_parser("delete", help="Mark messages as deleted") + del_p.add_argument("--mailbox", default="INBOX") + del_p.add_argument("--uids", required=True) + + expunge_p = sub.add_parser("expunge", help="Permanently remove messages marked as deleted") + expunge_p.add_argument("--mailbox", default="INBOX") + + return parser + + +def create_client_from_args(args: argparse.Namespace) -> ImapClient: + overrides = { + "host": args.host, + "port": args.port, + "use_ssl": bool(args.ssl), + "use_starttls": bool(args.starttls), + "username": args.user, + "password": args.password, + "timeout_seconds": args.timeout, + } + cfg = load_config_from_env(overrides) + if not cfg.host: + raise SystemExit("--host is required (or set IMAP_HOST)") + return ImapClient(cfg) + + +def main(argv: List[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + client = create_client_from_args(args) + try: + client.connect() + + if args.command == "mailboxes": + for m in client.list_mailboxes(): + print(m) + return 0 + + if args.command == "list": + uids = client.list_messages( + mailbox=args.mailbox, + limit=args.limit, + unseen=args.unseen, + since=args.since, + before=args.before, + ) + for u in uids: + print(u) + return 0 + + if args.command == "show": + if args.headers: + print(client.fetch_headers(args.mailbox, args.uid)) + if args.body: + print(client.fetch_body_text(args.mailbox, args.uid)) + if not args.headers and not args.body: + print(client.fetch_headers(args.mailbox, args.uid)) + print() + print(client.fetch_body_text(args.mailbox, args.uid)) + return 0 + + if args.command == "flag-add": + client.add_flags(args.mailbox, parse_uids(args.uids), args.flags.split()) + return 0 + + if args.command == "flag-remove": + client.remove_flags(args.mailbox, parse_uids(args.uids), args.flags.split()) + return 0 + + if args.command == "move": + client.move(args.mailbox, parse_uids(args.uids), args.destination) + return 0 + + if args.command == "delete": + client.delete(args.mailbox, parse_uids(args.uids)) + return 0 + + if args.command == "expunge": + client.expunge(args.mailbox) + return 0 + + parser.error("Unknown command") + return 2 + + except ImapError as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + finally: + client.logout() + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/imap_client/imap_client/__pycache__/__init__.cpython-313.pyc b/imap_client/imap_client/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..82dc699 Binary files /dev/null and b/imap_client/imap_client/__pycache__/__init__.cpython-313.pyc differ diff --git a/imap_client/imap_client/__pycache__/__main__.cpython-313.pyc b/imap_client/imap_client/__pycache__/__main__.cpython-313.pyc new file mode 100644 index 0000000..86234e6 Binary files /dev/null and b/imap_client/imap_client/__pycache__/__main__.cpython-313.pyc differ diff --git a/imap_client/imap_client/__pycache__/imap_client.cpython-313.pyc b/imap_client/imap_client/__pycache__/imap_client.cpython-313.pyc new file mode 100644 index 0000000..5ee046e Binary files /dev/null and b/imap_client/imap_client/__pycache__/imap_client.cpython-313.pyc differ diff --git a/imap_client/imap_client/imap_client.py b/imap_client/imap_client/imap_client.py new file mode 100644 index 0000000..7090b14 --- /dev/null +++ b/imap_client/imap_client/imap_client.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import contextlib +import email +import email.policy +import imaplib +import os +import re +import socket +import ssl +from dataclasses import dataclass +from typing import Iterable, List, Optional, Sequence, Tuple + + +@dataclass +class ImapConfig: + host: str + port: int = 993 + use_ssl: bool = True + use_starttls: bool = False + username: Optional[str] = None + password: Optional[str] = None + timeout_seconds: int = 30 + + +class ImapError(RuntimeError): + pass + + +class ImapClient: + def __init__(self, config: ImapConfig) -> None: + self.config = config + self._conn: Optional[imaplib.IMAP4] = None + + # ----- Connection management ----- + def connect(self) -> None: + try: + socket.setdefaulttimeout(self.config.timeout_seconds) + if self.config.use_ssl: + self._conn = imaplib.IMAP4_SSL(self.config.host, self.config.port) + else: + self._conn = imaplib.IMAP4(self.config.host, self.config.port) + if self.config.use_starttls: + self._conn.starttls(ssl_context=ssl.create_default_context()) + + if self.config.username: + self.login(self.config.username, self.config.password or "") + except (imaplib.IMAP4.error, socket.timeout, OSError) as exc: + raise ImapError(f"Failed to connect/login: {exc}") from exc + + def login(self, username: str, password: str) -> None: + self._ensure_conn() + try: + assert self._conn is not None + typ, _ = self._conn.login(username, password) + if typ != "OK": + raise ImapError("Login failed") + except imaplib.IMAP4.error as exc: + raise ImapError(f"Login failed: {exc}") from exc + + def logout(self) -> None: + if self._conn is not None: + with contextlib.suppress(Exception): + self._conn.logout() + self._conn = None + + def _ensure_conn(self) -> None: + if self._conn is None: + raise ImapError("Not connected") + + # ----- Mailboxes ----- + def list_mailboxes(self) -> List[str]: + self._ensure_conn() + assert self._conn is not None + typ, data = self._conn.list() + if typ != "OK": + raise ImapError("LIST failed") + mailboxes: List[str] = [] + for line in data or []: + if not line: + continue + decoded = line.decode(errors="ignore") + # Format: (* FLAGS) "/" "INBOX" + m = re.search(r"\"([^\"]+)\"\s*$", decoded) + if m: + mailboxes.append(m.group(1)) + else: + # Fallback to last token + mailboxes.append(decoded.split()[-1].strip('"')) + return mailboxes + + def ensure_selected(self, mailbox: str) -> Tuple[str, List[bytes]]: + self._ensure_conn() + assert self._conn is not None + typ, data = self._conn.select(mailbox, readonly=False) + if typ != "OK": + raise ImapError(f"SELECT {mailbox} failed") + return typ, data + + # ----- Search / List ----- + def search(self, mailbox: str, criteria: Sequence[str]) -> List[int]: + self.ensure_selected(mailbox) + assert self._conn is not None + typ, data = self._conn.search(None, *criteria) + if typ != "OK": + raise ImapError(f"SEARCH failed: {' '.join(criteria)}") + if not data or not data[0]: + return [] + uids = [int(x) for x in data[0].split()] + return uids + + def list_messages( + self, + mailbox: str, + limit: Optional[int] = None, + unseen: bool = False, + since: Optional[str] = None, + before: Optional[str] = None, + ) -> List[int]: + criteria: List[str] = ["UID", "1:*"] + if unseen: + criteria.append("UNSEEN") + if since: + criteria.extend(["SINCE", since]) + if before: + criteria.extend(["BEFORE", before]) + uids = self.search(mailbox, criteria) + uids.sort(reverse=True) + if limit is not None: + uids = uids[:limit] + uids.sort() + return uids + + # ----- Fetch ----- + def fetch_headers(self, mailbox: str, uid: int) -> str: + self.ensure_selected(mailbox) + assert self._conn is not None + typ, data = self._conn.uid("FETCH", str(uid), "(BODY.PEEK[HEADER])") + if typ != "OK" or not data or not isinstance(data[0], tuple): + raise ImapError("FETCH headers failed") + raw = data[0][1] + msg = email.message_from_bytes(raw, policy=email.policy.default) + return msg.as_string() + + def fetch_body_text(self, mailbox: str, uid: int) -> str: + self.ensure_selected(mailbox) + assert self._conn is not None + typ, data = self._conn.uid("FETCH", str(uid), "(BODY.PEEK[])") + if typ != "OK" or not data or not isinstance(data[0], tuple): + raise ImapError("FETCH body failed") + raw = data[0][1] + msg = email.message_from_bytes(raw, policy=email.policy.default) + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + disposition = part.get_content_disposition() + if disposition == "attachment": + continue + if content_type == "text/plain": + return part.get_content() + # Fallback to first non-attachment part + for part in msg.walk(): + if part.get_content_disposition() == "attachment": + continue + try: + return part.get_content() + except Exception: + continue + return "" + else: + return msg.get_content() + + # ----- Flags / Move / Delete ----- + def add_flags(self, mailbox: str, uids: Iterable[int], flags: Sequence[str]) -> None: + self.ensure_selected(mailbox) + assert self._conn is not None + uid_set = ",".join(str(u) for u in uids) + flag_list = "(" + " ".join(flags) + ")" + typ, _ = self._conn.uid("STORE", uid_set, "+FLAGS.SILENT", flag_list) + if typ != "OK": + raise ImapError("ADD flags failed") + + def remove_flags(self, mailbox: str, uids: Iterable[int], flags: Sequence[str]) -> None: + self.ensure_selected(mailbox) + assert self._conn is not None + uid_set = ",".join(str(u) for u in uids) + flag_list = "(" + " ".join(flags) + ")" + typ, _ = self._conn.uid("STORE", uid_set, "-FLAGS.SILENT", flag_list) + if typ != "OK": + raise ImapError("REMOVE flags failed") + + def move(self, mailbox: str, uids: Iterable[int], destination_mailbox: str) -> None: + self.ensure_selected(mailbox) + assert self._conn is not None + uid_set = ",".join(str(u) for u in uids) + # Try MOVE extension + typ, _ = self._conn.uid("MOVE", uid_set, destination_mailbox) + if typ == "OK": + return + # Fallback: COPY + delete + expunge + typ, _ = self._conn.uid("COPY", uid_set, destination_mailbox) + if typ != "OK": + raise ImapError("COPY failed during move") + self.add_flags(mailbox, [int(u) for u in uid_set.split(",")], ["\\Deleted"]) + self.expunge(mailbox) + + def delete(self, mailbox: str, uids: Iterable[int]) -> None: + self.ensure_selected(mailbox) + self.add_flags(mailbox, uids, ["\\Deleted"]) + + def expunge(self, mailbox: str) -> None: + self.ensure_selected(mailbox) + assert self._conn is not None + typ, _ = self._conn.expunge() + if typ != "OK": + raise ImapError("EXPUNGE failed") + + +def load_config_from_env(overrides: Optional[dict] = None) -> ImapConfig: + cfg = ImapConfig( + host=os.environ.get("IMAP_HOST", ""), + port=int(os.environ.get("IMAP_PORT", "993")), + use_ssl=os.environ.get("IMAP_SSL", "1") == "1", + use_starttls=os.environ.get("IMAP_STARTTLS", "0") == "1", + username=os.environ.get("IMAP_USER"), + password=os.environ.get("IMAP_PASSWORD"), + timeout_seconds=int(os.environ.get("IMAP_TIMEOUT", "30")), + ) + if overrides: + for key, value in overrides.items(): + if hasattr(cfg, key): + setattr(cfg, key, value) + return cfg + diff --git a/imap_client/pyproject.toml b/imap_client/pyproject.toml new file mode 100644 index 0000000..4a19004 --- /dev/null +++ b/imap_client/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "imap-client-cli" +version = "0.1.0" +description = "Simple IMAP mail client CLI using Python stdlib" +readme = "README.md" +requires-python = ">=3.9" +authors = [ + { name = "Your Name" } +] +dependencies = [] + +[project.scripts] +imap-client = "imap_client.__main__:main" + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" +