Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,25 @@
Introduction
============

``telnetlib3`` is a feature-rich Telnet Server and Client Protocol library
``telnetlib3`` is a feature-rich Telnet Server, Client, and Protocol library
for Python 3.9 and newer.

This library supports both modern asyncio_ *and* legacy `Blocking API`_.

The python telnetlib.py_ module removed by Python 3.13 is also re-distributed as-is, as a backport.

telnetlib3 provides multiple interfaces for working with the Telnet protocol:
See the `Guidebook`_ for examples and the `API documentation`_.

Asyncio Protocol
----------------

Modern async/await interface for both client and server, supporting concurrent
connections. See the `Guidebook`_ for examples and the `API documentation`_.
The core protocol and CLI utilities are written using an `Asyncio Interface`_.

Blocking API
------------

A traditional synchronous interface modeled after telnetlib.py_ (client) and miniboa_ (server),
with various enhancements in protocol negotiation is provided. Blocking API calls for complex
arrangements of clients and servers typically require threads.

See `sync API documentation`_ for more.
A Synchronous interface, modeled after telnetlib.py_ (client) and miniboa_ (server), with various
enhancements in protocol negotiation is also provided. See `sync API documentation`_ for more.

Command-line Utilities
----------------------
Expand All @@ -66,16 +62,38 @@ program.

::

# utf8 roguelike server
telnetlib3-client nethack.alt.org
# utf8 bbs
telnetlib3-client xibalba.l33t.codes 44510
# automatic communication with telnet server
telnetlib3-client --shell bin.client_wargame.shell 1984.ws 666
# run a server with default shell
telnetlib3-server
# or custom port and ip and shell
telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell
telnetlib3-server --pty-exec /bin/bash -- --login
# run an external program with a pseudo-terminal
telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login
# or a simple linemode program, bc (calculator)
telnetlib3-server --pty-exec /bin/bc


There are also fingerprinting CLIs, ``telnetlib3-fingerprint`` and
``telnetlib3-fingerprint-server``

::

# host a server, wait for clients to connect and fingerprint them,
telnetlib3-fingerprint-server

# report fingerprint of telnet server on 1984.ws
telnetlib3-fingerprint 1984.ws


Legacy telnetlib
----------------

This library contains an unadulterated copy of Python 3.12's telnetlib.py_,
This library contains an *unadulterated copy* of Python 3.12's telnetlib.py_,
from the standard library before it was removed in Python 3.13.

To migrate code, change import statements:
Expand Down Expand Up @@ -206,6 +224,8 @@ The following RFC specifications are implemented:
.. _rfc-2066: https://www.rfc-editor.org/rfc/rfc2066.txt
.. _`bin/`: https://github.com/jquast/telnetlib3/tree/master/bin
.. _telnetlib.py: https://docs.python.org/3.12/library/telnetlib.html
.. _Asyncio Interface: https://telnetlib3.readthedocs.io/en/latest/guidebook.html#asyncio-interface
.. _Blocking API: https://telnetlib3.readthedocs.io/en/latest/guidebook.html#blocking-interface
.. _Guidebook: https://telnetlib3.readthedocs.io/en/latest/guidebook.html
.. _API documentation: https://telnetlib3.readthedocs.io/en/latest/api.html
.. _sync API documentation: https://telnetlib3.readthedocs.io/en/latest/api/sync.html
Expand Down
2 changes: 1 addition & 1 deletion bin/client_wargame.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def shell(reader, writer):
break
if "?" in outp:
# Reply to all questions with 'y'
writer.write("y")
writer.write("y\r\n")

# Display all server output
print(outp, flush=True, end="")
Expand Down
168 changes: 154 additions & 14 deletions bin/moderate_fingerprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,33 @@
import sys
import json
import shutil
import signal
import socket
import argparse
import subprocess
import collections
from pathlib import Path

try:
# 3rd party
from wcwidth import iter_sequences, strip_sequences

_HAS_WCWIDTH = True
except ImportError:
_HAS_WCWIDTH = False

_BAT = shutil.which("bat") or shutil.which("batcat")
_JQ = shutil.which("jq")
_UNKNOWN = "0" * 16
_PROBES = {
"telnet-probe": ("telnet-client", "telnet-client-revision"),
"terminal-probe": ("terminal-emulator", "terminal-emulator-revision"),
"server-probe": ("telnet-server", "telnet-server-revision"),
}


def _iter_files(data_dir):
"""Yield (path, data) for each client JSON file."""
"""Yield (path, data) for each fingerprint JSON file."""
client_base = data_dir / "client"
if client_base.is_dir():
for path in sorted(client_base.glob("*/*/*.json")):
Expand All @@ -30,6 +42,14 @@ def _iter_files(data_dir):
yield path, json.load(f)
except (OSError, json.JSONDecodeError):
continue
server_base = data_dir / "server"
if server_base.is_dir():
for path in sorted(server_base.glob("*/*.json")):
try:
with open(path, encoding="utf-8") as f:
yield path, json.load(f)
except (OSError, json.JSONDecodeError):
continue


def _print_json(label, data):
Expand Down Expand Up @@ -79,6 +99,85 @@ def _print_terminal_context(session_data):
print(f" ambiguous_width: {aw}")


def _resolve_dns(host, timeout=5):
"""Resolve forward and reverse DNS for *host*, with timeout."""
forward = []
reverse = []

def _alarm_handler(signum, frame):
raise TimeoutError

old_handler = signal.signal(signal.SIGALRM, _alarm_handler)
try:
signal.alarm(timeout)
try:
infos = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
forward = sorted({info[4][0] for info in infos})
except (socket.gaierror, TimeoutError):
pass
for addr in forward:
try:
hostname, _, _ = socket.gethostbyaddr(addr)
reverse.append(hostname)
except (socket.herror, socket.gaierror, TimeoutError):
continue
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return forward, sorted(set(reverse))


def _format_banner(banner_data):
"""Return (clean_text, raw_display) from a banner dict."""
text = banner_data.get("text", "")
raw_hex = banner_data.get("raw_hex", "")
if _HAS_WCWIDTH and text:
clean = strip_sequences(text)
else:
clean = text
if _HAS_WCWIDTH and text:
parts = []
for seq in iter_sequences(text):
parts.append(repr(seq))
raw_display = " ".join(parts)
else:
raw_display = raw_hex
return clean, raw_display


def _print_server_context(session_data):
"""Print server fingerprint details for moderation context."""
for banner_key, banner_label in (
("banner_before_return", "pre-return"),
("banner_after_return", "post-return"),
):
banner = session_data.get(banner_key, {})
if not banner:
continue
clean, raw_display = _format_banner(banner)
if clean:
print(f" banner ({banner_label}, clean):")
for line in clean.splitlines():
print(f" {line}")
print()
if raw_display:
print(f" banner ({banner_label}, raw):")
for i in range(0, len(raw_display), 76):
print(f" {raw_display[i:i + 76]}")
print()

host = session_data.get("host", "")
port = session_data.get("port", "")
if host:
host_str = f"{host}:{port}" if port else host
print(f" host: {host_str}")
forward, reverse = _resolve_dns(host)
if forward:
print(f" forward DNS: {', '.join(forward)}")
if reverse:
print(f" reverse DNS: {', '.join(reverse)}")


def _print_paired(paired_hashes, label, names):
"""Print paired fingerprint hashes with names when known."""
if not paired_hashes:
Expand Down Expand Up @@ -131,10 +230,11 @@ def _scan(data_dir, names, revise=False):
labels.setdefault(h, probe_key.split("-", maxsplit=1)[0])
fp_data.setdefault(h, data.get(probe_key, {}).get("fingerprint-data", {}))
sessions.setdefault(h, data.get(probe_key, {}).get("session_data", {}))
other = "terminal-probe" if probe_key == "telnet-probe" else "telnet-probe"
other_h = data.get(other, {}).get("fingerprint")
if other_h and other_h != _UNKNOWN:
paired[h].add(other_h)
if probe_key in ("telnet-probe", "terminal-probe"):
other = "terminal-probe" if probe_key == "telnet-probe" else "telnet-probe"
other_h = data.get(other, {}).get("fingerprint")
if other_h and other_h != _UNKNOWN:
paired[h].add(other_h)
look = rev_key if revise else sug_key
if look in file_sug:
suggestions[h].append(file_sug[look])
Expand Down Expand Up @@ -169,6 +269,8 @@ def _review(entries, names):
_print_telnet_context(session_data)
elif label == "terminal" and session_data:
_print_terminal_context(session_data)
elif label == "server" and session_data:
_print_server_context(session_data)
_print_paired(paired_hashes, label, names)

default = ""
Expand Down Expand Up @@ -203,9 +305,22 @@ def _review(entries, names):
def _relocate(data_dir):
"""Move misplaced JSON files to match their internal fingerprint hashes."""
client_base = data_dir / "client"
server_base = data_dir / "server"
moved = 0
stale = set()
for path, data in _iter_files(data_dir):
sh = data.get("server-probe", {}).get("fingerprint")
if sh:
if path.parent.name == sh:
continue
target = server_base / sh / path.name
if target.exists():
continue
target.parent.mkdir(parents=True, exist_ok=True)
os.rename(path, target)
moved += 1
stale.add(path.parent)
continue
th = data.get("telnet-probe", {}).get("fingerprint")
tmh = data.get("terminal-probe", {}).get("fingerprint", _UNKNOWN)
if not th:
Expand All @@ -232,8 +347,11 @@ def _relocate(data_dir):
def _prune(data_dir, names):
"""Remove named hashes that have no data files."""
hashes = set()
for path, _ in _iter_files(data_dir):
hashes.update({path.parent.parent.name, path.parent.name})
for _path, data in _iter_files(data_dir):
for probe_key in _PROBES:
h = data.get(probe_key, {}).get("fingerprint")
if h and h != _UNKNOWN:
hashes.add(h)
orphaned = {h: n for h, n in names.items() if h not in hashes}
if not orphaned:
return False
Expand All @@ -253,27 +371,49 @@ def _prune(data_dir, names):
return True


def _get_argument_parser():
"""Build argument parser for ``moderate_fingerprints`` CLI."""
parser = argparse.ArgumentParser(
description="Moderate fingerprint name suggestions",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--data-dir",
default=os.environ.get("TELNETLIB3_DATA_DIR"),
help="directory for fingerprint data (default: $TELNETLIB3_DATA_DIR)",
)
parser.add_argument(
"--check-revise", action="store_true", help="review already-named fingerprints for revision"
)
parser.add_argument(
"--no-prune",
action="store_true",
help="skip pruning orphaned hashes from fingerprint_names.json",
)
return parser


def main():
"""CLI entry point for moderating fingerprint name suggestions."""
data_dir_env = os.environ.get("TELNETLIB3_DATA_DIR")
if not data_dir_env:
print("Error: TELNETLIB3_DATA_DIR not set", file=sys.stderr)
args = _get_argument_parser().parse_args()

if not args.data_dir:
print("Error: --data-dir or $TELNETLIB3_DATA_DIR required", file=sys.stderr)
sys.exit(1)
data_dir = Path(data_dir_env)
data_dir = Path(args.data_dir)
if not data_dir.exists():
print(f"Error: {data_dir} does not exist", file=sys.stderr)
sys.exit(1)

revise = "--check-revise" in sys.argv
relocated = _relocate(data_dir)
if relocated:
print(f"Relocated {relocated} file(s).\n")

names = _load_names(data_dir)
if "--no-prune" not in sys.argv and _prune(data_dir, names):
if not args.no_prune and _prune(data_dir, names):
_save_names(data_dir, names)

entries = _scan(data_dir, names, revise)
entries = _scan(data_dir, names, args.check_revise)
if entries and _review(entries, names):
_save_names(data_dir, names)
elif not entries:
Expand Down
Loading
Loading