Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ jobs:
- ubuntu-latest
- windows-latest
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
61 changes: 55 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
Introduction
============

``telnetlib3`` is a full-featured Telnet Client and Server library for python3.8 and newer.
``telnetlib3`` is a feature-rich Telnet Server and Client Protocol library
for Python 3.9 and newer.

Modern asyncio_ and legacy blocking API's are provided.
This library supports both modern asyncio_ *and* legacy `Blocking API`_.

The python telnetlib.py_ module removed by Python 3.13 is also re-distributed as a backport.

Overview
========
The python telnetlib.py_ module removed by Python 3.13 is also re-distributed as-is, as a backport.

telnetlib3 provides multiple interfaces for working with the Telnet protocol:

Expand Down Expand Up @@ -74,6 +72,40 @@ program.
telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell
telnetlib3-server --pty-exec /bin/bash -- --login

Fingerprinting Server
---------------------

A built-in fingerprinting server shell is provided to uniquely identify telnet clients.

Install with optional dependencies for full fingerprinting support (prettytable_
and ucs-detect_)::

pip install telnetlib3[extras]

Usage::

export TELNETLIB3_DATA_DIR=./data
telnetlib3-server --shell telnetlib3.fingerprinting_server_shell

A public fingerprinting server you can try out yourself::

telnet 1984.ws 555

An optional post-fingerprint hook can process saved files. The hook is run as
``python -m <module> <filepath>``. The built-in post-script pretty-prints the JSON
and integrates with ucs-detect_ for terminal capability probing::

export TELNETLIB3_DATA_DIR=./fingerprints
export TELNETLIB3_FINGERPRINT_POST_SCRIPT=telnetlib3.fingerprinting
telnetlib3-server --shell telnetlib3.fingerprinting_server_shell

If ucs-detect_ is installed and available in PATH, the post-script automatically
runs it to probe terminal capabilities (colors, sixel, kitty graphics, etc.) and
adds the results to the fingerprint data as ``terminal-fingerprint-data``.

.. _ucs-detect: https://github.com/jquast/ucs-detect
.. _prettytable: https://pypi.org/project/prettytable/

Legacy telnetlib
----------------

Expand Down Expand Up @@ -113,6 +145,21 @@ or CHARSET to negotiate about it.
In this case, use ``--force-binary`` and ``--encoding`` when the encoding of
the remote end is known.

Go-Ahead (GA)
--------------

When a client does not negotiate Suppress Go-Ahead (SGA), the server sends
``IAC GA`` after output to signal that the client may transmit. This is
correct behavior for MUD clients like Mudlet that expect prompt detection
via GA.

If GA causes unwanted output for your use case, disable it::

telnetlib3-server --never-send-ga

For PTY shells, GA is sent after 500ms of output idle time to avoid
injecting GA in the middle of streaming output.

Quick Example
=============

Expand Down Expand Up @@ -156,6 +203,7 @@ The following RFC specifications are implemented:
* `rfc-859`_, "Telnet Status Option", May 1983.
* `rfc-860`_, "Telnet Timing mark Option", May 1983.
* `rfc-885`_, "Telnet End of Record Option", Dec 1983.
* `rfc-930`_, "Telnet Terminal Type Option", Jan 1984.
* `rfc-1073`_, "Telnet Window Size Option", Oct 1988.
* `rfc-1079`_, "Telnet Terminal Speed Option", Dec 1988.
* `rfc-1091`_, "Telnet Terminal-Type Option", Feb 1989.
Expand All @@ -178,6 +226,7 @@ The following RFC specifications are implemented:
.. _rfc-859: https://www.rfc-editor.org/rfc/rfc859.txt
.. _rfc-860: https://www.rfc-editor.org/rfc/rfc860.txt
.. _rfc-885: https://www.rfc-editor.org/rfc/rfc885.txt
.. _rfc-930: https://www.rfc-editor.org/rfc/rfc930.txt
.. _rfc-1073: https://www.rfc-editor.org/rfc/rfc1073.txt
.. _rfc-1079: https://www.rfc-editor.org/rfc/rfc1079.txt
.. _rfc-1091: https://www.rfc-editor.org/rfc/rfc1091.txt
Expand Down
284 changes: 284 additions & 0 deletions bin/moderate_fingerprints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
#!/usr/bin/env python
# pylint: disable=cyclic-import
"""Moderate fingerprint name suggestions."""

# std imports
import os
import sys
import json
import shutil
import subprocess
import collections
from pathlib import Path

_BAT = shutil.which("bat") or shutil.which("batcat")
_JQ = shutil.which("jq")
_UNKNOWN = "0" * 16
_PROBES = {
"telnet-probe": ("telnet-client", "telnet-client-revision"),
"terminal-probe": ("terminal-emulator", "terminal-emulator-revision"),
}


def _iter_files(data_dir):
"""Yield (path, data) for each client JSON file."""
client_base = data_dir / "client"
if client_base.is_dir():
for path in sorted(client_base.glob("*/*/*.json")):
try:
with open(path, encoding="utf-8") as f:
yield path, json.load(f)
except (OSError, json.JSONDecodeError):
continue


def _print_json(label, data):
"""Print labeled JSON, colorized through bat or jq when available."""
raw = json.dumps(data, indent=4, sort_keys=True)
if _BAT:
r = subprocess.run(
[_BAT, "-l", "json", "--style=plain", "--color=always"],
input=raw,
capture_output=True,
text=True,
check=False,
)
if r.returncode == 0:
raw = r.stdout.rstrip("\n")
elif _JQ:
r = subprocess.run([_JQ, "-C", "."], input=raw, capture_output=True, text=True, check=False)
if r.returncode == 0:
raw = r.stdout.rstrip("\n")
print(f"{label} {raw}")


def _print_telnet_context(session_data):
"""Print key telnet session fields for moderation context."""
ttype_cycle = session_data.get("ttype_cycle", [])
if ttype_cycle:
print(f" ttype cycle: {' -> '.join(ttype_cycle)}")

extra = session_data.get("extra", {})
if extra:
for key in sorted(extra):
print(f" {key}: {extra[key]}")


def _print_terminal_context(session_data):
"""Print key terminal session fields for moderation context."""
software = session_data.get("software_name")
version = session_data.get("software_version")
if software:
sw_str = software
if version:
sw_str += f" {version}"
print(f" software: {sw_str}")

aw = session_data.get("ambiguous_width")
if aw is not None:
print(f" ambiguous_width: {aw}")


def _print_paired(paired_hashes, label, names):
"""Print paired fingerprint hashes with names when known."""
if not paired_hashes:
return
other_label = "terminal" if label == "telnet" else "telnet"
parts = []
for ph in sorted(paired_hashes):
name = names.get(ph)
if name:
parts.append(f"{name} ({ph[:8]})")
else:
parts.append(ph[:12])
print(f" paired {other_label}: {', '.join(parts)}")


def _load_names(data_dir):
try:
with open(data_dir / "fingerprint_names.json", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return {}


def _save_names(data_dir, names):
path = data_dir / "fingerprint_names.json"
tmp = path.with_suffix(".json.new")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(names, f, indent=2, sort_keys=True)
os.rename(tmp, path)
print(f"\nSaved {path}")


def _scan(data_dir, names, revise=False):
"""Return entries for review.

Each entry is ``(label, hash, suggestions, fp_data, session, paired)``.
"""
suggestions = collections.defaultdict(list)
fp_data = {}
labels = {}
sessions = {}
paired = collections.defaultdict(set)

for _, data in _iter_files(data_dir):
file_sug = data.get("suggestions", {})
for probe_key, (sug_key, rev_key) in _PROBES.items():
h = data.get(probe_key, {}).get("fingerprint")
if not h or h == _UNKNOWN:
continue
labels.setdefault(h, probe_key.split("-", maxsplit=1)[0])
fp_data.setdefault(h, data.get(probe_key, {}).get("fingerprint-data", {}))
sessions.setdefault(h, data.get(probe_key, {}).get("session_data", {}))
other = "terminal-probe" if probe_key == "telnet-probe" else "telnet-probe"
other_h = data.get(other, {}).get("fingerprint")
if other_h and other_h != _UNKNOWN:
paired[h].add(other_h)
look = rev_key if revise else sug_key
if look in file_sug:
suggestions[h].append(file_sug[look])

return [
(
labels[h],
h,
suggestions.get(h, []),
fp_data[h],
sessions.get(h, {}),
paired.get(h, set()),
)
for h in sorted(fp_data)
if (h in names) == revise
]


def _review(entries, names):
"""Interactive review loop. Return True if any names were added."""
updated = False
for label, h, sug_list, fpd, session_data, paired_hashes in entries:
current = names.get(h)
print(f"\n{'=' * 60}\n {label}: {h}")
if current:
print(f" current name: {current}")

if fpd:
_print_json(" fingerprint-data:", fpd)

if label == "telnet" and session_data:
_print_telnet_context(session_data)
elif label == "terminal" and session_data:
_print_terminal_context(session_data)
_print_paired(paired_hashes, label, names)

default = ""
if sug_list:
counter = collections.Counter(sug_list)
default = counter.most_common(1)[0][0]
print(f" {sum(counter.values())} suggestion(s):")
for name, count in counter.most_common():
print(f" {count}x {name}")
else:
print(" (no client suggestions)")

suffix = f"for '{default}'" if default else "to skip"
try:
raw = input(f" Name (return {suffix}): ").strip()
except EOFError:
print()
continue
except KeyboardInterrupt:
print("\nAborted.")
return updated

chosen = raw or default
if chosen and chosen != current:
names[h] = chosen
updated = True
print(f" -> {h} = {chosen}")

return updated


def _relocate(data_dir):
"""Move misplaced JSON files to match their internal fingerprint hashes."""
client_base = data_dir / "client"
moved = 0
stale = set()
for path, data in _iter_files(data_dir):
th = data.get("telnet-probe", {}).get("fingerprint")
tmh = data.get("terminal-probe", {}).get("fingerprint", _UNKNOWN)
if not th:
continue
if path.parent.parent.name == th and path.parent.name == tmh:
continue
target = client_base / th / tmh / path.name
if target.exists():
continue
target.parent.mkdir(parents=True, exist_ok=True)
os.rename(path, target)
moved += 1
stale.add(path.parent)

for d in stale:
try:
d.rmdir()
d.parent.rmdir()
except OSError:
pass
return moved


def _prune(data_dir, names):
"""Remove named hashes that have no data files."""
hashes = set()
for path, _ in _iter_files(data_dir):
hashes.update({path.parent.parent.name, path.parent.name})
orphaned = {h: n for h, n in names.items() if h not in hashes}
if not orphaned:
return False

print(f"Found {len(orphaned)} orphaned hash(es):")
for h, name in sorted(orphaned.items(), key=lambda x: x[1]):
print(f" {h} {name}")
try:
if input("\nRemove? [y/N] ").strip().lower() != "y":
return False
except (EOFError, KeyboardInterrupt):
print()
return False

for h in orphaned:
del names[h]
return True


def main():
"""CLI entry point for moderating fingerprint name suggestions."""
data_dir_env = os.environ.get("TELNETLIB3_DATA_DIR")
if not data_dir_env:
print("Error: TELNETLIB3_DATA_DIR not set", file=sys.stderr)
sys.exit(1)
data_dir = Path(data_dir_env)
if not data_dir.exists():
print(f"Error: {data_dir} does not exist", file=sys.stderr)
sys.exit(1)

revise = "--check-revise" in sys.argv
relocated = _relocate(data_dir)
if relocated:
print(f"Relocated {relocated} file(s).\n")

names = _load_names(data_dir)
if "--no-prune" not in sys.argv and _prune(data_dir, names):
_save_names(data_dir, names)

entries = _scan(data_dir, names, revise)
if entries and _review(entries, names):
_save_names(data_dir, names)
elif not entries:
print("Nothing to review.")


if __name__ == "__main__":
main()
Loading
Loading