diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..e398eead --- /dev/null +++ b/.editorconfig @@ -0,0 +1,10 @@ +root = true + +[*.py] +charset = utf-8 +tab_width = 4 +indent_size = tab +indent_space = space +trim_trailing_whitespace = true +insert_final_newline = true +max_line_length = 100 diff --git a/README.rst b/README.rst index d8b326cd..5f823eb4 100644 --- a/README.rst +++ b/README.rst @@ -72,40 +72,6 @@ program. telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell telnetlib3-server --pty-exec /bin/bash -- --login -Fingerprinting Server ---------------------- - -A built-in fingerprinting server shell is provided to uniquely identify telnet clients. - -Install with optional dependencies for full fingerprinting support (prettytable_ -and ucs-detect_):: - - pip install telnetlib3[extras] - -Usage:: - - export TELNETLIB3_DATA_DIR=./data - telnetlib3-server --shell telnetlib3.fingerprinting_server_shell - -A public fingerprinting server you can try out yourself:: - - telnet 1984.ws 555 - -An optional post-fingerprint hook can process saved files. The hook is run as -``python -m ``. The built-in post-script pretty-prints the JSON -and integrates with ucs-detect_ for terminal capability probing:: - - export TELNETLIB3_DATA_DIR=./fingerprints - export TELNETLIB3_FINGERPRINT_POST_SCRIPT=telnetlib3.fingerprinting - telnetlib3-server --shell telnetlib3.fingerprinting_server_shell - -If ucs-detect_ is installed and available in PATH, the post-script automatically -runs it to probe terminal capabilities (colors, sixel, kitty graphics, etc.) and -adds the results to the fingerprint data as ``terminal-fingerprint-data``. - -.. _ucs-detect: https://github.com/jquast/ucs-detect -.. _prettytable: https://pypi.org/project/prettytable/ - Legacy telnetlib ---------------- diff --git a/bin/server_mud.py b/bin/server_mud.py new file mode 100755 index 00000000..e79478aa --- /dev/null +++ b/bin/server_mud.py @@ -0,0 +1,868 @@ +#!/usr/bin/env python +"""Mini-MUD server demo with combat, GMCP/MSDP/MSSP. + +Usage:: + + $ python bin/server_mud.py + $ telnet localhost 6023 +""" + +from __future__ import annotations + +# std imports +import json +import time +import random +import asyncio +import logging +import argparse +import unicodedata +from typing import Any + +# local +import telnetlib3 +from telnetlib3.telopt import GMCP, MSDP, MSSP, WILL +from telnetlib3.server_shell import readline2 + +log = logging.getLogger("mud") + + +SERVER_NAME = "Mini-MUD Demo" +START_ROOM = "tavern" +IDLE_TIMEOUT = 120 +FIST_DAMAGE = (3, 7) +MAX_HEALTH = 100 +MAX_MANA = 50 +HEAL_AMOUNT = 5 +HEAL_INTERVAL = 2 +ATTACK_COOLDOWN = 2 +DODGE_DURATION = 2 +DODGE_CHANCE = 0.9 +BANNED_WORDS = ["cisco", "admin", "root"] +MSSP_DATA: dict[str, Any] = { + "NAME": SERVER_NAME, + "CODEBASE": "telnetlib3", + "LANGUAGE": ["English"], + "GAMEPLAY": ["Adventure", "Combat"], +} + + +ROOM_IDS: dict[str, int] = {"tavern": 1, "market": 2, "smithy": 3, "temple": 4, "forest": 5} +EXIT_SHORT: dict[str, str] = {"north": "n", "south": "s", "east": "e", "west": "w"} +MSDP_COMMANDS: list[str] = ["LIST", "SEND", "REPORT", "UNREPORT", "RESET"] +MSDP_REPORTABLE: list[str] = [ + "CHARACTER_NAME", + "SERVER_ID", + "HEALTH", + "HEALTH_MAX", + "MANA", + "MANA_MAX", + "ROOM", + "WEAPON", + "OPPONENT_NAME", + "OPPONENT_HEALTH", + "OPPONENT_HEALTH_MAX", +] +MSDP_CONFIGURABLE: list[str] = [] + +ROOMS: dict[str, dict[str, Any]] = { + "tavern": { + "name": "The Rusty Tavern", + "sanctuary": True, + "environment": "Indoor", + "desc": "A cozy tavern with wooden tables and a roaring fireplace." + " A warm glow surrounds you -- no violence is permitted here.", + "exits": {"north": "market", "east": "smithy"}, + }, + "market": { + "name": "Dirty Cobble Path", + "environment": "City", + "desc": "Uneven cobblestones stretch ahead, slick with grime and puddles.", + "exits": {"south": "tavern", "west": "temple"}, + }, + "smithy": { + "name": "The Smithy", + "environment": "Indoor", + "desc": "Hot coals glow red as the blacksmith hammers away at an anvil.", + "exits": {"west": "tavern"}, + }, + "temple": { + "name": "Muddy Bend", + "environment": "Field", + "desc": "The path curves around a steep hill, thick mud sucking at every step.", + "exits": {"east": "market", "north": "forest"}, + }, + "forest": { + "name": "Dark Forest", + "environment": "Forest", + "desc": "Twisted trees block out the sun in this mysterious woodland.", + "exits": {"south": "temple"}, + }, +} + + +class Weapon: # pylint: disable=too-few-public-methods + """A weapon that can be held or placed in a room.""" + + def __init__(self, name: str, damage: tuple[int, int], start_room: str) -> None: + self.name = name + self.damage = damage + self.start_room = start_room + self.location: str | None = start_room + self.holder: "Player | None" = None + + @property + def damage_display(self) -> str: + """Format damage range for display.""" + return f"{self.damage[0]}-{self.damage[1]}" + + +class Player: # pylint: disable=too-few-public-methods + """A connected player.""" + + def __init__(self, name: str = "Adventurer") -> None: + self.name = name + self.health = self.max_health = MAX_HEALTH + self.mana = self.max_mana = MAX_MANA + self.room = START_ROOM + self.weapon: Weapon | None = None + self.is_dodging = False + self.debug_mode = False + self.last_activity = time.monotonic() + self.msdp_reported: set[str] = set() + + +WEAPONS = [ + Weapon("Rusty Sword", (10, 15), "smithy"), + Weapon("Battle Axe", (15, 20), "forest"), + Weapon("Magic Staff", (12, 18), "temple"), + Weapon("Dagger", (5, 10), "market"), +] + +sessions: dict[Any, Player] = {} + + +def strip_control_chars(text: str) -> str: + """Remove control characters from *text*.""" + return "".join(c for c in text if unicodedata.category(c) != "Cc") + + +def tell(writer: Any, *lines: str) -> None: + """Write *lines* to *writer* with CRLF wrapping.""" + writer.write("\r\n" + "\r\n".join(lines) + "\r\n") + + +def send_message(writer: Any, text: str) -> None: + """Send a feedback message with a trailing blank line.""" + tell(writer, text, "") + + +def find_writer(player: Player) -> Any: + """Return the writer for *player*, or ``None``.""" + return next((w for w, p in sessions.items() if p is player), None) + + +def players_in_room(room_key: str, exclude: Player | None = None) -> list[Player]: + """Return active players in *room_key*.""" + return [p for p in sessions.values() if p.room == room_key and p is not exclude] + + +def weapons_in_room(room_key: str) -> list[Weapon]: + """Return weapons lying in *room_key*.""" + return [w for w in WEAPONS if w.location == room_key] + + +def drop_weapon(player: Player) -> str | None: + """Drop player's weapon into their room; return name.""" + if wp := player.weapon: + wp.location = player.room + wp.holder = None + player.weapon = None + return wp.name + return None + + +def broadcast_room(source: Any, room_key: str, text: str, exclude: list[Any] | None = None) -> None: + """Send *text* to all writers in *room_key* except *source*.""" + log.info("[%s] %s", ROOMS[room_key]["name"], text) + msg = f"\r\n{text}\r\n> " + skip = set(exclude or ()) + for w, p in sessions.items(): + if w is not source and w not in skip and p.room == room_key: + w.write(msg) + + +def update_room_all(room_key: str) -> None: + """Push GMCP Room.Info to everyone in *room_key*.""" + for w, p in sessions.items(): + if p.room == room_key: + send_room_gmcp(w, p) + + +def announce_all(writer: Any, msg: str) -> None: + """Send *msg* to all connected players.""" + for w in sessions: + if w is not writer: + w.write(f"\r\n{msg}\r\n> ") + send_message(writer, msg) + + +def resolve_target(writer: Any, prefix: str, candidates: list[Any], err: str) -> Any: + """Resolve an abbreviated target name.""" + matches = [c for c in candidates if c.name.lower().startswith(prefix.lower())] + if len(matches) == 1: + return matches[0] + if matches: + send_message(writer, f"Did you mean: {', '.join(m.name for m in matches)}?") + else: + send_message(writer, err) + return None + + +def send_vitals(writer: Any, player: Player) -> None: + """Push Char.Vitals GMCP and MSDP reported variables.""" + wn = player.weapon.name if player.weapon else "Fists" + st = "dodging" if player.is_dodging else "ready" + writer.send_gmcp( + "Char.Vitals", + { + "hp": player.health, + "maxhp": player.max_health, + "mp": player.mana, + "maxmp": player.max_mana, + "weapon": wn, + "status": st, + }, + ) + push_msdp_reported(writer, player) + + +def send_room_gmcp(writer: Any, player: Player) -> None: + """Push Room.Info GMCP.""" + room = ROOMS[player.room] + people = [ + {"name": p.name, "hp": p.health, "maxhp": p.max_health} + for p in players_in_room(player.room, exclude=player) + ] + writer.send_gmcp( + "Room.Info", + { + "num": ROOM_IDS[player.room], + "name": room["name"], + "area": "town", + "environment": room.get("environment", "Urban"), + "exits": {d: ROOM_IDS[r] for d, r in room["exits"].items()}, + "players": people, + "items": [w.name for w in weapons_in_room(player.room)], + }, + ) + + +def send_status(writer: Any, msg: str) -> None: + """Push Char.Status GMCP message.""" + if writer: + writer.send_gmcp("Char.Status", {"message": msg}) + + +def on_gmcp(writer: Any, package: str, data: Any) -> None: + """Handle incoming GMCP from a client.""" + player = sessions.get(writer) + if not player or not player.debug_mode: + return + writer.write(f"[DEBUG GMCP] {package}: {json.dumps(data)}\r\n") + + +def get_msdp_var( # pylint: disable=too-many-return-statements + player: Player, var: str +) -> dict[str, Any] | None: + """Return MSDP value dict for *var*, or ``None`` if unknown.""" + if var == "CHARACTER_NAME": + return {"CHARACTER_NAME": player.name} + if var == "SERVER_ID": + return {"SERVER_ID": SERVER_NAME} + if var == "HEALTH": + return {"HEALTH": str(player.health)} + if var == "HEALTH_MAX": + return {"HEALTH_MAX": str(player.max_health)} + if var == "MANA": + return {"MANA": str(player.mana)} + if var == "MANA_MAX": + return {"MANA_MAX": str(player.max_mana)} + if var == "ROOM": + room = ROOMS[player.room] + exits = {EXIT_SHORT.get(d, d): str(ROOM_IDS[dest]) for d, dest in room["exits"].items()} + return { + "ROOM": { + "VNUM": str(ROOM_IDS[player.room]), + "NAME": room["name"], + "AREA": "town", + "TERRAIN": room.get("environment", "Urban"), + "EXITS": exits, + } + } + if var == "WEAPON": + return {"WEAPON": player.weapon.name if player.weapon else "Fists"} + if var == "OPPONENT_NAME": + return {"OPPONENT_NAME": ""} + if var == "OPPONENT_HEALTH": + return {"OPPONENT_HEALTH": "0"} + if var == "OPPONENT_HEALTH_MAX": + return {"OPPONENT_HEALTH_MAX": "0"} + return None + + +def push_msdp_reported(writer: Any, player: Player) -> None: + """Push all MSDP variables in *player*'s report set.""" + if not player.msdp_reported: + return + merged: dict[str, Any] = {} + for var in player.msdp_reported: + val = get_msdp_var(player, var) + if val is not None: + merged.update(val) + if merged: + writer.send_msdp(merged) + + +def on_msdp(writer: Any, variables: dict[str, Any]) -> None: + """Handle incoming MSDP from a client.""" + player = sessions.get(writer) + if not player: + return + if player.debug_mode: + writer.write(f"[DEBUG MSDP] {variables!r}\r\n") + for cmd, val in variables.items(): + if cmd == "LIST": + _msdp_list(writer, val) + elif cmd == "SEND": + _msdp_send(writer, player, val) + elif cmd == "REPORT": + _msdp_report(writer, player, val) + elif cmd == "UNREPORT": + _msdp_unreport(player, val) + elif cmd == "RESET": + player.msdp_reported.clear() + + +def _msdp_list(writer: Any, what: str | list[str]) -> None: + """Handle MSDP LIST command.""" + if isinstance(what, list): + for item in what: + _msdp_list(writer, item) + return + if what == "COMMANDS": + writer.send_msdp({"COMMANDS": MSDP_COMMANDS}) + elif what == "LISTS": + writer.send_msdp( + {"LISTS": ["COMMANDS", "LISTS", "REPORTABLE_VARIABLES", "CONFIGURABLE_VARIABLES"]} + ) + elif what == "REPORTABLE_VARIABLES": + writer.send_msdp({"REPORTABLE_VARIABLES": MSDP_REPORTABLE}) + elif what == "CONFIGURABLE_VARIABLES": + writer.send_msdp({"CONFIGURABLE_VARIABLES": MSDP_CONFIGURABLE}) + + +def _msdp_send(writer: Any, player: Player, what: str | list[str]) -> None: + """Handle MSDP SEND -- one-time variable fetch.""" + names = [what] if isinstance(what, str) else what + merged: dict[str, Any] = {} + for var in names: + val = get_msdp_var(player, var) + if val is not None: + merged.update(val) + if merged: + writer.send_msdp(merged) + + +def _msdp_report(writer: Any, player: Player, what: str | list[str]) -> None: + """Handle MSDP REPORT -- subscribe to variable updates.""" + names = [what] if isinstance(what, str) else what + merged: dict[str, Any] = {} + for var in names: + if var in MSDP_REPORTABLE: + player.msdp_reported.add(var) + val = get_msdp_var(player, var) + if val is not None: + merged.update(val) + if merged: + writer.send_msdp(merged) + + +def _msdp_unreport(player: Player, what: str | list[str]) -> None: + """Handle MSDP UNREPORT -- unsubscribe from variable updates.""" + names = [what] if isinstance(what, str) else what + for var in names: + player.msdp_reported.discard(var) + + +def show_room(writer: Any, player: Player) -> None: + """Display the current room to *writer*.""" + room = ROOMS[player.room] + lines = [room["name"], room["desc"]] + if item_names := [w.name for w in weapons_in_room(player.room)]: + lines.append(f"Items here: {', '.join(item_names)}") + others = [p.name for p in sessions.values() if p.room == player.room and p is not player] + if others: + lines.append(f"Players here: {', '.join(others)}") + lines.append(f"Exits: {', '.join(room['exits'])}") + tell(writer, *lines, "") + + +def process_death(writer: Any, player: Player, killer: Player) -> None: + """Handle a player dying -- respawn in tavern.""" + death_room = player.room + kw = find_writer(killer) + dropped = drop_weapon(player) + tell(writer, "", "You have been slain!") + send_status(writer, "You have died") + if dropped: + writer.send_gmcp("Char.Items.Remove", {"name": dropped}) + if kw: + send_message(kw, f"{player.name} has been slain!") + send_status(kw, f"{player.name} has been slain") + broadcast_room(writer, death_room, f"{player.name} has been slain!", exclude=[kw] if kw else []) + player.room = START_ROOM + player.health = player.max_health + player.is_dodging = False + tell(writer, "You materialize in a burst of light.") + send_vitals(writer, player) + show_room(writer, player) + send_room_gmcp(writer, player) + writer.write("> ") + broadcast_room(writer, player.room, f"{player.name} materializes" " in a burst of light.") + update_room_all(death_room) + update_room_all(START_ROOM) + + +class Commands: + """Command dispatcher -- ``do_*`` methods are discovered dynamically.""" + + ALIASES: dict[str, str] = { + "n": "north", + "s": "south", + "e": "east", + "w": "west", + "l": "look", + "i": "inventory", + } + + def __init__(self, writer: Any, player: Player) -> None: + self.writer = writer + self.player = player + self._cmd_map: dict[str, Any] = { + name[3:]: getattr(self, name) + for name in sorted(dir(self)) + if name.startswith("do_") and callable(getattr(self, name)) + } + + def _resolve(self, word: str) -> str | list[str]: + """Resolve an abbreviated command word.""" + word = self.ALIASES.get(word, word) + if word in self._cmd_map: + return word + matches = [c for c in self._cmd_map if c.startswith(word)] + return matches[0] if len(matches) == 1 else matches + + async def dispatch(self, text: str) -> bool: + """Parse and dispatch a command. + + :returns: ``False`` to disconnect, ``True`` to continue. + """ + if not text: + return True + + w = self.writer + p = self.player + + if any(b in text for b in BANNED_WORDS[:2]): + announce_all(w, f"*DING!* {p.name}" " is laughed out of the tavern!") + return False + if BANNED_WORDS[-1] in text: + announce_all( + w, + f"{p.name} rings the bell and yells," + f" '{BANNED_WORDS[-1]}!', and everybody" + " in the tavern breaks out in laughter!", + ) + return True + + p.last_activity = time.monotonic() + + if text.startswith("pick up "): + verb, argument = "get", text[8:].strip() + elif text.startswith("pick "): + verb, argument = "get", text[5:].strip() + else: + parts = text.split(maxsplit=1) + argument = parts[1].strip() if len(parts) > 1 else "" + resolved = self._resolve(parts[0]) + if isinstance(resolved, list): + if resolved: + send_message(w, f"Did you mean: {', '.join(resolved)}?") + else: + send_message(w, "Unknown command. Type 'help' for commands.") + return True + verb = resolved + + method = self._cmd_map.get(verb) + if method is None: + send_message(w, "Unknown command. Type 'help' for commands.") + return True + return await method(argument) + + # -- commands ------------------------------------------------------- + + async def do_help(self, argument: str) -> bool: + """Show available commands.""" + if argument: + resolved = self._resolve(argument) + if isinstance(resolved, str): + doc = self._cmd_map[resolved].__doc__ + send_message(self.writer, f"{resolved}: {doc}") + else: + send_message(self.writer, "Unknown command.") + else: + lines = ["Commands:"] + for name, method in self._cmd_map.items(): + lines.append(f" {name:15s} - {method.__doc__}") + lines += ["", "Aliases: n/s/e/w, l=look, i=inventory", "Commands can be abbreviated."] + tell(self.writer, *lines) + return True + + async def do_look(self, *_args: str) -> bool: + """Look around the current room.""" + show_room(self.writer, self.player) + send_room_gmcp(self.writer, self.player) + return True + + async def do_north(self, *_args: str) -> bool: + """Go north.""" + return await self._move("north") + + async def do_south(self, *_args: str) -> bool: + """Go south.""" + return await self._move("south") + + async def do_east(self, *_args: str) -> bool: + """Go east.""" + return await self._move("east") + + async def do_west(self, *_args: str) -> bool: + """Go west.""" + return await self._move("west") + + async def do_stats(self, *_args: str) -> bool: + """View your stats.""" + p = self.player + wn = p.weapon.name if p.weapon else "Fists" + room_name = ROOMS[p.room]["name"] + tell( + self.writer, + f"Name: {p.name}" f" HP: {p.health}/{p.max_health}" f" MP: {p.mana}/{p.max_mana}", + f"Weapon: {wn} Room: {room_name}", + "", + ) + send_vitals(self.writer, p) + return True + + async def do_inventory(self, *_args: str) -> bool: + """Check held items.""" + if wp := self.player.weapon: + tell(self.writer, f"Holding: {wp.name}" f" ({wp.damage_display} damage)", "") + self.writer.send_gmcp( + "Char.Items.List", + [{"name": wp.name, "type": "weapon", "damage": wp.damage_display}], + ) + else: + tell(self.writer, "You are empty-handed.", "") + self.writer.send_gmcp("Char.Items.List", []) + return True + + async def do_say(self, argument: str) -> bool: + """Speak to the room.""" + if not argument: + send_message(self.writer, "Say what?") + else: + send_message(self.writer, f'You say, "{argument}"') + broadcast_room(self.writer, self.player.room, f'{self.player.name} says, "{argument}"') + return True + + async def do_debug(self, argument: str) -> bool: + """Toggle debug mode.""" + self.player.debug_mode = {"on": True, "off": False}.get( + argument, not self.player.debug_mode + ) + state = "on" if self.player.debug_mode else "off" + send_message(self.writer, f"Debug mode {state}.") + return True + + async def do_get(self, argument: str) -> bool: + """Pick up a weapon.""" + w, p = self.writer, self.player + if not argument: + send_message(w, "Pick up what?") + return True + pfx = argument.lower() + found = [wp for wp in WEAPONS if wp.location == p.room and wp.name.lower().startswith(pfx)] + if not found: + send_message(w, "Nothing like that here.") + elif len(found) > 1: + names = ", ".join(wp.name for wp in found) + send_message(w, f"Did you mean: {names}?") + elif p.weapon: + send_message(w, "Drop your weapon first.") + else: + weapon = found[0] + weapon.location = None + weapon.holder = p + p.weapon = weapon + send_message(w, f"You pick up the {weapon.name}.") + w.send_gmcp( + "Char.Items.Add", + {"name": weapon.name, "type": "weapon", "damage": weapon.damage_display}, + ) + send_vitals(w, p) + broadcast_room(w, p.room, f"{p.name} picks up" f" the {weapon.name}.") + update_room_all(p.room) + return True + + async def do_drop(self, *_args: str) -> bool: + """Drop your weapon.""" + w, p = self.writer, self.player + if not (weapon := p.weapon): + send_message(w, "You're not holding anything.") + return True + weapon.location = p.room + weapon.holder = None + p.weapon = None + send_message(w, f"You drop the {weapon.name}.") + w.send_gmcp("Char.Items.Remove", {"name": weapon.name}) + send_vitals(w, p) + broadcast_room(w, p.room, f"{p.name} drops the {weapon.name}.") + update_room_all(p.room) + return True + + async def do_attack(self, argument: str) -> bool: + """Attack another player (2s cooldown).""" + w, p = self.writer, self.player + if not argument: + send_message(w, "Attack whom?") + return True + if ROOMS[p.room].get("sanctuary"): + send_message(w, "You cannot attack in this sanctuary!") + return True + candidates = [o for o in sessions.values() if o.room == p.room and o is not p] + target = resolve_target(w, argument, candidates, "No such player here.") + if target: + await self._attack(target) + return True + + async def do_dodge(self, *_args: str) -> bool: + """Dodge attacks for 2 seconds.""" + w, p = self.writer, self.player + if p.is_dodging: + send_message(w, "You're already dodging!") + return True + p.is_dodging = True + tell(w, "You enter a defensive stance!") + send_vitals(w, p) + broadcast_room(w, p.room, f"{p.name} assumes a defensive stance.") + await asyncio.sleep(DODGE_DURATION) + p.is_dodging = False + tell(w, "Your dodge ends.") + send_vitals(w, p) + return True + + async def do_quit(self, *_args: str) -> bool: + """Leave the game.""" + self.writer.write("Farewell, adventurer!\r\n") + broadcast_room(self.writer, self.player.room, f"{self.player.name} has left.") + return False + + # -- helpers -------------------------------------------------------- + + async def _move(self, direction: str) -> bool: + """Move player in *direction*.""" + w, p = self.writer, self.player + if p.is_dodging: + send_message(w, "You can't move while dodging!") + return True + room = ROOMS[p.room] + if direction not in room["exits"]: + send_message(w, "You can't go that way.") + return True + old = p.room + broadcast_room(w, p.room, f"{p.name} leaves {direction}.") + p.room = room["exits"][direction] + tell(w, f"You go {direction}.") + show_room(w, p) + send_room_gmcp(w, p) + send_vitals(w, p) + broadcast_room(w, p.room, f"{p.name} arrives.") + update_room_all(old) + return True + + async def _attack(self, target: Player) -> None: + """Execute an attack against *target*.""" + w, p = self.writer, self.player + tw = find_writer(target) + wp = p.weapon + dmg = random.randint(*(wp.damage if wp else FIST_DAMAGE)) + wn = wp.name if wp else "fists" + if target.is_dodging and random.random() < DODGE_CHANCE: + tell(w, f"{target.name} dodges your attack!") + send_status(w, f"Attack dodged by {target.name}") + if tw: + tw.write(f"\r\n{p.name} swings" " but you dodge!\r\n> ") + send_status(tw, f"Dodged {p.name}'s attack") + else: + target.health -= dmg + tell(w, f"You hit {target.name}" f" with {wn} for {dmg} damage!") + send_status(w, f"Hit {target.name} for {dmg}") + if tw: + tw.write( + f"\r\n{p.name} hits you for {dmg}!" + f" (HP: {target.health}" + f"/{target.max_health})\r\n> " + ) + send_status(tw, f"Hit by {p.name} for {dmg}") + send_vitals(tw, target) + broadcast_room( + w, p.room, f"{p.name} attacks {target.name}!", exclude=[tw] if tw else [] + ) + if target.health <= 0 and tw: + process_death(tw, target, p) + tell(w, "You recover your stance...") + send_vitals(w, p) + await asyncio.sleep(ATTACK_COOLDOWN) + w.write("\r\n") + + +async def background_tick(writer: Any, player: Player) -> None: + """Periodic healing and idle timeout.""" + while True: + await asyncio.sleep(HEAL_INTERVAL) + if player.room == START_ROOM and player.health < player.max_health: + player.health = min(player.health + HEAL_AMOUNT, player.max_health) + writer.write( + "\r\nThe tavern's warmth heals you." + f" (HP: {player.health}" + f"/{player.max_health})\r\n> " + ) + send_vitals(writer, player) + if time.monotonic() - player.last_activity >= IDLE_TIMEOUT: + writer.write("\r\nIdle timeout.\r\n") + writer.close() + return + + +async def shell(reader: Any, writer: Any) -> None: + """Main MUD session for one connected client.""" + writer.iac(WILL, GMCP) + writer.iac(WILL, MSDP) + writer.iac(WILL, MSSP) + writer.set_ext_callback(GMCP, lambda pkg, data: on_gmcp(writer, pkg, data)) + writer.set_ext_callback(MSDP, lambda variables: on_msdp(writer, variables)) + writer.write("Welcome to the Mini-MUD!\r\n") + + env = writer.get_extra_info("USER") or writer.get_extra_info("LOGNAME") or "" + if env: + writer.write(f'What is your name? (return for "{env}") ') + else: + writer.write("What is your name? ") + + if (raw := await readline2(reader, writer)) is None: + writer.close() + return + name = strip_control_chars(raw).strip() or env + + if not name: + tell(writer, "A name is required.") + writer.close() + return + + if any(w in name.lower() for w in BANNED_WORDS): + tell(writer, "That name is not allowed.") + writer.close() + return + + if any(p.name.lower() == name.lower() for p in sessions.values()): + tell(writer, f"{name} is already playing!") + writer.close() + return + + player = Player(name) + + sessions[writer] = player + player.last_activity = time.monotonic() + log.info("connect: %s (%d online)", player.name, len(sessions)) + + tell(writer, f"Hello, {name}!") + broadcast_room(writer, player.room, f"{player.name} arrives.") + + mssp = dict(MSSP_DATA) + mssp["PLAYERS"] = str(len(sessions)) + mssp["UPTIME"] = "999" + mssp.setdefault("CREATED", "2026") + mssp.setdefault("CONTACT", "admin@example.com") + writer.send_mssp(mssp) + + send_room_gmcp(writer, player) + send_vitals(writer, player) + show_room(writer, player) + + commands = Commands(writer, player) + bg_task = asyncio.create_task(background_tick(writer, player)) + try: + while True: + writer.write("> ") + inp = await readline2(reader, writer) + if inp is None: + break + text = strip_control_chars(inp).strip() + writer.write("\r\n") + cmd = text.lower() + log.debug("%s: %s", player.name, cmd) + if not await commands.dispatch(cmd): + break + finally: + bg_task.cancel() + await asyncio.gather(bg_task, return_exceptions=True) + sessions.pop(writer, None) + drop_weapon(player) + log.info("disconnect: %s (%d online)", player.name, len(sessions)) + broadcast_room(None, player.room, f"{player.name} has left.") + update_room_all(player.room) + writer.close() + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + """Parse command-line arguments.""" + ap = argparse.ArgumentParser(description="Mini-MUD demo server with telnetlib3") + ap.add_argument("--host", default="127.0.0.1", help="bind address (default: 127.0.0.1)") + ap.add_argument("--port", type=int, default=6023, help="bind port (default: 6023)") + ap.add_argument("--log-level", default="INFO", help="log level (default: INFO)") + return ap.parse_args(argv) + + +async def main(argv: list[str] | None = None) -> None: + """Start the MUD server.""" + args = parse_args(argv) + logging.basicConfig( + level=getattr(logging, args.log_level.upper()), + format="%(asctime)s %(message)s", + datefmt="%H:%M:%S", + ) + server = await telnetlib3.create_server(host=args.host, port=args.port, shell=shell) + log.info("%s running on %s:%d", SERVER_NAME, args.host, args.port) + print( + f"{SERVER_NAME} running on" + f" {args.host}:{args.port}\n" + f"Connect with: telnet {args.host} {args.port}\n" + "Press Ctrl+C to stop" + ) + await server.wait_closed() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/api/fingerprinting.rst b/docs/api/fingerprinting.rst new file mode 100644 index 00000000..4b49845d --- /dev/null +++ b/docs/api/fingerprinting.rst @@ -0,0 +1,5 @@ +fingerprinting +-------------- + +.. automodule:: telnetlib3.fingerprinting + :members: diff --git a/docs/api/mud.rst b/docs/api/mud.rst new file mode 100644 index 00000000..c6f06fcd --- /dev/null +++ b/docs/api/mud.rst @@ -0,0 +1,5 @@ +mud +--- + +.. automodule:: telnetlib3.mud + :members: diff --git a/docs/conf.py b/docs/conf.py index 1c73938f..e0e2f610 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -67,10 +67,10 @@ # built documents. # # The short X.Y version. -version = "2.2" +version = "2.3" # The full version, including alpha/beta/rc tags. -release = "2.2.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !! +release = "2.3.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !! # The language for content auto-generated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/guidebook.rst b/docs/guidebook.rst index 5a150ac0..8af044fe 100644 --- a/docs/guidebook.rst +++ b/docs/guidebook.rst @@ -205,6 +205,31 @@ and utf-8 support. The same applies to clients -- ``open_connection(..., encoding=False)`` returns a ``(TelnetReader, TelnetWriter)`` pair that works with ``bytes``. +Line Endings +~~~~~~~~~~~~ + +The telnet protocol (RFC 854) requires ``\r\n`` (CR LF) as the line ending +for all NVT (Network Virtual Terminal) output. This applies in all standard +modes: + +- **NVT ASCII mode** (default): ``\r\n`` is required. +- **Kludge mode** (SGA negotiated, no LINEMODE): input is character-at-a-time, + but server output is still NVT -- ``\r\n`` is expected. +- **Binary mode** (TRANSMIT-BINARY): raw bytes, no NVT transformation -- + ``\n`` is acceptable if both sides agree. + +The ``write()`` method on both the asyncio and blocking interfaces sends data +as-is -- it does **not** convert ``\n`` to ``\r\n``:: + + # Correct: + writer.write("Hello!\r\n") + + # Wrong -- most clients will not display a proper line break: + writer.write("Hello!\n") + +For maximum compatibility with MUD clients, legacy terminals, and standard +telnet implementations, always use ``\r\n`` with ``write()``. + server_binary.py ~~~~~~~~~~~~~~~~ @@ -355,30 +380,37 @@ migration:: server = BlockingTelnetServer('0.0.0.0', 6023, handler=handler) server.serve_forever() -Property and method mapping: - -========================= ==================================== -miniboa :mod:`telnetlib3.sync` -========================= ==================================== -``client.active`` ``conn.active`` -``client.address`` ``conn.address`` -``client.port`` ``conn.port`` -``client.terminal_type`` ``conn.terminal_type`` -``client.columns`` ``conn.columns`` -``client.rows`` ``conn.rows`` -``client.send()`` ``conn.send()`` -``client.addrport()`` ``conn.addrport()`` -``client.idle()`` ``conn.idle()`` -``client.duration()`` ``conn.duration()`` -``client.deactivate()`` ``conn.deactivate()`` -========================= ==================================== +Properties and methods with equal mapping: + +:attr:`~telnetlib3.sync.ServerConnection.active`, +:attr:`~telnetlib3.sync.ServerConnection.address`, +:attr:`~telnetlib3.sync.ServerConnection.port`, +:attr:`~telnetlib3.sync.ServerConnection.terminal_type`, +:attr:`~telnetlib3.sync.ServerConnection.columns`, +:attr:`~telnetlib3.sync.ServerConnection.rows`, +:meth:`~telnetlib3.sync.ServerConnection.send`, +:meth:`~telnetlib3.sync.ServerConnection.addrport`, +:meth:`~telnetlib3.sync.ServerConnection.idle`, +:meth:`~telnetlib3.sync.ServerConnection.duration`, +:meth:`~telnetlib3.sync.ServerConnection.deactivate` Key differences from miniboa: -- telnetlib3 uses a thread-per-connection model (blocking I/O) -- miniboa uses a poll-based model (non-blocking with ``server.poll()``) -- telnetlib3 has ``readline()``/``read()`` blocking methods -- miniboa uses ``get_command()`` (non-blocking, check ``cmd_ready``) +- telnetlib3 uses a thread-per-connection model instead of miniboa's + poll-based ``server.poll()`` loop +- miniboa's ``get_command()`` and ``cmd_ready`` are replaced by blocking + :meth:`~telnetlib3.sync.ServerConnection.readline` and + :meth:`~telnetlib3.sync.ServerConnection.read` + +.. note:: + + The ``send()`` method normalizes newlines to ``\r\n`` for miniboa + compatibility. Both ``\n`` and ``\r\n`` in the input produce a single + ``\r\n`` on the wire:: + + conn.send("Hello!\n") # OK -- sends \r\n on the wire + conn.send("Hello!\r\n") # OK -- also sends \r\n on the wire + conn.write("Hello!\r\n") # OK -- write() sends as-is Advanced Negotiation @@ -404,6 +436,103 @@ property:: print(f"ECHO enabled: {writer.remote_option.enabled(ECHO)}") +Fingerprinting Server +===================== + +The public telnetlib3 demonstration Fingerprinting Server is:: + + telnet 1984.ws 555 + + +The fingerprinting shell +(:func:`telnetlib3.fingerprinting.fingerprinting_server_shell`) probes each +connecting client's telnet capabilities, terminal emulator features, and unicode +support. This useful for uniquely identify clients across sessions by the +capabilities of the software used. The fingerprinting shell runs in two phases: + +1. **Telnet probe** -- negotiates all standard telnet options (TTYPE, NAWS, + BINARY, SGA, ECHO, NEW_ENVIRON, CHARSET, LINEMODE, SLC) and records which + options the client supports, the TTYPE cycle, environment variables, and SLC + table. A deterministic hash is computed from the protocol-level fingerprint. + +2. **Terminal probe** -- if `ucs-detect `_ + is installed, the shell spawns it through a PTY to probe the terminal + emulator's software and version, color depth, graphics protocols (Kitty, + iTerm2, Sixel), device attributes, DEC private modes, unicode version + support, and emoji rendering. A second hash is computed from the terminal + fingerprint. + +Running +------- + +Install with optional dependencies for full fingerprinting support +(`prettytable `_ and +`ucs-detect `_):: + + pip install telnetlib3[extras] + +A dedicated CLI entry point is provided:: + + telnetlib3-fingerprint-server --data-dir data + +This uses :class:`~telnetlib3.fingerprinting.FingerprintingServer` as the +protocol factory and :func:`~telnetlib3.fingerprinting.fingerprinting_server_shell` +as the default shell. All ``telnetlib3-server`` options (``--host``, ``--port``, +etc.) are accepted. + +Storage +------- + +Results are saved as JSON files organized by fingerprint hash:: + + /client/// + +Moderating +---------- + +The ``bin/moderate_fingerprints.py`` script provides an interactive CLI for +reviewing client-submitted name suggestions and assigning names to hashes:: + + export TELNETLIB3_DATA_DIR=./data + python bin/moderate_fingerprints.py + + +MUD Server +========== + +The public telnetlib3 demonstration MUD Server is:: + + telnet 1984.ws 6066 + +telnetlib3 supports the common MUD (Multi-User Dungeon) protocols used by +MUD clients like Mudlet, TinTin++, and BlowTorch: + +- **GMCP** (Generic MUD Communication Protocol) -- JSON-based structured data + for room info, character vitals, inventory, and more. +- **MSDP** (MUD Server Data Protocol) -- binary-encoded variable/value pairs for + real-time game state. +- **MSSP** (MUD Server Status Protocol) -- server metadata for MUD + crawlers and directories. + +The :mod:`telnetlib3.mud` module provides encode/decode functions for all three +protocols using :class:`~telnetlib3.stream_writer.TelnetWriter` methods +``send_gmcp()``, ``send_msdp()``, and ``send_mssp()``. + +Running +------- + +The repository includes a "mini-MUD" example at `bin/server_mud.py +`_ with +rooms, combat, weapons, GMCP/MSDP/MSSP support, and basic persistence. + +:: + + telnetlib3-server --shell bin.server_mud.shell + +Then, connect with any telnet or MUD client:: + + telnet localhost 6023 + Legacy telnetlib Compatibility ============================== diff --git a/docs/history.rst b/docs/history.rst index 15793de5..7339134f 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,7 +1,18 @@ History ======= -*unreleased* - * new: ``connect_timeout`` arguments for client and ``--connect-timeout`` Client CLI argument. +2.3.0 *unreleased* + * new: ``mud`` module with encode/decode functions for GMCP (option 201), + MSDP (option 69), and MSSP (option 70) MUD telnet protocols. + * new: ``TelnetWriter.send_gmcp()``, ``send_msdp()``, and ``send_mssp()`` + methods for sending MUD protocol data, with corresponding + ``handle_gmcp()``, ``handle_msdp()``, and ``handle_mssp()`` callbacks. + * new: ``connect_timeout`` arguments for client and ``--connect-timeout`` + Client CLI argument, :ghissue:`30`. + * bugfix: missing LICENSE.txt in sdist file. + * new: ``telnetlib3-fingerprint-server`` CLI with extended ``NEW_ENVIRON`` + for client fingerprinting (uses ``FingerprintingServer`` protocol factory). + * note: fingerprint hashes for MUD clients detected via GMCP/MSDP + may change due to improved client classification. 2.2.0 * bugfix: workaround for Microsoft Telnet client crash on diff --git a/docs/rfcs.rst b/docs/rfcs.rst index 03c3537d..10cb676c 100644 --- a/docs/rfcs.rst +++ b/docs/rfcs.rst @@ -1,8 +1,8 @@ -RFCs -==== +Specifications +============== -Implemented ------------ +RFCs Implemented +---------------- * :rfc:`727`, "Telnet Logout Option," Apr 1977. * :rfc:`779`, "Telnet Send-Location Option", Apr 1981. @@ -26,8 +26,8 @@ Implemented * :rfc:`1572`, "Telnet Environment Option", Jan 1994. * :rfc:`2066`, "Telnet Charset Option", Jan 1997. -Not Implemented ---------------- +RFCs Not Implemented +-------------------- * :rfc:`861`, "Telnet Extended Options List", May 1983. describes a method of negotiating options after all possible 255 option bytes are exhausted by @@ -66,6 +66,32 @@ Not Implemented * :rfc:`1416`, "Telnet Authentication Option" * :rfc:`2217`, "Telnet Com Port Control Option", Oct 1997 +MUDs Implemented +---------------- + +These are community-standard telnet extensions widely used by MUD (Multi-User +Dungeon) servers and clients. + +* `GMCP`_ (Generic MUD Communication Protocol, option 201). JSON-based + bidirectional messaging for game data such as room info, character vitals, + and client metadata. +* `MSDP`_ (MUD Server Data Protocol, option 69). Structured key-value protocol + for game variables with support for nested tables and arrays. +* `MSSP`_ (MUD Server Status Protocol, option 70). Server metadata protocol + for MUD crawlers and directories, providing server name, player count, + codebase, and other listing information. + +.. _GMCP: https://www.gammon.com.au/gmcp +.. _MSDP: https://tintin.mudhalla.net/protocols/msdp/ +.. _MSSP: https://tintin.mudhalla.net/protocols/mssp/ + +MUDs Not Implemented +-------------------- + +Constants are also defined for the following MUD options, though their handlers +are not implemented: MCCP/MCCP2 (85/86, compression), MXP (91, markup), ZMP +(93, messaging), MSP (90, sound), and ATCP (200, Achaea-specific). + Additional Resources -------------------- diff --git a/pyproject.toml b/pyproject.toml index 80604732..cd5261e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,11 @@ build-backend = "hatchling.build" [project] name = "telnetlib3" -version = "2.2.0" +version = "2.3.0" description = " Python Telnet server and client CLI and Protocol library" readme = "README.rst" license = "ISC" -license-files = ["LICENSE"] +license-files = ["LICENSE.txt"] authors = [ {name = "Jeff Quast", email = "contact@jeffquast.com"}, ] @@ -58,6 +58,7 @@ extras = [ [project.scripts] telnetlib3-server = "telnetlib3.server:main" telnetlib3-client = "telnetlib3.client:main" +telnetlib3-fingerprint-server = "telnetlib3.fingerprinting:fingerprint_server_main" [project.urls] Homepage = "https://github.com/jquast/telnetlib3" @@ -147,4 +148,5 @@ ignore_errors = true [tool.black] line-length = 100 +skip-magic-trailing-comma = true target-version = ["py39", "py310", "py311", "py312", "py313"] diff --git a/telnetlib3/__init__.py b/telnetlib3/__init__.py index e759b876..ee88672b 100644 --- a/telnetlib3/__init__.py +++ b/telnetlib3/__init__.py @@ -18,6 +18,7 @@ from . import client_shell from . import client from . import telopt +from . import mud from . import slc from . import telnetlib from . import guard_shells @@ -33,6 +34,7 @@ from .client_shell import * # noqa from .client import * # noqa from .telopt import * # noqa +from .mud import * # noqa from .slc import * # noqa from .telnetlib import * # noqa from .guard_shells import * # noqa @@ -71,6 +73,7 @@ + sync.__all__ # protocol bits, bytes, and names + telopt.__all__ + + mud.__all__ + slc.__all__ # python's legacy stdlib api + telnetlib.__all__ diff --git a/telnetlib3/accessories.py b/telnetlib3/accessories.py index 7b42f05b..532fbdc6 100644 --- a/telnetlib3/accessories.py +++ b/telnetlib3/accessories.py @@ -21,12 +21,24 @@ "repr_mapping", "function_lookup", "make_reader_task", + "PATIENCE_MESSAGES", ) +PATIENCE_MESSAGES = [ + "Contemplate the virtue of patience", + "Endure delays with fortitude", + "To wait calmly requires discipline", + "Suspend expectations of imminence", + "The tide hastens for no man", + "Cultivate a stoic calmness", + "The tranquil mind eschews impatience", + "Deliberation is preferable to haste", +] + def get_version() -> str: """Return the current version of telnetlib3.""" - return "2.2.0" # keep in sync with pyproject.toml and docs/conf.py !! + return "2.3.0" # keep in sync with pyproject.toml and docs/conf.py !! def encoding_from_lang(lang: str) -> Optional[str]: @@ -91,10 +103,7 @@ def eightbits(number: int) -> str: def make_logger( - name: str, - loglevel: str = "info", - logfile: Optional[str] = None, - logfmt: str = _DEFAULT_LOGFMT, + name: str, loglevel: str = "info", logfile: Optional[str] = None, logfmt: str = _DEFAULT_LOGFMT ) -> logging.Logger: """Create and return simple logger for given arguments.""" lvl = getattr(logging, loglevel.upper()) @@ -123,8 +132,7 @@ def function_lookup(pymod_path: str) -> Callable[..., Any]: def make_reader_task( - reader: "Union[TelnetReader, TelnetReaderUnicode, asyncio.StreamReader]", - size: int = 2**12, + reader: "Union[TelnetReader, TelnetReaderUnicode, asyncio.StreamReader]", size: int = 2**12 ) -> "asyncio.Task[Any]": """Return asyncio task wrapping coroutine of reader.read(size).""" return asyncio.ensure_future(reader.read(size)) diff --git a/telnetlib3/client.py b/telnetlib3/client.py index 83b1895e..966e17f9 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -10,16 +10,7 @@ import struct import asyncio import argparse -from typing import ( - Any, - Dict, - List, - Tuple, - Union, - Callable, - Optional, - Sequence, -) +from typing import Any, Dict, List, Tuple, Union, Callable, Optional, Sequence # local from telnetlib3 import accessories, client_base @@ -286,11 +277,7 @@ def send_naws(self) -> Tuple[int, int]: """ return (self._extra["rows"], self._extra["cols"]) - def encoding( - self, - outgoing: Optional[bool] = None, - incoming: Optional[bool] = None, - ) -> str: + def encoding(self, outgoing: Optional[bool] = None, incoming: Optional[bool] = None) -> str: """ Return encoding for the given stream direction. @@ -365,10 +352,7 @@ def _winsize() -> Tuple[int, int]: return rows, cols except (ImportError, IOError): # TODO: mock import error, or test on windows or other non-posix. - return ( - int(os.environ.get("LINES", 25)), - int(os.environ.get("COLUMNS", 80)), - ) + return (int(os.environ.get("LINES", 25)), int(os.environ.get("COLUMNS", 80))) async def open_connection( # pylint: disable=too-many-locals @@ -395,10 +379,7 @@ async def open_connection( # pylint: disable=too-many-locals _waiter_connected: Optional[asyncio.Future[None]] = None, limit: Optional[int] = None, send_environ: Optional[Sequence[str]] = None, -) -> Tuple[ - Union[TelnetReader, TelnetReaderUnicode], - Union[TelnetWriter, TelnetWriterUnicode], -]: +) -> Tuple[Union[TelnetReader, TelnetReaderUnicode], Union[TelnetWriter, TelnetWriterUnicode]]: """ Connect to a TCP Telnet server as a Telnet client. @@ -516,10 +497,7 @@ async def run_client() -> None: config_msg = f"Client configuration: {accessories.repr_mapping(args)}" log = accessories.make_logger( - name=__name__, - loglevel=args["loglevel"], - logfile=args["logfile"], - logfmt=args["logfmt"], + name=__name__, loglevel=args["loglevel"], logfile=args["logfile"], logfmt=args["logfmt"] ) log.debug(config_msg) @@ -547,8 +525,7 @@ async def run_client() -> None: def _get_argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( - description="Telnet protocol client", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description="Telnet protocol client", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument("host", action="store", help="hostname") parser.add_argument("port", nargs="?", default=23, type=int, help="port number") @@ -574,10 +551,7 @@ def _get_argument_parser() -> argparse.ArgumentParser: "--connect-minwait", default=1.0, type=float, help="shell delay for negotiation" ) parser.add_argument( - "--connect-maxwait", - default=4.0, - type=float, - help="timeout for pending negotiation", + "--connect-maxwait", default=4.0, type=float, help="timeout for pending negotiation" ) parser.add_argument( "--connect-timeout", diff --git a/telnetlib3/client_base.py b/telnetlib3/client_base.py index a523bffb..b9b0f96e 100644 --- a/telnetlib3/client_base.py +++ b/telnetlib3/client_base.py @@ -168,11 +168,7 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None: pass self.writer = writer_factory( - transport=_transport, - protocol=self, - reader=self.reader, - client=True, - **writer_kwds, + transport=_transport, protocol=self, reader=self.reader, client=True, **writer_kwds ) self.log.info("Connected to %s", self) diff --git a/telnetlib3/client_shell.py b/telnetlib3/client_shell.py index 663cd17d..39532fe8 100644 --- a/telnetlib3/client_shell.py +++ b/telnetlib3/client_shell.py @@ -125,9 +125,7 @@ def determine_mode(self, mode: "Terminal.ModeDef") -> "Terminal.ModeDef": cc=cc, ) - async def make_stdio( - self, - ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + async def make_stdio(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Return (reader, writer) pair for sys.stdin, sys.stdout.""" reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index 454c655d..74847aec 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -3,7 +3,7 @@ This module probes telnet protocol capabilities, collects session data, and saves fingerprint files. Display, REPL, and post-script code live -in :mod:`telnetlib3.fingerprinting_display`. +in ``telnetlib3.fingerprinting_display``. """ from __future__ import annotations @@ -16,11 +16,13 @@ import asyncio import hashlib import logging +import argparse import datetime from typing import Any, Dict, List, Tuple, Union, Callable, Optional, cast # local from . import slc +from .server import TelnetServer # pylint: disable=cyclic-import from .telopt import ( BM, DO, @@ -33,6 +35,7 @@ DONT, ECHO, GMCP, + MSDP, NAMS, NAOL, NAOP, @@ -97,9 +100,7 @@ # Terminal types that uniquely identify specific telnet clients -PROTOCOL_MATCHED_TERMINALS = { - "syncterm", # SyncTERM BBS client -} +PROTOCOL_MATCHED_TERMINALS = {"syncterm"} # SyncTERM BBS client # Terminal types associated with MUD clients, matched case-insensitively. # These clients are likely to support extended options like GMCP. @@ -120,14 +121,92 @@ } __all__ = ( + "ENVIRON_EXTENDED", + "FingerprintingServer", + "FingerprintingTelnetServer", + "fingerprint_server_main", "fingerprinting_server_shell", "fingerprinting_post_script", "get_client_fingerprint", "probe_client_capabilities", ) +#: Extended NEW_ENVIRON variable list used during client fingerprinting. +#: The base :class:`~telnetlib3.server.TelnetServer` requests only common +#: variables (USER, LOGNAME, LANG, TERM, etc.). This extended set collects +#: additional information useful for identifying and classifying clients. +ENVIRON_EXTENDED: list[str] = [ + "HOME", + "SHELL", + "SSH_CLIENT", + "SSH_TTY", + "HOSTNAME", + "HOSTTYPE", + "OSTYPE", + "PWD", + "VISUAL", + "TMUX", + "STY", + "LC_ALL", + "LC_CTYPE", + "LC_MESSAGES", + "LC_COLLATE", + "LC_TIME", + "DOCKER_HOST", + "HISTFILE", + "AWS_PROFILE", + "AWS_REGION", +] + logger = logging.getLogger("telnetlib3.fingerprint") + +class FingerprintingTelnetServer: # pylint: disable=too-few-public-methods + """ + Mixin that extends ``on_request_environ`` with :data:`ENVIRON_EXTENDED`. + + Usage with :func:`~telnetlib3.server.create_server`:: + + from telnetlib3.server import TelnetServer + from telnetlib3.fingerprinting import FingerprintingTelnetServer + + class MyServer(FingerprintingTelnetServer, TelnetServer): + pass + + server = await create_server(protocol_factory=MyServer, ...) + """ + + def on_request_environ(self) -> list[Union[str, bytes]]: + """Return base environ keys plus :data:`ENVIRON_EXTENDED`.""" + # pylint: disable=no-member + base: list[Union[str, bytes]] = super().on_request_environ() # type: ignore[misc] + # Insert extended keys before the trailing VAR/USERVAR sentinels + # local + from .telopt import VAR, USERVAR # pylint: disable=import-outside-toplevel + + extra = [k for k in ENVIRON_EXTENDED if k not in base] + # Find where VAR/USERVAR sentinels start and insert before them + insert_at = len(base) + for i, item in enumerate(base): + if item in (VAR, USERVAR): + insert_at = i + break + return base[:insert_at] + extra + base[insert_at:] + + +class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): + """ + :class:`~telnetlib3.server.TelnetServer` with extended ``NEW_ENVIRON``. + + Combines :class:`FingerprintingTelnetServer` with :class:`~telnetlib3.server.TelnetServer` + so that :func:`fingerprinting_server_shell` receives the full set of + environment variables needed for stable fingerprint hashes. + + Used as the default ``protocol_factory`` by + :func:`fingerprint_server_main` / ``telnetlib3-fingerprint-server`` CLI. + """ + + # Timeout for probe_client_capabilities in _run_probe (seconds) _PROBE_TIMEOUT = 0.5 @@ -151,17 +230,13 @@ (SNDLOC, "SNDLOC", "Send location"), ] -MUD_OPTIONS = [ - (COM_PORT_OPTION, "COM_PORT", "Serial port control (RFC 2217)"), -] +MUD_OPTIONS = [(COM_PORT_OPTION, "COM_PORT", "Serial port control (RFC 2217)")] # Options with non-standard byte values (> 140) that crash some clients. # icy_term (icy_net) only accepts option bytes 0-49, 138-140, and 255, # returning a hard error for anything else. GMCP-capable MUD clients # typically self-announce via IAC WILL GMCP, so probing is unnecessary. -EXTENDED_OPTIONS = [ - (GMCP, "GMCP", "Generic MUD Communication Protocol"), -] +EXTENDED_OPTIONS = [(GMCP, "GMCP", "Generic MUD Communication Protocol")] LEGACY_OPTIONS = [ (AUTHENTICATION, "AUTHENTICATION", "Telnet authentication"), @@ -275,23 +350,11 @@ async def probe_client_capabilities( progress_callback(name, idx, len(to_probe), "") if writer.remote_option.enabled(opt): - results[name] = { - "status": "WILL", - "opt": opt, - "description": description, - } + results[name] = {"status": "WILL", "opt": opt, "description": description} elif writer.remote_option.get(opt) is False: - results[name] = { - "status": "WONT", - "opt": opt, - "description": description, - } + results[name] = {"status": "WONT", "opt": opt, "description": description} else: - results[name] = { - "status": "timeout", - "opt": opt, - "description": description, - } + results[name] = {"status": "timeout", "opt": opt, "description": description} return results @@ -316,9 +379,7 @@ async def probe_client_capabilities( ) + tuple(f"ttype{n}" for n in range(1, 9)) -def get_client_fingerprint( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Any]: +def get_client_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: """ Collect all available client information from writer. @@ -377,9 +438,7 @@ async def _run_probe( return results, elapsed -def _get_protocol( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Any: +def _get_protocol(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Any: """Return the protocol object from a writer.""" return getattr(writer, "_protocol", None) or getattr(writer, "protocol", None) @@ -416,9 +475,7 @@ def _collect_rejected_options( return result -def _collect_extra_info( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Any]: +def _collect_extra_info(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: """Collect all extra_info from writer, including private _extra dict.""" extra: Dict[str, Any] = {} @@ -453,9 +510,7 @@ def _collect_extra_info( return extra -def _collect_ttype_cycle( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> List[str]: +def _collect_ttype_cycle(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> List[str]: """Collect the full TTYPE cycle responses.""" ttype_list = [] @@ -470,9 +525,7 @@ def _collect_ttype_cycle( return ttype_list -def _collect_protocol_timing( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Any]: +def _collect_protocol_timing(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: """Collect timing information from protocol.""" timing = {} protocol = _get_protocol(writer) @@ -486,9 +539,7 @@ def _collect_protocol_timing( return timing -def _collect_slc_tab( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Any]: +def _collect_slc_tab(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: """Collect non-default SLC entries when LINEMODE was negotiated.""" slctab = getattr(writer, "slctab", None) if not slctab: @@ -532,8 +583,7 @@ def _collect_slc_tab( def _create_protocol_fingerprint( - writer: Union[TelnetWriter, TelnetWriterUnicode], - probe_results: Dict[str, Dict[str, Any]], + writer: Union[TelnetWriter, TelnetWriterUnicode], probe_results: Dict[str, Dict[str, Any]] ) -> Dict[str, Any]: """ Create anonymized/summarized protocol fingerprint from session data. @@ -545,9 +595,7 @@ def _create_protocol_fingerprint( :param probe_results: Probe results from capability probing. :returns: Dict with anonymized protocol fingerprint data. """ - fingerprint: Dict[str, Any] = { - "probed-protocol": "client", - } + fingerprint: Dict[str, Any] = {"probed-protocol": "client"} protocol = _get_protocol(writer) extra_dict = getattr(protocol, "_extra", {}) if protocol else {} @@ -634,9 +682,7 @@ def _count_fingerprint_folders(data_dir: Optional[str] = None) -> int: AMBIGUOUS_WIDTH_UNKNOWN = -1 -def _create_session_fingerprint( - writer: Union[TelnetWriter, TelnetWriterUnicode], -) -> Dict[str, Any]: +def _create_session_fingerprint(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> Dict[str, Any]: """Create session identity fingerprint from stable client fields.""" identity: Dict[str, Any] = {} @@ -812,9 +858,7 @@ def _save_fingerprint_data( # pylint: disable=too-many-locals,too-many-branches file_count = _count_protocol_folder_files(probe_dir) if file_count >= FINGERPRINT_MAX_FILES: logger.warning( - "fingerprint %s at file limit (%d), not saving", - telnet_hash, - FINGERPRINT_MAX_FILES, + "fingerprint %s at file limit (%d), not saving", telnet_hash, FINGERPRINT_MAX_FILES ) return None logger.info("connection for fingerprint %s", telnet_hash) @@ -823,10 +867,7 @@ def _save_fingerprint_data( # pylint: disable=too-many-locals,too-many-branches peername = writer.get_extra_info("peername") now = datetime.datetime.now(datetime.timezone.utc) - session_entry = { - "ip": str(peername[0]) if peername else None, - "connected": now.isoformat(), - } + session_entry = {"ip": str(peername[0]) if peername else None, "connected": now.isoformat()} if os.path.exists(filepath): try: @@ -871,6 +912,8 @@ def _is_maybe_mud(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> bool: for key in ("ttype1", "ttype2", "ttype3"): if (writer.get_extra_info(key) or "").lower() in MUD_TERMINALS: return True + if writer.remote_option.enabled(GMCP) or writer.remote_option.enabled(MSDP): + return True return False @@ -967,6 +1010,44 @@ def fingerprinting_post_script(filepath: str) -> None: _fps(filepath) +def fingerprint_server_main() -> None: + """ + Entry point for ``telnetlib3-fingerprint-server`` CLI. + + Reuses :func:`~telnetlib3.server.parse_server_args` and + :func:`~telnetlib3.server.run_server` with + :class:`FingerprintingServer` as the default protocol factory + and :func:`fingerprinting_server_shell` as the default shell. + + Accepts ``--data-dir`` to set the fingerprint data directory. + Falls back to the ``TELNETLIB3_DATA_DIR`` environment variable. + """ + # pylint: disable=import-outside-toplevel,global-statement + # local import is required to prevent circular imports + # local + from .server import _config, run_server, parse_server_args # noqa: PLC0415 + + global DATA_DIR + # Extract --data-dir before parse_server_args() sees argv. + pre = argparse.ArgumentParser(add_help=False) + pre.add_argument( + "--data-dir", + default=None, + help="directory for fingerprint data" " (default: $TELNETLIB3_DATA_DIR)", + ) + pre_args, remaining = pre.parse_known_args() + sys.argv[1:] = remaining + + if pre_args.data_dir is not None: + DATA_DIR = pre_args.data_dir + + args = parse_server_args() + if args["shell"] is _config.shell: + args["shell"] = fingerprinting_server_shell + args["protocol_factory"] = FingerprintingServer + asyncio.run(run_server(**args)) + + def main() -> None: """CLI entry point for fingerprinting post-processing.""" if len(sys.argv) != 2: diff --git a/telnetlib3/fingerprinting_display.py b/telnetlib3/fingerprinting_display.py index 5f273cd1..11d3051f 100644 --- a/telnetlib3/fingerprinting_display.py +++ b/telnetlib3/fingerprinting_display.py @@ -23,6 +23,7 @@ from typing import Any, Dict, List, Tuple, Optional, Generator # local +from .accessories import PATIENCE_MESSAGES from .fingerprinting import ( DATA_DIR, _UNKNOWN_TERMINAL_HASH, @@ -51,18 +52,7 @@ def _run_ucs_detect() -> Optional[Dict[str, Any]]: if not ucs_detect: return None - patience_msg = random.choice( - [ - "Contemplate the virtue of patience", - "Endure delays with fortitude", - "To wait calmly requires discipline", - "Suspend expectations of imminence", - "The tide hastens for no man", - "Cultivate a stoic calmness", - "The tranquil mind eschews impatience", - "Deliberation is preferable to haste", - ] - ) + patience_msg = random.choice(PATIENCE_MESSAGES) echo(f"{patience_msg}...\r\n") with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tmp: @@ -218,16 +208,11 @@ def _format_ttype( def _is_utf8_charset(value: str) -> bool: """Test whether a charset or encoding string refers to UTF-8.""" - return value.lower().replace("-", "").replace("_", "") in ( - "utf8", - "unicode11utf8", - ) + return value.lower().replace("-", "").replace("_", "") in ("utf8", "unicode11utf8") def _format_encoding( - extra: Dict[str, Any], - proto_data: Dict[str, Any], - ambiguous_width: Optional[int] = None, + extra: Dict[str, Any], proto_data: Dict[str, Any], ambiguous_width: Optional[int] = None ) -> Optional[Tuple[str, str]]: """Consolidate LANG, charset, and encoding into a single key-value pair.""" lang_val = extra.get("LANG") @@ -398,12 +383,7 @@ def _build_telnet_rows( # pylint: disable=too-many-locals,unused-argument pairs.append(("Options", _wrap_options(supported, wrap_width))) if rejected_will := proto_data.get("rejected-will"): - pairs.append( - ( - "Rejected", - _wrap_options(rejected_will, wrap_width), - ) - ) + pairs.append(("Rejected", _wrap_options(rejected_will, wrap_width))) slc_tab = session_data.get("slc_tab", {}) if slc_tab: @@ -701,9 +681,7 @@ def _load_known_fingerprints( # pylint: disable=too-complex def _find_nearest_match( - fp_data: Dict[str, Any], - probe_type: str, - names: Dict[str, str], + fp_data: Dict[str, Any], probe_type: str, names: Dict[str, str] ) -> Optional[Tuple[str, float]]: """ Find the most similar named fingerprint. @@ -726,9 +704,7 @@ def _find_nearest_match( def _build_seen_counts( # pylint: disable=too-many-locals - data: Dict[str, Any], - names: Optional[Dict[str, str]] = None, - term: Any = None, + data: Dict[str, Any], names: Optional[Dict[str, str]] = None, term: Any = None ) -> str: """Build friendly "seen before" text from folder and session counts.""" if DATA_DIR is None or not os.path.exists(DATA_DIR): @@ -787,11 +763,7 @@ def _build_seen_counts( # pylint: disable=too-many-locals terminal_unknown = terminal_known and terminal_hash not in _names if (telnet_unknown or terminal_unknown) and _names: match_lines = _nearest_match_lines( - data, - _names, - term, - telnet_unknown=telnet_unknown, - terminal_unknown=terminal_unknown, + data, _names, term, telnet_unknown=telnet_unknown, terminal_unknown=terminal_unknown ) if match_lines: lines.extend(match_lines) @@ -993,9 +965,7 @@ def _filter_terminal_detail( # pylint: disable=too-complex,too-many-branches return result -def _filter_telnet_detail( - detail: Optional[Dict[str, Any]], -) -> Optional[Dict[str, Any]]: +def _filter_telnet_detail(detail: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Filter telnet probe data for display.""" if not detail: return detail @@ -1098,9 +1068,7 @@ def _build_database_entries( # pylint: disable=too-many-locals def _show_database( - term: Any, - data: Dict[str, Any], - entries: List[Tuple[str, str, int, int]], + term: Any, data: Dict[str, Any], entries: List[Tuple[str, str, int, int]] ) -> None: """Display scrollable database of all known fingerprints.""" try: @@ -1127,14 +1095,7 @@ def _show_database( tbl.align["Calls"] = "r" tbl.max_table_width = max(40, (term.width or 80) - 1) for kind, display_name, files, sessions in entries: - tbl.add_row( - [ - kind, - term.forestgreen(display_name), - str(files), - str(sessions), - ] - ) + tbl.add_row([kind, term.forestgreen(display_name), str(files), str(sessions)]) _paginate(term, str(tbl)) diff --git a/telnetlib3/guard_shells.py b/telnetlib3/guard_shells.py index c3edcf94..1cfe693a 100644 --- a/telnetlib3/guard_shells.py +++ b/telnetlib3/guard_shells.py @@ -18,7 +18,8 @@ import re import asyncio import logging -from typing import Tuple, Union, Optional, cast +from typing import Tuple, Union, Optional, Generator, cast +from contextlib import contextmanager # local from .server_shell import readline2 @@ -39,6 +40,30 @@ _CPR_PATTERN = re.compile(rb"\x1b\[(\d+);(\d+)R") +@contextmanager +def _latin1_reading( + reader: Union[TelnetReader, TelnetReaderUnicode], +) -> Generator[None, None, None]: + """ + Temporarily switch reader to latin-1 for byte-transparent decoding. + + Latin-1 maps bytes 0x00-0xFF one-to-one, so every byte from a scanner + or bot is preserved exactly rather than raising ``UnicodeDecodeError`` + or producing replacement characters. + """ + if not isinstance(reader, TelnetReaderUnicode): + yield + return + orig_fn = reader.fn_encoding + reader.fn_encoding = lambda **kw: "latin-1" + reader._decoder = None # pylint: disable=protected-access + try: + yield + finally: + reader.fn_encoding = orig_fn + reader._decoder = None # pylint: disable=protected-access + + class ConnectionCounter: """Simple shared counter for limiting concurrent connections.""" @@ -73,10 +98,7 @@ def count(self) -> int: return self._count -async def _read_line_inner( - reader: Union[TelnetReader, TelnetReaderUnicode], - max_len: int, -) -> str: +async def _read_line_inner(reader: Union[TelnetReader, TelnetReaderUnicode], max_len: int) -> str: """Inner loop for _read_line, separated for wait_for compatibility.""" _reader = cast(TelnetReaderUnicode, reader) buf = "" @@ -91,9 +113,7 @@ async def _read_line_inner( async def _read_line( - reader: Union[TelnetReader, TelnetReaderUnicode], - timeout: float, - max_len: int = _MAX_INPUT, + reader: Union[TelnetReader, TelnetReaderUnicode], timeout: float, max_len: int = _MAX_INPUT ) -> Optional[str]: """Read a line with timeout and length limit.""" try: @@ -123,7 +143,6 @@ async def _read_cpr_response( try: data = await reader.read(1) except UnicodeDecodeError: - # Bot sent garbage bytes that can't be decoded return None if not data: return None @@ -199,7 +218,8 @@ async def robot_check( :returns: True if client passes (renders wide char as width 2). """ - width = await _measure_width(reader, writer, _WIDE_TEST_CHAR, timeout) + with _latin1_reading(reader): + width = await _measure_width(reader, writer, _WIDE_TEST_CHAR, timeout) return bool(width == 2) @@ -239,26 +259,27 @@ async def robot_shell( logger.info("robot_shell: connection from %s", peername) answers = [] - try: - line1 = await _ask_question(reader, writer, "Do robots dream of electric sheep? [yn] ") - if line1 is None: - logger.info("robot_shell: timeout waiting for response") - return - answers.append(line1) - - line2 = await _ask_question( - reader, writer, "\r\nHave you ever wondered, who are the windowmakers? " - ) - if line2 is None: - logger.info("robot_shell: timeout on second question") - return - answers.append(line2) - - writer.write("\r\n") - await writer.drain() - finally: - if answers: - logger.info("robot denied, answers=%r", answers) + with _latin1_reading(reader): + try: + line1 = await _ask_question(reader, writer, "Do robots dream of electric sheep? [yn] ") + if line1 is None: + logger.info("robot_shell: timeout waiting for response") + return + answers.append(line1) + + line2 = await _ask_question( + reader, writer, "\r\nHave you ever wondered, who are the windowmakers? " + ) + if line2 is None: + logger.info("robot_shell: timeout on second question") + return + answers.append(line2) + + writer.write("\r\n") + await writer.drain() + finally: + if answers: + logger.info("robot denied, answers=%r", answers) async def busy_shell( @@ -271,24 +292,22 @@ async def busy_shell( Displays busy message, logs any input, and disconnects. """ writer = cast(TelnetWriterUnicode, writer) - logger.info( - "busy_shell: connection from %s (limit reached)", - writer.get_extra_info("peername"), - ) + logger.info("busy_shell: connection from %s (limit reached)", writer.get_extra_info("peername")) writer.write("Machine is busy, do not touch! ") await writer.drain() - line1 = await _read_line(reader, timeout=30.0) - if line1 is not None: - logger.info("busy_shell: input1=%r", line1) + with _latin1_reading(reader): + line1 = await _read_line(reader, timeout=30.0) + if line1 is not None: + logger.info("busy_shell: input1=%r", line1) - writer.write("\r\nYou hear a distant explosion... ") - await writer.drain() + writer.write("\r\nYou hear a distant explosion... ") + await writer.drain() - line2 = await _read_line(reader, timeout=30.0) - if line2 is not None: - logger.info("busy_shell: input2=%r", line2) + line2 = await _read_line(reader, timeout=30.0) + if line2 is not None: + logger.info("busy_shell: input2=%r", line2) writer.write("\r\n") await writer.drain() diff --git a/telnetlib3/mud.py b/telnetlib3/mud.py new file mode 100644 index 00000000..701ce987 --- /dev/null +++ b/telnetlib3/mud.py @@ -0,0 +1,240 @@ +""" +MUD telnet protocol encoding and decoding utilities. + +Provides encode/decode functions for: +- GMCP (Generic MUD Communication Protocol, option 201) +- MSDP (MUD Server Data Protocol, option 69) +- MSSP (MUD Server Status Protocol, option 70) + +All encode functions return the payload bytes only (the content between +``IAC SB