diff --git a/README.rst b/README.rst index 5f823eb4..3cc94841 100644 --- a/README.rst +++ b/README.rst @@ -29,29 +29,25 @@ Introduction ============ -``telnetlib3`` is a feature-rich Telnet Server and Client Protocol library +``telnetlib3`` is a feature-rich Telnet Server, Client, and Protocol library for Python 3.9 and newer. This library supports both modern asyncio_ *and* legacy `Blocking API`_. The python telnetlib.py_ module removed by Python 3.13 is also re-distributed as-is, as a backport. -telnetlib3 provides multiple interfaces for working with the Telnet protocol: +See the `Guidebook`_ for examples and the `API documentation`_. Asyncio Protocol ---------------- -Modern async/await interface for both client and server, supporting concurrent -connections. See the `Guidebook`_ for examples and the `API documentation`_. +The core protocol and CLI utilities are written using an `Asyncio Interface`_. Blocking API ------------ -A traditional synchronous interface modeled after telnetlib.py_ (client) and miniboa_ (server), -with various enhancements in protocol negotiation is provided. Blocking API calls for complex -arrangements of clients and servers typically require threads. - -See `sync API documentation`_ for more. +A Synchronous interface, modeled after telnetlib.py_ (client) and miniboa_ (server), with various +enhancements in protocol negotiation is also provided. See `sync API documentation`_ for more. Command-line Utilities ---------------------- @@ -66,16 +62,38 @@ program. :: + # utf8 roguelike server telnetlib3-client nethack.alt.org + # utf8 bbs telnetlib3-client xibalba.l33t.codes 44510 + # automatic communication with telnet server telnetlib3-client --shell bin.client_wargame.shell 1984.ws 666 + # run a server with default shell + telnetlib3-server + # or custom port and ip and shell telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell - telnetlib3-server --pty-exec /bin/bash -- --login + # run an external program with a pseudo-terminal + telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login + # or a simple linemode program, bc (calculator) + telnetlib3-server --pty-exec /bin/bc + + +There are also fingerprinting CLIs, ``telnetlib3-fingerprint`` and +``telnetlib3-fingerprint-server`` + +:: + + # host a server, wait for clients to connect and fingerprint them, + telnetlib3-fingerprint-server + + # report fingerprint of telnet server on 1984.ws + telnetlib3-fingerprint 1984.ws + Legacy telnetlib ---------------- -This library contains an unadulterated copy of Python 3.12's telnetlib.py_, +This library contains an *unadulterated copy* of Python 3.12's telnetlib.py_, from the standard library before it was removed in Python 3.13. To migrate code, change import statements: @@ -206,6 +224,8 @@ The following RFC specifications are implemented: .. _rfc-2066: https://www.rfc-editor.org/rfc/rfc2066.txt .. _`bin/`: https://github.com/jquast/telnetlib3/tree/master/bin .. _telnetlib.py: https://docs.python.org/3.12/library/telnetlib.html +.. _Asyncio Interface: https://telnetlib3.readthedocs.io/en/latest/guidebook.html#asyncio-interface +.. _Blocking API: https://telnetlib3.readthedocs.io/en/latest/guidebook.html#blocking-interface .. _Guidebook: https://telnetlib3.readthedocs.io/en/latest/guidebook.html .. _API documentation: https://telnetlib3.readthedocs.io/en/latest/api.html .. _sync API documentation: https://telnetlib3.readthedocs.io/en/latest/api/sync.html diff --git a/bin/client_wargame.py b/bin/client_wargame.py index 21608f6f..304cd6ca 100755 --- a/bin/client_wargame.py +++ b/bin/client_wargame.py @@ -30,7 +30,7 @@ async def shell(reader, writer): break if "?" in outp: # Reply to all questions with 'y' - writer.write("y") + writer.write("y\r\n") # Display all server output print(outp, flush=True, end="") diff --git a/bin/moderate_fingerprints.py b/bin/moderate_fingerprints.py index 06729cc7..5abe01b2 100755 --- a/bin/moderate_fingerprints.py +++ b/bin/moderate_fingerprints.py @@ -7,21 +7,33 @@ import sys import json import shutil +import signal +import socket +import argparse import subprocess import collections from pathlib import Path +try: + # 3rd party + from wcwidth import iter_sequences, strip_sequences + + _HAS_WCWIDTH = True +except ImportError: + _HAS_WCWIDTH = False + _BAT = shutil.which("bat") or shutil.which("batcat") _JQ = shutil.which("jq") _UNKNOWN = "0" * 16 _PROBES = { "telnet-probe": ("telnet-client", "telnet-client-revision"), "terminal-probe": ("terminal-emulator", "terminal-emulator-revision"), + "server-probe": ("telnet-server", "telnet-server-revision"), } def _iter_files(data_dir): - """Yield (path, data) for each client JSON file.""" + """Yield (path, data) for each fingerprint JSON file.""" client_base = data_dir / "client" if client_base.is_dir(): for path in sorted(client_base.glob("*/*/*.json")): @@ -30,6 +42,14 @@ def _iter_files(data_dir): yield path, json.load(f) except (OSError, json.JSONDecodeError): continue + server_base = data_dir / "server" + if server_base.is_dir(): + for path in sorted(server_base.glob("*/*.json")): + try: + with open(path, encoding="utf-8") as f: + yield path, json.load(f) + except (OSError, json.JSONDecodeError): + continue def _print_json(label, data): @@ -79,6 +99,85 @@ def _print_terminal_context(session_data): print(f" ambiguous_width: {aw}") +def _resolve_dns(host, timeout=5): + """Resolve forward and reverse DNS for *host*, with timeout.""" + forward = [] + reverse = [] + + def _alarm_handler(signum, frame): + raise TimeoutError + + old_handler = signal.signal(signal.SIGALRM, _alarm_handler) + try: + signal.alarm(timeout) + try: + infos = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM) + forward = sorted({info[4][0] for info in infos}) + except (socket.gaierror, TimeoutError): + pass + for addr in forward: + try: + hostname, _, _ = socket.gethostbyaddr(addr) + reverse.append(hostname) + except (socket.herror, socket.gaierror, TimeoutError): + continue + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + return forward, sorted(set(reverse)) + + +def _format_banner(banner_data): + """Return (clean_text, raw_display) from a banner dict.""" + text = banner_data.get("text", "") + raw_hex = banner_data.get("raw_hex", "") + if _HAS_WCWIDTH and text: + clean = strip_sequences(text) + else: + clean = text + if _HAS_WCWIDTH and text: + parts = [] + for seq in iter_sequences(text): + parts.append(repr(seq)) + raw_display = " ".join(parts) + else: + raw_display = raw_hex + return clean, raw_display + + +def _print_server_context(session_data): + """Print server fingerprint details for moderation context.""" + for banner_key, banner_label in ( + ("banner_before_return", "pre-return"), + ("banner_after_return", "post-return"), + ): + banner = session_data.get(banner_key, {}) + if not banner: + continue + clean, raw_display = _format_banner(banner) + if clean: + print(f" banner ({banner_label}, clean):") + for line in clean.splitlines(): + print(f" {line}") + print() + if raw_display: + print(f" banner ({banner_label}, raw):") + for i in range(0, len(raw_display), 76): + print(f" {raw_display[i:i + 76]}") + print() + + host = session_data.get("host", "") + port = session_data.get("port", "") + if host: + host_str = f"{host}:{port}" if port else host + print(f" host: {host_str}") + forward, reverse = _resolve_dns(host) + if forward: + print(f" forward DNS: {', '.join(forward)}") + if reverse: + print(f" reverse DNS: {', '.join(reverse)}") + + def _print_paired(paired_hashes, label, names): """Print paired fingerprint hashes with names when known.""" if not paired_hashes: @@ -131,10 +230,11 @@ def _scan(data_dir, names, revise=False): labels.setdefault(h, probe_key.split("-", maxsplit=1)[0]) fp_data.setdefault(h, data.get(probe_key, {}).get("fingerprint-data", {})) sessions.setdefault(h, data.get(probe_key, {}).get("session_data", {})) - other = "terminal-probe" if probe_key == "telnet-probe" else "telnet-probe" - other_h = data.get(other, {}).get("fingerprint") - if other_h and other_h != _UNKNOWN: - paired[h].add(other_h) + if probe_key in ("telnet-probe", "terminal-probe"): + other = "terminal-probe" if probe_key == "telnet-probe" else "telnet-probe" + other_h = data.get(other, {}).get("fingerprint") + if other_h and other_h != _UNKNOWN: + paired[h].add(other_h) look = rev_key if revise else sug_key if look in file_sug: suggestions[h].append(file_sug[look]) @@ -169,6 +269,8 @@ def _review(entries, names): _print_telnet_context(session_data) elif label == "terminal" and session_data: _print_terminal_context(session_data) + elif label == "server" and session_data: + _print_server_context(session_data) _print_paired(paired_hashes, label, names) default = "" @@ -203,9 +305,22 @@ def _review(entries, names): def _relocate(data_dir): """Move misplaced JSON files to match their internal fingerprint hashes.""" client_base = data_dir / "client" + server_base = data_dir / "server" moved = 0 stale = set() for path, data in _iter_files(data_dir): + sh = data.get("server-probe", {}).get("fingerprint") + if sh: + if path.parent.name == sh: + continue + target = server_base / sh / path.name + if target.exists(): + continue + target.parent.mkdir(parents=True, exist_ok=True) + os.rename(path, target) + moved += 1 + stale.add(path.parent) + continue th = data.get("telnet-probe", {}).get("fingerprint") tmh = data.get("terminal-probe", {}).get("fingerprint", _UNKNOWN) if not th: @@ -232,8 +347,11 @@ def _relocate(data_dir): def _prune(data_dir, names): """Remove named hashes that have no data files.""" hashes = set() - for path, _ in _iter_files(data_dir): - hashes.update({path.parent.parent.name, path.parent.name}) + for _path, data in _iter_files(data_dir): + for probe_key in _PROBES: + h = data.get(probe_key, {}).get("fingerprint") + if h and h != _UNKNOWN: + hashes.add(h) orphaned = {h: n for h, n in names.items() if h not in hashes} if not orphaned: return False @@ -253,27 +371,49 @@ def _prune(data_dir, names): return True +def _get_argument_parser(): + """Build argument parser for ``moderate_fingerprints`` CLI.""" + parser = argparse.ArgumentParser( + description="Moderate fingerprint name suggestions", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--data-dir", + default=os.environ.get("TELNETLIB3_DATA_DIR"), + help="directory for fingerprint data (default: $TELNETLIB3_DATA_DIR)", + ) + parser.add_argument( + "--check-revise", action="store_true", help="review already-named fingerprints for revision" + ) + parser.add_argument( + "--no-prune", + action="store_true", + help="skip pruning orphaned hashes from fingerprint_names.json", + ) + return parser + + def main(): """CLI entry point for moderating fingerprint name suggestions.""" - data_dir_env = os.environ.get("TELNETLIB3_DATA_DIR") - if not data_dir_env: - print("Error: TELNETLIB3_DATA_DIR not set", file=sys.stderr) + args = _get_argument_parser().parse_args() + + if not args.data_dir: + print("Error: --data-dir or $TELNETLIB3_DATA_DIR required", file=sys.stderr) sys.exit(1) - data_dir = Path(data_dir_env) + data_dir = Path(args.data_dir) if not data_dir.exists(): print(f"Error: {data_dir} does not exist", file=sys.stderr) sys.exit(1) - revise = "--check-revise" in sys.argv relocated = _relocate(data_dir) if relocated: print(f"Relocated {relocated} file(s).\n") names = _load_names(data_dir) - if "--no-prune" not in sys.argv and _prune(data_dir, names): + if not args.no_prune and _prune(data_dir, names): _save_names(data_dir, names) - entries = _scan(data_dir, names, revise) + entries = _scan(data_dir, names, args.check_revise) if entries and _review(entries, names): _save_names(data_dir, names) elif not entries: diff --git a/docs/guidebook.rst b/docs/guidebook.rst index 8af044fe..f222c739 100644 --- a/docs/guidebook.rst +++ b/docs/guidebook.rst @@ -412,7 +412,6 @@ Key differences from miniboa: conn.send("Hello!\r\n") # OK -- also sends \r\n on the wire conn.write("Hello!\r\n") # OK -- write() sends as-is - Advanced Negotiation -------------------- @@ -443,7 +442,6 @@ The public telnetlib3 demonstration Fingerprinting Server is:: telnet 1984.ws 555 - The fingerprinting shell (:func:`telnetlib3.fingerprinting.fingerprinting_server_shell`) probes each connecting client's telnet capabilities, terminal emulator features, and unicode @@ -497,6 +495,40 @@ reviewing client-submitted name suggestions and assigning names to hashes:: python bin/moderate_fingerprints.py +Fingerprinting Client +===================== + +The ``telnetlib3-fingerprint`` CLI connects to a remote telnet server, +probes its supported telnet options, captures the login banner, and saves a +structured JSON fingerprint. This is the reverse of the fingerprinting +server -- it fingerprints *servers* instead of clients. + +Running +------- + +:: + + telnetlib3-fingerprint example.com 23 + +Options: + +- ``--data-dir `` -- directory for fingerprint data + (default: ``$TELNETLIB3_DATA_DIR``). +- ``--save-json `` -- write the JSON result to a specific file instead + of ``/server//``. +- ``--connect-timeout `` -- TCP connection timeout (default 10). +- ``--silent`` -- suppress fingerprint output to stdout. + +The fingerprint JSON records which options the server offered (WILL) and +requested (DO), which it refused, the pre-login banner text, and optional +DNS resolution results. Files are stored under:: + + /server//.json + +The ``bin/moderate_fingerprints.py`` script handles both client and server +fingerprints. + + MUD Server ========== diff --git a/docs/history.rst b/docs/history.rst index 7339134f..06bd97e3 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,18 +1,38 @@ History ======= 2.3.0 *unreleased* - * new: ``mud`` module with encode/decode functions for GMCP (option 201), - MSDP (option 69), and MSSP (option 70) MUD telnet protocols. - * new: ``TelnetWriter.send_gmcp()``, ``send_msdp()``, and ``send_mssp()`` - methods for sending MUD protocol data, with corresponding - ``handle_gmcp()``, ``handle_msdp()``, and ``handle_mssp()`` callbacks. + * bugfix: repeat "socket.send() raised exception." exceptions + * bugfix: server incorrectly accepted ``DO TSPEED`` and ``DO SNDLOC`` + with ``WILL`` responses. These are client-only options per :rfc:`1079` + and :rfc:`779`; the server now correctly rejects them. + * bugfix: ``LINEMODE DO FORWARDMASK`` subnegotiation no longer raises + ``NotImplementedError``; the mask is accepted (logged only). + * bugfix: echo doubling in ``--pty-exec`` without ``--pty-raw`` (linemode). + * bugfix: missing LICENSE.txt in sdist file. + * new: :mod:`telnetlib3.mud` module with encode/decode functions for + GMCP (option 201), MSDP (option 69), and MSSP (option 70) MUD telnet + protocols. + * new: :meth:`~telnetlib3.stream_writer.TelnetWriter.send_gmcp`, + :meth:`~telnetlib3.stream_writer.TelnetWriter.send_msdp`, and + :meth:`~telnetlib3.stream_writer.TelnetWriter.send_mssp` methods for sending MUD protocol + data, with corresponding :meth:`~telnetlib3.stream_writer.TelnetWriter.handle_gmcp`, + :meth:`~telnetlib3.stream_writer.TelnetWriter.handle_msdp`, and + :meth:`~telnetlib3.stream_writer.TelnetWriter.handle_mssp` callbacks. * new: ``connect_timeout`` arguments for client and ``--connect-timeout`` Client CLI argument, :ghissue:`30`. - * bugfix: missing LICENSE.txt in sdist file. * new: ``telnetlib3-fingerprint-server`` CLI with extended ``NEW_ENVIRON`` - for client fingerprinting (uses ``FingerprintingServer`` protocol factory). - * note: fingerprint hashes for MUD clients detected via GMCP/MSDP - may change due to improved client classification. + for fingerprinting of connected clients. + * new: ``telnetlib3-fingerprint`` CLI for fingerprinting the given remote + server, probing telnet option support and capturing banners. + * enhancement: reversed ``WILL``/``DO`` for directional options (e.g. ``WILL + NAWS`` from server, ``DO TTYPE`` from client) now gracefully refused with + ``DONT``/``WONT`` instead of raising ``ValueError``. + * enhancement: ``NEW_ENVIRON SEND`` and response logging improved -- + ``SEND (all)`` / ``env send: (empty)`` instead of raw byte dumps. + * bugfix: GMCP, MSDP, and MSSP decoding now uses ``--encoding`` when set, + falling back to latin-1 for non-UTF-8 bytes instead of lossy replacement. + * enhancement: ``telnetlib3-fingerprint`` now probes MSDP and MSSP options + and captures MSSP server status data in session output. 2.2.0 * bugfix: workaround for Microsoft Telnet client crash on @@ -28,20 +48,26 @@ History reduced from 4.0s to 1.5s. * performance: both client and server protocol data_received methods have approximately ~50x throughput improvement in bulk data transfers. - * new: ``Server`` class returned by ``create_server()`` with - ``wait_for_client()`` method and ``clients`` property for tracking - connected clients. - * new: ``TelnetWriter.wait_for()`` and ``wait_for_condition()`` - methods for waiting on telnet option negotiation state. - * new: ``telnetlib3.sync`` module with blocking (non-asyncio) APIs: - ``TelnetConnection`` for clients, ``BlockingTelnetServer`` for servers. - * new: ``pty_shell`` module and demonstrating ``telnetlib3-server --pty-exec`` CLI argument - and related ``--pty-raw`` server CLI option for raw PTY mode, used by most - programs that handle their own terminal I/O. - * new: ``guard_shells`` module with ``--robot-check`` and ``--pty-fork-limit`` - CLI arguments for connection limiting and bot detection. - * new: ``fingerprinting`` module for telnet client identification and - capability probing. + * new: :class:`~telnetlib3.server.Server` class returned by + :func:`~telnetlib3.server.create_server` with + :meth:`~telnetlib3.server.Server.wait_for_client` method and + :attr:`~telnetlib3.server.Server.clients` property for tracking connected + clients. + * new: :meth:`~telnetlib3.stream_writer.TelnetWriter.wait_for` and + :meth:`~telnetlib3.stream_writer.TelnetWriter.wait_for_condition` methods for waiting on + telnet option negotiation state. + * new: :mod:`telnetlib3.sync` module with blocking (non-asyncio) APIs: + :class:`~telnetlib3.sync.TelnetConnection` for clients, + :class:`~telnetlib3.sync.BlockingTelnetServer` for servers. + * new: :mod:`~telnetlib3.server_pty_shell` module and demonstrating + ``telnetlib3-server --pty-exec`` CLI argument and related ``--pty-raw`` + server CLI option for raw PTY mode, used by most programs that handle their + own terminal I/O. + * new: :mod:`~telnetlib3.guard_shells` module with ``--robot-check`` and + ``--pty-fork-limit`` CLI arguments for connection limiting and bot + detection. + * new: :mod:`~telnetlib3.fingerprinting` module for telnet client + identification and capability probing. * new: ``--send-environ`` client CLI option to control which environment variables are sent via NEW_ENVIRON. Default no longer includes HOME or SHELL. @@ -69,13 +95,13 @@ History 2.0.5 * feature: legacy `telnetlib.py` from Python 3.11 now redistributed, note change to project `LICENSE.txt` file. - * feature: Add `TelnetReader.readuntil_pattern` :ghissue:`92` by + * feature: Add :meth:`~telnetlib3.stream_reader.TelnetReader.readuntil_pattern` :ghissue:`92` by :ghuser:`agicy` - * feature: Add `TelnetWriter.wait_closed` async method in response to - :ghissue:`82`. + * feature: Add :meth:`~telnetlib3.stream_writer.TelnetWriter.wait_closed` + async method in response to :ghissue:`82`. * bugfix: README Examples do not work :ghissue:`81` * bugfix: `TypeError: buf expected bytes, got ` on client timeout - in `TelnetServer`, :ghissue:`87` + in :class:`~telnetlib3.server.TelnetServer`, :ghissue:`87` * bugfix: Performance issues with client protocol under heavy load, demonstrating server `telnet://1984.ws` now documented in README. * bugfix: annoying `socket.send() raised exception` repeating warning, @@ -98,25 +124,28 @@ History * bugfix: "write after close" is disregarded, caused many errors logged in socket.send() * bugfix: in accessories.repr_mapping() about using shlex.quote on non-str, `TypeError: expected string or bytes-like object, got 'int'` - * bugfix: about fn_encoding using repr() on TelnetReaderUnicode + * bugfix: about fn_encoding using repr() on :class:`~telnetlib3.stream_reader.TelnetReaderUnicode` * bugfix: TelnetReader.is_closing() raises AttributeError - * deprecation: `TelnetReader.close` and `TelnetReader.connection_closed` emit - warning, use `at_eof()` and `feed_eof()` instead. - * deprecation: the ``loop`` argument are is no longer accepted by TelnetReader. + * deprecation: ``TelnetReader.close`` and ``TelnetReader.connection_closed`` + emit warning, use :meth:`~telnetlib3.stream_reader.TelnetReader.at_eof` and + :meth:`~telnetlib3.stream_reader.TelnetReader.feed_eof` instead. + * deprecation: the ``loop`` argument is no longer accepted by + :class:`~telnetlib3.stream_reader.TelnetReader`. * enhancement: Add Generic Mud Communication Protocol support :ghissue:`63` by :ghuser:`gtaylor`! - * change: TelnetReader and TelnetWriter no longer derive from - `asyncio.StreamReader` and `asyncio.StreamWriter`, this fixes some TypeError - in signatures and runtime + * change: :class:`~telnetlib3.stream_reader.TelnetReader` and + :class:`~telnetlib3.stream_writer.TelnetWriter` no longer derive + from :class:`asyncio.StreamReader` and :class:`asyncio.StreamWriter`, this + fixes some TypeError in signatures and runtime 2.0.0 * change: Support Python 3.9, 3.10, 3.11. Drop Python 3.6 and earlier, All code and examples have been updated to the new-style PEP-492 syntax. - * change: the ``loop``, ``event_loop``, and ``log`` arguments are no longer accepted to + * change: the ``loop``, ``event_loop``, and ``log`` arguments are no longer accepted by any class initializers. * note: This release has a known memory leak when using the ``_waiter_connected`` and ``_waiter_closed`` arguments to Client or Shell class initializers, please do - not use them, A replacement "wait_for_negotiation" awaitable is planned for a + not use them. A replacement "wait_for_negotiation" awaitable is planned for a future release. * enhancement: Add COM-PORT-OPTION subnegotiation support :ghissue:`57` by :ghuser:`albireox` diff --git a/docs/rfcs.rst b/docs/rfcs.rst index 10cb676c..22188fc5 100644 --- a/docs/rfcs.rst +++ b/docs/rfcs.rst @@ -60,6 +60,7 @@ RFCs Not Implemented * :rfc:`1043`, "Telnet Data Entry Terminal Option", Feb 1988 * :rfc:`1097`, "Telnet Subliminal-Message Option", Apr 1989 * :rfc:`1143`, "The Q Method of Implementing .. Option Negotiation", Feb 1990 + Approximately only Rules 1, 2, and 3, but not 4, 5, and 6. * :rfc:`1205`, "5250 Telnet Interface", Feb 1991 * :rfc:`1411`, "Telnet Authentication: Kerberos_ Version 4", Jan 1993 * :rfc:`1412`, "Telnet Authentication: SPX" diff --git a/pyproject.toml b/pyproject.toml index cd5261e2..124f2bed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ extras = [ telnetlib3-server = "telnetlib3.server:main" telnetlib3-client = "telnetlib3.client:main" telnetlib3-fingerprint-server = "telnetlib3.fingerprinting:fingerprint_server_main" +telnetlib3-fingerprint = "telnetlib3.client:fingerprint_main" [project.urls] Homepage = "https://github.com/jquast/telnetlib3" diff --git a/telnetlib3/__init__.py b/telnetlib3/__init__.py index ee88672b..4b530fd2 100644 --- a/telnetlib3/__init__.py +++ b/telnetlib3/__init__.py @@ -23,6 +23,7 @@ from . import telnetlib from . import guard_shells from . import fingerprinting +from . import server_fingerprinting if sys.platform != "win32": from . import fingerprinting_display # noqa: F401 from . import sync @@ -39,6 +40,7 @@ from .telnetlib import * # noqa from .guard_shells import * # noqa from .fingerprinting import * # noqa +from .server_fingerprinting import * # noqa if sys.platform != "win32": from .fingerprinting_display import * # noqa from .sync import * # noqa @@ -61,6 +63,7 @@ + server_shell.__all__ + guard_shells.__all__ + fingerprinting.__all__ + + server_fingerprinting.__all__ + (server_pty_shell.__all__ if PTY_SUPPORT else ()) # client, + client_base.__all__ diff --git a/telnetlib3/accessories.py b/telnetlib3/accessories.py index 532fbdc6..4de1db85 100644 --- a/telnetlib3/accessories.py +++ b/telnetlib3/accessories.py @@ -21,7 +21,6 @@ "repr_mapping", "function_lookup", "make_reader_task", - "PATIENCE_MESSAGES", ) PATIENCE_MESSAGES = [ diff --git a/telnetlib3/client.py b/telnetlib3/client.py index 966e17f9..66dbdc9f 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -10,6 +10,7 @@ import struct import asyncio import argparse +import functools from typing import Any, Dict, List, Tuple, Union, Callable, Optional, Sequence # local @@ -591,5 +592,158 @@ def main() -> None: asyncio.run(run_client()) +def _get_fingerprint_argument_parser() -> argparse.ArgumentParser: + """Build argument parser for ``telnetlib3-fingerprint`` CLI.""" + parser = argparse.ArgumentParser( + description="Fingerprint a remote telnet server", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("host", help="remote hostname or IP") + parser.add_argument("port", nargs="?", default=23, type=int, help="port number") + parser.add_argument( + "--data-dir", + default=None, + help="directory for fingerprint data (default: $TELNETLIB3_DATA_DIR)", + ) + parser.add_argument( + "--save-json", default=None, metavar="PATH", help="write fingerprint JSON to this path" + ) + parser.add_argument( + "--connect-timeout", default=10, type=float, help="TCP connection timeout in seconds" + ) + parser.add_argument("--loglevel", default="warn", help="log level") + # pylint: disable=protected-access + parser.add_argument("--logfmt", default=accessories._DEFAULT_LOGFMT, help="log format") + parser.add_argument("--logfile", default=None, help="filepath") + parser.add_argument( + "--silent", action="store_true", help="suppress fingerprint output to stdout" + ) + parser.add_argument( + "--set-name", + default=None, + metavar="NAME", + help="store this name for the fingerprint in fingerprint_names.json", + ) + parser.add_argument( + "--encoding", + default="ascii", + metavar="CODEC", + dest="stream_encoding", + help="character encoding of the remote server (e.g. cp037 for EBCDIC)", + ) + parser.add_argument( + "--ttype", default="VT100", help="terminal type sent in response to TTYPE requests" + ) + parser.add_argument( + "--send-env", + action="append", + metavar="KEY=VALUE", + default=[], + help="environment variable to send (repeatable)", + ) + return parser + + +async def run_fingerprint_client() -> None: + """ + Connect to a remote telnet server and fingerprint it. + + Parses CLI arguments, binds them into + :func:`~telnetlib3.server_fingerprinting.fingerprinting_client_shell` + via :func:`functools.partial`, and runs the connection. + """ + # local + from . import fingerprinting # pylint: disable=import-outside-toplevel + from . import server_fingerprinting # pylint: disable=import-outside-toplevel + + args = _get_fingerprint_argument_parser().parse_args() + + if args.data_dir is not None: + fingerprinting.DATA_DIR = args.data_dir + + log = accessories.make_logger( + name=__name__, loglevel=args.loglevel, logfile=args.logfile, logfmt=args.logfmt + ) + log.debug("Fingerprint client: host=%s port=%d", args.host, args.port) + + shell = functools.partial( + server_fingerprinting.fingerprinting_client_shell, + host=args.host, + port=args.port, + save_path=args.save_json, + silent=args.silent, + set_name=args.set_name, + environ_encoding=args.stream_encoding, + ) + + # Parse --send-env KEY=VALUE pairs + extra_env: Dict[str, str] = {} + for item in args.send_env: + if "=" in item: + k, v = item.split("=", 1) + extra_env[k] = v + else: + extra_env[item] = "" + + # environ_encoding must be set on the writer BEFORE negotiation + # starts, so we wrap the client factory to inject it during + # connection_made (before begin_negotiation fires). + environ_encoding = args.stream_encoding + ttype = args.ttype + + def fingerprint_client_factory(**kwargs: Any) -> client_base.BaseClient: + # Ensure extra env keys are in the send list + if extra_env: + send = set(kwargs.get("send_environ") or TelnetClient.DEFAULT_SEND_ENVIRON) + send.update(extra_env.keys()) + kwargs["send_environ"] = list(send) + client = TelnetClient(**kwargs) + orig_connection_made = client.connection_made + orig_send_env = client.send_env + + def patched_connection_made(transport: asyncio.BaseTransport) -> None: + orig_connection_made(transport) + assert client.writer is not None + client.writer.environ_encoding = environ_encoding + + def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]: + result = orig_send_env(keys) + result.update(extra_env) + return result + + client.connection_made = patched_connection_made # type: ignore[method-assign] + if extra_env: + client.send_env = patched_send_env # type: ignore[method-assign] + return client + + waiter_closed: asyncio.Future[None] = asyncio.get_event_loop().create_future() + + _, writer = await open_connection( + host=args.host, + port=args.port, + client_factory=fingerprint_client_factory, + shell=shell, + encoding=False, + term=ttype, + connect_minwait=2.0, + connect_maxwait=4.0, + connect_timeout=args.connect_timeout, + waiter_closed=waiter_closed, + ) + + assert writer.protocol is not None + assert isinstance(writer.protocol, client_base.BaseClient) + await writer.protocol.waiter_closed + + +def fingerprint_main() -> None: + """Entry point for ``telnetlib3-fingerprint`` command.""" + try: + asyncio.run(run_fingerprint_client()) + except ConnectionError as err: + print(f"Error: {err}", file=sys.stderr) + sys.exit(1) + + if __name__ == "__main__": # pragma: no cover main() diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index 74847aec..17959f0f 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -18,7 +18,7 @@ import logging import argparse import datetime -from typing import Any, Dict, List, Tuple, Union, Callable, Optional, cast +from typing import Any, Union, Optional, TypedDict, cast # local from . import slc @@ -36,6 +36,7 @@ ECHO, GMCP, MSDP, + MSSP, NAMS, NAOL, NAOP, @@ -81,6 +82,16 @@ from .stream_reader import TelnetReader, TelnetReaderUnicode from .stream_writer import TelnetWriter, TelnetWriterUnicode + +class ProbeResult(TypedDict, total=False): + """Result of probing a single telnet option.""" + + status: str + opt: bytes + description: str + already_negotiated: bool + + # Data directory for saving fingerprint data - None when unset (no saves) DATA_DIR: Optional[str] = ( os.environ["TELNETLIB3_DATA_DIR"] if os.environ.get("TELNETLIB3_DATA_DIR") else None @@ -124,6 +135,7 @@ "ENVIRON_EXTENDED", "FingerprintingServer", "FingerprintingTelnetServer", + "ProbeResult", "fingerprint_server_main", "fingerprinting_server_shell", "fingerprinting_post_script", @@ -178,7 +190,9 @@ class MyServer(FingerprintingTelnetServer, TelnetServer): def on_request_environ(self) -> list[Union[str, bytes]]: """Return base environ keys plus :data:`ENVIRON_EXTENDED`.""" - # pylint: disable=no-member + if not isinstance(self, TelnetServer): + raise TypeError("FingerprintingTelnetServer must be combined with TelnetServer") + # pylint: disable-next=no-member base: list[Union[str, bytes]] = super().on_request_environ() # type: ignore[misc] # Insert extended keys before the trailing VAR/USERVAR sentinels # local @@ -236,7 +250,11 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): # icy_term (icy_net) only accepts option bytes 0-49, 138-140, and 255, # returning a hard error for anything else. GMCP-capable MUD clients # typically self-announce via IAC WILL GMCP, so probing is unnecessary. -EXTENDED_OPTIONS = [(GMCP, "GMCP", "Generic MUD Communication Protocol")] +EXTENDED_OPTIONS = [ + (GMCP, "GMCP", "Generic MUD Communication Protocol"), + (MSDP, "MSDP", "MUD Server Data Protocol"), + (MSSP, "MSSP", "MUD Server Status Protocol"), +] LEGACY_OPTIONS = [ (AUTHENTICATION, "AUTHENTICATION", "Telnet authentication"), @@ -284,10 +302,9 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): async def probe_client_capabilities( writer: Union[TelnetWriter, TelnetWriterUnicode], - options: Optional[List[Tuple[bytes, str, str]]] = None, - progress_callback: Optional[Callable[[str, int, int, str], None]] = None, + options: Optional[list[tuple[bytes, str, str]]] = None, timeout: float = 0.5, -) -> Dict[str, Dict[str, Any]]: +) -> dict[str, ProbeResult]: """ Actively probe client for telnet capability support. @@ -296,33 +313,24 @@ async def probe_client_capabilities( :param writer: TelnetWriter instance. :param options: List of (opt_bytes, name, description) tuples to probe. Defaults to ALL_PROBE_OPTIONS. - :param progress_callback: Optional callback(name, idx, total, status) called during result - collection. :param timeout: Timeout in seconds to wait for all responses. - :returns: Dict mapping option name to {"status": "WILL"|"WONT"|"timeout", "opt": bytes, - "description": str}. + :returns: Dict mapping option name to :class:`ProbeResult`. """ if options is None: options = ALL_PROBE_OPTIONS - results = {} + results: dict[str, ProbeResult] = {} to_probe = [] for opt, name, description in options: if writer.remote_option.enabled(opt): - results[name] = { - "status": "WILL", - "opt": opt, - "description": description, - "already_negotiated": True, - } + results[name] = ProbeResult( + status="WILL", opt=opt, description=description, already_negotiated=True + ) elif writer.remote_option.get(opt) is False: - results[name] = { - "status": "WONT", - "opt": opt, - "description": description, - "already_negotiated": True, - } + results[name] = ProbeResult( + status="WONT", opt=opt, description=description, already_negotiated=True + ) else: to_probe.append((opt, name, description)) @@ -342,19 +350,16 @@ async def probe_client_capabilities( break await asyncio.sleep(0.05) - for idx, (opt, name, description) in enumerate(to_probe, 1): + for opt, name, description in to_probe: if name in results: continue - if progress_callback: - progress_callback(name, idx, len(to_probe), "") - if writer.remote_option.enabled(opt): - results[name] = {"status": "WILL", "opt": opt, "description": description} + results[name] = ProbeResult(status="WILL", opt=opt, description=description) elif writer.remote_option.get(opt) is False: - results[name] = {"status": "WONT", "opt": opt, "description": description} + results[name] = ProbeResult(status="WONT", opt=opt, description=description) else: - results[name] = {"status": "timeout", "opt": opt, "description": description} + results[name] = ProbeResult(status="timeout", opt=opt, description=description) return results @@ -379,7 +384,7 @@ async def probe_client_capabilities( ) + tuple(f"ttype{n}" for n in range(1, 9)) -def get_client_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: +def get_client_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """ Collect all available client information from writer. @@ -403,7 +408,7 @@ def get_client_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> async def _run_probe( writer: Union[TelnetWriter, TelnetWriterUnicode], verbose: bool = True -) -> Tuple[Dict[str, Dict[str, Any]], float]: +) -> tuple[dict[str, ProbeResult], float]: """Run active probe, optionally extending to MUD options.""" if _is_maybe_ms_telnet(writer): probe_options = [opt for opt in CORE_OPTIONS + MUD_OPTIONS if opt[0] != NEW_ENVIRON] @@ -453,7 +458,7 @@ def _opt_byte_to_name(opt: bytes) -> str: def _collect_option_states( writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Dict[str, Any]]: +) -> dict[str, dict[str, Any]]: """Collect all telnet option states from writer.""" options = {} for label, opt_dict in [("remote", writer.remote_option), ("local", writer.local_option)]: @@ -465,9 +470,9 @@ def _collect_option_states( def _collect_rejected_options( writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, List[str]]: +) -> dict[str, list[str]]: """Collect rejected option offers from writer.""" - result: Dict[str, List[str]] = {} + result: dict[str, list[str]] = {} if getattr(writer, "rejected_will", None): result["will"] = sorted(_opt_byte_to_name(opt) for opt in writer.rejected_will) if getattr(writer, "rejected_do", None): @@ -475,9 +480,9 @@ def _collect_rejected_options( return result -def _collect_extra_info(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: +def _collect_extra_info(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect all extra_info from writer, including private _extra dict.""" - extra: Dict[str, Any] = {} + extra: dict[str, Any] = {} protocol = _get_protocol(writer) if protocol and hasattr(protocol, "_extra"): @@ -510,7 +515,7 @@ def _collect_extra_info(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dic return extra -def _collect_ttype_cycle(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> List[str]: +def _collect_ttype_cycle(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> list[str]: """Collect the full TTYPE cycle responses.""" ttype_list = [] @@ -525,7 +530,7 @@ def _collect_ttype_cycle(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Li return ttype_list -def _collect_protocol_timing(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: +def _collect_protocol_timing(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect timing information from protocol.""" timing = {} protocol = _get_protocol(writer) @@ -539,7 +544,7 @@ def _collect_protocol_timing(writer: Union[TelnetWriter, TelnetWriterUnicode]) - return timing -def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: +def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Collect non-default SLC entries when LINEMODE was negotiated.""" slctab = getattr(writer, "slctab", None) if not slctab: @@ -550,8 +555,8 @@ def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[s defaults = slc.generate_slctab(slc.BSD_SLC_TAB) - result: Dict[str, Any] = {} - slc_set: Dict[str, Any] = {} + result: dict[str, Any] = {} + slc_set: dict[str, Any] = {} slc_unset: list[str] = [] slc_nosupport: list[str] = [] @@ -583,8 +588,8 @@ def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[s def _create_protocol_fingerprint( - writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: Dict[str, Dict[str, Any]] -) -> Dict[str, Any]: + writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: dict[str, ProbeResult] +) -> dict[str, Any]: """ Create anonymized/summarized protocol fingerprint from session data. @@ -595,7 +600,7 @@ def _create_protocol_fingerprint( :param probe_results: Probe results from capability probing. :returns: Dict with anonymized protocol fingerprint data. """ - fingerprint: Dict[str, Any] = {"probed-protocol": "client"} + fingerprint: dict[str, Any] = {"probed-protocol": "client"} protocol = _get_protocol(writer) extra_dict = getattr(protocol, "_extra", {}) if protocol else {} @@ -654,7 +659,7 @@ def _create_protocol_fingerprint( return fingerprint -def _hash_fingerprint(data: Dict[str, Any]) -> str: +def _hash_fingerprint(data: dict[str, Any]) -> str: """Create deterministic 16-char SHA256 hash of a fingerprint dict.""" canonical = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:16] @@ -667,24 +672,103 @@ def _count_protocol_folder_files(protocol_dir: str) -> int: return sum(1 for f in os.listdir(protocol_dir) if f.endswith(".json")) -def _count_fingerprint_folders(data_dir: Optional[str] = None) -> int: - """Count unique telnet fingerprint folders in ``DATA_DIR/client/``.""" +def _count_fingerprint_folders(data_dir: Optional[str] = None, side: str = "client") -> int: + """Count unique fingerprint folders in ``DATA_DIR//``.""" _dir = data_dir if data_dir is not None else DATA_DIR if _dir is None: return 0 - client_dir = os.path.join(_dir, "client") - if not os.path.exists(client_dir): + side_dir = os.path.join(_dir, side) + if not os.path.exists(side_dir): return 0 - return sum(1 for f in os.listdir(client_dir) if os.path.isdir(os.path.join(client_dir, f))) + return sum(1 for f in os.listdir(side_dir) if os.path.isdir(os.path.join(side_dir, f))) + + +def _save_fingerprint_to_dir( + target_dir: str, + session_hash: str, + data: dict[str, Any], + *, + probe_key: str, + data_dir: str, + side: str, + protocol_hash: str, +) -> Optional[str]: + """ + Save fingerprint data to a directory with limit checks and session appending. + + Handles fingerprint-count and file-count limits, creates directories as + needed, and appends to existing session files when the session hash matches. + + :param target_dir: Directory path for this fingerprint's files. + :param session_hash: Hash used for the filename. + :param data: Complete fingerprint data dict to save. + :param probe_key: Top-level key in *data* (e.g. ``"telnet-probe"``). + :param data_dir: Base data directory for counting fingerprint folders. + :param side: ``"client"`` or ``"server"`` subdirectory name. + :param protocol_hash: Protocol fingerprint hash for logging. + :returns: Path to saved file, or ``None`` if saving was skipped. + """ + is_new_dir = not os.path.exists(target_dir) + + if is_new_dir: + if _count_fingerprint_folders(data_dir, side=side) >= FINGERPRINT_MAX_FINGERPRINTS: + logger.warning( + "max fingerprints (%d) exceeded, not saving %s", + FINGERPRINT_MAX_FINGERPRINTS, + protocol_hash, + ) + return None + try: + os.makedirs(target_dir, exist_ok=True) + except OSError as exc: + logger.warning("failed to create directory %s: %s", target_dir, exc) + return None + logger.info("new %s fingerprint %s", side, protocol_hash) + else: + if _count_protocol_folder_files(target_dir) >= FINGERPRINT_MAX_FILES: + logger.warning( + "fingerprint %s at file limit (%d), not saving", + protocol_hash, + FINGERPRINT_MAX_FILES, + ) + return None + logger.info("connection for %s fingerprint %s", side, protocol_hash) + + filepath = os.path.join(target_dir, f"{session_hash}.json") + + if os.path.exists(filepath): + try: + with open(filepath, encoding="utf-8") as f: + existing = json.load(f) + existing[probe_key]["session_data"] = data[probe_key]["session_data"] + existing["sessions"].append(data["sessions"][0]) + except (OSError, json.JSONDecodeError, KeyError) as exc: + logger.warning("failed to read existing %s: %s", filepath, exc) + existing = None + + if existing is not None: + try: + _atomic_json_write(filepath, existing) + return filepath + except OSError as exc: + logger.warning("failed to update fingerprint: %s", exc) + return None + + try: + _atomic_json_write(filepath, data) + return filepath + except OSError as exc: + logger.warning("failed to save fingerprint: %s", exc) + return None _UNKNOWN_TERMINAL_HASH = "0" * 16 AMBIGUOUS_WIDTH_UNKNOWN = -1 -def _create_session_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: +def _create_session_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> dict[str, Any]: """Create session identity fingerprint from stable client fields.""" - identity: Dict[str, Any] = {} + identity: dict[str, Any] = {} if peername := writer.get_extra_info("peername"): identity["client-ip"] = peername[0] @@ -699,7 +783,7 @@ def _create_session_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode] return identity -def _load_fingerprint_names(data_dir: Optional[str] = None) -> Dict[str, str]: +def _load_fingerprint_names(data_dir: Optional[str] = None) -> dict[str, str]: """Load fingerprint hash-to-name mapping from ``fingerprint_names.json``.""" _dir = data_dir if data_dir is not None else DATA_DIR if _dir is None: @@ -708,11 +792,35 @@ def _load_fingerprint_names(data_dir: Optional[str] = None) -> Dict[str, str]: if not os.path.exists(names_file): return {} with open(names_file, encoding="utf-8") as f: - result: Dict[str, str] = json.load(f) + result: dict[str, str] = json.load(f) return result -def _resolve_hash_name(hash_val: str, names: Dict[str, str]) -> str: +def _save_fingerprint_name(hash_val: str, name: str, data_dir: Optional[str] = None) -> str: + """ + Save a fingerprint hash-to-name mapping in ``fingerprint_names.json``. + + Loads the existing names file, adds or updates the entry for *hash_val*, + and writes it back atomically. + + :param hash_val: 16-character hex fingerprint hash. + :param name: Human-readable name to associate. + :param data_dir: Override data directory. Falls back to :data:`DATA_DIR`. + :returns: Path to the saved names file. + :raises ValueError: If *data_dir* is ``None`` and :data:`DATA_DIR` is unset. + """ + _dir = data_dir if data_dir is not None else DATA_DIR + if _dir is None: + raise ValueError("no data directory configured") + os.makedirs(_dir, exist_ok=True) + names_file = os.path.join(_dir, "fingerprint_names.json") + names = _load_fingerprint_names(_dir) + names[hash_val] = name + _atomic_json_write(names_file, names) + return names_file + + +def _resolve_hash_name(hash_val: str, names: dict[str, str]) -> str: """Return human-readable name for a hash, falling back to the hash itself.""" return names.get(hash_val, hash_val) @@ -746,7 +854,7 @@ def _cooked_input(prompt: str) -> str: termios.tcsetattr(fd, termios.TCSANOW, old_attrs) -def _atomic_json_write(filepath: str, data: Dict[str, Any]) -> None: +def _atomic_json_write(filepath: str, data: dict[str, Any]) -> None: """Atomically write JSON data to file via write-to-new + rename.""" tmp_path = os.path.splitext(filepath)[0] + ".json.new" with open(tmp_path, "w", encoding="utf-8") as f: @@ -756,9 +864,9 @@ def _atomic_json_write(filepath: str, data: Dict[str, Any]) -> None: def _build_session_fingerprint( writer: Union[TelnetWriter, TelnetWriterUnicode], - probe_results: Dict[str, Dict[str, Any]], + probe_results: dict[str, ProbeResult], probe_time: float, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Build the session fingerprint dict (raw detailed data).""" extra = _collect_extra_info(writer) extra.pop("peername", None) @@ -771,7 +879,7 @@ def _build_session_fingerprint( linemode_probed = probe_results.get("LINEMODE", {}).get("status") slc_tab = _collect_slc_tab(writer) if linemode_probed == "WILL" else {} - probe_by_status: Dict[str, Dict[str, int]] = {} + probe_by_status: dict[str, dict[str, int]] = {} for name, info in probe_results.items(): status = info["status"] opt_byte = info["opt"][0] if isinstance(info["opt"], bytes) else info["opt"] @@ -796,17 +904,17 @@ def _build_session_fingerprint( return result -def _save_fingerprint_data( # pylint: disable=too-many-locals,too-many-branches,too-complex +def _save_fingerprint_data( writer: Union[TelnetWriter, TelnetWriterUnicode], - probe_results: Dict[str, Dict[str, Any]], + probe_results: dict[str, ProbeResult], probe_time: float, - session_fp: Optional[Dict[str, Any]] = None, + session_fp: Optional[dict[str, Any]] = None, ) -> Optional[str]: """ Save comprehensive fingerprint data to a JSON file. - Creates directory structure: DATA_DIR//uuid4.json - Respects FINGERPRINT_MAX_FILES and FINGERPRINT_MAX_FINGERPRINTS limits. + Creates directory structure: + ``DATA_DIR/client///.json`` :param writer: TelnetWriter instance with full protocol access. :param probe_results: Probe results from capability probing. @@ -838,55 +946,11 @@ def _save_fingerprint_data( # pylint: disable=too-many-locals,too-many-branches break if probe_dir is None: probe_dir = os.path.join(telnet_dir, _UNKNOWN_TERMINAL_HASH) - is_new_dir = not os.path.exists(probe_dir) - - if is_new_dir: - if _count_fingerprint_folders() >= FINGERPRINT_MAX_FINGERPRINTS: - logger.warning( - "max fingerprints (%d) exceeded, not saving %s", - FINGERPRINT_MAX_FINGERPRINTS, - telnet_hash, - ) - return None - try: - os.makedirs(probe_dir, exist_ok=True) - except OSError as exc: - logger.warning("failed to create directory %s: %s", probe_dir, exc) - return None - logger.info("new fingerprint %s", telnet_hash) - else: - file_count = _count_protocol_folder_files(probe_dir) - if file_count >= FINGERPRINT_MAX_FILES: - logger.warning( - "fingerprint %s at file limit (%d), not saving", telnet_hash, FINGERPRINT_MAX_FILES - ) - return None - logger.info("connection for fingerprint %s", telnet_hash) - - filepath = os.path.join(probe_dir, f"{session_hash}.json") peername = writer.get_extra_info("peername") now = datetime.datetime.now(datetime.timezone.utc) session_entry = {"ip": str(peername[0]) if peername else None, "connected": now.isoformat()} - if os.path.exists(filepath): - try: - with open(filepath, encoding="utf-8") as f: - data = json.load(f) - data["telnet-probe"]["session_data"] = session_fp - data["sessions"].append(session_entry) - except (OSError, json.JSONDecodeError, KeyError) as exc: - logger.warning("failed to read existing %s: %s", filepath, exc) - data = None - - if data is not None: - try: - _atomic_json_write(filepath, data) - return filepath - except OSError as exc: - logger.warning("failed to update fingerprint: %s", exc) - return None - data = { "telnet-probe": { "fingerprint": telnet_hash, @@ -896,12 +960,15 @@ def _save_fingerprint_data( # pylint: disable=too-many-locals,too-many-branches "sessions": [session_entry], } - try: - _atomic_json_write(filepath, data) - return filepath - except OSError as exc: - logger.warning("failed to save fingerprint: %s", exc) - return None + return _save_fingerprint_to_dir( + target_dir=probe_dir, + session_hash=session_hash, + data=data, + probe_key="telnet-probe", + data_dir=DATA_DIR, + side="client", + protocol_hash=telnet_hash, + ) def _is_maybe_mud(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> bool: diff --git a/telnetlib3/mud.py b/telnetlib3/mud.py index 701ce987..2afde5dc 100644 --- a/telnetlib3/mud.py +++ b/telnetlib3/mud.py @@ -29,7 +29,29 @@ MSDP_TABLE_CLOSE, ) -__all__ = ("gmcp_encode", "gmcp_decode", "msdp_encode", "msdp_decode", "mssp_encode", "mssp_decode") +__all__ = ( + "gmcp_encode", + "gmcp_decode", + "msdp_encode", + "msdp_decode", + "mssp_encode", + "mssp_decode", + "MsdpParser", +) + + +def _decode_best_effort(buf: bytes, encoding: str = "utf-8") -> str: + """ + Decode bytes trying *encoding* first, falling back to latin-1. + + :param buf: Raw bytes to decode. + :param encoding: Primary encoding to attempt. + :returns: Decoded string. + """ + try: + return buf.decode(encoding) + except (UnicodeDecodeError, LookupError): + return buf.decode("latin-1") def gmcp_encode(package: str, data: Any = None) -> bytes: @@ -45,21 +67,22 @@ def gmcp_encode(package: str, data: Any = None) -> bytes: return package.encode("utf-8") + b" " + json.dumps(data, separators=(",", ":")).encode("utf-8") -def gmcp_decode(buf: bytes) -> tuple[str, Any]: +def gmcp_decode(buf: bytes, encoding: str = "utf-8") -> tuple[str, Any]: """ Decode a GMCP payload. :param buf: GMCP payload bytes + :param encoding: Character encoding to try first, falls back to latin-1. :returns: Tuple of (package, data), where data is None if no JSON present :raises ValueError: If JSON is malformed """ parts = buf.split(b" ", 1) if len(parts) == 1: - return (buf.decode("utf-8"), None) + return (_decode_best_effort(buf, encoding), None) - package = parts[0].decode("utf-8") + package = _decode_best_effort(parts[0], encoding) try: - data = json.loads(parts[1].decode("utf-8")) + data = json.loads(_decode_best_effort(parts[1], encoding)) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in GMCP payload: {exc}") from exc return (package, data) @@ -95,14 +118,16 @@ def encode_value(value: Any) -> bytes: return result -class _MsdpParser: +class MsdpParser: """State machine for parsing MSDP wire bytes.""" _DELIMITERS = (MSDP_VAR, MSDP_VAL, MSDP_TABLE_CLOSE, MSDP_ARRAY_CLOSE) - def __init__(self, buf: bytes) -> None: + def __init__(self, buf: bytes, encoding: str = "utf-8") -> None: + """Initialize parser with raw MSDP buffer.""" self.buf = buf self.idx = 0 + self.encoding = encoding def _read_string(self) -> str: start = self.idx @@ -110,7 +135,7 @@ def _read_string(self) -> str: self.idx < len(self.buf) and self.buf[self.idx : self.idx + 1] not in self._DELIMITERS ): self.idx += 1 - return self.buf[start : self.idx].decode("utf-8") + return _decode_best_effort(self.buf[start : self.idx], self.encoding) def _read_key(self) -> str: start = self.idx @@ -119,7 +144,7 @@ def _read_key(self) -> str: MSDP_VAR, ): self.idx += 1 - return self.buf[start : self.idx].decode("utf-8") + return _decode_best_effort(self.buf[start : self.idx], self.encoding) def _parse_table(self) -> dict[str, Any]: table: dict[str, Any] = {} @@ -172,14 +197,15 @@ def parse(self) -> dict[str, Any]: return result -def msdp_decode(buf: bytes) -> dict[str, Any]: +def msdp_decode(buf: bytes, encoding: str = "utf-8") -> dict[str, Any]: """ Decode MSDP wire bytes to dictionary. :param buf: MSDP payload bytes + :param encoding: Character encoding to try first, falls back to latin-1. :returns: Dictionary of variable names to values """ - return _MsdpParser(buf).parse() + return MsdpParser(buf, encoding=encoding).parse() def mssp_encode(variables: dict[str, str | list[str]]) -> bytes: @@ -200,11 +226,12 @@ def mssp_encode(variables: dict[str, str | list[str]]) -> bytes: return result -def mssp_decode(buf: bytes) -> dict[str, str | list[str]]: +def mssp_decode(buf: bytes, encoding: str = "utf-8") -> dict[str, str | list[str]]: """ Decode MSSP wire bytes to dictionary. :param buf: MSSP payload bytes + :param encoding: Character encoding to try first, falls back to latin-1. :returns: Dictionary with str values for single entries, list[str] for multiple """ result: dict[str, str | list[str]] = {} @@ -217,13 +244,13 @@ def mssp_decode(buf: bytes) -> dict[str, str | list[str]]: var_start = idx while idx < len(buf) and buf[idx : idx + 1] not in (MSSP_VAL, MSSP_VAR): idx += 1 - current_var = buf[var_start:idx].decode("utf-8") + current_var = _decode_best_effort(buf[var_start:idx], encoding) elif buf[idx : idx + 1] == MSSP_VAL: idx += 1 val_start = idx while idx < len(buf) and buf[idx : idx + 1] not in (MSSP_VAL, MSSP_VAR): idx += 1 - value = buf[val_start:idx].decode("utf-8") + value = _decode_best_effort(buf[val_start:idx], encoding) if current_var is not None: if current_var in result: diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py new file mode 100644 index 00000000..29a9230d --- /dev/null +++ b/telnetlib3/server_fingerprinting.py @@ -0,0 +1,462 @@ +""" +Fingerprint shell for telnet server identification. + +This module probes remote telnet servers for protocol capabilities, +collects banner data and session information, and saves fingerprint +files. It mirrors :mod:`telnetlib3.fingerprinting` but operates as +a client connecting *to* a server. +""" + +from __future__ import annotations + +# std imports +import os +import sys +import json +import time +import shutil +import asyncio +import logging +import datetime +import subprocess +from typing import Any + +# local +from . import fingerprinting as _fps +from .telopt import ( + VAR, + MSSP, + NAWS, + LFLOW, + TTYPE, + VALUE, + SNDLOC, + TSPEED, + USERVAR, + LINEMODE, + XDISPLOC, + NEW_ENVIRON, +) +from .stream_reader import TelnetReader +from .stream_writer import TelnetWriter +from .fingerprinting import ( + ALL_PROBE_OPTIONS, + _hash_fingerprint, + _opt_byte_to_name, + _atomic_json_write, + _save_fingerprint_name, + _save_fingerprint_to_dir, + probe_client_capabilities, +) + +__all__ = ("fingerprinting_client_shell", "probe_server_capabilities") + +# Options where only the client sends WILL (in response to a server's DO). +# A server should never WILL these — they describe client-side properties. +# The probe must not send DO for these; their state is already captured +# in ``server_requested`` (what the server sent DO for). +_CLIENT_ONLY_WILL = frozenset({TTYPE, TSPEED, NAWS, XDISPLOC, NEW_ENVIRON, LFLOW, LINEMODE, SNDLOC}) + +_BANNER_MAX_BYTES = 1024 +_NEGOTIATION_SETTLE = 0.5 +_BANNER_WAIT = 3.0 +_POST_RETURN_WAIT = 3.0 +_JQ = shutil.which("jq") + +logger = logging.getLogger("telnetlib3.server_fingerprint") + + +def _is_display_worthy(v: Any) -> bool: + """Return True if *v* should be kept in culled display output.""" + # pylint: disable-next=use-implicit-booleaness-not-comparison-to-string + return v is not False and v != {} and v != [] and v != "" + + +def _cull_display(obj: Any) -> Any: + """Recursively remove empty, false-valued, and verbose entries for display.""" + if isinstance(obj, dict): + return {k: _cull_display(v) for k, v in obj.items() if _is_display_worthy(v)} + if isinstance(obj, list): + return [_cull_display(item) for item in obj] + return obj + + +def _print_json(data: dict[str, Any]) -> None: + """Print *data* as JSON to stdout, colorized through ``jq`` when available.""" + raw = json.dumps(_cull_display(data), indent=2, sort_keys=True) + if _JQ: + result = subprocess.run( + [_JQ, "-C", "."], input=raw, capture_output=True, text=True, check=False + ) + if result.returncode == 0: + raw = result.stdout.rstrip("\n") + print(raw, file=sys.stdout) + + +async def fingerprinting_client_shell( + reader: TelnetReader, + writer: TelnetWriter, + *, + host: str, + port: int, + save_path: str | None = None, + silent: bool = False, + set_name: str | None = None, + environ_encoding: str = "ascii", +) -> None: + """ + Client shell that fingerprints a remote telnet server. + + Designed to be used with :func:`functools.partial` to bind CLI + arguments, then passed as the ``shell`` callback to + :func:`~telnetlib3.client.open_connection` with ``encoding=False``. + + :param reader: Binary-mode :class:`~telnetlib3.stream_reader.TelnetReader`. + :param writer: Binary-mode :class:`~telnetlib3.stream_writer.TelnetWriter`. + :param host: Remote hostname or IP address. + :param port: Remote port number. + :param save_path: If set, write fingerprint JSON directly to this path. + :param silent: Suppress fingerprint output to stdout. + :param set_name: If set, store this name for the fingerprint hash in + ``fingerprint_names.json`` without requiring moderation. + :param environ_encoding: Encoding for NEW_ENVIRON data. Default + ``"ascii"`` per :rfc:`1572`; use ``"cp037"`` for EBCDIC hosts. + """ + writer.environ_encoding = environ_encoding + try: + await _fingerprint_session( + reader, + writer, + host=host, + port=port, + save_path=save_path, + silent=silent, + set_name=set_name, + ) + except (ConnectionError, EOFError) as exc: + logger.warning("%s:%d: %s", host, port, exc) + writer.close() + + +async def _fingerprint_session( + reader: TelnetReader, + writer: TelnetWriter, + *, + host: str, + port: int, + save_path: str | None, + silent: bool, + set_name: str | None, +) -> None: + """Run the fingerprint session (inner helper for error handling).""" + start_time = time.time() + + # 1. Let straggler negotiation settle + await asyncio.sleep(_NEGOTIATION_SETTLE) + + # 2. Read banner (pre-return) + banner_before = await _read_banner(reader, timeout=_BANNER_WAIT) + + # 3. Send return, read post-return data + writer.write(b"\r\n") + await writer.drain() + banner_after = await _read_banner(reader, timeout=_POST_RETURN_WAIT) + + # 4. Snapshot option states before probing + option_states = _collect_server_option_states(writer) + + # 5. Active probe + probe_start = time.time() + probe_results = await probe_server_capabilities(writer) + probe_time = time.time() - probe_start + + # 5b. If server acknowledged MSSP but data hasn't arrived yet, poll briefly + if writer.remote_option.enabled(MSSP) and writer.mssp_data is None: + for _ in range(10): + await asyncio.sleep(0.05) + if writer.mssp_data is not None: + break + + # 6. Build session dicts + session_data: dict[str, Any] = { + "encoding": writer.environ_encoding, + "option_states": option_states, + "banner_before_return": _format_banner(banner_before, encoding=writer.environ_encoding), + "banner_after_return": _format_banner(banner_after, encoding=writer.environ_encoding), + "timing": {"probe": probe_time, "total": time.time() - start_time}, + } + if writer.mssp_data is not None: + session_data["mssp"] = writer.mssp_data + session_entry: dict[str, Any] = { + "host": host, + "ip": (writer.get_extra_info("peername") or (host,))[0], + "port": port, + "connected": datetime.datetime.now(datetime.timezone.utc).isoformat(), + } + + # 7. Save + _save_server_fingerprint_data( + writer=writer, + probe_results=probe_results, + session_data=session_data, + session_entry=session_entry, + save_path=save_path, + ) + + # 8. Set name in fingerprint_names.json + if set_name is not None: + protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_hash = _hash_fingerprint(protocol_fp) + try: + _save_fingerprint_name(protocol_hash, set_name) + logger.info("set name %r for %s", set_name, protocol_hash) + except ValueError: + logger.warning("--set-name requires --data-dir or $TELNETLIB3_DATA_DIR") + + # 9. Display + if not silent: + protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_hash = _hash_fingerprint(protocol_fp) + display_data: dict[str, Any] = { + "server-probe": { + "fingerprint": protocol_hash, + "fingerprint-data": protocol_fp, + "session_data": session_data, + }, + "sessions": [session_entry], + } + _print_json(display_data) + + # 10. Close + writer.close() + + +async def probe_server_capabilities( + writer: TelnetWriter, options: list[tuple[bytes, str, str]] | None = None, timeout: float = 0.5 +) -> dict[str, _fps.ProbeResult]: + """ + Actively probe a remote server for telnet capability support. + + Sends ``IAC DO`` for all options not yet negotiated, then waits + for ``WILL``/``WONT`` responses. Delegates to + :func:`~telnetlib3.fingerprinting.probe_client_capabilities`. + + :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. + :param options: List of ``(opt_bytes, name, description)`` tuples. + Defaults to :data:`~telnetlib3.fingerprinting.ALL_PROBE_OPTIONS` + minus client-only options. + :param timeout: Seconds to wait for all responses. + :returns: Dict mapping option name to status dict. + """ + if options is None: + options = [ + (opt, name, desc) + for opt, name, desc in ALL_PROBE_OPTIONS + if opt not in _CLIENT_ONLY_WILL + ] + return await probe_client_capabilities(writer, options=options, timeout=timeout) + + +def _parse_environ_send(raw: bytes) -> list[dict[str, Any]]: + """ + Parse a raw NEW_ENVIRON SEND payload into structured entries. + + :param raw: Bytes following ``SB NEW_ENVIRON SEND`` up to ``SE``. + :returns: List of dicts, each with ``type`` (``"VAR"`` or ``"USERVAR"``), + ``name`` (ASCII text portion), and optionally ``data_hex`` for + trailing binary bytes. + """ + entries: list[dict[str, Any]] = [] + delimiters = {VAR[0], USERVAR[0]} + value_byte = VALUE[0] + + # find positions of VAR/USERVAR delimiters + breaks = [i for i, b in enumerate(raw) if b in delimiters] + + for idx, ptr in enumerate(breaks): + kind = "VAR" if raw[ptr : ptr + 1] == VAR else "USERVAR" + start = ptr + 1 + end = breaks[idx + 1] if idx + 1 < len(breaks) else len(raw) + chunk = raw[start:end] + + if not chunk: + # bare VAR or USERVAR with no name = "send all" + entries.append({"type": kind, "name": "*"}) + continue + + # split on VALUE byte if present + if value_byte in chunk: + name_part, val_part = chunk.split(bytes([value_byte]), 1) + else: + name_part = chunk + val_part = b"" + + # contiguous ASCII-printable prefix only; trailing binary is ignored + ascii_end = 0 + for i, b in enumerate(name_part): + if 0x20 <= b < 0x7F: + ascii_end = i + 1 + else: + break + name_text = name_part[:ascii_end].decode("ascii") if ascii_end else "" + + entry: dict[str, Any] = {"type": kind, "name": name_text} + if val_part: + entry["value_hex"] = val_part.hex() + entries.append(entry) + + return entries + + +def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, Any]]: + """ + Collect telnet option states from the server perspective. + + :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. + :returns: Dict with ``server_offered`` (server WILL) and + ``server_requested`` (server DO) entries. + """ + server_offered: dict[str, Any] = {} + for opt, enabled in writer.remote_option.items(): + server_offered[_opt_byte_to_name(opt)] = enabled + + server_requested: dict[str, Any] = {} + for opt, enabled in writer.local_option.items(): + server_requested[_opt_byte_to_name(opt)] = enabled + + result: dict[str, Any] = { + "server_offered": server_offered, + "server_requested": server_requested, + } + + if writer.environ_send_raw is not None: + result["environ_requested"] = _parse_environ_send(writer.environ_send_raw) + + return result + + +def _create_server_protocol_fingerprint( + writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult] +) -> dict[str, Any]: + """ + Create anonymized protocol fingerprint for a remote server. + + :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. + :param probe_results: Results from :func:`probe_server_capabilities`. + :returns: Deterministic fingerprint dict suitable for hashing. + """ + offered = sorted(name for name, info in probe_results.items() if info["status"] == "WILL") + refused = sorted( + name for name, info in probe_results.items() if info["status"] in ("WONT", "timeout") + ) + + requested = sorted( + _opt_byte_to_name(opt) for opt, enabled in writer.local_option.items() if enabled + ) + + return { + "probed-protocol": "server", + "offered-options": offered, + "requested-options": requested, + "refused-options": refused, + } + + +def _save_server_fingerprint_data( + writer: TelnetWriter, + probe_results: dict[str, _fps.ProbeResult], + session_data: dict[str, Any], + session_entry: dict[str, Any], + save_path: str | None = None, +) -> str | None: + """ + Save server fingerprint data to a JSON file. + + Directory structure: ``DATA_DIR/server//.json`` + + :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. + :param probe_results: Results from :func:`probe_server_capabilities`. + :param session_data: Pre-built dict with ``option_states``, ``banner_before_return``, + ``banner_after_return``, and ``timing`` keys. + :param session_entry: Pre-built dict with ``host``, ``ip``, ``port``, + and ``connected`` keys. + :param save_path: If set, write directly to this path. + :returns: Path to saved file, or ``None`` if saving was skipped. + """ + protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_hash = _hash_fingerprint(protocol_fp) + + data: dict[str, Any] = { + "server-probe": { + "fingerprint": protocol_hash, + "fingerprint-data": protocol_fp, + "session_data": session_data, + }, + "sessions": [session_entry], + } + + # Direct save path + if save_path is not None: + save_dir = os.path.dirname(save_path) + if save_dir: + os.makedirs(save_dir, exist_ok=True) + _atomic_json_write(save_path, data) + logger.info("saved server fingerprint to %s", save_path) + return save_path + + # DATA_DIR-based save + data_dir = _fps.DATA_DIR + if data_dir is None: + return None + if not os.path.isdir(data_dir): + os.makedirs(data_dir, exist_ok=True) + + session_identity = { + "host": session_entry["host"], + "port": session_entry["port"], + "ip": session_entry["ip"], + } + session_hash = _hash_fingerprint(session_identity) + server_dir = os.path.join(data_dir, "server", protocol_hash) + + return _save_fingerprint_to_dir( + target_dir=server_dir, + session_hash=session_hash, + data=data, + probe_key="server-probe", + data_dir=data_dir, + side="server", + protocol_hash=protocol_hash, + ) + + +def _format_banner(data: bytes, encoding: str = "utf-8") -> str: + """ + Format raw banner bytes for JSON serialization. + + Default ``"utf-8"`` is intentional -- banners are typically UTF-8 + regardless of ``environ_encoding``; callers may override. + + :param data: Raw bytes from the server. + :param encoding: Character encoding to use for decoding. + :returns: Decoded text string (undecodable bytes replaced). + """ + return data.decode(encoding, errors="replace") + + +async def _read_banner(reader: TelnetReader, timeout: float = _BANNER_WAIT) -> bytes: + """ + Read up to :data:`_BANNER_MAX_BYTES` from *reader* with timeout. + + Returns whatever bytes were received before the timeout, which may + be empty if the server sends nothing. + + :param reader: :class:`~telnetlib3.stream_reader.TelnetReader` instance. + :param timeout: Seconds to wait for data. + :returns: Banner bytes (may be empty). + """ + try: + data = await asyncio.wait_for(reader.read(_BANNER_MAX_BYTES), timeout=timeout) + except (asyncio.TimeoutError, EOFError): + data = b"" + return data diff --git a/telnetlib3/server_pty_shell.py b/telnetlib3/server_pty_shell.py index 46992828..948f29c5 100644 --- a/telnetlib3/server_pty_shell.py +++ b/telnetlib3/server_pty_shell.py @@ -238,10 +238,10 @@ def _setup_child( # terminal responses from being echoed back through the PTY. attrs[3] &= ~(termios.ECHO | termios.ICANON) else: - # Normal mode: Keep ECHO and ICANON enabled for proper input() - # behavior. We sent WONT ECHO to the client, so the PTY handles echo - # with proper output translation (ONLCR: \n → \r\n). - pass + # Normal mode: keep ICANON for line editing but disable ECHO. + # We sent WONT ECHO so the client does local echo; if the PTY + # also echoed, every character would appear twice. + attrs[3] &= ~termios.ECHO # Set VERASE to ^H (0x08) since many telnet clients send ^H for backspace # (default PTY ERASE is often ^? which won't work for those clients). diff --git a/telnetlib3/server_shell.py b/telnetlib3/server_shell.py index 8de08cd9..086cea39 100644 --- a/telnetlib3/server_shell.py +++ b/telnetlib3/server_shell.py @@ -133,10 +133,10 @@ def _visible_width(text: str) -> int: class _LineEditor: # pylint: disable=too-few-public-methods """Shared line-editing state machine for readline and readline_async.""" - def __init__(self, maxvis: int = 0) -> None: + def __init__(self, max_visible_width: int = 0) -> None: self.command: str = "" self.last_char: str = "" - self.maxvis: int = maxvis + self.max_visible_width: int = max_visible_width def feed(self, char: str) -> tuple[str, Optional[str]]: """Feed one character, return (echo_str, command_or_none).""" @@ -160,15 +160,15 @@ def feed(self, char: str) -> tuple[str, Optional[str]]: return echo, None return "", None - # Regular character -- check maxvis + # Regular character -- check max_visible_width self.last_char = char - if self.maxvis and _visible_width(self.command + char) > self.maxvis: + if self.max_visible_width and _visible_width(self.command + char) > self.max_visible_width: return "", None self.command += char return char, None -__all__ = ("telnet_server_shell", "readline_async", "readline2", "readline") +__all__ = ("telnet_server_shell", "readline_async", "readline") async def telnet_server_shell( # pylint: disable=too-complex,too-many-branches,too-many-statements @@ -302,17 +302,17 @@ async def get_next_ascii( def readline( _reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], - maxvis: int = 0, + max_visible_width: int = 0, ) -> Generator[Optional[str], str, None]: """ Blocking readline using generator yield/send protocol. Characters are fed in via ``send()`` and complete lines are yielded. - Uses ``_LineEditor`` for grapheme-aware backspace and maxvis + Uses ``_LineEditor`` for grapheme-aware backspace and max_visible_width support. """ _writer = cast(TelnetWriterUnicode, writer) - editor = _LineEditor(maxvis=maxvis) + editor = _LineEditor(max_visible_width=max_visible_width) inp = yield None while True: echo, cmd = editor.feed(inp) @@ -324,17 +324,17 @@ def readline( async def readline_async( reader: Union[TelnetReader, TelnetReaderUnicode], writer: Union[TelnetWriter, TelnetWriterUnicode], - maxvis: int = 0, + max_visible_width: int = 0, ) -> Optional[str]: """ Async readline that filters ANSI escape sequences. Uses ``filter_ansi()`` to strip escape sequences and - ``_LineEditor`` for grapheme-aware backspace and maxvis support. + ``_LineEditor`` for grapheme-aware backspace and max_visible_width support. """ _reader = cast(TelnetReaderUnicode, reader) _writer = cast(TelnetWriterUnicode, writer) - editor = _LineEditor(maxvis=maxvis) + editor = _LineEditor(max_visible_width=max_visible_width) while True: next_char = await filter_ansi(_reader, _writer) if not next_char: diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index 925ae9d9..16ff6437 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -197,6 +197,11 @@ def __init__( #: indicating state of remote capabilities. self.remote_option = Option("remote_option", self.log, on_change=self._check_waiters) + #: Encoding used for NEW_ENVIRON variable names and values. + #: Default ``"ascii"`` per :rfc:`1572`; set to ``"cp037"`` for + #: EBCDIC hosts such as IBM OS/400. + self.environ_encoding: str = "ascii" + #: Set of option byte(s) for WILL received from remote end #: that were rejected with DONT (unhandled options). self.rejected_will: set[bytes] = set() @@ -205,6 +210,14 @@ def __init__( #: that were rejected with WONT (unsupported options). self.rejected_do: set[bytes] = set() + #: Raw bytes of the last NEW_ENVIRON SEND payload, captured + #: for fingerprinting. ``None`` if no SEND was received. + self.environ_send_raw: Optional[bytes] = None + + #: Decoded MSSP variables received via subnegotiation. + #: ``None`` until a ``SB MSSP`` payload is received and decoded. + self.mssp_data: Optional[dict[str, str | list[str]]] = None + #: Sub-negotiation buffer self._sb_buffer: collections.deque[bytes] = collections.deque() @@ -809,7 +822,7 @@ def send_iac(self, buf: bytes) -> None: """ assert isinstance(buf, (bytes, bytearray)), buf assert buf and buf.startswith(IAC), buf - if self._transport is not None: + if not self.is_closing(): self._transport.write(buf) if hasattr(self._protocol, "_tx_bytes"): self._protocol._tx_bytes += len(buf) @@ -900,51 +913,45 @@ def send_eor(self) -> bool: self.send_iac(IAC + CMD_EOR) return True - def send_gmcp(self, package: str, data: Any = None) -> bool: + def send_gmcp(self, package: str, data: Any = None) -> None: """ Transmit a GMCP message via subnegotiation. :param package: GMCP package name (e.g., ``"Char.Vitals"``) :param data: Optional data to encode as JSON - :returns: True if sent, False if GMCP is not negotiated """ if not (self.local_option.enabled(GMCP) or self.remote_option.enabled(GMCP)): self.log.debug("cannot send GMCP without negotiation") - return False + return payload = self._escape_iac(gmcp_encode(package, data)) self.log.debug("send IAC SB GMCP %s IAC SE", package) self.send_iac(IAC + SB + GMCP + payload + IAC + SE) - return True - def send_msdp(self, variables: dict[str, Any]) -> bool: + def send_msdp(self, variables: dict[str, Any]) -> None: """ Transmit MSDP variables via subnegotiation. :param variables: Dictionary of variable names to values - :returns: True if sent, False if MSDP is not negotiated """ if not (self.local_option.enabled(MSDP) or self.remote_option.enabled(MSDP)): self.log.debug("cannot send MSDP without negotiation") - return False + return payload = self._escape_iac(msdp_encode(variables)) self.log.debug("send IAC SB MSDP IAC SE") self.send_iac(IAC + SB + MSDP + payload + IAC + SE) - return True - def send_mssp(self, variables: dict[str, str | list[str]]) -> bool: + def send_mssp(self, variables: dict[str, str | list[str]]) -> None: """ Transmit MSSP variables via subnegotiation. :param variables: Dictionary of variable names to values - :returns: True if sent, False if MSSP is not negotiated """ if not (self.local_option.enabled(MSSP) or self.remote_option.enabled(MSSP)): self.log.debug("cannot send MSSP without negotiation") - return False + return payload = self._escape_iac(mssp_encode(variables)) self.log.debug("send IAC SB MSSP IAC SE") self.send_iac(IAC + SB + MSSP + payload + IAC + SE) - return True # Public methods for notifying about, or soliciting state options. # @@ -1059,7 +1066,7 @@ def request_environ(self) -> bool: response.append(env_key) else: response.extend([VAR]) - response.extend([_escape_environ(env_key.encode("ascii"))]) + response.extend([_escape_environ(env_key.encode(self.environ_encoding, "replace"))]) response.extend([IAC, SE]) self.log.debug("request_environ: %r", b"".join(response)) self.pending_option[SB + NEW_ENVIRON] = True @@ -1136,7 +1143,8 @@ def request_forwardmask(self, fmask: Optional[slc.Forwardmask] = None) -> bool: self.log.debug("send IAC SE") self.send_iac(IAC + SB + LINEMODE + DO + slc.LMODE_FORWARDMASK) - self._transport.write(fmask.value) + if not self.is_closing(): + self._transport.write(fmask.value) self.send_iac(IAC + SE) return True @@ -1182,7 +1190,8 @@ def send_linemode(self, linemode: Optional[slc.Linemode] = None) -> None: self.log.debug("send IAC SB LINEMODE LINEMODE-MODE %r IAC SE", self._linemode) self.send_iac(IAC + SB + LINEMODE + slc.LMODE_MODE) - self._transport.write(self._linemode.mask) + if not self.is_closing(): + self._transport.write(self._linemode.mask) self.send_iac(IAC + SE) # Public is-a-command (IAC) callbacks @@ -1544,16 +1553,30 @@ def handle_charset(self, charset: str) -> None: self.log.debug("Character set: %s", charset) def handle_gmcp(self, package: str, data: Any) -> None: - """Receive GMCP message with ``package`` name and ``data``.""" + """ + Receive GMCP message with ``package`` name and ``data``. + + :param package: GMCP package name (e.g., ``"Char.Vitals"``). + :param data: Decoded JSON value -- may be any JSON type + (``str``, ``int``, ``float``, ``bool``, ``None``, + ``list``, or ``dict``). + """ self.log.debug("GMCP: %s %r", package, data) def handle_msdp(self, variables: dict[str, Any]) -> None: - """Receive MSDP variables as dict.""" + """ + Receive MSDP variables as dict. + + :param variables: Mapping of variable names to values. Values + may be ``str``, ``dict[str, Any]`` (MSDP table), or + ``list[Any]`` (MSDP array) per the MSDP wire format. + """ self.log.debug("MSDP: %r", variables) def handle_mssp(self, variables: dict[str, str | list[str]]) -> None: """Receive MSSP variables as dict.""" self.log.debug("MSSP: %r", variables) + self.mssp_data = variables def handle_send_client_charset(self, _charsets: list[str]) -> str: """ @@ -1633,8 +1656,18 @@ def handle_do(self, opt: bytes) -> bool: # servers, such as dgamelaunch (nethack.alt.org) freeze up # unless we answer IAC-WONT-ECHO. self.iac(WONT, ECHO) - elif self.server and opt in (LINEMODE, TTYPE, NAWS, NEW_ENVIRON, XDISPLOC, LFLOW): - raise ValueError(f"cannot recv DO {name_command(opt)} on server end (ignored).") + elif self.server and opt in ( + LINEMODE, + TTYPE, + NAWS, + NEW_ENVIRON, + XDISPLOC, + LFLOW, + TSPEED, + SNDLOC, + ): + self.log.debug("recv DO %s on server end, refusing.", name_command(opt)) + self.iac(WONT, opt) elif self.client and opt in (LOGOUT,): raise ValueError(f"cannot recv DO {name_command(opt)} on client end (ignored).") elif opt == TM: @@ -1702,7 +1735,7 @@ def handle_dont(self, opt: bytes) -> None: """ self.log.debug("handle_dont(%s)", name_command(opt)) if opt == LOGOUT: - assert self.server, "cannot recv DONT LOGOUT on server end" + assert self.server, "cannot recv DONT LOGOUT on client end" self._ext_callback[LOGOUT](DONT) # many implementations (wrongly!) sent a WONT in reply to DONT. It # sounds reasonable, but it can and will cause telnet loops. (ruby?) @@ -1731,8 +1764,11 @@ def handle_will(self, opt: bytes) -> None: unsupported capabilities, RFC specifies a response of (IAC, DONT, opt). Similarly, set ``self.remote_option[opt]`` to ``False``. - :raises ValueError: When an invalid WILL command is received for the - current endpoint role (server/client). + Options received in the wrong direction (e.g. WILL NAWS on client + end) are gracefully refused with DONT per Postel's law (RFC 1123). + + :raises ValueError: When WILL ECHO is received on server end, or + when WILL TM is received without prior DO TM. """ self.log.debug("handle_will(%s)", name_command(opt)) @@ -1740,7 +1776,9 @@ def handle_will(self, opt: bytes) -> None: if opt == ECHO and self.server: raise ValueError("cannot recv WILL ECHO on server end") if opt in (NAWS, LINEMODE, SNDLOC) and self.client: - raise ValueError(f"cannot recv WILL {name_command(opt)} on client end") + self.log.debug("recv WILL %s on client end, refusing.", name_command(opt)) + self.iac(DONT, opt) + return if not self.remote_option.enabled(opt): self.iac(DO, opt) self.remote_option[opt] = True @@ -1759,7 +1797,7 @@ def handle_will(self, opt: bytes) -> None: elif opt == LOGOUT: if self.client: - raise ValueError("cannot recv WILL LOGOUT on server end") + raise ValueError("cannot recv WILL LOGOUT on client end") self._ext_callback[LOGOUT](WILL) elif opt == STATUS: @@ -1778,7 +1816,9 @@ def handle_will(self, opt: bytes) -> None: # # Though Others -- XDISPLOC, TTYPE, TSPEED, are 1-directional. if not self.server and opt not in (CHARSET,): - raise ValueError(f"cannot recv WILL {name_command(opt)} on client end.") + self.log.debug("recv WILL %s on client end, refusing.", name_command(opt)) + self.iac(DONT, opt) + return # First, we need to acknowledge WILL with DO for all options # This was missing for CHARSET when received by client @@ -1948,9 +1988,11 @@ def _handle_sb_charset(self, buf: collections.deque[bytes]) -> None: response.extend([IAC, SE]) self.log.debug("send IAC SB CHARSET ACCEPTED %s IAC SE", selected) self.send_iac(b"".join(response)) + self.environ_encoding = selected elif opt == ACCEPTED: charset = b"".join(buf).decode("ascii") self.log.debug("recv IAC SB CHARSET ACCEPTED %s IAC SE", charset) + self.environ_encoding = charset self._ext_callback[CHARSET](charset) elif opt == REJECTED: self.log.warning("recv IAC SB CHARSET REJECTED IAC SE") @@ -2072,9 +2114,17 @@ def _handle_sb_environ(self, buf: collections.deque[bytes]) -> None: assert cmd == NEW_ENVIRON, (cmd, name_command(cmd)) assert opt in (IS, SEND, INFO), opt opt_kind = {IS: "IS", INFO: "INFO", SEND: "SEND"}.get(opt) - self.log.debug("recv %s %s: %r", name_command(cmd), opt_kind, b"".join(buf)) + raw = b"".join(buf) - env = _decode_env_buf(b"".join(buf)) + if opt == SEND: + self.environ_send_raw = raw + + env = _decode_env_buf(raw, encoding=self.environ_encoding) + env_keys = [k for k in env if k] + if env_keys: + self.log.debug("recv %s %s: %s", name_command(cmd), opt_kind, ", ".join(env_keys)) + else: + self.log.debug("recv %s %s (all)", name_command(cmd), opt_kind) if opt in (IS, INFO): assert self.server, f"SE: cannot recv from server: {name_command(cmd)} {opt_kind}" @@ -2093,9 +2143,15 @@ def _handle_sb_environ(self, buf: collections.deque[bytes]) -> None: assert self.client, f"SE: cannot recv from client: {name_command(cmd)} {opt_kind}" # client-side, we do _not_ honor the 'send all VAR' or 'send all # USERVAR' requests -- it is a small bit of a security issue. - send_env = _encode_env_buf(self._ext_send_callback[NEW_ENVIRON](env.keys())) + reply_env = self._ext_send_callback[NEW_ENVIRON](env.keys()) + send_env = _encode_env_buf(reply_env, encoding=self.environ_encoding) response = [IAC, SB, NEW_ENVIRON, IS, send_env, IAC, SE] - self.log.debug("env send: %r", response) + if reply_env: + self.log.debug( + "env send: %s", ", ".join(f"{k}={v!r}" for k, v in reply_env.items()) + ) + else: + self.log.debug("env send: (empty)") self.send_iac(b"".join(response)) if self.pending_option.enabled(WILL + TTYPE): self.pending_option[WILL + TTYPE] = False @@ -2190,72 +2246,71 @@ def _receive_status(self, buf: collections.deque[bytes]) -> None: """ Callback responds to IAC SB STATUS IS, :rfc:`859`. - :param buf: sub-negotiation byte buffer containing status data. This implementation does its - best to analyze our perspective's state to the state options given. Any discrepancies - are reported to the error log, but no action is taken. This implementation handles - malformed STATUS data gracefully by skipping invalid command bytes and continuing to - process the remaining data. + :param buf: sub-negotiation byte buffer containing status data. + Parses ``WILL/WONT/DO/DONT `` pairs and ``SB SE`` + blocks. Compares the remote peer's reported option state against + our own and logs a summary of agreed, disagreed, and subnegotiation + parameters. """ - # Convert deque to list for processing buf_list = list(buf) + agreed = [] + disagreed = [] + sb_info = [] - # Process command-option pairs, handling malformed data gracefully i = 0 while i < len(buf_list): if i + 1 >= len(buf_list): - # Odd number of bytes remaining, log and skip - self.log.warning("STATUS: incomplete pair at end, skipping byte: %s", buf_list[i]) + self.log.debug("STATUS: trailing byte: %s", buf_list[i]) break cmd = buf_list[i] - opt = buf_list[i + 1] - # Skip invalid command bytes with a warning + # SB SE block + if cmd == SB: + opt = buf_list[i + 1] + # find matching SE + se_idx = None + for j in range(i + 2, len(buf_list)): + if buf_list[j] == SE: + se_idx = j + break + if se_idx is None: + sb_data = b"".join(buf_list[i + 2 :]) + sb_info.append(f"{name_command(opt)} {sb_data.hex()}") + break + sb_data = b"".join(buf_list[i + 2 : se_idx]) + sb_info.append(_format_sb_status(opt, sb_data)) + i = se_idx + 1 + continue + if cmd not in (DO, DONT, WILL, WONT): - self.log.warning( - "STATUS: invalid cmd at pos %s: %s, skipping. Expected DO DONT WILL WONT.", - i, - cmd, - ) - # Try to resync by looking for the next valid command + self.log.debug("STATUS: unknown byte at pos %d: %s", i, cmd) i += 1 continue - matching = False + opt = buf_list[i + 1] + opt_name = name_command(opt) if cmd in (DO, DONT): - _side = "local" enabled = self.local_option.enabled(opt) matching = (cmd == DO and enabled) or (cmd == DONT and not enabled) - else: # (WILL, WONT) - _side = "remote" + else: enabled = self.remote_option.enabled(opt) matching = (cmd == WILL and enabled) or (cmd == WONT and not enabled) - _mode = "enabled" if enabled else "not enabled" - if not matching: - self.log.error( - "STATUS %s %s: disagreed, %s option is %s.", - name_command(cmd), - name_command(opt), - _side, - _mode, - ) - self.log.error( - "remote %r is %s", - [(name_commands(_opt), _val) for _opt, _val in self.remote_option.items()], - self.remote_option.enabled(opt), - ) - self.log.error( - " local %r is %s", - [(name_commands(_opt), _val) for _opt, _val in self.local_option.items()], - self.local_option.enabled(opt), - ) + if matching: + agreed.append(opt_name) else: - self.log.debug("STATUS %s %s (agreed).", name_command(cmd), name_command(opt)) + disagreed.append(opt_name) - # Move to next pair i += 2 + if agreed: + self.log.debug("STATUS agreed: %s", ", ".join(agreed)) + if disagreed: + self.log.debug("STATUS disagreed: %s", ", ".join(disagreed)) + if sb_info: + self.log.debug("STATUS subneg: %s", "; ".join(sb_info)) + def _send_status(self) -> None: """Callback responds to IAC SB STATUS SEND, :rfc:`859`.""" if not (self.pending_option.enabled(WILL + STATUS) or self.local_option.enabled(STATUS)): @@ -2398,7 +2453,8 @@ def _slc_end(self) -> None: if len(self._slc_buffer): self.log.debug("send (slc_end): %r", b"".join(self._slc_buffer)) buf = b"".join(self._slc_buffer) - self._transport.write(self._escape_iac(buf)) + if not self.is_closing(): + self._transport.write(self._escape_iac(buf)) self._slc_buffer.clear() self.log.debug("slc_end: [..] IAC SE") @@ -2626,7 +2682,8 @@ def _handle_sb_gmcp(self, buf: collections.deque[bytes]) -> None: """ buf.popleft() payload = b"".join(buf) - package, data = gmcp_decode(payload) + encoding = self.environ_encoding or "utf-8" + package, data = gmcp_decode(payload, encoding=encoding) self._ext_callback[GMCP](package, data) def _handle_sb_msdp(self, buf: collections.deque[bytes]) -> None: @@ -2637,7 +2694,8 @@ def _handle_sb_msdp(self, buf: collections.deque[bytes]) -> None: """ buf.popleft() payload = b"".join(buf) - variables = msdp_decode(payload) + encoding = self.environ_encoding or "utf-8" + variables = msdp_decode(payload, encoding=encoding) self._ext_callback[MSDP](variables) def _handle_sb_mssp(self, buf: collections.deque[bytes]) -> None: @@ -2648,7 +2706,8 @@ def _handle_sb_mssp(self, buf: collections.deque[bytes]) -> None: """ buf.popleft() payload = b"".join(buf) - variables = mssp_decode(payload) + encoding = self.environ_encoding or "utf-8" + variables = mssp_decode(payload, encoding=encoding) self._ext_callback[MSSP](variables) def _handle_do_forwardmask(self, buf: collections.deque[bytes]) -> None: @@ -2656,12 +2715,12 @@ def _handle_do_forwardmask(self, buf: collections.deque[bytes]) -> None: Callback handles request for LINEMODE DO FORWARDMASK. :param buf: bytes following IAC SB LINEMODE DO FORWARDMASK. - :raises NotImplementedError: """ - raise NotImplementedError + mask = b"".join(buf) + self.log.debug("FORWARDMASK received (%d bytes), not applied", len(mask)) -class TelnetWriterUnicode(TelnetWriter): # pylint: disable=abstract-method +class TelnetWriterUnicode(TelnetWriter): """ A Unicode StreamWriter interface for Telnet protocol. @@ -2823,11 +2882,12 @@ def _unescape_environ(buf: bytes) -> bytes: return buf.replace(ESC + VAR, VAR).replace(ESC + USERVAR, USERVAR) -def _encode_env_buf(env: dict[str, str]) -> bytes: +def _encode_env_buf(env: dict[str, str], encoding: str = "ascii") -> bytes: """ Encode dictionary for transmission as environment variables, :rfc:`1572`. :param env: dictionary of environment values. + :param encoding: Character encoding for names and values. :returns: buffer meant to follow sequence IAC SB NEW_ENVIRON IS. It is not terminated by IAC SE. @@ -2837,13 +2897,35 @@ def _encode_env_buf(env: dict[str, str]) -> bytes: buf: collections.deque[bytes] = collections.deque() for key, value in env.items(): buf.append(VAR) - buf.extend([_escape_environ(key.encode("ascii"))]) + buf.extend([_escape_environ(key.encode(encoding, "replace"))]) buf.append(VALUE) - buf.extend([_escape_environ(f"{value}".encode("ascii"))]) + buf.extend([_escape_environ(f"{value}".encode(encoding, "replace"))]) return b"".join(buf) -def _decode_env_buf(buf: bytes) -> dict[str, str]: +def _format_sb_status(opt: bytes, data: bytes) -> str: + """ + Format a STATUS IS subnegotiation block as a human-readable string. + + :param opt: Option byte (e.g. NAWS, TTYPE). + :param data: Subnegotiation payload bytes. + :returns: Descriptive string like ``"NAWS 80x25"`` or ``"TTYPE IS VT100"``. + """ + opt_name = name_command(opt) + if opt == NAWS and len(data) == 4: + w = (data[0] << 8) | data[1] + h = (data[2] << 8) | data[3] + return f"{opt_name} {w}x{h}" + if opt in (TTYPE, XDISPLOC, SNDLOC) and len(data) >= 2: + kind = {IS: "IS", SEND: "SEND"}.get(data[0:1], data[0:1].hex()) + text = data[1:].decode("ascii", errors="replace") + return f"{opt_name} {kind} {text}" + if data: + return f"{opt_name} {data.hex()}" + return opt_name + + +def _decode_env_buf(buf: bytes, encoding: str = "ascii") -> dict[str, str]: """ Decode environment values to dictionary, :rfc:`1572`. @@ -2872,11 +2954,11 @@ def _decode_env_buf(buf: bytes) -> dict[str, str]: end = breaks[idx + 1] pair = buf[start:end].split(VALUE, 1) - key = _unescape_environ(pair[0]).decode("ascii", "strict") + key = _unescape_environ(pair[0]).decode(encoding, "replace") if len(pair) == 1: value = "" else: - value = _unescape_environ(pair[1]).decode("ascii", "strict") + value = _unescape_environ(pair[1]).decode(encoding, "replace") env[key] = value return env diff --git a/telnetlib3/sync.py b/telnetlib3/sync.py index 2cece1fc..f2bf0b0a 100644 --- a/telnetlib3/sync.py +++ b/telnetlib3/sync.py @@ -733,8 +733,11 @@ def close(self) -> None: if self._closed: return self._closed = True - self._loop.call_soon_threadsafe(self._writer.close) - self._loop.call_soon_threadsafe(self._close_event.set) + try: + self._loop.call_soon_threadsafe(self._writer.close) + self._loop.call_soon_threadsafe(self._close_event.set) + except RuntimeError: + pass # Event loop already closed during shutdown def get_extra_info(self, name: str, default: Any = None) -> Any: """ diff --git a/telnetlib3/telopt.py b/telnetlib3/telopt.py index 16b3660a..7a3f9aa4 100644 --- a/telnetlib3/telopt.py +++ b/telnetlib3/telopt.py @@ -75,6 +75,7 @@ NAOLFD = b"\x10" __all__ = ( + "AARDWOLF", "ABORT", "ACCEPTED", "AO", @@ -201,6 +202,7 @@ MSP = bytes([90]) MXP = bytes([91]) ZMP = bytes([93]) +AARDWOLF = bytes([102]) ATCP = bytes([200]) # MSDP sub-command bytes (used within SB MSDP payloads) @@ -269,6 +271,7 @@ "MSP", "MXP", "ZMP", + "AARDWOLF", "ATCP", "ENCRYPT", "AUTHENTICATION", diff --git a/telnetlib3/tests/test_benchmarks.py b/telnetlib3/tests/test_benchmarks.py index c0bd38df..ca58ec27 100644 --- a/telnetlib3/tests/test_benchmarks.py +++ b/telnetlib3/tests/test_benchmarks.py @@ -23,6 +23,9 @@ def write(self, data): def get_write_buffer_size(self): return 0 + def is_closing(self): + return False + class MockProtocol: """Minimal protocol mock for benchmarking.""" diff --git a/telnetlib3/tests/test_fingerprinting.py b/telnetlib3/tests/test_fingerprinting.py index ce4803dc..75403263 100644 --- a/telnetlib3/tests/test_fingerprinting.py +++ b/telnetlib3/tests/test_fingerprinting.py @@ -565,19 +565,6 @@ async def test_probe_already_negotiated(opt, value, name, expected_status): assert results[name]["already_negotiated"] is True -@pytest.mark.asyncio -async def test_probe_with_progress_callback(): - called = [] - writer = MockWriter(will_options=[fps.BINARY]) - await fps.probe_client_capabilities( - writer, - options=[(fps.BINARY, "BINARY", "test")], - timeout=0.01, - progress_callback=lambda name, idx, total, status: called.append((name, idx)), - ) - assert called == [("BINARY", 1)] - - def test_get_client_fingerprint(): writer = MockWriter( extra={ diff --git a/telnetlib3/tests/test_mud.py b/telnetlib3/tests/test_mud.py index f9e929e7..299a3da1 100644 --- a/telnetlib3/tests/test_mud.py +++ b/telnetlib3/tests/test_mud.py @@ -58,6 +58,20 @@ def test_gmcp_nested_json() -> None: assert decoded_data == data +def test_gmcp_decode_encoding_param() -> None: + """gmcp_decode uses the encoding parameter for decoding.""" + decoded_pkg, decoded_data = gmcp_decode(b"Caf\xe9", encoding="latin-1") + assert decoded_pkg == "Caf\xe9" + assert decoded_data is None + + +def test_gmcp_decode_latin1_fallback() -> None: + """gmcp_decode falls back to latin-1 when utf-8 fails.""" + decoded_pkg, decoded_data = gmcp_decode(b"Caf\xe9") + assert decoded_pkg == "Caf\xe9" + assert decoded_data is None + + def test_gmcp_decode_invalid_json() -> None: """Decode GMCP with invalid JSON raises ValueError.""" with pytest.raises(ValueError): @@ -122,6 +136,20 @@ def test_msdp_empty_value() -> None: assert decoded == variables +def test_msdp_decode_encoding_param() -> None: + """msdp_decode uses the encoding parameter for decoding.""" + encoded = MSDP_VAR + b"KEY" + MSDP_VAL + b"Caf\xe9" + decoded = msdp_decode(encoded, encoding="latin-1") + assert decoded == {"KEY": "Caf\xe9"} + + +def test_msdp_decode_latin1_fallback() -> None: + """msdp_decode falls back to latin-1 when utf-8 fails.""" + encoded = MSDP_VAR + b"KEY" + MSDP_VAL + b"Caf\xe9" + decoded = msdp_decode(encoded) + assert decoded == {"KEY": "Caf\xe9"} + + def test_mssp_single_value() -> None: """Encode and decode MSSP with single values.""" variables = {"NAME": "TestMUD", "UPTIME": "12345"} @@ -168,3 +196,17 @@ def test_mssp_decode_multi_returns_list() -> None: decoded = mssp_decode(encoded) assert decoded["SINGLE"] == "one" assert decoded["MULTI"] == ["first", "second"] + + +def test_mssp_decode_encoding_param() -> None: + """mssp_decode uses the encoding parameter for decoding.""" + encoded = MSSP_VAR + b"NAME" + MSSP_VAL + b"\xc9toile" + decoded = mssp_decode(encoded, encoding="latin-1") + assert decoded == {"NAME": "\xc9toile"} + + +def test_mssp_decode_latin1_fallback() -> None: + """mssp_decode falls back to latin-1 when utf-8 fails.""" + encoded = MSSP_VAR + b"NAME" + MSSP_VAL + b"\xc9toile" + decoded = mssp_decode(encoded) + assert decoded == {"NAME": "\xc9toile"} diff --git a/telnetlib3/tests/test_mud_negotiation.py b/telnetlib3/tests/test_mud_negotiation.py index 96cd4c04..2f1c96fa 100644 --- a/telnetlib3/tests/test_mud_negotiation.py +++ b/telnetlib3/tests/test_mud_negotiation.py @@ -165,19 +165,80 @@ def callback(variables): assert received_args[0] == {"NAME": "TestMUD"} +def test_handle_mssp_stores_data(): + w, t, p = new_writer(server=True) + assert w.mssp_data is None + w.handle_mssp({"NAME": "TestMUD", "PLAYERS": "42"}) + assert w.mssp_data == {"NAME": "TestMUD", "PLAYERS": "42"} + + +def test_sb_mssp_dispatch_stores_data(): + w, t, p = new_writer(server=True) + w.pending_option[SB + MSSP] = True + + # local + from telnetlib3.telopt import MSSP_VAL, MSSP_VAR + + payload = MSSP_VAR + b"NAME" + MSSP_VAL + b"TestMUD" + MSSP_VAR + b"PLAYERS" + MSSP_VAL + b"5" + buf = collections.deque([bytes([MSSP[0]])] + [bytes([b]) for b in payload]) + w.handle_subnegotiation(buf) + assert w.mssp_data == {"NAME": "TestMUD", "PLAYERS": "5"} + + +def test_sb_mssp_latin1_fallback(): + """MSSP with non-UTF-8 bytes falls back to latin-1 decoding.""" + w, t, p = new_writer(server=True) + w.pending_option[SB + MSSP] = True + + # local + from telnetlib3.telopt import MSSP_VAL, MSSP_VAR + + # 0xC9 is 'É' in latin-1 but invalid as a lone UTF-8 lead byte + payload = MSSP_VAR + b"NAME" + MSSP_VAL + b"\xc9toile" + buf = collections.deque([bytes([MSSP[0]])] + [bytes([b]) for b in payload]) + w.handle_subnegotiation(buf) + assert w.mssp_data == {"NAME": "\xc9toile"} + + +def test_sb_gmcp_latin1_fallback(): + """GMCP with non-UTF-8 bytes falls back to latin-1 decoding.""" + w, t, p = new_writer(server=True) + w.pending_option[SB + GMCP] = True + received_args: list[tuple[object, ...]] = [] + w.set_ext_callback(GMCP, lambda pkg, data: received_args.append((pkg, data))) + payload = b"Caf\xe9" + buf = collections.deque([bytes([GMCP[0]])] + [bytes([b]) for b in payload]) + w.handle_subnegotiation(buf) + assert received_args[0] == ("Caf\xe9", None) + + +def test_sb_msdp_latin1_fallback(): + """MSDP with non-UTF-8 bytes falls back to latin-1 decoding.""" + w, t, p = new_writer(server=True) + w.pending_option[SB + MSDP] = True + + # local + from telnetlib3.telopt import MSDP_VAL, MSDP_VAR + + received_args: list[object] = [] + w.set_ext_callback(MSDP, received_args.append) + payload = MSDP_VAR + b"KEY" + MSDP_VAL + b"Caf\xe9" + buf = collections.deque([bytes([MSDP[0]])] + [bytes([b]) for b in payload]) + w.handle_subnegotiation(buf) + assert received_args[0] == {"KEY": "Caf\xe9"} + + def test_send_gmcp(): w, t, p = new_writer(server=True) w.local_option[GMCP] = True - result = w.send_gmcp("Char.Vitals", {"hp": 100}) - assert result is True + w.send_gmcp("Char.Vitals", {"hp": 100}) expected = IAC + SB + GMCP + b'Char.Vitals {"hp":100}' + IAC + SE assert expected in t.writes def test_send_gmcp_not_negotiated(): w, t, p = new_writer(server=True) - result = w.send_gmcp("Char.Vitals", {"hp": 100}) - assert result is False + w.send_gmcp("Char.Vitals", {"hp": 100}) assert len(t.writes) == 0 @@ -188,8 +249,7 @@ def test_send_msdp(): # local from telnetlib3.telopt import MSDP_VAL, MSDP_VAR - result = w.send_msdp({"HEALTH": "100"}) - assert result is True + w.send_msdp({"HEALTH": "100"}) expected = IAC + SB + MSDP + MSDP_VAR + b"HEALTH" + MSDP_VAL + b"100" + IAC + SE assert expected in t.writes @@ -201,7 +261,6 @@ def test_send_mssp(): # local from telnetlib3.telopt import MSSP_VAL, MSSP_VAR - result = w.send_mssp({"NAME": "TestMUD"}) - assert result is True + w.send_mssp({"NAME": "TestMUD"}) expected = IAC + SB + MSSP + MSSP_VAR + b"NAME" + MSSP_VAL + b"TestMUD" + IAC + SE assert expected in t.writes diff --git a/telnetlib3/tests/test_server_fingerprinting.py b/telnetlib3/tests/test_server_fingerprinting.py new file mode 100644 index 00000000..55bef9c5 --- /dev/null +++ b/telnetlib3/tests/test_server_fingerprinting.py @@ -0,0 +1,547 @@ +# std imports +import json +import asyncio + +# 3rd party +import pytest + +# local +from telnetlib3 import fingerprinting as fps +from telnetlib3 import server_fingerprinting as sfp +from telnetlib3.telopt import VAR, USERVAR + + +class MockOption(dict): + def __init__(self, values=None): + super().__init__(values or {}) + + def enabled(self, opt): + return self.get(opt) is True + + +class MockWriter: + def __init__(self, extra=None, will_options=None, wont_options=None): + self._extra = extra or {"peername": ("127.0.0.1", 12345)} + self._will_options = set(will_options or []) + self._wont_options = set(wont_options or []) + self._iac_calls = [] + self.remote_option = MockOption() + self.local_option = MockOption() + self.environ_encoding = "ascii" + self.environ_send_raw = None + self.mssp_data = None + self._closing = False + + def get_extra_info(self, key, default=None): + return self._extra.get(key, default) + + def iac(self, cmd, opt): + self._iac_calls.append((cmd, opt)) + if opt in self._will_options: + self.remote_option[opt] = True + elif opt in self._wont_options: + self.remote_option[opt] = False + + def write(self, data): + pass + + async def drain(self): + pass + + def close(self): + self._closing = True + + +class MockReader: + def __init__(self, chunks=None): + self._chunks = list(chunks or []) + self._idx = 0 + + async def read(self, n): + if self._idx >= len(self._chunks): + await asyncio.sleep(10) + return b"" + chunk = self._chunks[self._idx] + self._idx += 1 + return chunk[:n] + + +_BINARY_PROBE = {"BINARY": {"status": "WILL", "opt": fps.BINARY}} + + +def _save(writer=None, save_path=None, **overrides): + session_data = { + "option_states": overrides.pop("option_states", {}), + "banner_before_return": sfp._format_banner(overrides.pop("banner_before", b"")), + "banner_after_return": sfp._format_banner(overrides.pop("banner_after", b"")), + "timing": { + "probe": overrides.pop("probe_time", 0.1), + "total": overrides.pop("total_time", 1.0), + }, + } + session_entry = { + "host": overrides.pop("host", "example.com"), + "port": overrides.pop("port", 23), + "ip": overrides.pop("ip", "10.0.0.1"), + "connected": "2026-01-01T00:00:00+00:00", + } + defaults = { + "writer": writer or MockWriter(extra={"peername": ("10.0.0.1", 23)}), + "probe_results": overrides.pop("probe_results", _BINARY_PROBE), + "session_data": session_data, + "session_entry": session_entry, + } + defaults.update(overrides) + if save_path is not None: + defaults["save_path"] = save_path + return sfp._save_server_fingerprint_data(**defaults) + + +@pytest.mark.asyncio +async def test_probe_server_capabilities(): + options = [(fps.BINARY, "BINARY", ""), (fps.SGA, "SGA", "")] + writer = MockWriter(will_options=[fps.BINARY], wont_options=[fps.SGA]) + results = await sfp.probe_server_capabilities(writer, options=options, timeout=0.01) + assert results["BINARY"]["status"] == "WILL" + assert results["SGA"]["status"] == "WONT" + + +@pytest.mark.parametrize( + "opt,value,name,expected_status", + [ + pytest.param(fps.SGA, False, "SGA", "WONT", id="already_wont"), + pytest.param(fps.BINARY, True, "BINARY", "WILL", id="already_will"), + ], +) +@pytest.mark.asyncio +async def test_probe_already_negotiated(opt, value, name, expected_status): + writer = MockWriter() + writer.remote_option[opt] = value + results = await sfp.probe_server_capabilities( + writer, options=[(opt, name, "test")], timeout=0.01 + ) + assert results[name]["status"] == expected_status + assert results[name]["already_negotiated"] is True + + +@pytest.mark.asyncio +async def test_probe_timeout_and_defaults(): + writer = MockWriter() + results = await sfp.probe_server_capabilities( + writer, options=[(fps.BINARY, "BINARY", "")], timeout=0.01 + ) + assert results["BINARY"]["status"] == "timeout" + + writer2 = MockWriter(wont_options=[fps.BINARY]) + results2 = await sfp.probe_server_capabilities(writer2, timeout=0.01) + assert "BINARY" in results2 + expected = len(fps.ALL_PROBE_OPTIONS) - len(sfp._CLIENT_ONLY_WILL) + assert len(results2) == expected + + +def test_collect_server_option_states(): + writer = MockWriter() + states = sfp._collect_server_option_states(writer) + assert not states["server_offered"] + assert not states["server_requested"] + + writer.remote_option[fps.SGA] = True + writer.remote_option[fps.ECHO] = True + writer.local_option[fps.NAWS] = True + states = sfp._collect_server_option_states(writer) + assert "SGA" in states["server_offered"] + assert "ECHO" in states["server_offered"] + assert "NAWS" in states["server_requested"] + + +def test_create_server_protocol_fingerprint(): + writer = MockWriter() + fp = sfp._create_server_protocol_fingerprint(writer, {}) + assert fp["probed-protocol"] == "server" + assert fp["offered-options"] == [] + assert fp["refused-options"] == [] + assert fp["requested-options"] == [] + + writer.local_option[fps.NAWS] = True + probe = { + "BINARY": {"status": "WILL", "opt": fps.BINARY}, + "SGA": {"status": "WONT", "opt": fps.SGA}, + "ECHO": {"status": "timeout", "opt": fps.ECHO}, + "TTYPE": {"status": "WILL", "opt": fps.TTYPE}, + } + fp = sfp._create_server_protocol_fingerprint(writer, probe) + assert fp["offered-options"] == ["BINARY", "TTYPE"] + assert fp["refused-options"] == ["ECHO", "SGA"] + assert fp["requested-options"] == ["NAWS"] + + +def test_server_fingerprint_hash_consistency(): + probe = { + "BINARY": {"status": "WILL", "opt": fps.BINARY}, + "SGA": {"status": "WONT", "opt": fps.SGA}, + } + fp1 = sfp._create_server_protocol_fingerprint( + MockWriter(extra={"peername": ("10.0.0.1", 23)}), probe + ) + fp2 = sfp._create_server_protocol_fingerprint( + MockWriter(extra={"peername": ("10.0.0.2", 2323)}), probe + ) + h1 = fps._hash_fingerprint(fp1) + h2 = fps._hash_fingerprint(fp2) + assert h1 == h2 and len(h1) == 16 + + +def test_format_banner(): + assert sfp._format_banner(b"Hello\r\nWorld") == "Hello\r\nWorld" + assert not sfp._format_banner(b"") + assert sfp._format_banner(b"\xff\xfe\xfd") == "\ufffd\ufffd\ufffd" + + +@pytest.mark.asyncio +async def test_read_banner(): + reader = MockReader([b"Welcome to BBS\r\n"]) + assert await sfp._read_banner(reader, timeout=0.1) == b"Welcome to BBS\r\n" + + assert await sfp._read_banner(MockReader([]), timeout=0.01) == b"" + + +def test_save_server_fingerprint_data(tmp_path, monkeypatch): + monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) + filepath = _save( + probe_results={ + "BINARY": {"status": "WILL", "opt": fps.BINARY}, + "SGA": {"status": "WONT", "opt": fps.SGA}, + }, + banner_before=b"Welcome", + banner_after=b"Login:", + total_time=3.0, + ) + assert filepath is not None + with open(filepath, encoding="utf-8") as f: + data = json.load(f) + assert data["server-probe"]["fingerprint-data"]["probed-protocol"] == "server" + assert data["sessions"][0]["host"] == "example.com" + assert "server" in filepath + + +def test_save_server_fingerprint_explicit_path(tmp_path): + save_path = str(tmp_path / "result.json") + assert _save(writer=MockWriter(), save_path=save_path) == save_path + with open(save_path, encoding="utf-8") as f: + data = json.load(f) + assert "server-probe" in data + + nested = str(tmp_path / "nested" / "dir" / "result.json") + assert _save(writer=MockWriter(), save_path=nested) == nested + + +def test_save_server_fingerprint_max_files(tmp_path, monkeypatch): + monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) + assert _save() is not None + + monkeypatch.setattr(fps, "FINGERPRINT_MAX_FILES", 0) + w2 = MockWriter(extra={"peername": ("10.0.0.2", 23)}) + assert _save(writer=w2, ip="10.0.0.2") is None + + +def test_save_server_fingerprint_max_fingerprints(tmp_path, monkeypatch): + monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) + monkeypatch.setattr(fps, "FINGERPRINT_MAX_FINGERPRINTS", 0) + assert _save() is None + + +def test_save_server_fingerprint_data_dir_none(monkeypatch): + monkeypatch.setattr(fps, "DATA_DIR", None) + assert _save() is None + + +def test_count_server_fingerprint_folders(tmp_path): + assert fps._count_fingerprint_folders(data_dir=str(tmp_path), side="server") == 0 + assert fps._count_fingerprint_folders(data_dir=None, side="server") == 0 + server_dir = tmp_path / "server" + server_dir.mkdir() + (server_dir / "hash1").mkdir() + (server_dir / "hash2").mkdir() + (server_dir / "not_a_dir.txt").write_text("") + assert fps._count_fingerprint_folders(data_dir=str(tmp_path), side="server") == 2 + + +def test_save_appends_session(tmp_path, monkeypatch): + monkeypatch.setattr(fps, "DATA_DIR", str(tmp_path)) + fp1 = _save() + fp2 = _save() + assert fp1 == fp2 + with open(fp2, encoding="utf-8") as f: + assert len(json.load(f)["sessions"]) == 2 + + +def test_banner_data_in_saved_fingerprint(tmp_path): + save_path = str(tmp_path / "result.json") + _save( + writer=MockWriter(), + save_path=save_path, + banner_before=b"Hello\r\n", + banner_after=b"Login: ", + ) + with open(save_path, encoding="utf-8") as f: + session = json.load(f)["server-probe"]["session_data"] + assert "Hello" in session["banner_before_return"] + assert session["banner_after_return"] == "Login: " + assert "probe" in session["timing"] + + +@pytest.mark.asyncio +async def test_fingerprinting_client_shell(tmp_path, monkeypatch): + monkeypatch.setattr(sfp, "_NEGOTIATION_SETTLE", 0.0) + monkeypatch.setattr(sfp, "_BANNER_WAIT", 0.01) + monkeypatch.setattr(sfp, "_POST_RETURN_WAIT", 0.01) + + save_path = str(tmp_path / "result.json") + reader = MockReader([b"Welcome to BBS\r\nLogin: "]) + writer = MockWriter(will_options=[fps.SGA, fps.ECHO]) + + await sfp.fingerprinting_client_shell( + reader, writer, host="localhost", port=23, save_path=save_path, silent=True + ) + + assert writer._closing + with open(save_path, encoding="utf-8") as f: + data = json.load(f) + assert data["server-probe"]["fingerprint-data"]["probed-protocol"] == "server" + assert data["sessions"][0]["host"] == "localhost" + + +@pytest.mark.asyncio +async def test_fingerprinting_client_shell_no_save(monkeypatch): + monkeypatch.setattr(sfp, "_NEGOTIATION_SETTLE", 0.0) + monkeypatch.setattr(sfp, "_BANNER_WAIT", 0.01) + monkeypatch.setattr(sfp, "_POST_RETURN_WAIT", 0.01) + monkeypatch.setattr(fps, "DATA_DIR", None) + + writer = MockWriter() + await sfp.fingerprinting_client_shell( + MockReader([]), writer, host="localhost", port=23, silent=True + ) + assert writer._closing + + +@pytest.mark.asyncio +async def test_fingerprinting_client_shell_display(tmp_path, monkeypatch, capsys): + monkeypatch.setattr(sfp, "_NEGOTIATION_SETTLE", 0.0) + monkeypatch.setattr(sfp, "_BANNER_WAIT", 0.01) + monkeypatch.setattr(sfp, "_POST_RETURN_WAIT", 0.01) + monkeypatch.setattr(sfp, "_JQ", None) + + save_path = str(tmp_path / "result.json") + reader = MockReader([b"Hello"]) + writer = MockWriter(will_options=[fps.SGA]) + + await sfp.fingerprinting_client_shell( + reader, writer, host="localhost", port=23, save_path=save_path + ) + + captured = capsys.readouterr() + output = json.loads(captured.out) + assert output["server-probe"]["fingerprint-data"]["probed-protocol"] == "server" + assert output["sessions"][0]["host"] == "localhost" + session = output["server-probe"]["session_data"] + assert isinstance(session.get("banner_before_return", ""), str) + assert "server_requested" not in session.get("option_states", {}) + + +def test_save_fingerprint_name(tmp_path): + names_path = fps._save_fingerprint_name("abcd1234abcd1234", "my-server", str(tmp_path)) + with open(names_path, encoding="utf-8") as f: + names = json.load(f) + assert names["abcd1234abcd1234"] == "my-server" + + fps._save_fingerprint_name("ffff0000ffff0000", "other-server", str(tmp_path)) + with open(names_path, encoding="utf-8") as f: + names = json.load(f) + assert names["abcd1234abcd1234"] == "my-server" + assert names["ffff0000ffff0000"] == "other-server" + + fps._save_fingerprint_name("abcd1234abcd1234", "renamed", str(tmp_path)) + with open(names_path, encoding="utf-8") as f: + names = json.load(f) + assert names["abcd1234abcd1234"] == "renamed" + + +def test_parse_environ_send_ibm_os400(): + """Parse an OS/400-style SEND with IBMRSEED + binary seed data.""" + raw = USERVAR + b"IBMRSEED\xb6\xd7>\xd5 bytes: + raise self._exc + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "exc", + [ + ConnectionResetError(104, "Connection reset by peer"), + ConnectionAbortedError("Connection aborted"), + EOFError("EOF"), + ], +) +async def test_fingerprinting_client_shell_connection_error(monkeypatch, exc): + """Connection errors produce a warning, not an unhandled exception.""" + monkeypatch.setattr(sfp, "_NEGOTIATION_SETTLE", 0.0) + monkeypatch.setattr(sfp, "_BANNER_WAIT", 0.01) + monkeypatch.setattr(sfp, "_POST_RETURN_WAIT", 0.01) + + writer = MockWriter() + await sfp.fingerprinting_client_shell( + ErrorReader(exc), writer, host="192.0.2.1", port=23, silent=True + ) + assert writer._closing diff --git a/telnetlib3/tests/test_server_shell_unit.py b/telnetlib3/tests/test_server_shell_unit.py index d0053bd8..a20552d3 100644 --- a/telnetlib3/tests/test_server_shell_unit.py +++ b/telnetlib3/tests/test_server_shell_unit.py @@ -27,9 +27,9 @@ def echo(self, data): self.echos.append(data) -def _run_readline(sequence, maxvis=0): +def _run_readline(sequence, max_visible_width=0): w = DummyWriter() - gen = ss.readline(None, w, maxvis=maxvis) + gen = ss.readline(None, w, max_visible_width=max_visible_width) gen.send(None) cmds = [] for ch in sequence: @@ -521,7 +521,7 @@ def test_line_editor_backspace_on_empty(): def test_line_editor_maxvis_ascii(): - editor = ss._LineEditor(maxvis=3) + editor = ss._LineEditor(max_visible_width=3) for ch in "abc": echo, _ = editor.feed(ch) assert echo == ch @@ -626,7 +626,7 @@ def test_readline_backspace_emoji(): def test_readline_maxvis_ascii(): - cmds, echos = _run_readline("abcdefg\r", maxvis=5) + cmds, echos = _run_readline("abcdefg\r", max_visible_width=5) assert cmds == ["abcde"] echo_str = "".join(echos) assert "f" not in echo_str @@ -635,7 +635,7 @@ def test_readline_maxvis_ascii(): @wcwidth_available def test_readline_maxvis_wide(): - cmds, _ = _run_readline("a\u30b3x\r", maxvis=3) + cmds, _ = _run_readline("a\u30b3x\r", max_visible_width=3) assert cmds == ["a\u30b3"] @@ -663,14 +663,18 @@ async def test_readline_async_backspace_emoji(): @pytest.mark.asyncio async def test_readline_async_maxvis_ascii(): - result = await ss.readline_async(MockReader(list("abcdef\r")), MockWriter(), maxvis=4) + result = await ss.readline_async( + MockReader(list("abcdef\r")), MockWriter(), max_visible_width=4 + ) assert result == "abcd" @wcwidth_available @pytest.mark.asyncio async def test_readline_async_maxvis_wide(): - result = await ss.readline_async(MockReader(list("a\u30b3\u30b3x\r")), MockWriter(), maxvis=5) + result = await ss.readline_async( + MockReader(list("a\u30b3\u30b3x\r")), MockWriter(), max_visible_width=5 + ) assert result == "a\u30b3\u30b3" diff --git a/telnetlib3/tests/test_stream_writer_extra.py b/telnetlib3/tests/test_stream_writer_extra.py index fcd52857..e3aaac49 100644 --- a/telnetlib3/tests/test_stream_writer_extra.py +++ b/telnetlib3/tests/test_stream_writer_extra.py @@ -1,5 +1,6 @@ # std imports import struct +import logging import collections # 3rd party @@ -19,9 +20,11 @@ NAWS, SEND, WILL, + WONT, LFLOW, TTYPE, BINARY, + SNDLOC, STATUS, TSPEED, CHARSET, @@ -37,7 +40,7 @@ ) from telnetlib3.client_base import BaseClient from telnetlib3.server_base import BaseServer -from telnetlib3.stream_writer import TelnetWriter, _encode_env_buf +from telnetlib3.stream_writer import TelnetWriter, _encode_env_buf, _format_sb_status class MockTransport: @@ -466,11 +469,12 @@ def test_handle_sb_status_send_and_is(): ws2._handle_sb_status(buf2) -def test_handle_sb_forwardmask_do_raises_notimplemented(): +def test_handle_sb_forwardmask_do_accepted(): wc, _, _ = new_writer(server=False, client=True) wc.local_option[LINEMODE] = True - with pytest.raises(NotImplementedError): - wc._handle_sb_forwardmask(DO, collections.deque([b"x", b"y"])) + wc._handle_sb_forwardmask(DO, collections.deque([b"x", b"y"])) + opt = SB + LINEMODE + slc.LMODE_FORWARDMASK + assert wc.local_option[opt] is True def test_handle_sb_linemode_mode_empty_buffer(): @@ -572,3 +576,74 @@ async def test_client_process_chunk_split_sb_linemode(): response = b"".join(transport.writes) assert IAC + SB + LINEMODE + slc.LMODE_MODE in response + + +@pytest.mark.parametrize( + "opt, data, expected", + [ + (NAWS, b"\x00\x50\x00\x19", "NAWS 80x25"), + (NAWS, b"\x01\x00\x00\xc8", "NAWS 256x200"), + (TTYPE, IS + b"VT100", "TTYPE IS VT100"), + (TTYPE, SEND + b"xterm", "TTYPE SEND xterm"), + (XDISPLOC, IS + b"host:0.0", "XDISPLOC IS host:0.0"), + (SNDLOC, IS + b"Building4", "SNDLOC IS Building4"), + (TTYPE, b"\x99" + b"data", "TTYPE 99 data"), + (STATUS, b"\xab\xcd", "STATUS abcd"), + (NAWS, b"\x00\x50\x00", "NAWS 005000"), + (STATUS, b"", "STATUS"), + (BINARY, b"", "BINARY"), + ], +) +def test_format_sb_status(opt, data, expected): + """Test _format_sb_status output for each branch.""" + assert _format_sb_status(opt, data) == expected + + +def _make_status_is_buf(*parts): + """Build a deque for _handle_sb_status from raw byte sequences.""" + buf = collections.deque() + buf.append(STATUS) + buf.append(IS) + for part in parts: + for byte_val in part: + buf.append(bytes([byte_val])) + return buf + + +def test_receive_status_sb_naws(caplog): + """STATUS IS with embedded SB NAWS data SE.""" + ws, _, _ = new_writer(server=True) + ws.local_option[NAWS] = True + naws_payload = struct.pack("!HH", 80, 25) + buf = _make_status_is_buf(SB + NAWS + naws_payload + SE) + with caplog.at_level(logging.DEBUG): + ws._handle_sb_status(buf) + assert any("NAWS 80x25" in msg for msg in caplog.messages) + + +def test_receive_status_sb_missing_se(caplog): + """STATUS IS with SB block missing SE consumes rest of buffer.""" + ws, _, _ = new_writer(server=True) + naws_payload = struct.pack("!HH", 80, 25) + buf = _make_status_is_buf(SB + NAWS + naws_payload) + with caplog.at_level(logging.DEBUG): + ws._handle_sb_status(buf) + assert any("subneg" in msg for msg in caplog.messages) + + +def test_receive_status_mixed_do_will_and_sb(caplog): + """STATUS IS with DO/WILL pairs intermixed with SB blocks.""" + ws, _, _ = new_writer(server=True) + ws.local_option[BINARY] = True + ws.remote_option[SGA] = True + ws.remote_option[ECHO] = True + ws.local_option[NAWS] = True + naws_payload = struct.pack("!HH", 132, 43) + buf = _make_status_is_buf( + DO + BINARY + WILL + SGA + SB + NAWS + naws_payload + SE + WONT + ECHO + ) + with caplog.at_level(logging.DEBUG): + ws._handle_sb_status(buf) + assert any("agreed" in msg.lower() for msg in caplog.messages) + assert any("NAWS 132x43" in msg for msg in caplog.messages) + assert any("disagree" in msg.lower() for msg in caplog.messages) diff --git a/telnetlib3/tests/test_stream_writer_full.py b/telnetlib3/tests/test_stream_writer_full.py index a253e224..9a8f74d3 100644 --- a/telnetlib3/tests/test_stream_writer_full.py +++ b/telnetlib3/tests/test_stream_writer_full.py @@ -29,6 +29,7 @@ WONT, LFLOW, TTYPE, + VALUE, BINARY, LOGOUT, SNDLOC, @@ -131,6 +132,50 @@ def test_close_idempotent_and_cleanup(): assert not t2.writes +def test_send_iac_skipped_when_closing(): + """send_iac() drops writes when transport is closing.""" + w, t, _ = new_writer(server=True) + t._closing = True + w.send_iac(IAC + NOP) + assert not t.writes + + +def test_send_iac_skipped_when_closed(): + """send_iac() drops writes after close().""" + w, t, _ = new_writer(server=True) + w.close() + w.send_iac(IAC + NOP) + assert not t.writes + + +def test_forwardmask_skipped_when_closing(): + """request_forwardmask() drops writes when transport is closing.""" + w, t, _ = new_writer(server=True) + w.remote_option[LINEMODE] = True + t._closing = True + w.request_forwardmask() + assert not t.writes + + +def test_send_linemode_skipped_when_closing(): + """send_linemode() drops writes when transport is closing.""" + w, t, _ = new_writer(server=True) + w.remote_option[LINEMODE] = True + t._closing = True + w.send_linemode() + assert not t.writes + + +def test_slc_end_skipped_when_closing(): + """_slc_end() drops writes when closing, buffer still cleared.""" + w, t, _ = new_writer(server=True) + w._slc_buffer = [b"\x03\x03\x04"] + t._closing = True + w._slc_end() + assert not t.writes + assert not w._slc_buffer + + def test_get_extra_info_merges_protocol_and_transport(): w, t, p = new_writer(server=True) p.info["proto_key"] = "P" @@ -216,10 +261,10 @@ def test_handle_logout_paths(): def test_handle_do_variants_and_tm_and_logout(): - # server receiving forbidden DO -> ValueError - ws, *_ = new_writer(server=True) - with pytest.raises(ValueError, match="cannot recv DO LINEMODE"): - ws.handle_do(LINEMODE) + # server receiving reversed DO LINEMODE -> WONT refusal + ws, ts, _ = new_writer(server=True) + ws.handle_do(LINEMODE) + assert ts.writes[-1] == IAC + WONT + LINEMODE # client receiving DO LOGOUT -> ValueError wc, *_ = new_writer(server=False, client=True) with pytest.raises(ValueError, match="cannot recv DO LOGOUT"): @@ -256,10 +301,10 @@ def test_handle_will_invalid_cases_and_else_unhandled(): ws, *_ = new_writer(server=True) with pytest.raises(ValueError, match="cannot recv WILL ECHO"): ws.handle_will(ECHO) - # client WILL NAWS invalid - wc, *_ = new_writer(server=False, client=True) - with pytest.raises(ValueError, match="cannot recv WILL NAWS on client end"): - wc.handle_will(NAWS) + # client receiving reversed WILL NAWS -> DONT refusal + wc, tc, _ = new_writer(server=False, client=True) + wc.handle_will(NAWS) + assert tc.writes[-1] == IAC + DONT + NAWS # WILL TM requires pending DO TM wtm, *_ = new_writer(server=True) with pytest.raises(ValueError, match="cannot recv WILL TM"): @@ -522,6 +567,24 @@ def test_escape_unescape_and_env_encode_decode_roundtrip(): assert dec == {"USER": "root", "LANG": "C.UTF-8"} +def test_decode_env_buf_ebcdic(): + """EBCDIC-encoded env data decoded when encoding=cp037.""" + ebcdic_user = "USER".encode("cp037") + ebcdic_root = "root".encode("cp037") + payload = VAR + ebcdic_user + VALUE + ebcdic_root + result = _decode_env_buf(payload, encoding="cp037") + assert result == {"USER": "root"} + + +def test_decode_env_buf_non_ascii_replace(): + """Non-ASCII bytes with default ascii encoding use replacement chars.""" + payload = VAR + b"\x93\x96\x87\x89\x95" + VALUE + b"\xff\xfe" + result = _decode_env_buf(payload) + assert len(result) == 1 + key = list(result.keys())[0] + assert "\ufffd" in key + + def test_transport_property_write_eof_can_write_eof_and_is_closing(): class MT2(MockTransport): def __init__(self): @@ -762,11 +825,11 @@ def test_handle_sb_forwardmask_server_will_and_client_do(): opt = SB + LINEMODE + slc.LMODE_FORWARDMASK assert ws.remote_option[opt] is True - # client DO path -> _handle_do_forwardmask -> NotImplementedError + # client DO path -> forwardmask logged, local_option set wc, tc, pc = new_writer(server=False, client=True) wc.local_option[LINEMODE] = True - with pytest.raises(NotImplementedError): - wc._handle_sb_forwardmask(DO, collections.deque([b"x"])) + wc._handle_sb_forwardmask(DO, collections.deque([b"x"])) + assert wc.local_option[opt] is True def test_handle_sb_forwardmask_server_without_linemode(): @@ -889,6 +952,27 @@ def test_handle_send_server_and_client_charset_returns(): assert not wc.handle_send_client_charset(["UTF-8", "ASCII"]) +def test_charset_accepted_updates_environ_encoding(): + """CHARSET ACCEPTED updates environ_encoding for NEW_ENVIRON decoding.""" + ws, ts, ps = new_writer(server=True) + assert ws.environ_encoding == "ascii" + ws.set_ext_callback(CHARSET, lambda c: None) + buf = collections.deque([CHARSET, ACCEPTED, b"UTF-8"]) + ws._handle_sb_charset(buf) + assert ws.environ_encoding == "UTF-8" + + +def test_charset_request_accepted_updates_environ_encoding(): + """Client accepting CHARSET REQUEST updates environ_encoding.""" + wc, tc, pc = new_writer(server=False, client=True) + assert wc.environ_encoding == "ascii" + wc.set_ext_send_callback(CHARSET, lambda offers: "UTF-8") + sep = b";" + buf = collections.deque([CHARSET, REQUEST, sep, b"UTF-8;ASCII"]) + wc._handle_sb_charset(buf) + assert wc.environ_encoding == "UTF-8" + + def test_iac_wont_and_dont_suppressed_when_remote_false(): w, t, p = new_writer(server=True) # WONT sets local option False and writes frame diff --git a/telnetlib3/tests/test_timeout.py b/telnetlib3/tests/test_timeout.py index 3748d990..ab61bd99 100644 --- a/telnetlib3/tests/test_timeout.py +++ b/telnetlib3/tests/test_timeout.py @@ -60,7 +60,7 @@ async def test_telnet_server_waitfor_timeout(bind_host, unused_tcp_port): stime = time.time() output = await asyncio.wait_for(reader.read(), 0.5) elapsed = time.time() - stime - assert 0.040 <= round(elapsed, 3) <= 0.150 + assert 0.035 <= round(elapsed, 3) <= 0.150 assert output == expected_output