diff --git a/README.md b/README.md new file mode 100644 index 0000000..d9bf464 --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# IMAP 邮件客户端 + +一个基于 Python 标准库的 IMAP 邮件客户端,提供命令行接口: + +- 列出邮箱(mailboxes) +- 列出邮件(按邮箱、查询条件) +- 显示邮件头/正文 +- 下载附件 +- 标记已读/未读、加星/去星 + +## 安装 + +Python 3.9+。无需额外依赖。 + +## 使用 + +```bash +python -m imap_client --help +``` + +常用示例: + +```bash +# 登录并列出邮箱 +python -m imap_client --host imap.example.com --user alice list-mailboxes + +# 列出 INBOX 最近 20 封 +python -m imap_client --host imap.example.com --user alice list --mailbox INBOX --limit 20 + +# 显示某封邮件 +python -m imap_client --host imap.example.com --user alice show --mailbox INBOX --uid 123 + +# 下载附件到当前目录 +python -m imap_client --host imap.example.com --user alice download --mailbox INBOX --uid 123 --out . + +# 标记已读 +python -m imap_client --host imap.example.com --user alice mark --mailbox INBOX --uid 123 --seen true +``` + +可以通过环境变量传递密码:`IMAP_PASSWORD`。也支持交互式提示输入密码。 + +## 安全性 + +- 优先使用 IMAPS (SSL/TLS) 993 端口;如需明文 143,可设置 `--port 143 --starttls`。 +- 密码仅用于会话;不写入磁盘。 + +## 许可证 + +MIT \ No newline at end of file diff --git a/imap_client/__init__.py b/imap_client/__init__.py new file mode 100644 index 0000000..76f3cd9 --- /dev/null +++ b/imap_client/__init__.py @@ -0,0 +1,5 @@ +__all__ = ["IMAPClient", "__version__"] + +__version__ = "0.1.0" + +from .client import IMAPClient # noqa: E402 \ No newline at end of file diff --git a/imap_client/__main__.py b/imap_client/__main__.py new file mode 100644 index 0000000..f5e7aa8 --- /dev/null +++ b/imap_client/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/imap_client/__pycache__/__init__.cpython-313.pyc b/imap_client/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..f1d8bef Binary files /dev/null and b/imap_client/__pycache__/__init__.cpython-313.pyc differ diff --git a/imap_client/__pycache__/__main__.cpython-313.pyc b/imap_client/__pycache__/__main__.cpython-313.pyc new file mode 100644 index 0000000..d592a1c Binary files /dev/null and b/imap_client/__pycache__/__main__.cpython-313.pyc differ diff --git a/imap_client/__pycache__/cli.cpython-313.pyc b/imap_client/__pycache__/cli.cpython-313.pyc new file mode 100644 index 0000000..c421f9d Binary files /dev/null and b/imap_client/__pycache__/cli.cpython-313.pyc differ diff --git a/imap_client/__pycache__/client.cpython-313.pyc b/imap_client/__pycache__/client.cpython-313.pyc new file mode 100644 index 0000000..f41690b Binary files /dev/null and b/imap_client/__pycache__/client.cpython-313.pyc differ diff --git a/imap_client/cli.py b/imap_client/cli.py new file mode 100644 index 0000000..732426d --- /dev/null +++ b/imap_client/cli.py @@ -0,0 +1,202 @@ +import argparse +import getpass +import os +import sys +from typing import List, Optional + +from .client import IMAPClient + + +def positive_int(value: str) -> int: + try: + ivalue = int(value) + except Exception: + raise argparse.ArgumentTypeError("must be an integer") + if ivalue <= 0: + raise argparse.ArgumentTypeError("must be > 0") + return ivalue + + +def add_common_args(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--host", required=True, help="IMAP server host") + parser.add_argument("--port", type=int, default=993, help="IMAP server port (default 993)") + parser.add_argument("--user", required=True, help="Username") + parser.add_argument("--password", help="Password (use IMAP_PASSWORD env or prompt if omitted)") + parser.add_argument("--no-ssl", action="store_true", help="Disable SSL (use IMAP on 143)") + parser.add_argument("--starttls", action="store_true", help="Use STARTTLS after connecting (when --no-ssl)") + parser.add_argument("--timeout", type=int, default=None, help="Socket timeout seconds") + parser.add_argument("--debug", action="store_true", help="Enable imaplib debug output") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="imap_client", + description="Simple IMAP client CLI", + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + # list-mailboxes + p_list_mbx = subparsers.add_parser("list-mailboxes", help="List mailboxes") + add_common_args(p_list_mbx) + + # list + p_list = subparsers.add_parser("list", help="List messages in a mailbox") + add_common_args(p_list) + p_list.add_argument("--mailbox", default="INBOX", help="Mailbox name (default INBOX)") + p_list.add_argument("--criteria", default="ALL", help="IMAP SEARCH criteria (default ALL)") + p_list.add_argument("--limit", type=positive_int, default=50, help="Max messages to show (default 50)") + + # show + p_show = subparsers.add_parser("show", help="Show one message headers") + add_common_args(p_show) + p_show.add_argument("--mailbox", default="INBOX") + p_show.add_argument("--uid", type=int, required=True, help="Message UID") + + # download + p_dl = subparsers.add_parser("download", help="Download attachments") + add_common_args(p_dl) + p_dl.add_argument("--mailbox", default="INBOX") + p_dl.add_argument("--uid", type=int, required=True) + p_dl.add_argument("--out", default=".", help="Destination directory") + p_dl.add_argument("--name-contains", default=None, help="Only save attachments whose name contains substring") + + # mark + p_mark = subparsers.add_parser("mark", help="Set flags on a message") + add_common_args(p_mark) + p_mark.add_argument("--mailbox", default="INBOX") + p_mark.add_argument("--uid", type=int, required=True) + p_mark.add_argument("--seen", choices=["true", "false"], help="Mark seen/unseen") + p_mark.add_argument("--flagged", choices=["true", "false"], help="Mark flagged/unflagged") + + return parser + + +def get_password(args_password: Optional[str]) -> str: + if args_password: + return args_password + env = os.getenv("IMAP_PASSWORD") + if env: + return env + return getpass.getpass("IMAP password: ") + + +def make_client(args: argparse.Namespace) -> IMAPClient: + password = get_password(args.password) + return IMAPClient( + host=args.host, + port=args.port, + username=args.user, + password=password, + use_ssl=not args.no_ssl, + starttls=args.starttls, + timeout=args.timeout, + debug=args.debug, + ) + + +def cmd_list_mailboxes(args: argparse.Namespace) -> int: + with make_client(args) as client: + names = client.list_mailboxes() + for name in names: + print(name) + return 0 + + +def cmd_list(args: argparse.Namespace) -> int: + with make_client(args) as client: + uids = client.search_uids(args.mailbox, args.criteria) + if not uids: + return 0 + # Show latest first + uids_sorted = sorted(uids, reverse=True)[: args.limit] + items = client.fetch_overview(args.mailbox, uids_sorted) + # Sort by the same order as uids_sorted + order = {uid: i for i, uid in enumerate(uids_sorted)} + items.sort(key=lambda x: order.get(x.get("uid", 0), 0)) + for it in items: + uid = it.get("uid") + flags = " ".join(it.get("flags", [])) + subj = it.get("subject", "") + frm = it.get("from", "") + date = it.get("date", "") + print(f"{uid}\t{date}\t{frm}\t{subj}\t{flags}") + return 0 + + +def cmd_show(args: argparse.Namespace) -> int: + from email import policy + from email.parser import BytesParser + + with make_client(args) as client: + msg = client.fetch_message(args.mailbox, args.uid) + # Headers summary + print(f"UID: {args.uid}") + for key in ["From", "To", "Cc", "Date", "Subject"]: + val = msg.get(key) + if val: + print(f"{key}: {val}") + # Show text/plain part (if any) + text = None + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + text = part.get_payload(decode=True) + charset = part.get_content_charset() or "utf-8" + try: + print() + print(text.decode(charset, errors="replace")) + except Exception: + print() + print(text.decode("utf-8", errors="replace")) + break + else: + if msg.get_content_type() == "text/plain": + text = msg.get_payload(decode=True) + charset = msg.get_content_charset() or "utf-8" + print() + print(text.decode(charset, errors="replace")) + return 0 + + +def cmd_download(args: argparse.Namespace) -> int: + with make_client(args) as client: + saved = client.download_attachments(args.mailbox, args.uid, args.out, args.name_contains) + for path in saved: + print(path) + return 0 + + +def cmd_mark(args: argparse.Namespace) -> int: + seen = None if args.seen is None else (args.seen == "true") + flagged = None if args.flagged is None else (args.flagged == "true") + with make_client(args) as client: + client.set_flags(args.mailbox, args.uid, seen=seen, flagged=flagged) + return 0 + + +def main(argv: Optional[List[str]] = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + if args.command == "list-mailboxes": + return cmd_list_mailboxes(args) + if args.command == "list": + return cmd_list(args) + if args.command == "show": + return cmd_show(args) + if args.command == "download": + return cmd_download(args) + if args.command == "mark": + return cmd_mark(args) + parser.error("unknown command") + return 2 + except KeyboardInterrupt: + print("Interrupted", file=sys.stderr) + return 130 + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/imap_client/client.py b/imap_client/client.py new file mode 100644 index 0000000..9d7fb17 --- /dev/null +++ b/imap_client/client.py @@ -0,0 +1,263 @@ +import imaplib +import ssl +import os +import re +from typing import Iterable, List, Optional, Tuple, Dict, Any +from email import message_from_bytes +from email.header import decode_header +from email.message import Message + + +def decode_mime_words(value: Optional[str]) -> str: + if not value: + return "" + decoded_parts: List[str] = [] + for bytes_or_str, encoding in decode_header(value): + if isinstance(bytes_or_str, bytes): + try: + decoded_parts.append(bytes_or_str.decode(encoding or "utf-8", errors="replace")) + except LookupError: + decoded_parts.append(bytes_or_str.decode("utf-8", errors="replace")) + else: + decoded_parts.append(bytes_or_str) + return "".join(decoded_parts) + + +def sanitize_filename(filename: str) -> str: + # Remove directory separators and control chars + filename = re.sub(r"[\\/\0\r\n\t]", "_", filename) + filename = filename.strip() or "attachment.bin" + return filename + + +class IMAPClient: + """Thin wrapper around imaplib for common IMAP operations.""" + + def __init__( + self, + host: str, + port: int, + username: str, + password: str, + use_ssl: bool = True, + starttls: bool = False, + timeout: Optional[int] = None, + debug: bool = False, + ) -> None: + self.host = host + self.port = port + self.username = username + self.password = password + self.use_ssl = use_ssl + self.starttls = starttls + self.timeout = timeout + self.debug = debug + self._conn: Optional[imaplib.IMAP4] = None + + # Context manager support + def __enter__(self) -> "IMAPClient": + return self.connect() + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + # Connection management + def connect(self) -> "IMAPClient": + if self._conn is not None: + return self + if self.use_ssl: + context = ssl.create_default_context() + conn = imaplib.IMAP4_SSL(self.host, self.port, ssl_context=context, timeout=self.timeout) + else: + conn = imaplib.IMAP4(self.host, self.port, timeout=self.timeout) + if self.starttls: + context = ssl.create_default_context() + conn.starttls(ssl_context=context) + if self.debug: + conn.debug = 4 + conn.login(self.username, self.password) + self._conn = conn + return self + + def close(self) -> None: + if self._conn is None: + return + try: + try: + self._conn.close() + except Exception: + # Might already be closed or no mailbox selected + pass + self._conn.logout() + finally: + self._conn = None + + # Helpers + def _ensure_conn(self) -> imaplib.IMAP4: + if self._conn is None: + raise RuntimeError("Not connected. Call connect() first.") + return self._conn + + def select_mailbox(self, mailbox: str, readonly: bool = True) -> int: + conn = self._ensure_conn() + status, data = conn.select(mailbox, readonly=readonly) + if status != "OK": + raise RuntimeError(f"Failed to select mailbox {mailbox}: {status}") + try: + return int(data[0]) + except Exception: + return 0 + + def list_mailboxes(self) -> List[str]: + conn = self._ensure_conn() + status, data = conn.list() + if status != "OK": + raise RuntimeError("Failed to list mailboxes") + names: List[str] = [] + if data is None: + return names + for raw in data: + if not raw: + continue + # raw example: b'(\\HasNoChildren) "/" "INBOX"' + line = raw.decode("utf-8", errors="replace") + # Extract last quoted string + match = re.search(r'"((?:\\.|[^"\\])*)"\s*$', line) + if match: + mailbox = match.group(1).encode("utf-8").decode("unicode_escape") + names.append(mailbox) + else: + # Fallback to last token + parts = line.split(" ") + names.append(parts[-1].strip('"')) + return names + + def search_uids(self, mailbox: str, criteria: str = "ALL") -> List[int]: + conn = self._ensure_conn() + self.select_mailbox(mailbox, readonly=True) + status, data = conn.uid("search", None, criteria) + if status != "OK" or not data: + return [] + ids_str = data[0].decode("utf-8", errors="replace").strip() + if not ids_str: + return [] + return [int(x) for x in ids_str.split()] + + def fetch_overview(self, mailbox: str, uids: Iterable[int]) -> List[Dict[str, Any]]: + uids_list = list(uids) + if not uids_list: + return [] + conn = self._ensure_conn() + self.select_mailbox(mailbox, readonly=True) + set_str = ",".join(str(u) for u in uids_list) + status, data = conn.uid( + "fetch", + set_str, + "(FLAGS BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE MESSAGE-ID)])", + ) + if status != "OK" or not data: + return [] + # Parse alternating tuples in data + results: List[Dict[str, Any]] = [] + current: Dict[str, Any] = {} + for item in data: + if item is None: + continue + if isinstance(item, tuple) and len(item) == 2: + meta_bytes, payload = item + meta = meta_bytes.decode("utf-8", errors="replace") + # Extract UID + m = re.search(r"UID (\d+)", meta) + if m: + uid = int(m.group(1)) + current = {"uid": uid, "flags": [], "subject": "", "from": "", "date": ""} + # Parse header + msg = message_from_bytes(payload) + current["subject"] = decode_mime_words(msg.get("Subject")) + current["from"] = decode_mime_words(msg.get("From")) + current["date"] = msg.get("Date", "") + results.append(current) + else: + # FLAGS-only or unexpected chunk + if "FLAGS" in meta and results: + flags_match = re.search(r"FLAGS \(([^)]*)\)", meta) + if flags_match: + flags_raw = flags_match.group(1).strip() + results[-1]["flags"] = flags_raw.split() if flags_raw else [] + elif isinstance(item, bytes): + # Some servers interleave FLAGS bytes lines. Attach to last. + meta = item.decode("utf-8", errors="replace") + if "FLAGS" in meta and results: + flags_match = re.search(r"FLAGS \(([^)]*)\)", meta) + if flags_match: + flags_raw = flags_match.group(1).strip() + results[-1]["flags"] = flags_raw.split() if flags_raw else [] + return results + + def fetch_message(self, mailbox: str, uid: int) -> Message: + conn = self._ensure_conn() + self.select_mailbox(mailbox, readonly=True) + status, data = conn.uid("fetch", str(uid), "(RFC822)") + if status != "OK" or not data or not isinstance(data[0], tuple): + raise RuntimeError(f"Failed to fetch message UID {uid}") + raw = data[0][1] + return message_from_bytes(raw) + + def download_attachments( + self, mailbox: str, uid: int, dest_dir: str, only_substr: Optional[str] = None + ) -> List[str]: + os.makedirs(dest_dir, exist_ok=True) + msg = self.fetch_message(mailbox, uid) + saved_paths: List[str] = [] + for part in msg.walk(): + content_disposition = part.get_content_disposition() + if content_disposition not in ("attachment", "inline"): + continue + filename = part.get_filename() + if not filename: + # Derive from content type + maintype, subtype = (part.get_content_type() or "application/octet-stream").split("/", 1) + filename = f"attachment.{subtype}" + filename = decode_mime_words(filename) + filename = sanitize_filename(filename) + if only_substr and only_substr.lower() not in filename.lower(): + continue + payload = part.get_payload(decode=True) + if payload is None: + continue + full_path = os.path.join(dest_dir, filename) + # Avoid overwrite by appending (n) + base, ext = os.path.splitext(full_path) + counter = 1 + candidate = full_path + while os.path.exists(candidate): + candidate = f"{base} ({counter}){ext}" + counter += 1 + with open(candidate, "wb") as f: + f.write(payload) + saved_paths.append(candidate) + return saved_paths + + def set_flags( + self, + mailbox: str, + uid: int, + seen: Optional[bool] = None, + flagged: Optional[bool] = None, + ) -> None: + if seen is None and flagged is None: + return + conn = self._ensure_conn() + self.select_mailbox(mailbox, readonly=False) + if seen is not None: + flag = r"(\\Seen)" + if seen: + conn.uid("store", str(uid), "+FLAGS.SILENT", flag) + else: + conn.uid("store", str(uid), "-FLAGS.SILENT", flag) + if flagged is not None: + flag = r"(\\Flagged)" + if flagged: + conn.uid("store", str(uid), "+FLAGS.SILENT", flag) + else: + conn.uid("store", str(uid), "-FLAGS.SILENT", flag) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c05be6f --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +# No external dependencies required. Uses Python standard library only. \ No newline at end of file