diff --git a/pyproject.toml b/pyproject.toml index 7768298e..c8667da6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "The Posit Connect command-line interface." authors = [{ name = "Posit, PBC", email = "rsconnect@posit.co" }] license = { file = "LICENSE.md" } readme = { file = "README.md", content-type = "text/markdown" } -requires-python = ">=3.8" +requires-python = ">=3.10" dependencies = [ "typing-extensions>=4.8.0", @@ -13,7 +13,8 @@ dependencies = [ "semver>=2.0.0,<4.0.0", "pyjwt>=2.4.0", "click>=8.0.0", - "toml>=0.10; python_version < '3.11'" + "toml>=0.10; python_version < '3.11'", + "fastmcp==2.12.4" ] dynamic = ["version"] diff --git a/rsconnect/main.py b/rsconnect/main.py index 357d2b9a..ccba739b 100644 --- a/rsconnect/main.py +++ b/rsconnect/main.py @@ -8,7 +8,17 @@ import traceback from functools import wraps from os.path import abspath, dirname, exists, isdir, join -from typing import Callable, ItemsView, Literal, Optional, Sequence, TypeVar, cast +from typing import ( + Any, + Callable, + Dict, + ItemsView, + Literal, + Optional, + Sequence, + TypeVar, + cast, +) import click @@ -392,6 +402,88 @@ def version(): click.echo(VERSION) +@cli.command(help="Start the MCP server") +def mcp_server(): + from fastmcp import FastMCP + from fastmcp.exceptions import ToolError + + mcp = FastMCP("Connect MCP") + + # Discover all commands at startup + from .mcp_deploy_context import discover_all_commands + all_commands_info = discover_all_commands(cli) + + @mcp.tool() + def get_command_info( + command_path: str, + ) -> Dict[str, Any]: + """ + Get the parameter schema for any rsconnect command. + + Returns information about the parameters needed to construct an rsconnect command + that can be executed in a bash shell. Supports nested command groups of arbitrary depth. + + :param command_path: space-separated command path (e.g., 'version', 'deploy notebook', 'content build add') + :return: dictionary with command parameter schema and execution metadata + """ + try: + # split the command path into parts + parts = command_path.strip().split() + if not parts: + available_commands = list(all_commands_info["commands"].keys()) + return { + "error": "Command path cannot be empty", + "available_commands": available_commands + } + + current_info = all_commands_info + current_path = [] + + for _, part in enumerate(parts): + # error if we find unexpected additional subcommands + if "commands" not in current_info: + return { + "error": f"'{' '.join(current_path)}' is not a command group. Unexpected part: '{part}'", + "type": "command", + "command_path": f"rsconnect {' '.join(current_path)}", + } + + # try to return useful messaging for invalid subcommands + if part not in current_info["commands"]: + available = list(current_info["commands"].keys()) + path_str = ' '.join(current_path) if current_path else "top level" + return { + "error": f"Command '{part}' not found in {path_str}", + "available_commands": available + } + + current_info = current_info["commands"][part] + current_path.append(part) + + # still return something useful if additional subcommands are needed + if "commands" in current_info: + return { + "type": "command_group", + "name": current_info.get("name", parts[-1]), + "description": current_info.get("description"), + "available_subcommands": list(current_info["commands"].keys()), + "message": f"The '{' '.join(parts)}' command requires a subcommand." + } + else: + return { + "type": "command", + "command_path": f"rsconnect {' '.join(parts)}", + "name": current_info.get("name", parts[-1]), + "description": current_info.get("description"), + "parameters": current_info.get("parameters", []), + "shell": "bash" + } + except Exception as e: + raise ToolError(f"Failed to retrieve command info: {str(e)}") + + mcp.run() + + def _test_server_and_api(server: str, api_key: str, insecure: bool, ca_cert: str | None): """ Test the specified server information to make sure it works. If so, a @@ -433,7 +525,7 @@ def _test_spcs_creds(server: SPCSConnectServer): @cli.command( short_help="Create an initial admin user to bootstrap a Connect instance.", - help="Creates an initial admin user to bootstrap a Connect instance. Returns the provisionend API key.", + help="Creates an initial admin user to bootstrap a Connect instance. Returns the provisioned API key.", no_args_is_help=True, ) @click.option( diff --git a/rsconnect/mcp_deploy_context.py b/rsconnect/mcp_deploy_context.py new file mode 100644 index 00000000..674b2273 --- /dev/null +++ b/rsconnect/mcp_deploy_context.py @@ -0,0 +1,123 @@ +""" +Programmatically discover all parameters for rsconnect commands. +This helps MCP tools understand how to use the cli. +""" + +import json +from typing import Any, Dict + +import click + + +def extract_parameter_info(param: click.Parameter) -> Dict[str, Any]: + """Extract detailed information from a Click parameter.""" + info: Dict[str, Any] = {} + + if isinstance(param, click.Option) and param.opts: + # Use the longest option name (usually the full form without dashes) + mcp_arg_name = max(param.opts, key=len).lstrip('-').replace('-', '_') + info["name"] = mcp_arg_name + info["cli_flags"] = param.opts + info["param_type"] = "option" + else: + info["name"] = param.name + if isinstance(param, click.Argument): + info["param_type"] = "argument" + + # extract help text for added context + help_text = getattr(param, 'help', None) + if help_text: + info["description"] = help_text + + if isinstance(param, click.Option): + # Boolean flags + if param.is_flag: + info["type"] = "boolean" + info["default"] = param.default or False + + # choices + elif param.type and hasattr(param.type, 'choices'): + info["type"] = "string" + info["choices"] = list(param.type.choices) + + # multiple + elif param.multiple: + info["type"] = "array" + info["items"] = {"type": "string"} + + # files + elif isinstance(param.type, click.Path): + info["type"] = "string" + info["format"] = "path" + if param.type.exists: + info["path_must_exist"] = True + if param.type.file_okay and not param.type.dir_okay: + info["path_type"] = "file" + elif param.type.dir_okay and not param.type.file_okay: + info["path_type"] = "directory" + + # default + else: + info["type"] = "string" + + # defaults (important to avoid noise in returned command) + if param.default is not None and not param.is_flag: + if isinstance(param.default, tuple): + info["default"] = list(param.default) + elif isinstance(param.default, (str, int, float, bool, list, dict)): + info["default"] = param.default + + # required params + info["required"] = param.required + + return info + + +def discover_single_command(cmd: click.Command) -> Dict[str, Any]: + """Discover a single command and its parameters.""" + cmd_info = { + "name": cmd.name, + "description": cmd.help, + "parameters": [] + } + + for param in cmd.params: + if param.name in ["verbose", "v"]: + continue + + param_info = extract_parameter_info(param) + cmd_info["parameters"].append(param_info) + + return cmd_info + + +def discover_command_group(group: click.Group) -> Dict[str, Any]: + """Discover all commands in a command group and their parameters.""" + result = { + "name": group.name, + "description": group.help, + "commands": {} + } + + for cmd_name, cmd in group.commands.items(): + if isinstance(cmd, click.Group): + # recursively discover nested command groups + result["commands"][cmd_name] = discover_command_group(cmd) + else: + result["commands"][cmd_name] = discover_single_command(cmd) + + return result + + +def discover_all_commands(cli: click.Group) -> Dict[str, Any]: + """Discover all commands in the CLI and their parameters.""" + return discover_command_group(cli) + + +if __name__ == "__main__": + from rsconnect.main import cli + + # Discover all commands in the CLI + # use this for testing/debugging + all_commands = discover_all_commands(cli) + print(json.dumps(all_commands, indent=2)) diff --git a/tests/test_mcp_deploy_context.py b/tests/test_mcp_deploy_context.py new file mode 100644 index 00000000..8ffe8ec1 --- /dev/null +++ b/tests/test_mcp_deploy_context.py @@ -0,0 +1,192 @@ +"""Tests for MCP deploy context.""" + +from unittest import TestCase + +from rsconnect.main import cli +from rsconnect.mcp_deploy_context import discover_all_commands + + +class TestDiscoverAllCommands(TestCase): + def test_discover_rsconnect_cli(self): + result = discover_all_commands(cli) + + self.assertIn("commands", result) + self.assertIsNotNone(result["description"]) + + def test_top_level_commands(self): + result = discover_all_commands(cli) + + expected = ["version", "mcp-server", "add", "list", "remove", "details", "info", "deploy", "write-manifest", "content", "system", "bootstrap"] + for cmd in expected: + self.assertIn(cmd, result["commands"]) + + def test_deploy_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["deploy"]) + + def test_deploy_subcommands(self): + result = discover_all_commands(cli) + + deploy = result["commands"]["deploy"] + expected = ["notebook", "voila", "manifest", "quarto", "tensorflow", "html", "api", "flask", "fastapi", "dash", "streamlit", "bokeh", "shiny", "gradio"] + for subcmd in expected: + self.assertIn(subcmd, deploy["commands"]) + + def test_content_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["content"]) + + def test_content_subcommands(self): + result = discover_all_commands(cli) + + content = result["commands"]["content"] + expected = ["search", "describe", "download-bundle", "build"] + for subcmd in expected: + self.assertIn(subcmd, content["commands"]) + + def test_content_build_nested_group(self): + result = discover_all_commands(cli) + + build = result["commands"]["content"]["commands"]["build"] + self.assertIn("commands", build) + + expected = ["add", "rm", "ls", "history", "logs", "run"] + for subcmd in expected: + self.assertIn(subcmd, build["commands"]) + + def test_system_caches_nested_group(self): + result = discover_all_commands(cli) + + caches = result["commands"]["system"]["commands"]["caches"] + self.assertIn("commands", caches) + + expected = ["list", "delete"] + for subcmd in expected: + self.assertIn(subcmd, caches["commands"]) + + def test_write_manifest_is_command_group(self): + result = discover_all_commands(cli) + self.assertIn("commands", result["commands"]["write-manifest"]) + + def test_version_is_simple_command(self): + result = discover_all_commands(cli) + + version = result["commands"]["version"] + self.assertNotIn("commands", version) + self.assertIn("parameters", version) + + def test_mcp_server_command_exists(self): + result = discover_all_commands(cli) + self.assertIn("mcp-server", result["commands"]) + self.assertIn("parameters", result["commands"]["mcp-server"]) + + def test_deploy_notebook_has_parameters(self): + result = discover_all_commands(cli) + + notebook = result["commands"]["deploy"]["commands"]["notebook"] + param_names = [p["name"] for p in notebook["parameters"]] + + self.assertIn("file", param_names) + self.assertIn("name", param_names) + self.assertIn("server", param_names) + self.assertIn("api_key", param_names) + + def test_add_command_has_parameters(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + param_names = [p["name"] for p in add["parameters"]] + + self.assertIn("name", param_names) + self.assertIn("server", param_names) + self.assertIn("api_key", param_names) + self.assertIn("insecure", param_names) + + def test_parameter_has_required_fields(self): + result = discover_all_commands(cli) + + for param in result["commands"]["add"]["parameters"]: + self.assertIn("name", param) + self.assertIn("param_type", param) + self.assertIn("required", param) + + if param["param_type"] == "option": + self.assertIn("cli_flags", param) + self.assertGreater(len(param["cli_flags"]), 0) + + def test_boolean_flags_identified(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + insecure = next((p for p in add["parameters"] if p["name"] == "insecure"), None) + + self.assertIsNotNone(insecure) + self.assertEqual(insecure["type"], "boolean") + + def test_parameters_have_descriptions(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + server = next((p for p in add["parameters"] if p["name"] == "server"), None) + + self.assertIsNotNone(server) + self.assertIn("description", server) + self.assertGreater(len(server["description"]), 0) + + def test_verbose_parameters_excluded(self): + result = discover_all_commands(cli) + + param_names = [p["name"] for p in result["commands"]["add"]["parameters"]] + self.assertNotIn("verbose", param_names) + self.assertNotIn("v", param_names) + + def test_all_commands_have_valid_structure(self): + def validate_command(cmd_info, path=""): + self.assertIn("name", cmd_info) + + if "commands" in cmd_info: + self.assertIsInstance(cmd_info["commands"], dict) + for subcmd_name, subcmd_info in cmd_info["commands"].items(): + validate_command(subcmd_info, f"{path}/{subcmd_name}") + else: + self.assertIn("parameters", cmd_info) + self.assertIsInstance(cmd_info["parameters"], list) + + for param in cmd_info["parameters"]: + self.assertIn("name", param) + self.assertIn("param_type", param) + self.assertIn("required", param) + + result = discover_all_commands(cli) + validate_command(result, "cli") + + def test_multiple_value_parameters(self): + result = discover_all_commands(cli) + + quarto = result["commands"]["deploy"]["commands"]["quarto"] + exclude = next((p for p in quarto["parameters"] if p["name"] == "exclude"), None) + + self.assertIsNotNone(exclude) + self.assertEqual(exclude["type"], "array") + + def test_required_parameters_marked(self): + result = discover_all_commands(cli) + + describe = result["commands"]["content"]["commands"]["describe"] + guid = next((p for p in describe["parameters"] if p["name"] == "guid"), None) + + self.assertIsNotNone(guid) + self.assertTrue(guid["required"]) + + def test_cli_flags_format(self): + result = discover_all_commands(cli) + + add = result["commands"]["add"] + name = next((p for p in add["parameters"] if p["name"] == "name"), None) + + self.assertIsNotNone(name) + self.assertIn("cli_flags", name) + self.assertGreater(len(name["cli_flags"]), 0) + + for flag in name["cli_flags"]: + self.assertTrue(flag.startswith("-"))