From 8de6946bbbd7ac5035f9a40dc1a38ef34f307c79 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Sun, 8 Feb 2026 12:03:16 -0500 Subject: [PATCH] Mud Improvements (round 2) (#120) - bugfix: GMCP, MSDP, and MSSP decoding now uses --encoding when set, falling back to latin-1 - bugfix: NEW_ENVIRON SEND with empty payload now correctly interpreted as "send all" per RFC 1572 - new: --always-will, --always-do, --scan-type, --mssp-wait, --banner-quiet-time, --banner-max-wait options for telnetlib3-fingerprint --- docs/history.rst | 8 +- telnetlib3/client.py | 144 ++++++++++-- telnetlib3/client_base.py | 9 +- telnetlib3/fingerprinting.py | 3 +- telnetlib3/server_fingerprinting.py | 175 ++++++++++---- telnetlib3/stream_writer.py | 30 ++- telnetlib3/telopt.py | 26 +++ telnetlib3/tests/test_core.py | 1 - telnetlib3/tests/test_fingerprinting.py | 30 +-- telnetlib3/tests/test_mud_negotiation.py | 1 + telnetlib3/tests/test_pty_shell.py | 22 +- .../tests/test_server_fingerprinting.py | 216 ++++++++++++++---- telnetlib3/tests/test_stream_writer_full.py | 66 ++++++ 13 files changed, 601 insertions(+), 130 deletions(-) diff --git a/docs/history.rst b/docs/history.rst index 06bd97e3..14ee6420 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -9,6 +9,10 @@ History ``NotImplementedError``; the mask is accepted (logged only). * bugfix: echo doubling in ``--pty-exec`` without ``--pty-raw`` (linemode). * bugfix: missing LICENSE.txt in sdist file. + * bugfix: GMCP, MSDP, and MSSP decoding now uses ``--encoding`` when set, + falling back to latin-1 for non-UTF-8 bytes instead of lossy replacement. + * bugfix: ``NEW_ENVIRON SEND`` with empty payload now correctly + interpreted as "send all" per :rfc:`1572`. * new: :mod:`telnetlib3.mud` module with encode/decode functions for GMCP (option 201), MSDP (option 69), and MSSP (option 70) MUD telnet protocols. @@ -29,10 +33,10 @@ History ``DONT``/``WONT`` instead of raising ``ValueError``. * enhancement: ``NEW_ENVIRON SEND`` and response logging improved -- ``SEND (all)`` / ``env send: (empty)`` instead of raw byte dumps. - * bugfix: GMCP, MSDP, and MSSP decoding now uses ``--encoding`` when set, - falling back to latin-1 for non-UTF-8 bytes instead of lossy replacement. * enhancement: ``telnetlib3-fingerprint`` now probes MSDP and MSSP options and captures MSSP server status data in session output. + * new: ``--always-will``, ``--always-do``, ``--scan-type``, ``--mssp-wait``, + ``--banner-quiet-time``, ``--banner-max-wait`` options for ``telnetlib3-fingerprint``. 2.2.0 * bugfix: workaround for Microsoft Telnet client crash on diff --git a/telnetlib3/client.py b/telnetlib3/client.py index 66dbdc9f..cd574667 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -502,8 +502,34 @@ async def run_client() -> None: ) log.debug(config_msg) + always_will: set[bytes] = args["always_will"] + always_do: set[bytes] = args["always_do"] + + # Wrap client factory to inject always_will/always_do before negotiation + client_factory: Optional[Callable[..., client_base.BaseClient]] = None + if always_will or always_do: + + def _client_factory(**kwargs: Any) -> client_base.BaseClient: + client: TelnetClient + if sys.platform != "win32" and sys.stdin.isatty(): + client = TelnetTerminalClient(**kwargs) + else: + client = TelnetClient(**kwargs) + orig_connection_made = client.connection_made + + def _patched_connection_made(transport: asyncio.BaseTransport) -> None: + orig_connection_made(transport) + assert client.writer is not None + client.writer.always_will = always_will + client.writer.always_do = always_do + + client.connection_made = _patched_connection_made # type: ignore[method-assign] + return client + + client_factory = _client_factory + # Build connection kwargs explicitly to avoid pylint false positive - connection_kwargs = { + connection_kwargs: Dict[str, Any] = { "encoding": args["encoding"], "tspeed": args["tspeed"], "shell": args["shell"], @@ -514,6 +540,8 @@ async def run_client() -> None: "connect_timeout": args["connect_timeout"], "send_environ": args["send_environ"], } + if client_factory is not None: + connection_kwargs["client_factory"] = client_factory # connect _, writer = await open_connection(args["host"], args["port"], **connection_kwargs) @@ -565,9 +593,40 @@ def _get_argument_parser() -> argparse.ArgumentParser: default="TERM,LANG,COLUMNS,LINES,COLORTERM", help="comma-separated environment variables to send (NEW_ENVIRON)", ) + parser.add_argument( + "--always-will", + action="append", + default=[], + metavar="OPT", + help="always send WILL for this option (name like MXP or number, repeatable)", + ) + parser.add_argument( + "--always-do", + action="append", + default=[], + metavar="OPT", + help="always send DO for this option (name like GMCP or number, repeatable)", + ) return parser +def _parse_option_arg(value: str) -> bytes: + """ + Resolve a telnet option name or integer to option bytes. + + :param value: Option name (e.g. ``"MXP"``) or decimal byte value (e.g. ``"91"``). + :returns: Single-byte option value. + :raises ValueError: When *value* is not a known name or valid integer. + """ + # local + from .telopt import option_from_name # pylint: disable=import-outside-toplevel + + try: + return option_from_name(value) + except KeyError: + return bytes([int(value)]) + + def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: return { "host": args.host, @@ -584,12 +643,18 @@ def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: "connect_minwait": args.connect_minwait, "connect_timeout": args.connect_timeout, "send_environ": tuple(v.strip() for v in args.send_environ.split(",") if v.strip()), + "always_will": {_parse_option_arg(v) for v in args.always_will}, + "always_do": {_parse_option_arg(v) for v in args.always_do}, } def main() -> None: """Entry point for telnetlib3-client command.""" - asyncio.run(run_client()) + try: + asyncio.run(run_client()) + except OSError as err: + print(f"Error: {err}", file=sys.stderr) + sys.exit(1) def _get_fingerprint_argument_parser() -> argparse.ArgumentParser: @@ -634,6 +699,12 @@ def _get_fingerprint_argument_parser() -> argparse.ArgumentParser: parser.add_argument( "--ttype", default="VT100", help="terminal type sent in response to TTYPE requests" ) + parser.add_argument( + "--scan-type", + choices=["quick", "full"], + default="quick", + help="probe depth: 'quick' probes core options only, " "'full' includes legacy options", + ) parser.add_argument( "--send-env", action="append", @@ -641,6 +712,35 @@ def _get_fingerprint_argument_parser() -> argparse.ArgumentParser: default=[], help="environment variable to send (repeatable)", ) + parser.add_argument( + "--always-will", + action="append", + default=[], + metavar="OPT", + help="always send WILL for this option (name like MXP or number, repeatable)", + ) + parser.add_argument( + "--always-do", + action="append", + default=[], + metavar="OPT", + help="always send DO for this option (name like GMCP or number, repeatable)", + ) + parser.add_argument( + "--mssp-wait", + default=5.0, + type=float, + help="max seconds since connect to wait for MSSP data", + ) + parser.add_argument( + "--banner-quiet-time", + default=2.0, + type=float, + help="seconds of silence before considering banner complete", + ) + parser.add_argument( + "--banner-max-wait", default=8.0, type=float, help="max seconds to wait for banner data" + ) return parser @@ -674,8 +774,16 @@ async def run_fingerprint_client() -> None: silent=args.silent, set_name=args.set_name, environ_encoding=args.stream_encoding, + scan_type=args.scan_type, + mssp_wait=args.mssp_wait, + banner_quiet_time=args.banner_quiet_time, + banner_max_wait=args.banner_max_wait, ) + # Parse --always-will/--always-do option names/numbers + fp_always_will = {_parse_option_arg(v) for v in args.always_will} + fp_always_do = {_parse_option_arg(v) for v in args.always_do} + # Parse --send-env KEY=VALUE pairs extra_env: Dict[str, str] = {} for item in args.send_env: @@ -705,6 +813,8 @@ def patched_connection_made(transport: asyncio.BaseTransport) -> None: orig_connection_made(transport) assert client.writer is not None client.writer.environ_encoding = environ_encoding + client.writer.always_will = fp_always_will + client.writer.always_do = fp_always_do def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]: result = orig_send_env(keys) @@ -718,18 +828,22 @@ def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]: waiter_closed: asyncio.Future[None] = asyncio.get_event_loop().create_future() - _, writer = await open_connection( - host=args.host, - port=args.port, - client_factory=fingerprint_client_factory, - shell=shell, - encoding=False, - term=ttype, - connect_minwait=2.0, - connect_maxwait=4.0, - connect_timeout=args.connect_timeout, - waiter_closed=waiter_closed, - ) + try: + _, writer = await open_connection( + host=args.host, + port=args.port, + client_factory=fingerprint_client_factory, + shell=shell, + encoding=False, + term=ttype, + connect_minwait=2.0, + connect_maxwait=4.0, + connect_timeout=args.connect_timeout, + waiter_closed=waiter_closed, + ) + except OSError as err: + log.error("%s:%d: %s", args.host, args.port, err) + raise assert writer.protocol is not None assert isinstance(writer.protocol, client_base.BaseClient) @@ -740,7 +854,7 @@ def fingerprint_main() -> None: """Entry point for ``telnetlib3-fingerprint`` command.""" try: asyncio.run(run_fingerprint_client()) - except ConnectionError as err: + except OSError as err: print(f"Error: {err}", file=sys.stderr) sys.exit(1) diff --git a/telnetlib3/client_base.py b/telnetlib3/client_base.py index b9b0f96e..d62e6760 100644 --- a/telnetlib3/client_base.py +++ b/telnetlib3/client_base.py @@ -15,7 +15,7 @@ # local from ._types import ShellCallback -from .telopt import theNULL, name_commands +from .telopt import DO, WILL, theNULL, name_commands from .stream_reader import TelnetReader, TelnetReaderUnicode from .stream_writer import TelnetWriter, TelnetWriterUnicode @@ -272,6 +272,13 @@ def begin_negotiation(self) -> None: self._check_later = asyncio.get_event_loop().call_soon(self._check_negotiation_timer) self._tasks.append(self._check_later) + # Send proactive WILL/DO for any "always" options + if self.writer is not None: + for opt in self.writer.always_will: + self.writer.iac(WILL, opt) + for opt in self.writer.always_do: + self.writer.iac(DO, opt) + def encoding(self, outgoing: bool = False, incoming: bool = False) -> Union[str, bool]: """ Encoding that should be used for the direction indicated. diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index 17959f0f..b2069f9d 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -98,7 +98,7 @@ class ProbeResult(TypedDict, total=False): ) # Maximum files per protocol-fingerprint folder -FINGERPRINT_MAX_FILES = int(os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FILES", "200")) +FINGERPRINT_MAX_FILES = int(os.environ.get("TELNETLIB3_FINGERPRINT_MAX_FILES", "1000")) # Maximum number of unique fingerprint folders FINGERPRINT_MAX_FINGERPRINTS = int( @@ -292,6 +292,7 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): ] ALL_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS + LEGACY_OPTIONS +QUICK_PROBE_OPTIONS = CORE_OPTIONS + MUD_OPTIONS # All known options including extended, for display/name lookup only _ALL_KNOWN_OPTIONS = ALL_PROBE_OPTIONS + EXTENDED_OPTIONS diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py index 29a9230d..b04b5257 100644 --- a/telnetlib3/server_fingerprinting.py +++ b/telnetlib3/server_fingerprinting.py @@ -41,6 +41,7 @@ from .stream_writer import TelnetWriter from .fingerprinting import ( ALL_PROBE_OPTIONS, + QUICK_PROBE_OPTIONS, _hash_fingerprint, _opt_byte_to_name, _atomic_json_write, @@ -61,6 +62,7 @@ _NEGOTIATION_SETTLE = 0.5 _BANNER_WAIT = 3.0 _POST_RETURN_WAIT = 3.0 +_PROBE_TIMEOUT = 0.5 _JQ = shutil.which("jq") logger = logging.getLogger("telnetlib3.server_fingerprint") @@ -103,6 +105,10 @@ async def fingerprinting_client_shell( silent: bool = False, set_name: str | None = None, environ_encoding: str = "ascii", + scan_type: str = "quick", + mssp_wait: float = 5.0, + banner_quiet_time: float = 2.0, + banner_max_wait: float = 8.0, ) -> None: """ Client shell that fingerprints a remote telnet server. @@ -121,6 +127,12 @@ async def fingerprinting_client_shell( ``fingerprint_names.json`` without requiring moderation. :param environ_encoding: Encoding for NEW_ENVIRON data. Default ``"ascii"`` per :rfc:`1572`; use ``"cp037"`` for EBCDIC hosts. + :param scan_type: ``"quick"`` probes CORE + MUD options only (default); + ``"full"`` includes all LEGACY options. + :param mssp_wait: Max seconds since connect to wait for MSSP data. + :param banner_quiet_time: Seconds of silence before considering the + pre-return banner complete. + :param banner_max_wait: Max seconds to wait for pre-return banner data. """ writer.environ_encoding = environ_encoding try: @@ -132,6 +144,10 @@ async def fingerprinting_client_shell( save_path=save_path, silent=silent, set_name=set_name, + scan_type=scan_type, + mssp_wait=mssp_wait, + banner_quiet_time=banner_quiet_time, + banner_max_wait=banner_max_wait, ) except (ConnectionError, EOFError) as exc: logger.warning("%s:%d: %s", host, port, exc) @@ -147,6 +163,10 @@ async def _fingerprint_session( save_path: str | None, silent: bool, set_name: str | None, + scan_type: str = "quick", + mssp_wait: float = 5.0, + banner_quiet_time: float = 2.0, + banner_max_wait: float = 8.0, ) -> None: """Run the fingerprint session (inner helper for error handling).""" start_time = time.time() @@ -154,8 +174,10 @@ async def _fingerprint_session( # 1. Let straggler negotiation settle await asyncio.sleep(_NEGOTIATION_SETTLE) - # 2. Read banner (pre-return) - banner_before = await _read_banner(reader, timeout=_BANNER_WAIT) + # 2. Read banner (pre-return) — wait until output stops + banner_before = await _read_banner_until_quiet( + reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait + ) # 3. Send return, read post-return data writer.write(b"\r\n") @@ -163,28 +185,32 @@ async def _fingerprint_session( banner_after = await _read_banner(reader, timeout=_POST_RETURN_WAIT) # 4. Snapshot option states before probing - option_states = _collect_server_option_states(writer) - - # 5. Active probe - probe_start = time.time() - probe_results = await probe_server_capabilities(writer) - probe_time = time.time() - probe_start - - # 5b. If server acknowledged MSSP but data hasn't arrived yet, poll briefly - if writer.remote_option.enabled(MSSP) and writer.mssp_data is None: - for _ in range(10): - await asyncio.sleep(0.05) - if writer.mssp_data is not None: - break - - # 6. Build session dicts - session_data: dict[str, Any] = { - "encoding": writer.environ_encoding, - "option_states": option_states, - "banner_before_return": _format_banner(banner_before, encoding=writer.environ_encoding), - "banner_after_return": _format_banner(banner_after, encoding=writer.environ_encoding), - "timing": {"probe": probe_time, "total": time.time() - start_time}, - } + session_data: dict[str, Any] = {"option_states": _collect_server_option_states(writer)} + + # 5. Active probe (skip if connection already lost) + if writer.is_closing(): + probe_results: dict[str, Any] = {} + probe_time = 0.0 + else: + probe_time = time.time() + probe_results = await probe_server_capabilities( + writer, scan_type=scan_type, timeout=_PROBE_TIMEOUT + ) + probe_time = time.time() - probe_time + + # 5b. If server acknowledged MSSP but data hasn't arrived yet, wait. + await _await_mssp_data(writer, start_time + mssp_wait) + + # 6. Complete session dicts + session_data.update( + { + "scan_type": scan_type, + "encoding": writer.environ_encoding, + "banner_before_return": _format_banner(banner_before, encoding=writer.environ_encoding), + "banner_after_return": _format_banner(banner_after, encoding=writer.environ_encoding), + "timing": {"probe": probe_time, "total": time.time() - start_time}, + } + ) if writer.mssp_data is not None: session_data["mssp"] = writer.mssp_data session_entry: dict[str, Any] = { @@ -201,11 +227,14 @@ async def _fingerprint_session( session_data=session_data, session_entry=session_entry, save_path=save_path, + scan_type=scan_type, ) # 8. Set name in fingerprint_names.json if set_name is not None: - protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_fp = _create_server_protocol_fingerprint( + writer, probe_results, scan_type=scan_type + ) protocol_hash = _hash_fingerprint(protocol_fp) try: _save_fingerprint_name(protocol_hash, set_name) @@ -215,24 +244,30 @@ async def _fingerprint_session( # 9. Display if not silent: - protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_fp = _create_server_protocol_fingerprint( + writer, probe_results, scan_type=scan_type + ) protocol_hash = _hash_fingerprint(protocol_fp) - display_data: dict[str, Any] = { - "server-probe": { - "fingerprint": protocol_hash, - "fingerprint-data": protocol_fp, - "session_data": session_data, - }, - "sessions": [session_entry], - } - _print_json(display_data) + _print_json( + { + "server-probe": { + "fingerprint": protocol_hash, + "fingerprint-data": protocol_fp, + "session_data": session_data, + }, + "sessions": [session_entry], + } + ) # 10. Close writer.close() async def probe_server_capabilities( - writer: TelnetWriter, options: list[tuple[bytes, str, str]] | None = None, timeout: float = 0.5 + writer: TelnetWriter, + options: list[tuple[bytes, str, str]] | None = None, + timeout: float = 0.5, + scan_type: str = "quick", ) -> dict[str, _fps.ProbeResult]: """ Actively probe a remote server for telnet capability support. @@ -243,17 +278,16 @@ async def probe_server_capabilities( :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. :param options: List of ``(opt_bytes, name, description)`` tuples. - Defaults to :data:`~telnetlib3.fingerprinting.ALL_PROBE_OPTIONS` - minus client-only options. + Defaults to option list based on *scan_type*, minus client-only + options. :param timeout: Seconds to wait for all responses. + :param scan_type: ``"quick"`` probes CORE + MUD options only; + ``"full"`` includes LEGACY options. Default ``"quick"``. :returns: Dict mapping option name to status dict. """ if options is None: - options = [ - (opt, name, desc) - for opt, name, desc in ALL_PROBE_OPTIONS - if opt not in _CLIENT_ONLY_WILL - ] + base = ALL_PROBE_OPTIONS if scan_type == "full" else QUICK_PROBE_OPTIONS + options = [(opt, name, desc) for opt, name, desc in base if opt not in _CLIENT_ONLY_WILL] return await probe_client_capabilities(writer, options=options, timeout=timeout) @@ -270,6 +304,10 @@ def _parse_environ_send(raw: bytes) -> list[dict[str, Any]]: delimiters = {VAR[0], USERVAR[0]} value_byte = VALUE[0] + # Per RFC 1572: bare SEND with no VAR/USERVAR list means "send all" + if not raw: + return [{"type": "VAR", "name": "*"}, {"type": "USERVAR", "name": "*"}] + # find positions of VAR/USERVAR delimiters breaks = [i for i, b in enumerate(raw) if b in delimiters] @@ -336,13 +374,14 @@ def _collect_server_option_states(writer: TelnetWriter) -> dict[str, dict[str, A def _create_server_protocol_fingerprint( - writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult] + writer: TelnetWriter, probe_results: dict[str, _fps.ProbeResult], scan_type: str = "quick" ) -> dict[str, Any]: """ Create anonymized protocol fingerprint for a remote server. :param writer: :class:`~telnetlib3.stream_writer.TelnetWriter` instance. :param probe_results: Results from :func:`probe_server_capabilities`. + :param scan_type: ``"quick"`` or ``"full"`` probe depth used. :returns: Deterministic fingerprint dict suitable for hashing. """ offered = sorted(name for name, info in probe_results.items() if info["status"] == "WILL") @@ -356,6 +395,7 @@ def _create_server_protocol_fingerprint( return { "probed-protocol": "server", + "scan-type": scan_type, "offered-options": offered, "requested-options": requested, "refused-options": refused, @@ -367,7 +407,9 @@ def _save_server_fingerprint_data( probe_results: dict[str, _fps.ProbeResult], session_data: dict[str, Any], session_entry: dict[str, Any], + *, save_path: str | None = None, + scan_type: str = "quick", ) -> str | None: """ Save server fingerprint data to a JSON file. @@ -381,9 +423,10 @@ def _save_server_fingerprint_data( :param session_entry: Pre-built dict with ``host``, ``ip``, ``port``, and ``connected`` keys. :param save_path: If set, write directly to this path. + :param scan_type: ``"quick"`` or ``"full"`` probe depth used. :returns: Path to saved file, or ``None`` if saving was skipped. """ - protocol_fp = _create_server_protocol_fingerprint(writer, probe_results) + protocol_fp = _create_server_protocol_fingerprint(writer, probe_results, scan_type=scan_type) protocol_hash = _hash_fingerprint(protocol_fp) data: dict[str, Any] = { @@ -444,6 +487,16 @@ def _format_banner(data: bytes, encoding: str = "utf-8") -> str: return data.decode(encoding, errors="replace") +async def _await_mssp_data(writer: TelnetWriter, deadline: float) -> None: + """Wait for MSSP data until *deadline* if server acknowledged MSSP.""" + if not writer.remote_option.enabled(MSSP) or writer.mssp_data is not None: + return + remaining = deadline - time.time() + while remaining > 0 and writer.mssp_data is None: + await asyncio.sleep(min(0.05, remaining)) + remaining = deadline - time.time() + + async def _read_banner(reader: TelnetReader, timeout: float = _BANNER_WAIT) -> bytes: """ Read up to :data:`_BANNER_MAX_BYTES` from *reader* with timeout. @@ -460,3 +513,35 @@ async def _read_banner(reader: TelnetReader, timeout: float = _BANNER_WAIT) -> b except (asyncio.TimeoutError, EOFError): data = b"" return data + + +async def _read_banner_until_quiet( + reader: TelnetReader, quiet_time: float = 2.0, max_wait: float = 8.0 +) -> bytes: + """ + Read banner data until output stops for *quiet_time* seconds. + + Keeps reading chunks as they arrive. If no new data appears within + *quiet_time* seconds (or *max_wait* total elapses), returns everything + collected so far. + + :param reader: :class:`~telnetlib3.stream_reader.TelnetReader` instance. + :param quiet_time: Seconds of silence before considering banner complete. + :param max_wait: Maximum total seconds to wait for banner data. + :returns: Banner bytes (may be empty). + """ + chunks: list[bytes] = [] + loop = asyncio.get_event_loop() + deadline = loop.time() + max_wait + while loop.time() < deadline: + remaining = min(quiet_time, deadline - loop.time()) + if remaining <= 0: + break + try: + chunk = await asyncio.wait_for(reader.read(_BANNER_MAX_BYTES), timeout=remaining) + if not chunk: + break + chunks.append(chunk) + except (asyncio.TimeoutError, EOFError): + break + return b"".join(chunks) diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index 16ff6437..13e514c9 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -79,6 +79,7 @@ LFLOW_RESTART_ANY, LFLOW_RESTART_XON, theNULL, + name_option, name_command, name_commands, option_from_name, @@ -202,6 +203,16 @@ def __init__( #: EBCDIC hosts such as IBM OS/400. self.environ_encoding: str = "ascii" + #: Set of option byte(s) for which the client always sends WILL + #: (even when not natively supported). Overrides the default + #: WONT rejection in :meth:`handle_do`. + self.always_will: set[bytes] = set() + + #: Set of option byte(s) for which the client always sends DO + #: (even when not natively supported). Overrides the default + #: DONT rejection in :meth:`handle_will`. + self.always_do: set[bytes] = set() + #: Set of option byte(s) for WILL received from remote end #: that were rejected with DONT (unhandled options). self.rejected_will: set[bytes] = set() @@ -612,7 +623,11 @@ def feed_byte(self, byte: bytes) -> bool: # IAC SB sub-negotiation buffer, assert command is SE. self.cmd_received = cmd = byte if cmd != SE: - self.log.error("sub-negotiation buffer interrupted by IAC %s", name_command(cmd)) + sb_opt = name_command(self._sb_buffer[0]) if self._sb_buffer else "?" + self.log.warning( + "sub-negotiation SB %s (%d bytes) interrupted by IAC %s", + sb_opt, len(self._sb_buffer), name_command(cmd), + ) self._sb_buffer.clear() else: # sub-negotiation end (SE), fire handle_subnegotiation @@ -628,6 +643,8 @@ def feed_byte(self, byte: bytes) -> bool: elif self.cmd_received == SB: # continue buffering of sub-negotiation command. + if not self._sb_buffer: + self.log.debug("begin sub-negotiation SB %s", name_command(byte)) self._sb_buffer.append(byte) assert len(self._sb_buffer) < (1 << 15) # 32k SB buffer @@ -635,7 +652,7 @@ def feed_byte(self, byte: bytes) -> bool: # parse 3rd and final byte of IAC DO, DONT, WILL, WONT. assert isinstance(self.cmd_received, bytes) cmd, opt = self.cmd_received, byte - self.log.debug("recv IAC %s %s", name_command(cmd), name_command(opt)) + self.log.debug("recv IAC %s %s", name_command(cmd), name_option(opt)) try: if cmd == DO: try: @@ -1717,6 +1734,9 @@ def handle_do(self, opt: bytes) -> bool: # WILL and received DO may initiate SB at any time. self.pending_option[SB + opt] = True + elif opt in self.always_will: + if not self.local_option.enabled(opt): + self.iac(WILL, opt) else: self.log.debug("DO %s not supported.", name_command(opt)) self.rejected_do.add(opt) @@ -1852,6 +1872,10 @@ def handle_will(self, opt: bytes) -> None: LFLOW: self.send_lineflow_mode, }[opt]() + elif opt in self.always_do: + if not self.remote_option.enabled(opt): + self.iac(DO, opt) + self.remote_option[opt] = True else: self.iac(DONT, opt) self.rejected_will.add(opt) @@ -2143,7 +2167,7 @@ def _handle_sb_environ(self, buf: collections.deque[bytes]) -> None: assert self.client, f"SE: cannot recv from client: {name_command(cmd)} {opt_kind}" # client-side, we do _not_ honor the 'send all VAR' or 'send all # USERVAR' requests -- it is a small bit of a security issue. - reply_env = self._ext_send_callback[NEW_ENVIRON](env.keys()) + reply_env = self._ext_send_callback[NEW_ENVIRON](list(env.keys())) send_env = _encode_env_buf(reply_env, encoding=self.environ_encoding) response = [IAC, SB, NEW_ENVIRON, IS, send_env, IAC, SE] if reply_env: diff --git a/telnetlib3/telopt.py b/telnetlib3/telopt.py index 7a3f9aa4..538d1d2d 100644 --- a/telnetlib3/telopt.py +++ b/telnetlib3/telopt.py @@ -159,6 +159,7 @@ "STATUS", "SUPDUP", "SUPDUPOUTPUT", + "TELOPT_92", "SUPPRESS_LOCAL_ECHO", "SUSP", "TLS", @@ -185,6 +186,7 @@ "theNULL", "name_command", "name_commands", + "name_option", "option_from_name", ) @@ -201,6 +203,7 @@ MSSP = bytes([70]) MSP = bytes([90]) MXP = bytes([91]) +TELOPT_92 = bytes([92]) ZMP = bytes([93]) AARDWOLF = bytes([102]) ATCP = bytes([200]) @@ -270,6 +273,7 @@ "MSSP", "MSP", "MXP", + "TELOPT_92", "ZMP", "AARDWOLF", "ATCP", @@ -331,6 +335,28 @@ def name_command(byte: bytes) -> str: return _DEBUG_OPTS.get(byte, repr(byte)) +#: IAC command bytes that should display as hex when used as option codes. +#: Servers with output-filter bugs can send e.g. ``IAC WONT 0xFC`` where +#: 0xFC is the WONT command byte itself. Displaying "WONT WONT" is +#: confusing, so :func:`name_option` renders these as ``b'\\xfc'``. +_IAC_CMD_BYTES: frozenset[bytes] = frozenset( + {IAC, DO, DONT, WILL, WONT, SB, SE, NOP, DM, BRK, IP, AO, AYT, EC, EL, GA} +) + + +def name_option(byte: bytes) -> str: + """ + Return string description for a telnet option byte. + + Unlike :func:`name_command`, IAC command bytes (DO, DONT, WILL, WONT, + etc.) are displayed as ``repr(byte)`` rather than their command names + when they appear in the option-byte position. + """ + if byte in _IAC_CMD_BYTES: + return repr(byte) + return _DEBUG_OPTS.get(byte, repr(byte)) + + def name_commands(cmds: bytes, sep: str = " ") -> str: """Return string description for array of (maybe) telnet command bytes.""" return sep.join([name_command(bytes([byte])) for byte in cmds]) diff --git a/telnetlib3/tests/test_core.py b/telnetlib3/tests/test_core.py index bede92c8..375649d5 100644 --- a/telnetlib3/tests/test_core.py +++ b/telnetlib3/tests/test_core.py @@ -500,7 +500,6 @@ async def shell(reader, writer): b"Press Return to continue:\r\ngoodbye.\n" b"\x1b[m\nConnection closed by foreign host.\n" ) - assert len(logfile_output) in (2, 3), logfile assert "Connected to bytes: EOFError("EOF"), ], ) -async def test_fingerprinting_client_shell_connection_error(monkeypatch, exc): +async def test_fingerprinting_client_shell_connection_error(exc): """Connection errors produce a warning, not an unhandled exception.""" - monkeypatch.setattr(sfp, "_NEGOTIATION_SETTLE", 0.0) - monkeypatch.setattr(sfp, "_BANNER_WAIT", 0.01) - monkeypatch.setattr(sfp, "_POST_RETURN_WAIT", 0.01) writer = MockWriter() await sfp.fingerprinting_client_shell( - ErrorReader(exc), writer, host="192.0.2.1", port=23, silent=True + ErrorReader(exc), + writer, + host="192.0.2.1", + port=23, + silent=True, + banner_quiet_time=0.01, + banner_max_wait=0.01, + mssp_wait=0.01, ) assert writer._closing + + +@pytest.mark.asyncio +async def test_probe_server_capabilities_quick_default(): + """Default scan_type='quick' excludes legacy options.""" + writer = MockWriter(wont_options=[fps.BINARY]) + results = await sfp.probe_server_capabilities(writer, timeout=0.01) + probed_names = set(results.keys()) + legacy_names = {name for _, name, _ in fps.LEGACY_OPTIONS} + assert not probed_names.intersection(legacy_names) + + +@pytest.mark.asyncio +async def test_probe_server_capabilities_full(): + """scan_type='full' includes legacy options.""" + writer = MockWriter(wont_options=[fps.BINARY]) + results = await sfp.probe_server_capabilities(writer, timeout=0.01, scan_type="full") + probed_names = set(results.keys()) + legacy_names = {name for _, name, _ in fps.LEGACY_OPTIONS} + assert probed_names.issuperset(legacy_names) + expected = len(fps.ALL_PROBE_OPTIONS) - len( + [o for o in fps.ALL_PROBE_OPTIONS if o[0] in sfp._CLIENT_ONLY_WILL] + ) + assert len(results) == expected + + +@pytest.mark.asyncio +async def test_scan_type_recorded_in_fingerprint(tmp_path): + """scan_type appears in both session_data and fingerprint-data.""" + + for scan_type in ("quick", "full"): + save_path = str(tmp_path / f"{scan_type}.json") + reader = MockReader([b"Welcome"]) + writer = MockWriter(will_options=[fps.SGA]) + + await sfp.fingerprinting_client_shell( + reader, + writer, + host="localhost", + port=23, + save_path=save_path, + silent=True, + scan_type=scan_type, + banner_quiet_time=0.01, + banner_max_wait=0.01, + mssp_wait=0.01, + ) + + with open(save_path, encoding="utf-8") as f: + data = json.load(f) + assert data["server-probe"]["session_data"]["scan_type"] == scan_type + assert data["server-probe"]["fingerprint-data"]["scan-type"] == scan_type + + +def test_parse_environ_send_empty_payload(): + """Bare SB NEW_ENVIRON SEND SE (empty payload) means 'send all' per RFC 1572.""" + entries = sfp._parse_environ_send(b"") + assert len(entries) == 2 + assert entries[0] == {"type": "VAR", "name": "*"} + assert entries[1] == {"type": "USERVAR", "name": "*"} + + +@pytest.mark.asyncio +async def test_probe_skipped_when_closing(tmp_path): + """Probe burst is skipped when the connection is already closed.""" + + save_path = str(tmp_path / "result.json") + writer = MockWriter(will_options=[fps.SGA]) + writer._closing = True + + await sfp.fingerprinting_client_shell( + MockReader([]), + writer, + host="localhost", + port=23, + save_path=save_path, + silent=True, + banner_quiet_time=0.01, + banner_max_wait=0.01, + mssp_wait=0.01, + ) + + assert not writer._iac_calls + with open(save_path, encoding="utf-8") as f: + data = json.load(f) + assert data["server-probe"]["fingerprint-data"]["offered-options"] == [] + assert data["server-probe"]["fingerprint-data"]["refused-options"] == [] diff --git a/telnetlib3/tests/test_stream_writer_full.py b/telnetlib3/tests/test_stream_writer_full.py index 9a8f74d3..8db69664 100644 --- a/telnetlib3/tests/test_stream_writer_full.py +++ b/telnetlib3/tests/test_stream_writer_full.py @@ -1,4 +1,5 @@ # std imports +import logging import collections # 3rd party @@ -567,6 +568,23 @@ def test_escape_unescape_and_env_encode_decode_roundtrip(): assert dec == {"USER": "root", "LANG": "C.UTF-8"} +def test_decode_env_buf_bare_delimiters(): + """Bare VAR/USERVAR delimiters produce empty-string keys.""" + payload = VAR + USERVAR + result = _decode_env_buf(payload) + assert result == {"": ""} + + +def test_handle_sb_environ_bare_var_uservar_sends_empty(): + """SEND with bare VAR/USERVAR passes [''] to callback; security policy returns {}.""" + wc, tc, _ = new_writer(server=False, client=True) + received_keys = [] + wc.set_ext_send_callback(NEW_ENVIRON, lambda keys: (received_keys.extend(keys), {})[1]) + payload = VAR + USERVAR + wc._handle_sb_environ(collections.deque([NEW_ENVIRON, SEND, payload])) + assert received_keys == [""] + + def test_decode_env_buf_ebcdic(): """EBCDIC-encoded env data decoded when encoding=cp037.""" ebcdic_user = "USER".encode("cp037") @@ -1055,3 +1073,51 @@ def test_miscellaneous_handle_logs_cover_remaining_handlers(): ws.handle_ew(b"\x00") ws.handle_xon(b"\x00") ws.handle_xoff(b"\x00") + + +def test_sb_interrupted_logs_warning_with_context(caplog): + """SB interruption logs WARNING (not ERROR) with option name and byte count.""" + w, t, _ = new_writer(server=True) + # Enter SB mode: IAC SB CHARSET + w.feed_byte(IAC) + w.feed_byte(SB) + w.feed_byte(CHARSET) + w.feed_byte(b"\x01") + w.feed_byte(b"\x02") + # Interrupt with IAC WONT (instead of IAC SE) + with caplog.at_level(logging.WARNING): + w.feed_byte(IAC) + w.feed_byte(WONT) + assert any("SB CHARSET (3 bytes) interrupted by IAC WONT" in r.message for r in caplog.records) + assert all(r.levelno != logging.ERROR for r in caplog.records) + # The WONT command is still parsed: next byte is its option + w.feed_byte(ECHO) + + +def test_sb_begin_logged(caplog): + """Entering SB mode logs the option name at DEBUG level.""" + w, t, _ = new_writer(server=True) + with caplog.at_level(logging.DEBUG): + w.feed_byte(IAC) + w.feed_byte(SB) + w.feed_byte(TTYPE) + assert any("begin sub-negotiation SB TTYPE" in r.message for r in caplog.records) + + +def test_name_option_distinguishes_commands_from_options(): + """name_option renders IAC command bytes as repr, not their command names.""" + # local + from telnetlib3.telopt import name_option, name_command + + assert name_option(WONT) == repr(WONT) + assert name_option(DO) == repr(DO) + assert name_option(DONT) == repr(DONT) + assert name_option(WILL) == repr(WILL) + assert name_option(IAC) == repr(IAC) + assert name_option(SB) == repr(SB) + assert name_option(SE) == repr(SE) + assert name_option(SGA) == "SGA" + assert name_option(TTYPE) == "TTYPE" + assert name_option(NAWS) == "NAWS" + assert name_command(WONT) == "WONT" + assert name_command(SGA) == "SGA"