diff --git a/server/mcp_server_vefaas_function/pyproject.toml b/server/mcp_server_vefaas_function/pyproject.toml index 289bdf59..d410be46 100644 --- a/server/mcp_server_vefaas_function/pyproject.toml +++ b/server/mcp_server_vefaas_function/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "volcengine-python-sdk>=3.0.8", "requests==2.32.3", "pyzipper==0.3.6", + "pathspec>=0.12.0", ] [project.scripts] @@ -20,3 +21,6 @@ build-backend = "hatchling.build" [tool.hatch.metadata] allow-direct-references = true + +[tool.autopep8] +max_line_length = 240 diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/sign.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/sign.py index b031fb80..da193cdc 100644 --- a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/sign.py +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/sign.py @@ -38,7 +38,7 @@ Service = "apig" Version = "2021-03-03" Region = "cn-beijing" -Host = "iam.volcengineapi.com" +Host = "open.volcengineapi.com" ContentType = "application/x-www-form-urlencoded" AK_KEY = "VOLCENGINE_ACCESS_KEY" @@ -62,7 +62,7 @@ def norm_query(params): if type(params[key]) == list: for k in params[key]: query = ( - query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" + query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") @@ -82,7 +82,7 @@ def hash_sha256(content: str): # 第二步:签名请求函数 -def request(method, date, query, header, ak, sk, token, action, body, region = None, timeout=None): +def request(method, date, query, header, ak, sk, token, action, body, region=None, timeout=None): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 @@ -99,7 +99,8 @@ def request(method, date, query, header, ak, sk, token, action, body, region = N if action in ['CodeUploadCallback', 'CreateDependencyInstallTask', 'GetDependencyInstallTaskStatus', 'GetDependencyInstallTaskLogDownloadURI', "ListApplicationTemplates", "GetApplicationTemplateDetail", "GetRevision", - "CreateApplication", "GetApplication", "ReleaseApplication", "ListTriggers", "GetApplicationRevisionLog", "ListTemplates", "GetTemplateDetail"]: + "CreateApplication", "GetApplication", "ReleaseApplication", "ListTriggers", "GetApplicationRevisionLog", "ListTemplates", "GetTemplateDetail", + "GenTempTosObjectUrl", "ListApplications", "CreateFunction", "UpdateFunction", "GetFunction", "Release", "GetReleaseStatus"]: credential["service"] = "vefaas" content_type = ContentType @@ -150,11 +151,11 @@ def request(method, date, query, header, ak, sk, token, action, body, region = N "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] - ), - "", - signed_headers_str, - x_content_sha256, - ] + ), + "", + signed_headers_str, + x_content_sha256, + ] ) # 打印正规化的请求用于调试比对 @@ -195,13 +196,13 @@ def request(method, date, query, header, ak, sk, token, action, body, region = N def get_authorization_credentials(ctx: Context = None) -> tuple[str, str, str]: """ Gets authorization credentials from either environment variables or request headers. - + Args: ctx: The server context object - + Returns: tuple: (access_key, secret_key, session_token) - + Raises: ValueError: If authorization information is missing or invalid """ diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/__init__.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/__init__.py new file mode 100644 index 00000000..6a16cbbc --- /dev/null +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/__init__.py @@ -0,0 +1,52 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates +# SPDX-License-Identifier: MIT + +"""veFaaS CLI SDK - Ported capabilities from vefaas-cli (TypeScript)""" + +from .detector import auto_detect, DetectionResult +from .deploy import ( + DeployConfig, + DeployResult, + VeFaaSClient, + deploy_application, + get_console_url, + get_application_console_url, + extract_access_url_from_cloud_resource, + wait_for_function_release, + wait_for_application_deploy, + wait_for_dependency_install, +) +from .config import ( + VefaasConfig, + FunctionConfig, + TriggerConfig, + read_config, + write_config, + get_linked_ids, + get_linked_region, +) + +__all__ = [ + # Detector + "auto_detect", + "DetectionResult", + # Deploy + "DeployConfig", + "DeployResult", + "VeFaaSClient", + "deploy_application", + "get_console_url", + "get_application_console_url", + "extract_access_url_from_cloud_resource", + "wait_for_function_release", + "wait_for_application_deploy", + "wait_for_dependency_install", + # Config + "VefaasConfig", + "FunctionConfig", + "TriggerConfig", + "read_config", + "write_config", + "get_linked_ids", + "get_linked_region", +] diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/config.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/config.py new file mode 100644 index 00000000..6491619a --- /dev/null +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/config.py @@ -0,0 +1,271 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates +# SPDX-License-Identifier: MIT + +""" +Configuration Module + +This module handles reading and writing veFaaS configuration files. +Supports both `.vefaas/config.json` (vefaas-cli format) and `vefaas.yaml` (legacy format). + +Configuration priority: +1. User-provided parameters +2. .vefaas/config.json +3. vefaas.yaml (backward compatibility) + +Write strategy: Update both formats on success to prevent config drift. +""" + +import json +import os +import logging +from dataclasses import dataclass, field, asdict +from typing import Optional, Dict, Any + +logger = logging.getLogger(__name__) + +# Config file paths +VEFAAS_CONFIG_DIR = ".vefaas" +VEFAAS_CONFIG_FILE = "config.json" +VEFAAS_YAML_FILE = "vefaas.yaml" + + +@dataclass +class TriggerConfig: + """Trigger configuration (API gateway info)""" + type: str = "apig" + system_url: Optional[str] = None + inner_url: Optional[str] = None + id: Optional[str] = None + + +@dataclass +class FunctionConfig: + """Function configuration""" + id: str = "" + runtime: Optional[str] = None + region: str = "cn-beijing" + application_id: Optional[str] = None + + +@dataclass +class VefaasConfig: + """ + veFaaS configuration structure. + Compatible with vefaas-cli's .vefaas/config.json format. + """ + version: str = "1.0" + function: FunctionConfig = field(default_factory=FunctionConfig) + triggers: Optional[TriggerConfig] = None + + # Additional fields for MCP compatibility (not in vefaas-cli) + name: Optional[str] = None + command: Optional[str] = None + + def to_json_dict(self) -> Dict[str, Any]: + """Convert to JSON-serializable dict (vefaas-cli format)""" + result = { + "version": self.version, + "function": { + "id": self.function.id, + "region": self.function.region, + } + } + if self.function.runtime: + result["function"]["runtime"] = self.function.runtime + if self.function.application_id: + result["function"]["application_id"] = self.function.application_id + if self.triggers: + result["triggers"] = {} + result["triggers"]["type"] = self.triggers.type + if self.triggers.system_url: + result["triggers"]["system_url"] = self.triggers.system_url + if self.triggers.inner_url: + result["triggers"]["inner_url"] = self.triggers.inner_url + if self.triggers.id: + result["triggers"]["id"] = self.triggers.id + return result + + def to_yaml_dict(self) -> Dict[str, Any]: + """Convert to YAML-compatible dict (legacy format)""" + result = {} + if self.function.id: + result["function_id"] = self.function.id + if self.name: + result["name"] = self.name + if self.function.region: + result["region"] = self.function.region + if self.function.runtime: + result["runtime"] = self.function.runtime + if self.command: + result["command"] = self.command + if self.function.application_id: + result["application_id"] = self.function.application_id + return result + + +def get_config_paths(project_path: str) -> tuple: + """Get configuration file paths""" + config_dir = os.path.join(project_path, VEFAAS_CONFIG_DIR) + json_path = os.path.join(config_dir, VEFAAS_CONFIG_FILE) + yaml_path = os.path.join(project_path, VEFAAS_YAML_FILE) + return config_dir, json_path, yaml_path + + +def read_config(project_path: str) -> Optional[VefaasConfig]: + """ + Read veFaaS configuration from project directory. + + Priority: + 1. .vefaas/config.json (vefaas-cli format) + 2. vefaas.yaml (legacy MCP format) + + Returns: + VefaasConfig if found, None otherwise + """ + config_dir, json_path, yaml_path = get_config_paths(project_path) + + # Try .vefaas/config.json first + if os.path.exists(json_path): + try: + with open(json_path, "r", encoding="utf-8") as f: + data = json.load(f) + + func_data = data.get("function", {}) + triggers_data = data.get("triggers") + + config = VefaasConfig( + version=data.get("version", "1.0"), + function=FunctionConfig( + id=func_data.get("id", ""), + runtime=func_data.get("runtime"), + region=func_data.get("region", "cn-beijing"), + application_id=func_data.get("application_id"), + ), + ) + + if triggers_data: + config.triggers = TriggerConfig( + type=triggers_data.get("type", "apig"), + system_url=triggers_data.get("system_url"), + inner_url=triggers_data.get("inner_url"), + id=triggers_data.get("id"), + ) + + logger.info(f"[config] Loaded from .vefaas/config.json: function_id={config.function.id}") + return config + except Exception as e: + logger.warning(f"[config] Failed to read .vefaas/config.json: {e}") + + # Fallback to vefaas.yaml + if os.path.exists(yaml_path): + try: + # Simple YAML parsing (key: value format) + data = {} + with open(yaml_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if ":" in line: + key, value = line.split(":", 1) + data[key.strip()] = value.strip() + + config = VefaasConfig( + function=FunctionConfig( + id=data.get("function_id", ""), + runtime=data.get("runtime"), + region=data.get("region", "cn-beijing"), + application_id=data.get("application_id"), + ), + name=data.get("name"), + command=data.get("command"), + ) + + logger.info(f"[config] Loaded from vefaas.yaml: function_id={config.function.id}") + return config + except Exception as e: + logger.warning(f"[config] Failed to read vefaas.yaml: {e}") + + return None + + +def write_config(project_path: str, config: VefaasConfig) -> None: + """ + Write veFaaS configuration to project directory. + + Writes to BOTH formats to prevent config drift: + 1. .vefaas/config.json (vefaas-cli compatible) + 2. vefaas.yaml (legacy MCP format) + """ + config_dir, json_path, yaml_path = get_config_paths(project_path) + + # Ensure .vefaas directory exists + os.makedirs(config_dir, exist_ok=True) + + # Write .vefaas/config.json + try: + json_data = config.to_json_dict() + with open(json_path, "w", encoding="utf-8") as f: + json.dump(json_data, f, indent=2, ensure_ascii=False) + logger.info(f"[config] Saved .vefaas/config.json") + except Exception as e: + logger.warning(f"[config] Failed to write .vefaas/config.json: {e}") + + # Write vefaas.yaml + try: + yaml_data = config.to_yaml_dict() + with open(yaml_path, "w", encoding="utf-8") as f: + for key, value in yaml_data.items(): + if value is not None: + f.write(f"{key}: {value}\n") + logger.info(f"[config] Saved vefaas.yaml") + except Exception as e: + logger.warning(f"[config] Failed to write vefaas.yaml: {e}") + + # Ensure .vefaas is in .gitignore + _ensure_gitignore(project_path) + + +def _ensure_gitignore(project_path: str) -> None: + """Ensure .vefaas/ is in .gitignore""" + gitignore_path = os.path.join(project_path, ".gitignore") + + try: + content = "" + if os.path.exists(gitignore_path): + with open(gitignore_path, "r", encoding="utf-8") as f: + content = f.read() + + # Check if .vefaas is already ignored + import re + if not re.search(r'(^|\n)\s*(?:/)?.vefaas(?:/)?', content): + with open(gitignore_path, "a", encoding="utf-8") as f: + if content and not content.endswith("\n"): + f.write("\n") + f.write(".vefaas/\n") + logger.info(f"[config] Added .vefaas/ to .gitignore") + except Exception as e: + logger.debug(f"[config] Failed to update .gitignore: {e}") + + +def get_linked_ids(project_path: str) -> tuple: + """ + Get linked function_id and application_id from config. + + Returns: + (function_id, application_id) tuple, None values if not found + """ + config = read_config(project_path) + if config: + func_id = config.function.id if config.function.id else None + app_id = config.function.application_id if config.function.application_id else None + return func_id, app_id + return None, None + + +def get_linked_region(project_path: str) -> Optional[str]: + """Get linked region from config""" + config = read_config(project_path) + if config and config.function.region: + return config.function.region + return None diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/deploy.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/deploy.py new file mode 100644 index 00000000..0350baa4 --- /dev/null +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/deploy.py @@ -0,0 +1,1067 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates +# SPDX-License-Identifier: MIT + +""" +Deploy Flow Module + +This module encapsulates the core deployment logic for veFaaS applications, +ported from vefaas-cli. + +Correct flow: +1. Detect project configuration +2. Build project (if needed, for Node.js/static sites) +3. Package output directory +4. Upload to TOS (GenTempTosObjectUrl) +5. Create/Update function with Source pointing to TOS location +6. Wait for dependency installation (Python) +7. Create and release application (includes function release) +""" + +from dataclasses import dataclass, field +from typing import Optional, List +import time +import logging +import json +import os +import datetime +import subprocess +import io +import zipfile +import pathspec + +from .detector import auto_detect, DetectionResult +from .config import ( + VefaasConfig, + FunctionConfig, + TriggerConfig, + read_config, + write_config, + get_linked_ids, +) + +logger = logging.getLogger(__name__) + +# Default timeout and polling interval +DEFAULT_TIMEOUT_SECONDS = 240 # 4 minutes +DEPLOY_TIMEOUT_SECONDS = 360 # 6 minutes for full deployment +DEFAULT_POLL_INTERVAL_SECONDS = 3 + +# Default .vefaasignore file content (aligned with vefaas-cli) +DEFAULT_VEFAASIGNORE = """# veFaaS default ignore patterns +# Files and directories that will not be packaged and uploaded + +# Version control +.git/ +.svn/ +.hg/ + +# Python virtual environments and dependencies (installed by function runtime) +.venv/ +site-packages/ +__pycache__/ + +# IDE and editors +.idea/ +.vscode/ +*.swp +*.swo + +# System files +.DS_Store +Thumbs.db + +# veFaaS CLI config +.vefaas/ +""" + +# Default Caddyfile name for static sites +DEFAULT_CADDYFILE_NAME = "DefaultCaddyFile" + + +def read_gitignore_patterns(base_dir: str) -> List[str]: + """Read .gitignore file patterns. Ported from vefaas-cli.""" + gitignore_path = os.path.join(base_dir, ".gitignore") + try: + with open(gitignore_path, "r", encoding="utf-8") as f: + raw = f.read() + return [line.strip() for line in raw.splitlines() if line.strip() and not line.strip().startswith("#")] + except Exception: + return [] + + +def read_vefaasignore_patterns(base_dir: str) -> List[str]: + """Read .vefaasignore file patterns. Create default file if not exists. Ported from vefaas-cli.""" + vefaasignore_path = os.path.join(base_dir, ".vefaasignore") + try: + with open(vefaasignore_path, "r", encoding="utf-8") as f: + raw = f.read() + return [line.strip() for line in raw.splitlines() if line.strip() and not line.strip().startswith("#")] + except FileNotFoundError: + # File doesn't exist, create default .vefaasignore + try: + with open(vefaasignore_path, "w", encoding="utf-8") as f: + f.write(DEFAULT_VEFAASIGNORE) + logger.debug(f"[package] Created default .vefaasignore at {vefaasignore_path}") + except Exception as e: + logger.debug(f"[package] Failed to create .vefaasignore: {e}") + # Return default patterns + return [line.strip() for line in DEFAULT_VEFAASIGNORE.splitlines() if line.strip() and not line.strip().startswith("#")] + except Exception: + return [] + + +def create_ignore_filter( + gitignore_patterns: List[str], + vefaasignore_patterns: List[str], + additional_patterns: Optional[List[str]] = None +) -> pathspec.PathSpec: + """Create a pathspec filter from gitignore/vefaasignore patterns. Ported from vefaas-cli.""" + all_patterns = gitignore_patterns + vefaasignore_patterns + (additional_patterns or []) + return pathspec.PathSpec.from_lines("gitwildmatch", all_patterns) + + +def render_default_caddyfile_content() -> str: + """ + Generate default Caddyfile content for static site hosting. + Ported from vefaas-cli renderDefaultCaddyfileContent(). + """ + return """:8000 { + root * . + # Block access to sensitive paths + @unsafePath { + path /.git/* /node_modules/* /vendor/* /.venv/* + } + respond @unsafePath 404 + # Configure cache policies for different file types + @staticAssets { + path *.css *.js *.png *.jpg *.jpeg *.gif *.svg *.ico *.webp + } + @htmlFiles { + path *.html + } + # Long cache for static assets (1 year), browser updates when filename changes + header @staticAssets Cache-Control "public, max-age=31536000, immutable" + # No cache for HTML + header @htmlFiles Cache-Control "no-cache, no-store, must-revalidate" + header @htmlFiles Pragma "no-cache" + header @htmlFiles Expires "0" + # Default cache header for unmatched files + header Cache-Control "public, max-age=3600" + file_server + try_files {path} {path}.html {path}/index.html 404.html index.html + log { + output stderr + } +}""" + + +def ensure_caddyfile_in_output( + dest_dir: str, + output_path: str, + filename: str = DEFAULT_CADDYFILE_NAME +) -> str: + """ + Create DefaultCaddyFile in the output directory for static site hosting. + Ported from vefaas-cli ensureCaddyfileInOutput(). + + Args: + dest_dir: Project root directory + output_path: Build output directory (relative to dest_dir) + filename: Caddyfile filename (default: DefaultCaddyFile) + + Returns: + Path to the created Caddyfile + """ + out_dir = os.path.join(dest_dir, output_path) if output_path and output_path != "./" else dest_dir + os.makedirs(out_dir, exist_ok=True) + + content = render_default_caddyfile_content() + target = os.path.join(out_dir, filename) + with open(target, "w", encoding="utf-8") as f: + f.write(content) + + logger.info(f"[deploy] Caddyfile generated: {target}") + return target + + +@dataclass +class DeployResult: + """Deployment result containing IDs and URLs""" + success: bool = False + application_id: Optional[str] = None + function_id: Optional[str] = None + function_name: Optional[str] = None + access_url: Optional[str] = None + console_url: Optional[str] = None + app_console_url: Optional[str] = None + error: Optional[str] = None + logs: List[str] = field(default_factory=list) # Status messages + + +@dataclass +class DeployConfig: + """Deployment configuration""" + project_path: str # Required: project root path + name: Optional[str] = None # App name (required for new apps) + application_id: Optional[str] = None # Existing app ID (for updates) + region: str = "cn-beijing" # Region + gateway_name: Optional[str] = None # Gateway name (auto-detect if not specified) + build_command: Optional[str] = None # Override build command + output_path: Optional[str] = None # Override output path + start_command: Optional[str] = None # Override start command + port: Optional[int] = None # Override port + skip_build: bool = False # Skip build step + + +class VeFaaSClient: + """ + veFaaS API client - provides all operations needed for deployment. + + Usage: + client = VeFaaSClient(ak, sk, token, region) + result = deploy_application(config, client) + """ + + def __init__(self, ak: str, sk: str, token: str, region: str = "cn-beijing"): + self.ak = ak + self.sk = sk + self.token = token + self.region = region + + def call(self, action: str, body: dict, timeout: int = None) -> dict: + """Make a raw API call""" + from ..sign import request + now = datetime.datetime.utcnow() + return request("POST", now, {}, {}, self.ak, self.sk, self.token, action, json.dumps(body), self.region, timeout) + + def _check_api_error(self, result: dict, action: str) -> None: + """Check API response for errors and raise with detailed message.""" + error = result.get("ResponseMetadata", {}).get("Error", {}) + if error: + code = error.get("Code", "") + message = error.get("Message", "Unknown error") + request_id = result.get("ResponseMetadata", {}).get("RequestId", "") + + # Check for common error patterns and provide clear guidance + if "already exists" in message.lower() or "duplicate" in message.lower(): + raise ValueError( + f"[{action}] Name already exists: {message}\n" + "To update an existing application, get the application_id from `.vefaas/config.json` or console, " + "then call deploy_application with application_id parameter. " + "Do NOT use function_id directly - always use application_id for updates." + ) + elif "not found" in message.lower(): + raise ValueError(f"[{action}] Resource not found: {message}") + elif "permission" in message.lower() or "auth" in message.lower(): + raise ValueError( + f"[{action}] Permission denied: {message}\n" + "Please visit https://console.volcengine.com/iam/service/attach_role/?ServiceName=vefaas to authorize and retry" + ) + else: + raise ValueError(f"[{action}] API error ({code}): {message} (RequestId: {request_id})") + + def _call_with_check(self, action: str, body: dict, timeout: int = None) -> dict: + """Make API call and check for errors.""" + result = self.call(action, body, timeout) + self._check_api_error(result, action) + return result + + # ========== TOS Operations ========== + + def gen_temp_tos_url(self) -> dict: + """Generate temporary TOS upload URL""" + return self._call_with_check("GenTempTosObjectUrl", {}) + + def upload_to_tos(self, zip_bytes: bytes) -> str: + """ + Upload zip bytes to TOS and return inner source location. + + Returns: + Inner source location string for use in CreateFunction/UpdateFunction + """ + import requests + + # Get temporary upload URL + result = self.gen_temp_tos_url() + temp = result.get("Result", {}) + outer_url = temp.get("OuterSourceLocation") + inner_location = temp.get("InnerSourceLocation") + + if not outer_url or not inner_location: + raise ValueError("Failed to get TOS upload URL") + + # Upload to TOS + response = requests.put(outer_url, data=zip_bytes, headers={ + "Content-Type": "application/octet-stream", + }) + + if response.status_code not in (200, 201): + raise ValueError(f"Failed to upload to TOS: {response.status_code}") + + logger.info(f"[deploy] Uploaded to TOS: {len(zip_bytes)} bytes") + return inner_location + + # ========== Function Operations ========== + + def create_function( + self, + name: str, + runtime: str, + command: str, + port: int = 8080, + source: Optional[str] = None, + build_command: Optional[str] = None, + output_path: Optional[str] = None, + ) -> dict: + """Create a new function with optional TOS source""" + body = { + "Name": name, + "Runtime": runtime, + "Command": command, + "Port": port, + "Description": "Created by veFaaS MCP", + "ExclusiveMode": False, + "RequestTimeout": 300, + "MaxConcurrency": 100, + "MemoryMB": 1024, + } + + if source: + body["SourceType"] = "tos" + body["Source"] = source + + # Always include BuildConfig for application source + # Use 'echo skip' as no-op command if build_command is empty (API requires non-empty) + body["BuildConfig"] = { + "Command": build_command or "echo skip", + "OutputPath": output_path or "./", + } + + return self._call_with_check("CreateFunction", body) + + def update_function( + self, + function_id: str, + source: Optional[str] = None, + command: Optional[str] = None, + port: Optional[int] = None, + runtime: Optional[str] = None, + ) -> dict: + """Update function with new source/config""" + body = {"Id": function_id} + + if source: + body["SourceType"] = "tos" + body["Source"] = source + if command: + body["Command"] = command + if port: + body["Port"] = port + if runtime: + body["Runtime"] = runtime + + return self._call_with_check("UpdateFunction", body) + + def get_function(self, function_id: str) -> dict: + """Get function details""" + return self.call("GetFunction", {"Id": function_id}) + + def release_function(self, function_id: str, revision_number: int = 0) -> dict: + """Release/deploy a function""" + return self._call_with_check("Release", { + "FunctionId": function_id, + "RevisionNumber": revision_number, + "Description": "Triggered by veFaaS MCP", + }) + + def get_release_status(self, function_id: str) -> dict: + """Get function release status""" + return self.call("GetReleaseStatus", {"FunctionId": function_id}) + + def get_dependency_install_status(self, function_id: str) -> dict: + """Get dependency installation task status""" + return self.call("GetDependencyInstallTaskStatus", {"FunctionId": function_id}) + + # ========== Application Operations ========== + + def get_application(self, app_id: str) -> dict: + """Get application details""" + return self.call("GetApplication", {"Id": app_id}) + + def create_application(self, name: str, function_id: str, gateway_name: str) -> dict: + """Create a new application""" + return self._call_with_check("CreateApplication", { + "Name": name, + "FunctionId": function_id, + "Description": "Created by veFaaS MCP", + "Reference": "mcp", + "EndpointConfig": {"GatewayName": gateway_name}, + }) + + def release_application(self, app_id: str) -> dict: + """Release/deploy an application""" + return self._call_with_check("ReleaseApplication", { + "Id": app_id, + "SkipPipeline": True, + "Description": "Triggered by veFaaS MCP", + }) + + def list_applications(self, page_number: int = 1, page_size: int = 100, filters: Optional[List[dict]] = None) -> dict: + """List applications""" + body = {"PageNumber": page_number, "PageSize": page_size} + if filters: + body["Filters"] = filters + return self.call("ListApplications", body) + + def find_application_by_name(self, name: str) -> Optional[str]: + """Find application ID by name (using API level filter)""" + try: + # Format: Filters: [{ Item: { Key: 'Name', Value: [name] } }] + filters = [{"Item": {"Key": "Name", "Value": [name]}}] + result = self.list_applications(page_size=50, filters=filters) + items = result.get("Result", {}).get("Items", []) + + # Client-side verification + target_name = name.lower() + for item in items: + if item.get("Name", "").lower() == target_name: + return item.get("Id") + return None + except: + return None + + # ========== Gateway Operations ========== + + def list_gateways(self) -> dict: + """List API gateways""" + return self.call("ListGateways", {"PageNumber": 1, "PageSize": 100}) + + def get_usable_gateway(self) -> Optional[str]: + """Get the name of an available (Running) gateway""" + try: + result = self.list_gateways() + gateways = result.get("Result", {}).get("Items", []) + for gw in gateways: + if gw.get("Status") == "Running": + return gw.get("Name") + return None + except: + return None + + +def get_console_url(function_id: str, region: str) -> str: + """Get function console URL""" + return f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}" + + +def get_application_console_url(application_id: str, region: str) -> str: + """Get application console URL""" + return f"https://console.volcengine.com/vefaas/region:vefaas+{region}/application/detail/{application_id}" + + +def extract_access_url_from_cloud_resource(cloud_resource: str) -> Optional[str]: + """ + Extract access URL from CloudResource JSON string. + Matches vefaas-cli's parseCloudResource + sanitizeUrl logic. + """ + try: + if not cloud_resource: + return None + parsed = json.loads(cloud_resource) + # Get first key's value (e.g., 'framework' or other) + keys = list(parsed.keys()) + if not keys: + return None + data = parsed[keys[0]] + url_obj = data.get('url', {}) + # Prefer system_url, fallback to inner_url + system_url = url_obj.get('system_url', '') + inner_url = url_obj.get('inner_url', '') + if system_url and isinstance(system_url, str) and system_url.strip(): + return system_url.strip() + if inner_url and isinstance(inner_url, str) and inner_url.strip(): + return inner_url.strip() + return None + except: + return None + + +def wait_for_function_release( + client: VeFaaSClient, + function_id: str, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + poll_interval_seconds: int = DEFAULT_POLL_INTERVAL_SECONDS +) -> dict: + """Wait for function release to complete.""" + start_time = time.time() + last_status = "" + + while time.time() - start_time < timeout_seconds: + try: + result = client.get_release_status(function_id) + status = result.get("Result", {}).get("Status", "") + + if status != last_status: + logger.info(f"[release] Function release status: {status}") + last_status = status + + if status.lower() == "done": + return {"success": True, "status": status} + + if status.lower() == "failed": + msg = result.get("Result", {}).get("StatusMessage", "") + raise ValueError(f"Function release failed: {msg}") + + except ValueError: + raise + except Exception as e: + logger.warning(f"[release] Error checking status: {e}") + + time.sleep(poll_interval_seconds) + + raise ValueError(f"Function release timed out after {timeout_seconds} seconds") + + +def wait_for_application_deploy( + client: VeFaaSClient, + application_id: str, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + poll_interval_seconds: int = DEFAULT_POLL_INTERVAL_SECONDS +) -> dict: + """Wait for application deployment to complete.""" + start_time = time.time() + last_status = "" + + while time.time() - start_time < timeout_seconds: + try: + result = client.get_application(application_id) + app = result.get("Result", {}) + status = app.get("Status", "") + + if status != last_status: + logger.info(f"[deploy] Application status: {status}") + last_status = status + + if status.lower() == "deploy_success": + access_url = extract_access_url_from_cloud_resource(app.get("CloudResource")) + return {"success": True, "access_url": access_url} + + if status.lower() in ("deploy_fail", "deleted", "delete_fail"): + # Try to get detailed error from GetReleaseStatus (like vefaas-cli) + error_details = {} + function_id = None + try: + # Try to get function_id from CloudResource first (like get_application_detail) + cloud_resource_str = app.get("CloudResource", "") + if cloud_resource_str: + try: + cloud_resource = json.loads(cloud_resource_str) + keys = list(cloud_resource.keys()) + if keys: + first_key = keys[0] + function_id = cloud_resource.get(first_key, {}).get("function_id") + except: + pass + + # Fallback: try Config + if not function_id: + config_str = app.get("Config", "") + if config_str: + try: + config_data = json.loads(config_str) + function_id = config_data.get("function", {}).get("function_id") + except: + pass + + if function_id: + rel_result = client.get_release_status(function_id) + rel = rel_result.get("Result", {}) + status_msg = rel.get("StatusMessage", "").strip() + if status_msg: + error_details["error_message"] = status_msg + log_url = rel.get("FailedInstanceLogs", "").strip() + if log_url: + error_details["error_logs_uri"] = log_url + error_details["function_id"] = function_id + except Exception as ex: + logger.debug(f"Failed to get release status error details: {ex}") + + console_url = f"https://console.volcengine.com/vefaas/region:vefaas+{client.region}/application/detail/{application_id}" + + # Build detailed error message + error_parts = [f"Application deployment failed ({status})"] + if error_details.get("error_message"): + error_parts.append(f"Error: {error_details['error_message']}") + if error_details.get("error_logs_uri"): + error_parts.append(f"Logs: {error_details['error_logs_uri']}") + error_parts.append(f"Console: {console_url}") + + raise ValueError(". ".join(error_parts)) + + except ValueError: + raise + except Exception as e: + logger.warning(f"[deploy] Error checking status: {e}") + + time.sleep(poll_interval_seconds) + + raise ValueError(f"Deployment timed out after {timeout_seconds} seconds") + + +def wait_for_dependency_install( + client: VeFaaSClient, + function_id: str, + timeout_seconds: int = 300, + poll_interval_seconds: int = 5 +) -> dict: + """Wait for Python dependency installation to complete.""" + start_time = time.time() + last_status = "" + + while time.time() - start_time < timeout_seconds: + try: + result = client.get_dependency_install_status(function_id) + status = result.get("Result", {}).get("Status", "") + + if status != last_status: + logger.info(f"[dependency] Installation status: {status}") + last_status = status + + if status.lower() in ("succeeded", "success", "done"): + return {"success": True, "status": status} + + if status.lower() == "failed": + raise ValueError("Dependency installation failed") + + except ValueError: + raise + except Exception as e: + logger.warning(f"[dependency] Error checking status: {e}") + + time.sleep(poll_interval_seconds) + + raise ValueError(f"Dependency installation timed out after {timeout_seconds} seconds") + + +def _run_build_command(project_path: str, build_command: str): + """Run build command in project directory.""" + logger.info(f"[build] Running: {build_command}") + result = subprocess.run( + build_command, + shell=True, + cwd=project_path, + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise ValueError(f"Build failed: {result.stderr or result.stdout}") + logger.info("[build] Build completed successfully") + + +def package_directory(directory: str, base_dir: Optional[str] = None, include_gitignore: bool = True) -> bytes: + """ + Package directory into a zip file using pathspec for gitignore-style filtering. + + Args: + directory: Directory to package + base_dir: Project root for reading ignore files (defaults to directory) + include_gitignore: Whether to include .gitignore patterns + - True: Python projects (source code deployment) + - False: Built output or function code upload (only .vefaasignore) + + Returns: + Zip file bytes + """ + if base_dir is None: + base_dir = directory + + # Load ignore patterns based on scenario + gitignore_patterns = read_gitignore_patterns(base_dir) if include_gitignore else [] + vefaasignore_patterns = read_vefaasignore_patterns(base_dir) + spec = create_ignore_filter(gitignore_patterns, vefaasignore_patterns) + + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + for root, dirs, files in os.walk(directory): + rel_root = os.path.relpath(root, directory) + if rel_root == ".": + rel_root = "" + + # Filter directories in-place to prevent descending into ignored dirs + dirs[:] = [ + d for d in dirs + if not spec.match_file(f"{rel_root}/{d}" if rel_root else d) + and not spec.match_file(f"{rel_root}/{d}/" if rel_root else f"{d}/") + ] + + for file in files: + arcname = f"{rel_root}/{file}" if rel_root else file + + # Skip files matching ignore patterns + if spec.match_file(arcname): + continue + + file_path = os.path.join(root, file) + zf.write(file_path, arcname) + + buffer.seek(0) + zip_bytes = buffer.read() + return zip_bytes + + +def deploy_application(config: DeployConfig, client: VeFaaSClient) -> DeployResult: + """ + Deploy an application to veFaaS. + + Correct flow (from vefaas-cli): + 1. Detect project configuration + 2. Build project (if build_command exists and not skip_build) + 3. Package output directory + 4. Upload to TOS + 5. Create/Update function with Source pointing to TOS + 6. Wait for dependency installation (Python) + 7. Release function + 8. Create application (if needed) + 9. Release application + + Args: + config: Deployment configuration + client: VeFaaSClient instance + + Returns: + DeployResult with IDs and URLs + """ + result = DeployResult() + + def log(msg: str): + """Add message to result logs""" + result.logs.append(msg) + logger.info(f"[deploy] {msg}") + + try: + # Validate + if not os.path.isabs(config.project_path): + raise ValueError(f"project_path must be absolute, got: {config.project_path}") + if not os.path.exists(config.project_path): + raise ValueError(f"project_path does not exist: {config.project_path}") + + # Read existing config if application_id not provided + # Only use config's application_id if regions match (cross-region deployment needs new app) + existing_config = read_config(config.project_path) + if existing_config: + config_region = existing_config.function.region or "cn-beijing" + if config_region == client.region: + if not config.application_id and existing_config.function.application_id: + config.application_id = existing_config.function.application_id + log(f"[config] Using application_id from config: {config.application_id}") + else: + log(f"[config] Config region ({config_region}) differs from target region ({client.region}), will create new application") + + if not config.name and not config.application_id: + raise ValueError("Must provide name or application_id") + + # 0. Early check for duplicate application name + if config.name and not config.application_id: + existing_app_id = client.find_application_by_name(config.name) + if existing_app_id: + raise ValueError( + f"Application name '{config.name}' already exists (ID: {existing_app_id}). " + f"To update this application, pass application_id='{existing_app_id}' parameter." + ) + + # 0.5 Early check: if updating existing app and deployment is in progress, return early + if config.application_id: + try: + app_status = client.get_application(config.application_id).get("Result", {}) + current_status = app_status.get("Status", "").lower() + if current_status in ("deploying", "releasing", "deploy_pendding"): + result.application_id = config.application_id + result.app_console_url = get_application_console_url(config.application_id, client.region) + result.error = ( + f"Deployment is already in progress (status: {current_status}). " + "Wait for completion, or call get_application_detail to check status." + ) + return result + + # Early check: validate Application SourceType (only 'function' type is supported) + source_type = app_status.get("SourceType", "") + if source_type and source_type != "function": + raise ValueError( + f"Application {config.application_id} has SourceType '{source_type}' which is not compatible with code deployment via MCP. " + "Only 'function' type applications are supported. " + "Please delete the .vefaas/config.json and vefaas.yaml files, then use 'name' parameter to create a new custom application." + ) + except ValueError: + raise + except Exception as e: + log(f"[warning] Could not check application status: {e}") + + # 1. Detect project + log("[1/7] Detecting project configuration...") + detection = auto_detect(config.project_path) + log(f" → Detected: framework={detection.framework}, runtime={detection.runtime}") + + # Apply user overrides + build_command = config.build_command or detection.build_command + output_path = config.output_path or detection.output_path or "." + start_command = config.start_command or detection.start_command + port = config.port or detection.port + is_python = "python" in detection.runtime.lower() + + # Validate start_command is required + if not start_command: + raise ValueError( + "start_command is required but not provided. " + "Please provide start_command parameter, e.g.:\n" + " - Python FastAPI: 'python -m uvicorn main:app --host 0.0.0.0 --port 8080'\n" + " - Python Flask: 'python -m flask run --host 0.0.0.0 --port 8080'\n" + " - Node.js: 'node server.js'\n" + " - Static site: 'npx serve . -l 8080' (serves current dir after build)\n" + "Or call detect_project first to auto-detect the configuration." + ) + + # Validate build_command is required for non-Python runtimes (unless skip_build) + if not is_python and not build_command and not config.skip_build: + raise ValueError( + "build_command is required for non-Python runtimes but not provided. " + ) + + # Validate port is required and remind about alignment with start_command + if not port: + raise ValueError( + "port is required but not provided. " + "Please provide port parameter.\n" + "IMPORTANT: The port must match the actual listening port in your start_command.\n" + ) + + # 2. Build project (if needed) + if not config.skip_build and build_command and not is_python: + log(f"[2/7] Building project: {build_command}") + _run_build_command(config.project_path, build_command) + log(" → Build completed") + + # For static sites, generate DefaultCaddyFile in output directory + if detection.is_static: + root_caddy = os.path.join(config.project_path, DEFAULT_CADDYFILE_NAME) + out_dir = os.path.join(config.project_path, output_path) if output_path and output_path != "./" and output_path != "." else config.project_path + target_caddy = os.path.join(out_dir, DEFAULT_CADDYFILE_NAME) + + if os.path.exists(root_caddy): + # Copy existing Caddyfile from project root to output + import shutil + os.makedirs(out_dir, exist_ok=True) + shutil.copy2(root_caddy, target_caddy) + log(f" → Existing {DEFAULT_CADDYFILE_NAME} copied to output") + else: + # Generate new Caddyfile + ensure_caddyfile_in_output(config.project_path, output_path) + log(f" → {DEFAULT_CADDYFILE_NAME} generated for static site") + else: + log("[2/7] Build skipped") + # Even if build is skipped, static sites still need Caddyfile + if detection.is_static: + out_dir = os.path.join(config.project_path, output_path) if output_path and output_path != "./" and output_path != "." else config.project_path + target_caddy = os.path.join(out_dir, DEFAULT_CADDYFILE_NAME) + if not os.path.exists(target_caddy): + ensure_caddyfile_in_output(config.project_path, output_path) + log(f" → {DEFAULT_CADDYFILE_NAME} generated for static site") + + # 3. Package output directory + # Python: include .gitignore (source code deployment) + # Non-Python: only .vefaasignore (built output doesn't need gitignore) + log("[3/7] Packaging code...") + package_path = os.path.join(config.project_path, output_path) if output_path != "." else config.project_path + if not os.path.exists(package_path): + package_path = config.project_path # Fallback to project root + + zip_bytes = package_directory(package_path, base_dir=config.project_path, include_gitignore=is_python) + log(f" → Packaged: {len(zip_bytes) / 1024:.1f} KB") + + # 4. Upload to TOS + log("[4/7] Uploading to cloud storage...") + source_location = client.upload_to_tos(zip_bytes) + log(" → Upload completed") + + # 5. Create or Update function + target_function_id = None + target_application_id = config.application_id + function_name = None + + # If application_id is provided, get the function_id from app details + if config.application_id: + log(f"[5/7] Getting function from application: {config.application_id}") + try: + app_detail = client.get_application(config.application_id) + app_data = app_detail.get("Result", {}) + + # Try to get function_id from CloudResource first (like vefaas-cli) + cloud_resource_str = app_data.get("CloudResource", "") + if cloud_resource_str: + try: + cloud_resource = json.loads(cloud_resource_str) + # CloudResource format: {"framework": {"function_id": "xxx", ...}} + keys = list(cloud_resource.keys()) + if keys: + first_key = keys[0] + target_function_id = cloud_resource.get(first_key, {}).get("function_id") + except json.JSONDecodeError: + pass + + # Fallback: try to get from Config + if not target_function_id: + config_str = app_data.get("Config", "") + if config_str: + try: + config_data = json.loads(config_str) + target_function_id = config_data.get("function", {}).get("function_id") + except json.JSONDecodeError: + pass + + if not target_function_id: + raise ValueError( + f"Could not find function_id in application {config.application_id}. " + "This application may not have a function associated with it, or it was created without a function. " + "Please use 'name' parameter to create a new application instead." + ) + + log(f" → Found function: {target_function_id}") + except json.JSONDecodeError: + raise ValueError(f"Could not parse application data for {config.application_id}") + + if config.application_id: + # Update existing function + log(f"[5/7] Updating function: {target_function_id}") + client.update_function( + function_id=target_function_id, + source=source_location, + command=start_command, + port=port, + runtime=detection.runtime, + ) + log(" → Function updated") + elif config.name: + log(f"[5/7] Creating function: {config.name}") + create_resp = client.create_function( + name=config.name, + runtime=detection.runtime, + command=start_command, + port=port, + source=source_location, + build_command=build_command if not is_python else None, + output_path=output_path if not is_python else None, + ) + func_result = create_resp.get("Result", {}) + target_function_id = func_result.get("Id") + function_name = func_result.get("Name") + log(f" → Function created: {target_function_id}") + + if not target_function_id: + raise ValueError("Unable to determine target function ID") + + result.function_id = target_function_id + result.function_name = function_name + + # 6. Wait for dependency installation (Python) + if is_python: + log("[6/7] Waiting for dependency installation...") + try: + wait_for_dependency_install(client, target_function_id) + log(" → Dependencies installed") + except Exception as e: + log(f" → Dependency check: {e}") + else: + log("[6/7] Dependency installation skipped") + + # 7. Create application and deploy + access_url = None + + if config.name and not config.application_id: + # New application + log("[7/7] Creating and deploying application...") + gateway_name = config.gateway_name or client.get_usable_gateway() + if not gateway_name: + raise ValueError( + "No available API gateway found. " + "Please visit https://console.volcengine.com/veapig to create a running gateway, then retry." + ) + + app_name = f"{config.name}".lower() + create_app_resp = client.create_application(app_name, target_function_id, gateway_name) + target_application_id = create_app_resp.get("Result", {}).get("Id") + log(f" → Application created: {target_application_id}") + + # Release application + if target_application_id: + log(" → Releasing application...") + client.release_application(target_application_id) + log(" → Waiting for deployment...") + deploy_status = wait_for_application_deploy(client, target_application_id, timeout_seconds=DEPLOY_TIMEOUT_SECONDS) + access_url = deploy_status.get("access_url") + + # If access_url not from deploy status, fetch from app details + if not access_url: + try: + app_detail = client.get_application(target_application_id) + cloud_resource = app_detail.get("Result", {}).get("CloudResource", "") + access_url = extract_access_url_from_cloud_resource(cloud_resource) + except: + pass + + log(" → Deployment completed!") + elif config.application_id: + # Update existing application - re-release it + log("[7/7] Re-deploying existing application...") + client.release_application(target_application_id) + log(" → Waiting for deployment...") + deploy_status = wait_for_application_deploy(client, target_application_id, timeout_seconds=DEPLOY_TIMEOUT_SECONDS) + access_url = deploy_status.get("access_url") + + if not access_url: + try: + app_detail = client.get_application(target_application_id) + cloud_resource = app_detail.get("Result", {}).get("CloudResource", "") + access_url = extract_access_url_from_cloud_resource(cloud_resource) + except: + pass + + log(" → Deployment completed!") + else: + log("[7/7] Application deployment skipped") + + result.application_id = target_application_id + result.function_id = target_function_id + result.access_url = access_url + if target_application_id: + result.app_console_url = get_application_console_url(target_application_id, client.region) + + result.success = True + + # Save config to .vefaas/config.json and vefaas.yaml + try: + save_config = VefaasConfig( + function=FunctionConfig( + id=target_function_id or "", + runtime=detection.runtime, + region=client.region, + application_id=target_application_id, + ), + name=config.name or function_name, + command=start_command, + ) + if access_url: + save_config.triggers = TriggerConfig( + type="apig", + system_url=access_url, + ) + write_config(config.project_path, save_config) + log("[config] Saved .vefaas/config.json and vefaas.yaml") + except Exception as e: + log(f"[config] Warning: Failed to save config: {e}") + + log("Deployed successfully!") + except Exception as e: + result.error = str(e) + result.success = False + log(f"Deployment failed: {e}") + + return result diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/detector.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/detector.py new file mode 100644 index 00000000..f1668994 --- /dev/null +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_cli_sdk/detector.py @@ -0,0 +1,672 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates +# SPDX-License-Identifier: MIT + +""" +Project Detection Module + +This module encapsulates project detection logic for veFaaS applications, +ported from vefaas-cli (detector.ts). + +Supported frameworks: +- Node.js: Next.js, Nuxt, Vite, VitePress, Rspress, Astro, Express, SvelteKit, Remix, CRA, Angular, Gatsby +- Python: FastAPI, Flask, Streamlit, Django +- Static: HTML sites, Hugo, MkDocs, Zola, Hexo +""" + +import os +import json +import re +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class DetectionResult: + """Project detection result""" + install_command: str = "" # Install command + build_command: str = "" # Build command + output_path: str = "./" # Build output directory + start_command: str = "" # Start command + port: int = 8000 # Port + runtime: str = "native-node20/v1" # Runtime + framework: str = "" # Framework name + is_static: bool = False # Is static site + + +def auto_detect(target_path: str) -> DetectionResult: + """ + Auto-detect project runtime, framework and related commands based on file system. + + Args: + target_path: Project root directory path + + Returns: + DetectionResult: Detection result + """ + root = os.path.abspath(target_path) + logger.debug(f"[detect] Starting framework detection for: {root}") + + # Detect build and run scripts + build_script = _find_script(root, ["build.sh"]) + run_script = _find_script(root, ["run.sh"]) + has_build = build_script is not None + has_run = run_script is not None + logger.debug(f"[detect] Found scripts: build={build_script or 'none'}, run={run_script or 'none'}") + + fallback = DetectionResult( + build_command=build_script or "./build.sh", + start_command=run_script or "./run.sh", + ) + + # Try detectors in priority order + detectors = [_detect_node, _detect_python, _detect_static] + + for detector in detectors: + try: + result = detector(root) + if result: + # Override with custom scripts if present + if has_build and build_script: + result.build_command = build_script + if has_run and run_script: + result.start_command = run_script + if has_build and has_run: + result.port = 8000 + result.output_path = "./" + logger.debug(f"[detect] Framework detected: {result.framework or 'none'}") + return result + except Exception as e: + logger.debug(f"[detect] Detector error: {e}") + continue + + logger.debug("[detect] No framework detected, using fallback") + return fallback + + +def _find_script(root: str, names: List[str]) -> Optional[str]: + """Find script file""" + for name in names: + script_path = os.path.join(root, name) + if os.path.exists(script_path): + return f"./{name}" + return None + + +def _exists(path: str) -> bool: + """Check if path exists""" + return os.path.exists(path) + + +def _read_text(path: str) -> Optional[str]: + """Read text file""" + try: + with open(path, "r", encoding="utf-8") as f: + return f.read() + except: + return None + + +def _read_json(path: str) -> Dict[str, Any]: + """Read JSON file""" + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except: + return {} + + +# ==================== Node.js Detection ==================== + +def _detect_node(root: str) -> Optional[DetectionResult]: + """Detect Node.js project""" + pkg_path = os.path.join(root, "package.json") + has_pkg = _exists(pkg_path) + + is_node = ( + has_pkg or + _exists(os.path.join(root, "package-lock.json")) or + _exists(os.path.join(root, "pnpm-lock.yaml")) or + _exists(os.path.join(root, "yarn.lock")) + ) + + if not is_node: + return None + + pkg = _read_json(pkg_path) if has_pkg else {} + scripts = pkg.get("scripts", {}) + pm = _get_node_package_manager(root, pkg) + framework = _detect_node_framework(pkg) + + build_cmd = _resolve_node_build_command(pm, scripts, framework) + output_path = _resolve_node_output_path(framework, scripts, root) + start_cmd = _resolve_node_start_command(pm, scripts, framework, root, pkg) + install_cmd = _resolve_node_install_command(pm, root) + port = _detect_node_port(root, scripts, framework, pkg) + is_static = _should_use_static_hosting(framework, root, pkg, scripts) + + return DetectionResult( + install_command=install_cmd, + build_command=build_cmd, + output_path=output_path, + start_command=start_cmd, + port=port or 8000, + runtime="native-node20/v1", + framework=framework, + is_static=is_static, + ) + + +def _get_node_package_manager(root: str, pkg: Dict) -> str: + """Get Node.js package manager""" + if _exists(os.path.join(root, "pnpm-lock.yaml")): + return "pnpm" + if _exists(os.path.join(root, "yarn.lock")): + return "yarn" + pm_field = pkg.get("packageManager", "").lower() + if pm_field.startswith("pnpm"): + return "pnpm" + if pm_field.startswith("yarn"): + return "yarn" + return "npm" + + +def _detect_node_framework(pkg: Dict) -> str: + """Detect Node.js framework""" + deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} + + if "next" in deps: + return "next" + if "vitepress" in deps: + return "vitepress" + if "rspress" in deps: + return "rspress" + if "vite" in deps: + return "vite" + if "nuxt" in deps or "nuxi" in deps: + return "nuxt" + if "astro" in deps: + return "astro" + if "react-scripts" in deps: + return "cra" + if "@angular/core" in deps or "@angular/cli" in deps: + return "angular" + if "@nestjs/core" in deps: + return "nest" + if "express" in deps: + return "express" + if "@sveltejs/kit" in deps: + return "sveltekit" + if "@remix-run/dev" in deps: + return "remix" + if "gatsby" in deps: + return "gatsby" + if "@gulux/gulux" in deps or "@gulux/cli" in deps or "gulux" in deps: + return "gulux" + return "" + + +def _pm_run(pm: str, script: str) -> str: + """Generate package manager run command""" + if pm == "pnpm": + return f"pnpm run {script}" + if pm == "yarn": + return f"yarn {script}" + return f"npm run {script}" + + +def _resolve_node_build_command(pm: str, scripts: Dict, framework: str) -> str: + """Resolve Node.js build command""" + if scripts.get("build"): + base = _pm_run(pm, "build") + if framework == "nuxt": + return f"NITRO_PRESET=node-server {base}" + return base + + framework_builds = { + "next": "npx next build", + "vite": "npx vite build", + "sveltekit": "npx vite build", + "astro": "npx vite build", + "vitepress": "npx vitepress build", + "rspress": "npx rspress build", + "nuxt": "NITRO_PRESET=node-server npx nuxi build", + "cra": "npx react-scripts build", + "angular": "npx ng build --configuration production", + "gatsby": "npx gatsby build", + } + return framework_builds.get(framework, "") + + +def _resolve_node_output_path(framework: str, scripts: Dict, root: str) -> str: + """Resolve Node.js output path""" + output_paths = { + "next": "./", + "vite": "dist", + "sveltekit": "dist", + "astro": "dist", + "vitepress": ".vitepress/dist", + "rspress": "doc_build", + "nuxt": ".output", + "cra": "build", + "angular": "dist", + "gatsby": "public", + "gulux": "output", + } + return output_paths.get(framework, "./") + + +def _resolve_node_install_command(pm: str, root: str) -> str: + """Resolve Node.js install command""" + if pm == "pnpm": + return "pnpm install" + if pm == "yarn": + return "yarn install" + # npm: prefer ci + if _exists(os.path.join(root, "package-lock.json")): + return "npm ci" + return "npm install" + + +def _resolve_node_start_command(pm: str, scripts: Dict, framework: str, root: str, pkg: Dict) -> str: + """Resolve Node.js start command""" + # Static sites use Caddy + if _should_use_static_hosting(framework, root, pkg, scripts): + return "caddy run --config DefaultCaddyFile --adapter caddyfile" + + if scripts.get("start"): + return _pm_run(pm, "start") + + framework_starts = { + "next": "npx next start -p ${PORT:-8080}", + "nuxt": "HOST=0.0.0.0 node ./server/index.mjs", + "nest": _pm_run(pm, "start:prod"), + "express": "node server.js", + } + return framework_starts.get(framework, "") + + +def _detect_node_port(root: str, scripts: Dict, framework: str, pkg: Dict) -> int: + """Detect Node.js port""" + if _should_use_static_hosting(framework, root, pkg, scripts): + return 8000 + + # Detect port from scripts + start_script = scripts.get("start", "") + port_match = re.search(r"--port[=\s]+(\d+)", start_script) + if port_match: + return int(port_match.group(1)) + + # Framework default ports + framework_ports = { + "next": 3000, + "nuxt": 3000, + "nest": 3000, + "cra": 3000, + "express": 3000, + "remix": 3000, + "gatsby": 3000, + "vite": 4173, + "vitepress": 4173, + "sveltekit": 4173, + "astro": 4321, + "angular": 8080, + } + return framework_ports.get(framework, 3000) + + +def _should_use_static_hosting(framework: str, root: str, pkg: Dict, scripts: Dict) -> bool: + """Determine if static hosting should be used (based on vefaas-cli shouldUseStaticHosting)""" + deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} + + if framework in ["vitepress", "rspress", "cra", "angular", "gatsby"]: + return True + + if framework == "astro": + ssr_adapters = ["@astrojs/node", "@astrojs/deno", "@astrojs/netlify", "@astrojs/vercel", "@astrojs/cloudflare"] + if any(adapter in deps for adapter in ssr_adapters): + return False + # Check if astro.config has output: 'server' + if _is_astro_ssr(root): + return False + return True + + if framework == "next": + # next export or out dir exists means static + build_script = scripts.get("build", "") + if "next export" in build_script: + return True + if _exists(os.path.join(root, "out")): + return True + return False + + if framework == "sveltekit": + if "@sveltejs/adapter-static" in deps: + return True + if "@sveltejs/adapter-node" in deps: + return False + return False + + if framework == "nuxt": + build_script = scripts.get("build", "") + if "nuxi generate" in build_script: + return True + if "NITRO_PRESET=static" in build_script: + return True + return False + + if framework == "vite": + # Vite: if SSR signals present, not static; otherwise default to static (SPA/MPA) + if _is_vite_ssr(root, pkg, scripts): + return False + return True # Default static + + return False + + +def _is_vite_ssr(root: str, pkg: Dict, scripts: Dict) -> bool: + """Detect if Vite is in SSR mode (based on vefaas-cli isViteSSR)""" + # Check if scripts have ssr related commands + for key, val in scripts.items(): + val_lower = (val or "").lower() + if "vite-ssr" in val_lower or "vite build --ssr" in val_lower: + return True + + # Check SSR dependencies + deps = {**pkg.get("dependencies", {}), **pkg.get("devDependencies", {})} + ssr_deps = ["vite-plugin-ssr", "vike", "@vitejs/plugin-react-ssr"] + if any(dep in deps for dep in ssr_deps): + return True + + # Check vite.config for ssr config + vite_config_files = ["vite.config.js", "vite.config.ts", "vite.config.mjs", "vite.config.cjs"] + for config_file in vite_config_files: + content = _read_text(os.path.join(root, config_file)) + if content: + if re.search(r"ssr\s*:\s*\{", content) or re.search(r"ssr\s*:\s*true", content): + return True + + return False + + +def _is_astro_ssr(root: str) -> bool: + """Detect if Astro is in SSR mode""" + config_files = ["astro.config.js", "astro.config.ts", "astro.config.mjs", "astro.config.cjs", "astro.config.cts"] + for config_file in config_files: + content = _read_text(os.path.join(root, config_file)) + if content: + if re.search(r"output\s*:\s*['\"]server['\"]", content): + return True + if re.search(r"adapter\s*:\s*", content): + return True + return False + + +# ==================== Python Detection ==================== + +def _detect_python(root: str) -> Optional[DetectionResult]: + """Detect Python project""" + has_py = ( + _exists(os.path.join(root, "requirements.txt")) or + _exists(os.path.join(root, "pyproject.toml")) or + _exists(os.path.join(root, "Pipfile")) or + any(f.endswith(".py") + for f in os.listdir(root) if os.path.isfile(os.path.join(root, f))) + ) + + if not has_py: + return None + + # Read dependency info + req_content = _read_text(os.path.join(root, "requirements.txt")) or "" + pyproj_content = _read_text(os.path.join(root, "pyproject.toml")) or "" + deps_blob = f"{req_content}\n{pyproj_content}".lower() + + is_fastapi = "fastapi" in deps_blob or "uvicorn" in deps_blob + is_flask = "flask" in deps_blob + is_django = "django" in deps_blob + is_streamlit = "streamlit" in deps_blob + + # Detect package manager + pm = _detect_python_package_manager(root) + run_prefix = _get_python_run_prefix(pm) + port = _detect_python_port(root) or 8000 + + # Detect entrypoint file + main_file = _find_python_entrypoint(root) + default_entry = os.path.basename(main_file) if main_file else "main.py" + + # Generate start command + has_uvicorn = "uvicorn" in deps_blob + has_gunicorn = "gunicorn" in deps_blob + + if is_streamlit: + start_cmd = f"{run_prefix} streamlit run {default_entry} --server.port ${{PORT:-{port}}} --server.address 0.0.0.0".strip() + elif is_fastapi: + if has_uvicorn: + wsgi = _determine_python_wsgi(root, main_file, "fastapi") + if wsgi: + start_cmd = f"{run_prefix} uvicorn {wsgi} --host 0.0.0.0 --port ${{PORT:-{port}}}".strip() + else: + start_cmd = f"{run_prefix} python {default_entry}".strip() + else: + start_cmd = f"{run_prefix} python {default_entry}".strip() + elif is_flask: + wsgi = _determine_python_wsgi(root, main_file, "flask") + if wsgi and has_gunicorn: + start_cmd = f"{run_prefix} gunicorn {wsgi} --bind :{port}".strip() + else: + start_cmd = f"{run_prefix} python {default_entry}".strip() + elif is_django: + if has_gunicorn: + start_cmd = f"{run_prefix} gunicorn project.wsgi:application --bind :{port}".strip() + else: + start_cmd = f"{run_prefix} python manage.py runserver 0.0.0.0:{port}".strip( + ) + else: + start_cmd = f"{run_prefix} python {default_entry}".strip() + + # Python version detection + py_version = _detect_python_version(root) + runtime = f"native-python{_ensure_supported_py_version(py_version)}/v1" + + # Install command + install_cmd = _resolve_python_install_command(root) + + return DetectionResult( + install_command=install_cmd, + build_command="", + output_path="./", + start_command=start_cmd, + port=port, + runtime=runtime, + framework="streamlit" if is_streamlit else ("fastapi" if is_fastapi else ( + "flask" if is_flask else ("django" if is_django else ""))), + is_static=False, + ) + + +def _detect_python_package_manager(root: str) -> str: + """Detect Python package manager""" + if _exists(os.path.join(root, "uv.lock")): + return "uv" + pyproj = _read_text(os.path.join(root, "pyproject.toml")) or "" + if "[tool.poetry]" in pyproj: + return "poetry" + if "[tool.pdm]" in pyproj: + return "pdm" + if _exists(os.path.join(root, "Pipfile")): + return "pipenv" + return "pip" + + +def _get_python_run_prefix(pm: str) -> str: + """Get Python run prefix""" + prefixes = { + "uv": "UV_PROJECT_ENVIRONMENT=/tmp/.venv uv run", + "poetry": "POETRY_VIRTUALENVS_PATH=/tmp/.venv poetry run", + "pipenv": "WORKON_HOME=/tmp/.venv pipenv run", + "pdm": "PDM_VENV_PATH=/tmp/.venv pdm run", + } + return prefixes.get(pm, "") + + +def _find_python_entrypoint(root: str) -> Optional[str]: + """Find Python entrypoint file""" + common_names = ["main.py", "app.py", "run.py", + "server.py", "wsgi.py", "asgi.py", "manage.py"] + + for name in common_names: + path = os.path.join(root, name) + if _exists(path): + content = _read_text(path) or "" + if "FastAPI(" in content or "Flask(" in content: + return path + + # Fallback to first found common entry + for name in common_names: + path = os.path.join(root, name) + if _exists(path): + return path + + return None + + +def _determine_python_wsgi(root: str, entry_file: Optional[str], framework: str) -> Optional[str]: + """Determine Python WSGI path""" + if not entry_file: + return None + + content = _read_text(entry_file) + if not content: + return None + + ctor = "FastAPI" if framework == "fastapi" else "Flask" + match = re.search(rf"(\w+)\s*=\s*{ctor}\(", content) + if not match: + return None + + var_name = match.group(1) + rel = os.path.relpath(entry_file, root) + module = rel.replace(".py", "").replace(os.sep, ".") + + return f"{module}:{var_name}" + + +def _detect_python_port(root: str) -> Optional[int]: + """Detect Python port""" + # Detect from entry files + entry_files = ["main.py", "app.py", "run.py", "server.py"] + for name in entry_files: + content = _read_text(os.path.join(root, name)) + if content: + match = re.search( + r"port[\"']?\s*[=:]\s*(\d{4,5})", content, re.IGNORECASE) + if match: + return int(match.group(1)) + return None + + +def _detect_python_version(root: str) -> str: + """Detect Python version""" + # Detect from pyproject.toml + pyproj = _read_text(os.path.join(root, "pyproject.toml")) or "" + match = re.search(r'python\s*[>=<~^]*\s*["\']?(\d+\.\d+)', pyproj) + if match: + return match.group(1) + + # Detect from .python-version + pv = _read_text(os.path.join(root, ".python-version")) + if pv: + match = re.search(r"(\d+\.\d+)", pv.strip()) + if match: + return match.group(1) + + return "3.12" # Default + + +def _ensure_supported_py_version(version: str) -> str: + """Ensure supported Python version""" + supported = ["3.9", "3.10", "3.11", "3.12"] + if version in supported: + return version + # Try to match major version + for v in reversed(supported): + if version.startswith(v.split(".")[0]): + return v + return "3.12" + + +def _resolve_python_install_command(root: str) -> str: + """Resolve Python install command""" + if _exists(os.path.join(root, "uv.lock")): + if _exists(os.path.join(root, "requirements.txt")): + return "uv pip install -r requirements.txt" + return "uv sync" + + pyproj = _read_text(os.path.join(root, "pyproject.toml")) or "" + if "[tool.poetry]" in pyproj: + return "poetry install" + if "[tool.pdm]" in pyproj: + return "pdm install" + if _exists(os.path.join(root, "Pipfile")): + return "pipenv install" + if _exists(os.path.join(root, "requirements.txt")): + return "pip install -r requirements.txt" + return "pip install ." + + +# ==================== Static Site Detection ==================== + +def _detect_static(root: str) -> Optional[DetectionResult]: + """Detect static site""" + root_index = os.path.join(root, "index.html") + has_root_index = _exists(root_index) + + # Hugo detection + hugo_files = ["hugo.toml", "hugo.json", "hugo.yaml"] + has_hugo = any(_exists(os.path.join(root, f)) for f in hugo_files) + + # MkDocs detection + has_mkdocs = _exists(os.path.join(root, "mkdocs.yml")) + + # Build output directory detection + dist_index = os.path.join(root, "dist", "index.html") + build_index = os.path.join(root, "build", "index.html") + public_index = os.path.join(root, "public", "index.html") + has_built_static = any(_exists(p) + for p in [dist_index, build_index, public_index]) + + matched = has_root_index or has_hugo or has_mkdocs or has_built_static + if not matched: + return None + + # Determine output directory + if _exists(dist_index): + output_path = "dist" + elif _exists(build_index): + output_path = "build" + elif _exists(public_index): + output_path = "public" + else: + output_path = "./" + + # Determine framework + framework = "" + if has_hugo: + framework = "hugo" + elif has_mkdocs: + framework = "mkdocs" + + return DetectionResult( + install_command="", + build_command="", + output_path=output_path, + start_command="caddy run --config DefaultCaddyFile --adapter caddyfile", + port=8000, + runtime="native-node20/v1", + framework=framework, + is_static=True, + ) diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server.py index 8b734929..cf59858e 100644 --- a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server.py +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server.py @@ -1,4 +1,4 @@ -import fnmatch +import pathspec import io from pdb import run from socket import timeout @@ -25,6 +25,8 @@ import requests import shutil +from .vefaas_cli_sdk.deploy import package_directory + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -34,332 +36,41 @@ stateless_http=os.getenv("STATLESS_HTTP", "true").lower() == "true", streamable_http_path=os.getenv("STREAMABLE_HTTP_PATH", "/mcp")) -TemplateIdForRegion = { - "ap-southeast-1": "6943ba21735f270008330d1c", - "cn-beijing": "68d24592162cb40008217d6f", - "cn-shanghai": "6943b9de4fa45c0008ea04e1", - "cn-guangzhou": "6943b95bc69585000819d70f", -} def validate_and_set_region(region: str = None) -> str: """ - Validates the provided region and returns the default if none is provided. - - Args: - region: The region to validate + Validates the provided region and returns a valid region string. + If no region is provided, it tries to detect it from local configuration. - Returns: - A valid region string - - Raises: - ValueError: If the provided region is invalid + Priority: + 1. Provided region parameter + 2. Region from .vefaas/config.json or vefaas.yaml in current directory + 3. Default to "cn-beijing" """ valid_regions = ["ap-southeast-1", "cn-beijing", "cn-shanghai", "cn-guangzhou"] + if region: if region not in valid_regions: - raise ValueError(f"Invalid region. Must be one of: {', '.join(valid_regions)}") - else: - region = "cn-beijing" - return region - -@mcp.tool(description="""Create a veFaaS Application. - -Args: - - function_id: vefaas function id. - - function_name: vefaas function name. - - gateway_name: api gateway name (Name from tool `fetch_running_api_gateway`). - -Note: - - Applications bind the function to an API gateway as the top-level delivery unit. - - Creation automatically submits an application release; capture the returned `application_id`. - - On success, append `application_id` to `vefaas.yaml` immediately. - -**CRITICAL REQUIREMENT**: - - Invoke **ONLY AFTER** the current workflow has just run `create_function`; otherwise reuse the existing application. - - If need to create a new application, follow these steps: - - Step 1: List templates via `list_vefaas_application_templates`. - - Step 2: Pull details/code with `get_vefaas_application_template` if a template fits. - - Step 3: Create the veFaaS function and confirm its release succeeded. - - Step 4: Ensure a running API gateway is ready. - - Step 5: Create the application with `create_vefaas_application`. - - On success, immediately call `poll_vefaas_application_status` until deployment finishes (success or fail, max three polls). If creation fails or raises, stop and surface the error instead of polling. - -Error Handle Tips: - - If there is **any authentication** error about vefaas application(create/release/get), let user apply auth by link: https://console.volcengine.com/iam/service/attach_custom_role?ServiceName=vefaas&policy1_1=APIGFullAccess&policy1_2=VeFaaSFullAccess&role1=ServerlessApplicationRole, then retry. - -""") -def create_vefaas_application(function_id: str, function_name: str, gateway_name: str, region: Optional[str] = None): - now = datetime.datetime.utcnow() - try: - ak, sk, token = get_authorization_credentials(mcp.get_context()) - except ValueError as e: - raise ValueError(f"Authorization failed: {str(e)}") - - # check apig trigger whether exist - try: - triggers = list_function_triggers(function_id, region).get("Result", {}).get("Items", []) - if any(trigger.get("Type") == "apig" for trigger in triggers): - return f"APIGateway trigger already exists for function {function_name}, skip create application" - except Exception as e: - raise ValueError(f"Failed to list function triggers: {str(e)}") - - region = validate_and_set_region(region) - - applicationName = (append_random_suffix(function_name, 3) + "-app").lower() - - body = { - "Name": applicationName, - "Config": { - "FunctionName": function_name, - "GatewayName": gateway_name, - "Region": region, - }, - "TemplateId": TemplateIdForRegion.get(region, "68d24592162cb40008217d6f"), - } + # We allow it but log a warning, just in case a new region is added + logger.warning(f"Region '{region}' is not in the known valid regions list: {valid_regions}") + return region + # Try to detect from local config try: - response_body = request("POST", now, {}, {}, ak, sk, token, "CreateApplication", json.dumps(body), region) - except Exception as e: - raise ValueError(f"Failed to create application: {str(e)}") - - result = {} - if isinstance(response_body, dict): - result = response_body.get("Result") or {} - application_id = result.get("Id") - if not application_id: - raise ValueError(f"Failed to determine application ID from create response. response_body: {response_body}") - - release_body = {"Id": application_id} - try: - release_response = request("POST", datetime.datetime.utcnow(), {}, {}, ak, sk, token, "ReleaseApplication", json.dumps(release_body), region) - except Exception as e: - raise ValueError(f"Failed to release application: {str(e)}") - - payload = { - "application_id": application_id, - "create_application": response_body, - "release_application": release_response, - } - return json.dumps(payload, ensure_ascii=False, indent=2) - -@mcp.tool(description="""Get veFaaS Application deployment status. - -Args: - - application_id: vefaas application_id. - -Note: - - Application deployment status is independent of function releases. - - Call after `create_vefaas_application` (which auto-submits release) to monitor progress. - - When it finishes, **MUST** report application_id, region, status, access_url, and app_platform_url derived from the response. - -**CRITICAL REQUIREMENT**: - - Do not use alternative methods to check application deployment status—only this tool. - - Poll immediately after `create_vefaas_application` returns (release is auto-submitted) and stop once you see `deploy_success` or `deploy_fail`, with at most three attempts. - - If it is a **streamlit type** application (will contain "streamlit" in the function_name or application_name), **MUST** update the `access_url` to function envs and release function again after application deployment finished. - - Detail step: - - 1. Use tool `update_function`, add environment STREAMLIT_BROWSER_SERVER_ADDRESS=`access_url` to param envs. - - 2. Call tool `release_function` and `poll_function_release_status` to check if the streamlit application is redeployed successfully. - -""") -def poll_vefaas_application_status(application_id: str, region: Optional[str] = None): - region = validate_and_set_region(region) - now = datetime.datetime.utcnow() - try: - ak, sk, token = get_authorization_credentials(mcp.get_context()) - except ValueError as e: - raise ValueError(f"Authorization failed: {str(e)}") - - body = { - "Id": application_id, - } - timeout = 120 - polling_interval = 5 - start_time = time.time() - - while time.time() - start_time < timeout: - try: - response = request("POST", now, {}, {}, ak, sk, token, "GetApplication", json.dumps(body), region) - if response["Result"] is not None: - result = response["Result"] - status = result.get("Status") - except Exception as e: - status = None - - if status == "deploying" or status == None: - time.sleep(polling_interval) - continue - else: - break - - errLogs: list[str] = [] - hasAuthError = False - if status == "deploy_fail": - try: - revision_number = result.get("NewRevisionNumber") - if revision_number: - logQueryBody = { - "Id": application_id, - "Limit": 99999, - "RevisionNumber": revision_number, - } - logResponse = request("POST", now, {}, {}, ak, sk, token, "GetApplicationRevisionLog", json.dumps(logQueryBody), region) - log_result = logResponse.get("Result") - if log_result: - logLines = log_result.get("LogLines", []) - for logLine in logLines: - if "warn" in logLine.lower() or "error" in logLine.lower() or "fail" in logLine.lower(): - errLogs.append(logLine) - if "not authorized" in logLine.lower() or "cannot get sts token" in logLine.lower(): - errLogs.append(logLine) - hasAuthError = True - if hasAuthError: - errLogs.append("Failed to release application due to an authentication error. Please visit https://console.volcengine.com/iam/service/attach_custom_role?ServiceName=vefaas&policy1_1=APIGFullAccess&policy1_2=VeFaaSFullAccess&role1=ServerlessApplicationRole to grant the required permissions and then try again.") - except Exception as e: - logger.error(f"Failed to get application log: {str(e)}") - - # get system_url - system_url = "" - try: - cloud_resource = json.loads(result["CloudResource"]) - system_url = cloud_resource['framework']['url']['system_url'] - except Exception as e: - logger.error(f"Failed to get system_url: {str(e)}") - - responseInfo = { - "Id": result["Id"], - "Name": result["Name"], - "Status": result["Status"], - "Config": result["Config"], - "Region": result["Region"], - "AccessUrl": system_url, - "AppPlatformUrl": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/application/detail/{application_id}?tab=detail", - "NewRevisionNumber": result.get("NewRevisionNumber"), - } - if len(errLogs) > 0: - responseInfo["DeployFailedLogs"] = errLogs - - return responseInfo - - -@mcp.tool(description="""Create a veFaaS function. - -Args: - - name: function name (unique, with a 6 length lowercase random string). - - runtime: function runtime. - - command: function startup script (./run.sh by default). - - region: function region. (`cn-beijing` by default) - -Note: - - runtime must be `native-python3.12/v1`, `native-node20/v1`, or `native/v1` (defaults to `native-python3.12/v1` if omitted). - - command must be a runnable script (default `./run.sh`). - - region defaults to `cn-beijing` and must be one of `cn-beijing`, `cn-shanghai`, `cn-guangzhou`, `ap-southeast-1`. - - `enable_vpc=true` requires `vpc_id`, `subnet_ids`, and `security_group_ids`. - - Startup scripts must invoke tooling via `python -m ` (e.g., `python -m uvicorn ...`); direct CLI binaries such as `uvicorn` or `gunicorn` are not on PATH—apply the same rule for any missing CLI: use `python -m ` or launch it from code. - - When bootstrapping a new workload (e.g., React service), review official templates via `list_vefaas_application_templates` and `get_vefaas_application_template` first so code and configs inherit veFaaS conventions. - - Review the `upload_code` checklist before preparing artifacts or startup scripts. - - After work completes, surface function_id, name, region, runtime, and platform link if available from context. - - veFaaS platform link template: https://console.volcengine.com/vefaas/region:vefaas+`region`/function/detail/`function_id`?tab=config - -Error Handle Tips: - - If there is **any authentication** error about vefaas function(like create/release/get), let user to apply auth by this link https://console.volcengine.com/iam/service/attach_role/?ServiceName=vefaas, then retry. - -**CRITICAL REQUIREMENT**: - - If `vefaas.yaml` already holds a valid `function_id`, reuse it and skip this tool. - - On success, write `function_id`, `name`, `region`, `runtime`, `command` to `vefaas.yaml`, vefaas.yaml should be created in the **project root directory**. - - Then execute in order: - - Step 0 (for new services without an existing template choice): call `list_vefaas_application_templates`, pick a template, and pull its source via `get_vefaas_application_template` to guide code changes before uploading. - - Step 1: Run `upload_code` (per its checklist; required for TOS sources). - - Step 2: Call `release_function` once upload completes when release is needed. - - Step 3: Fetch a running API gateway. - - Step 4: Create the veFaaS application (this tool auto-releases). Only if creation succeeds, poll deployment status via `poll_vefaas_application_status`; otherwise surface the failure. - -""") -def create_function(name: str = None, region: str = None, runtime: str = None, command: str = None, source: str = None, - image: str = None, envs: dict = None, description: str = None, enable_vpc = False, - vpc_id: str = None, subnet_ids: List[str] = None, security_group_ids: List[str] = None,) -> str: - # Validate region - region = validate_and_set_region(region) - - api_instance = init_client(region, mcp.get_context()) - if enable_vpc and (not vpc_id or not subnet_ids or not security_group_ids): - raise ValueError("vpc_id or subnet_ids and security_group_ids must be provided.") - - def build_create_request(current_name: str) -> volcenginesdkvefaas.CreateFunctionRequest: - request_obj = volcenginesdkvefaas.CreateFunctionRequest( - name=current_name, - runtime=runtime if runtime else "native-python3.12/v1", - ) - - if image: - request_obj.source = image - request_obj.source_type = "image" - - if command: - request_obj.command = command - - if source: - if ":" not in source: - source_type = "zip" - elif source.count(":") == 1 and "/" not in source: - source_type = "tos" - elif "/" in source and ":" in source: - source_type = "image" + from .vefaas_cli_sdk.config import read_config + config = read_config(os.getcwd()) + if config and config.function.region: + detected_region = config.function.region + if detected_region not in valid_regions: + logger.warning(f"Auto-detected region '{detected_region}' from config is not in the known list: {valid_regions}") else: - source_type = None - - request_obj.source = source - if source_type: - request_obj.source_type = source_type - - if envs: - env_list = [{"key": key, "value": value} for key, value in envs.items()] - request_obj.envs = env_list - - if enable_vpc: - vpc_config = volcenginesdkvefaas.VpcConfigForUpdateFunctionInput( - enable_vpc=True, vpc_id=vpc_id, subnet_ids=subnet_ids, security_group_ids=security_group_ids, - ) - request_obj.vpc_config = vpc_config - - if description: - request_obj.description = description - - return request_obj - - base_name = name if name else generate_random_name() - current_name = base_name - used_names = {current_name} - max_attempts = 5 - attempt = 0 - - while attempt < max_attempts: - request_obj = build_create_request(current_name) - try: - response = api_instance.create_function(request_obj) - return f"Successfully created veFaaS function with name {current_name} and id {response.id}" - except ApiException as e: - if "need to create a service-linked role for vefaas" in str(e).lower() or "no auth" in str(e).lower() or "not authorized" in str(e).lower(): - raise ValueError("You need to create a service-linked role for veFaaS. Please visit https://console.volcengine.com/iam/service/attach_role/?ServiceName=vefaas to grant the required permissions and then try again.") - if is_name_conflict_error(e): - attempt += 1 - next_name = append_random_suffix(base_name) - while next_name in used_names: - next_name = append_random_suffix(base_name) - used_names.add(next_name) - logger.info( - "Function name '%s' already exists. Retrying with '%s' (attempt %s/%s)", - current_name, - next_name, - attempt, - max_attempts, - ) - current_name = next_name - continue - - error_message = f"Failed to create veFaaS function: {str(e)}" - raise ValueError(error_message) + logger.info(f"Auto-detected region from config: {detected_region}") + return detected_region + except Exception as e: + logger.debug(f"Failed to auto-detect region from config: {e}") - raise ValueError("Failed to create veFaaS function: exhausted name retries due to conflicts.") + # Default + return "cn-beijing" def append_random_suffix(name: str, length: int = 6) -> str: @@ -383,144 +94,294 @@ def is_name_conflict_error(exception: ApiException) -> bool: return False -@mcp.tool(description="""Update a veFaaS function's referenced artifact or runtime settings. -Args: -- function_id: ID of the function to update. -- source: Optional new artifact to use (base64 zip, TOS object, container image). -- region: Optional region to update the function in (supports 'ap-southeast-1', 'cn-beijing', 'cn-shanghai', 'cn-guangzhou'). -- command: Optional new command to run. -- envs: Optional new environment variables as key-value pairs. -- enable_vpc: Optional flag to enable VPC networking. -- vpc_id: Optional VPC ID if VPC is enabled. -- subnet_ids: Optional list of subnet IDs if VPC is enabled. -- security_group_ids: Optional list of security group IDs if VPC is enabled. +@mcp.tool(description="""Update veFaaS function code and configuration. -Note: -- Use to swap in an existing artifact (base64 zip/TOS/image) or update command/env/VPC fields; for fresh local edits prefer `upload_code`. -- When passing `source`, ensure the artifact already exists and matches the inferred source_type (zip/tos/image). -- For VPC updates set `enable_vpc=true` and include `vpc_id`, `subnet_ids`, and `security_group_ids`. +**Use Cases**: +- Upload local code changes to online function +- Update function command, environment variables, etc. +- Sync code after local development + +**Parameters**: +- function_id: Function ID (required) +- region: Region (default cn-beijing) +- project_path: Local project path (required for code update, absolute path) +- command: Startup command (optional) +- envs: Environment variables dict (optional) + +**File Filtering**: +- Uses `.vefaasignore` file in project root (gitignore syntax) +- Auto-creates default `.vefaasignore` if not exists +**Workflow**: +1. If project_path provided, zip and upload code (respecting .vefaasignore) +2. If command/envs provided, update function config +3. Return upload/update result + +**Note**: +- After updating code, call release_function to publish changes +- release_function will auto-handle dependency installation and release """) -def update_function(function_id: str, source: str = None, region: str = None, command: str = None, - envs: dict = None, enable_vpc = False, vpc_id: str = None, subnet_ids: List[str] = None, - security_group_ids: List[str] = None,): +def update_function(function_id: str, region: Optional[str] = None, + project_path: Optional[str] = None, + command: Optional[str] = None, + envs: Optional[dict] = None): region = validate_and_set_region(region) - api_instance = init_client(region, mcp.get_context()) - update_request = volcenginesdkvefaas.UpdateFunctionRequest( - id=function_id, + result = {"function_id": function_id, "region": region} + + # Upload code if project_path is provided + if project_path: + if not os.path.isabs(project_path): + raise ValueError(f"project_path must be an absolute path, got: {project_path}") + if not os.path.exists(project_path): + raise ValueError(f"project_path does not exist: {project_path}") + + try: + ak, sk, token = get_authorization_credentials(mcp.get_context()) + except ValueError as e: + raise ValueError(f"Authorization failed: {str(e)}") + + # Zip code using .vefaasignore + data, size, error = zip_and_encode_folder(project_path) + if error: + raise ValueError(f"Error zipping folder: {error}") + if not data or size == 0: + raise ValueError("Zipped folder is empty, nothing to upload") + + # Upload code + upload_code_zip_for_function( + api_instance=api_instance, + function_id=function_id, + code_zip_size=size, + zip_bytes=data, + ak=ak, + sk=sk, + token=token, + region=region, ) + result["code_uploaded"] = True + # Use KB for small files, MB for larger files + if size < 1024 * 1024: + result["upload_size"] = f"{round(size / 1024, 1)} KB" + else: + result["upload_size"] = f"{round(size / 1024 / 1024, 2)} MB" + logger.info(f"Code uploaded successfully, size: {result['upload_size']}") - source_type = None - - if source: - # Determine source type based on the format - if ":" not in source: - # If no colon, assume it's a base64 encoded zip - source_type = "zip" - elif source.count(":") == 1 and "/" not in source: - # Format: bucket_name:object_key - source_type = "tos" - elif "/" in source and ":" in source: - # Format: host/namespace/repo:tag - source_type = "image" - # else: - # raise ValueError( - # "Invalid source format. Must be one of: base64 zip, bucket_name:object_key, or host/namespace/repo:tag" - # ) - - update_request.source = source - update_request.source_type = source_type - - if command != "": + # Update function config (command, envs) + update_request = volcenginesdkvefaas.UpdateFunctionRequest(id=function_id) + has_config_update = False + + if command is not None and command != "": update_request.command = command + has_config_update = True + result["command_updated"] = command if envs: - env_list = [] - for key, value in envs.items(): - env_list.append({ - "key": key, - "value": value - }) + env_list = [{"key": key, "value": value} for key, value in envs.items()] update_request.envs = env_list + has_config_update = True + result["envs_updated"] = list(envs.keys()) - if enable_vpc: - if not vpc_id or not subnet_ids or not security_group_ids: - raise ValueError("vpc_id or subnet_ids and security_group_ids must be provided.") - vpc_config = volcenginesdkvefaas.VpcConfigForUpdateFunctionInput( - enable_vpc=True, vpc_id=vpc_id, subnet_ids=subnet_ids, security_group_ids=security_group_ids, - ) - update_request.vpc_config = vpc_config + if has_config_update: + try: + api_instance.update_function(update_request) + result["config_updated"] = True + except ApiException as e: + raise ValueError(f"Failed to update function config: {str(e)}") - try: - response = api_instance.update_function(update_request) - return f"Successfully updated function {function_id} with source type {source_type}" - except ApiException as e: - error_message = f"Failed to update veFaaS function: {str(e)}" - raise ValueError(error_message) + result["next_step"] = "Call release_function to publish changes (it will handle dependency installation automatically)" + result["platform_url"] = f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}" -@mcp.tool(description="""Release(Deploy) the latest code/configs to a veFaaS Function. + return json.dumps(result, ensure_ascii=False, indent=2) -Args: -- function_id: ID of the function to release. -- region: The region of the veFaaS function. -Note: - - Submits the release job; the function is not live until polling reports success. +@mcp.tool(description="""Release (deploy) a veFaaS function to production. + +**Use Cases**: +- Publish code changes after update_function +- Deploy new version to production -**CRITICAL REQUIREMENT**: -- Use only when new code or config is ready to publish. If code changed, wait for `upload_code` (including dependency install tasks) to finish; config-only changes can proceed immediately. -- After submission, call `poll_function_release_status` until it returns Succeeded/Failed, and invoke that poll tool no more than three times. +**Parameters**: +- function_id: Function ID (required) +- region: Region (default cn-beijing) +- skip_dependency: Skip dependency installation step (default False). Use when dependencies are already installed. +**Workflow**: +1. Trigger dependency installation (if requirements.txt/package.json exists, unless skip_dependency=True) +2. Wait for dependency installation to complete +3. Submit release request +4. Poll release status until succeeded/failed +5. Return final status with access link and revision info + +**Returns**: +- release_status: succeeded/failed +- stable_revision_number: Current stable revision number +- new_revision_number: New revision number after release +- access_link: Function access URL +- platform_url: Console link +- error_message: Error details (if failed) """) -def release_function(function_id: str, region: str = None): +def release_function(function_id: str, region: Optional[str] = None, skip_dependency: bool = False) -> str: region = validate_and_set_region(region) - api_instance = init_client(region, mcp.get_context()) try: - logger.info("Release uses the last artifact uploaded via upload_code/update_function; ensure that step has completed successfully before calling release.") + ak, sk, token = get_authorization_credentials(mcp.get_context()) + except ValueError as e: + raise ValueError(f"Authorization failed: {str(e)}") + + result = {"function_id": function_id, "region": region} + + # Early check: if release is already in progress, return immediately with guidance + try: + req = volcenginesdkvefaas.GetReleaseStatusRequest(function_id=function_id) + current_status = api_instance.get_release_status(req) + if current_status.status == "inprogress": + return json.dumps({ + "function_id": function_id, + "region": region, + "release_status": "inprogress", + "next_action": "Release is already in progress. Wait for completion, or call get_function_detail to check status.", + "platform_url": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}" + }, ensure_ascii=False, indent=2) + except Exception: + pass # If status check fails, proceed with release + + # Step 1: Trigger dependency installation (unless skipped) + if skip_dependency: + logger.info("Skipping dependency installation as requested.") + result["dependency_triggered"] = False + result["dependency_status"] = "skipped" + else: + logger.info("Checking if dependency installation is needed...") + try: + dep_body = {"FunctionId": function_id} + now = datetime.datetime.utcnow() + dep_resp = request( + "POST", now, {}, {}, ak, sk, token, "CreateDependencyInstallTask", json.dumps(dep_body), region + ) + logger.info("Dependency install task created, waiting for completion...") + result["dependency_triggered"] = True + except Exception as e: + # Dependency install may fail if no requirements.txt/package.json, that's OK + logger.info(f"Dependency install skipped or failed: {str(e)}") + result["dependency_triggered"] = False + + # Step 2: Wait for dependency installation to complete + if result.get("dependency_triggered"): + timeout_seconds = 120 + poll_interval_seconds = 5 + start_time = time.time() + dep_status = None + + while time.time() - start_time < timeout_seconds: + try: + now = datetime.datetime.utcnow() + status_resp = request( + "POST", now, {}, {}, ak, sk, token, "GetDependencyInstallTaskStatus", + json.dumps({"FunctionId": function_id}), region, 5 + ) + dep_status = status_resp.get("Result", {}).get("Status") + + if dep_status == "InProgress" or dep_status is None: + time.sleep(poll_interval_seconds) + continue + else: + break + except Exception as ex: + logger.warning(f"Failed to get dependency status: {ex}") + break + + if dep_status == "Failed": + # Try to get log for debugging + try: + now = datetime.datetime.utcnow() + log_resp = request( + "POST", now, {}, {}, ak, sk, token, + "GetDependencyInstallTaskLogDownloadURI", + json.dumps({"FunctionId": function_id}), region, 5 + ) + log_url = log_resp.get("Result", {}).get("DownloadURL", "") + result["dependency_status"] = "failed" + result["dependency_log_url"] = log_url + raise ValueError(f"Dependency installation failed. Check logs: {log_url}") + except ValueError: + raise + except Exception: + raise ValueError("Dependency installation failed") + + result["dependency_status"] = "succeeded" if dep_status == "Succeeded" else dep_status + + # Step 3: Submit release request + logger.info("Submitting release request...") + try: req = volcenginesdkvefaas.ReleaseRequest( function_id=function_id, revision_number=0 ) - response = api_instance.release(req) - return ( - "Release request submitted for function " - f"{function_id}. Poll 'poll_function_release_status' until it reports Succeeded/Failed." - ) + api_instance.release(req) + logger.info("Release request submitted, polling status...") except ApiException as e: - error_message = f"Failed to release veFaaS function: {str(e)}" - raise ValueError(error_message) + raise ValueError(f"Failed to submit release: {str(e)}") -@mcp.tool(description="""Delete a veFaaS function. + # Step 4: Poll release status + timeout = 120 + interval = 5 + start_time = time.time() + release_status = None + status_message = "" -Args: -- function_id: ID of the function to delete. -- region: The region of the veFaaS function. + while time.time() - start_time < timeout: + try: + req = volcenginesdkvefaas.GetReleaseStatusRequest(function_id=function_id) + response = api_instance.get_release_status(req) + release_status = response.status + status_message = response.status_message or "" -Note: - - Use this when asked to delete, remove, or uninstall a veFaaS function. + if release_status == "inprogress": + time.sleep(interval) + else: + break + except Exception as e: + logger.warning(f"Failed to get release status: {e}") + break -""") -def delete_function(function_id: str, region: str = None): - region = validate_and_set_region(region) + # Build final result + result["release_status"] = release_status or "unknown" + if status_message: + result["status_message"] = status_message - api_instance = init_client(region, mcp.get_context()) + result["platform_url"] = f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}" + # Get revision info from final status try: - req = volcenginesdkvefaas.DeleteFunctionRequest( - id=function_id - ) - response = api_instance.delete_function(req) - return f"Successfully deleted function {function_id}" - except ApiException as e: - error_message = f"Failed to delete veFaaS function: {str(e)}" - raise ValueError(error_message) + req = volcenginesdkvefaas.GetReleaseStatusRequest(function_id=function_id) + final_status = api_instance.get_release_status(req) + if getattr(final_status, 'stable_revision_number', None) is not None: + result["stable_revision_number"] = final_status.stable_revision_number + if getattr(final_status, 'new_revision_number', None) is not None: + result["new_revision_number"] = final_status.new_revision_number + except Exception: + pass + + # Get access link + try: + access_link = get_function_access_link(function_id, region) + if access_link: + result["access_link"] = access_link + except Exception: + pass + + if release_status == "failed": + result["error_message"] = status_message + raise ValueError(json.dumps(result, ensure_ascii=False, indent=2)) + + return json.dumps(result, ensure_ascii=False, indent=2) + -def get_function_release_status(function_id: str, region: str = None): +def get_function_release_status(function_id: str, region: Optional[str] = None): region = validate_and_set_region(region) api_instance = init_client(region, mcp.get_context()) @@ -533,21 +394,7 @@ def get_function_release_status(function_id: str, region: str = None): return response -@mcp.tool(description="""Check veFaaS function release status. - -Args: -- function_id: ID of the function to check release status. -- region: The region of the veFaaS function. - -Note: -- If failed: inspect status/errors, resolve, then rerun 'upload_code' -> 'release_function' procedure once fixes are in place. - - A frequent error is `bash: uvicorn: command not found`; switch startup scripts to `python -m uvicorn main:app --host 0.0.0.0 --port 8000` (or launch the server in code) per the `upload_code` guidance—apply the same rule for any missing CLI. - -**CRITICAL REQUIREMENT**: - - Can **only** use this tool to check vefaas function release status, **NEVER** try to get release status by other ways. - - When it finishes, **MUST** report: function_id, region, release_status, vefaas_function_access_link and vefaas_function_platform_url derived from the response. -""") -def poll_function_release_status(function_id: str, region: str = None): +def poll_function_release_status(function_id: str, region: Optional[str] = None): region = validate_and_set_region(region) api_instance = init_client(region, mcp.get_context()) @@ -574,12 +421,13 @@ def poll_function_release_status(function_id: str, region: str = None): "new_revision_number": response.new_revision_number or -1, "old_revision_number": response.old_revision_number or -1, "start_time": response.start_time or '', - "vefaas_function_platform_url": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}?tab=config", + "vefaas_function_platform_url": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}", "vefaas_function_access_link": get_function_access_link(function_id, region), } return responseInfo -def get_function_access_link(function_id: str, region: str = None): + +def get_function_access_link(function_id: str, region: Optional[str] = None): region = validate_and_set_region(region) triggers = list_function_triggers(function_id, region).get("Result", {}).get("Items", []) @@ -660,7 +508,7 @@ def init_client(region: str = None, ctx: Context = None): configuration.session_token = session_token # Set region with default if needed - region = region if region is not None else "cn-beijing" + region = validate_and_set_region(region) logger.info("Using region: %s", region) configuration.region = region @@ -668,6 +516,7 @@ def init_client(region: str = None, ctx: Context = None): volcenginesdkcore.Configuration.set_default(configuration) return volcenginesdkvefaas.VEFAASApi() + def list_existing_api_gateways(region: str = None): now = datetime.datetime.utcnow() @@ -692,7 +541,8 @@ def list_existing_api_gateways(region: str = None): except Exception as e: return f"Failed to list API Gateways: {str(e)}" -def create_api_gateway(name: str = None, region: str = "cn-beijing"): + +def create_api_gateway(name: str = None, region: Optional[str] = None): """ Creates a new VeApig gateway. @@ -730,55 +580,6 @@ def create_api_gateway(name: str = None, region: str = "cn-beijing"): except Exception as e: return f"Failed to create VeApig gateway with name {gateway_name}: {str(e)}" -@mcp.tool(description="""Fetch a running API Gateway ID. - -Args: - - region: The region to fetch the gateway for. - -Note: - - Returns a running API gateway to feed into `create_vefaas_application`; creates one and waits if none are ready. - - On failure, retry up to three times before surfacing the error. - - Use the returned gateway's `Name` directly when calling `create_vefaas_application`, and expect new gateways to take a few minutes to reach `Running`. -""") - -def fetch_running_api_gateway(region: str = None): - region = validate_and_set_region(region) - - try: - existing_gateways = list_existing_api_gateways(region) - running_gateways = [gw for gw in existing_gateways if gw["Status"] == "Running"] - if len(running_gateways) > 0: - return random.choice(running_gateways) - - timeout = 180 - interval = 5 - start_time = datetime.datetime.utcnow() - create_api_gateway_failed_times = 0 - while (datetime.datetime.utcnow() - start_time).total_seconds() < timeout: - existing_gateways = list_existing_api_gateways(region) - running_gateways = [gw for gw in existing_gateways if gw["Status"] == "Running"] - if len(running_gateways) > 0: - return random.choice(running_gateways) - - pending_gateways = [gw for gw in existing_gateways if gw["Status"] == "Creating"] - if len(pending_gateways) > 0: - logger.info(f"Waiting for gateway to be running: {pending_gateways}") - time.sleep(interval) - continue - - try: - create_api_gateway(region=region) - time.sleep(interval) - except Exception as e: - logger.error(f"Failed to create API Gateway: {str(e)}") - create_api_gateway_failed_times += 1 - if create_api_gateway_failed_times >= 3: - raise Exception(f"Failed to create API Gateway after {create_api_gateway_failed_times} times") - time.sleep(interval) - except Exception as e: - raise Exception(f"Failed to fetch an running API Gateway: {str(e)}") - - raise Exception(f"Failed to fetch an running API Gateway after {timeout} seconds") def ensure_executable_permissions(folder_path: str): for root, _, files in os.walk(folder_path): @@ -787,156 +588,27 @@ def ensure_executable_permissions(folder_path: str): if fname.endswith('.sh') or fname in ('run.sh',): os.chmod(full_path, 0o755) -def zip_and_encode_folder(folder_path: str, local_folder_exclude: List[str]) -> Tuple[bytes, int, Exception]: + +def zip_and_encode_folder(folder_path: str) -> Tuple[bytes, int, Exception]: """ - Zips a folder with system zip command (if available) or falls back to Python implementation. + Zips a folder using .vefaasignore patterns for filtering. + Delegates to cli_sdk.deploy.package_directory. + Returns (zip_data, size_in_bytes, error) tuple. """ - # Check for system zip first - if not shutil.which('zip'): - logger.info("System zip command not found, using Python implementation") - try: - data = python_zip_implementation(folder_path, local_folder_exclude) - return data, len(data), None - except Exception as e: - return None, 0, e - logger.info("Zipping folder: %s", folder_path) try: - ensure_executable_permissions(folder_path) - # Base zip command - cmd = ['zip', '-r', '-q', '-', '.', '-x', '*.git*', '-x', '*.venv*', '-x', '*__pycache__*', '-x', '*.pyc'] - - # Append user-specified exclude patterns - if local_folder_exclude: - for pattern in local_folder_exclude: - cmd.extend(['-x', pattern]) - logger.debug("Zip command: %s", cmd) - - # Create zip process with explicit arguments - proc = subprocess.Popen( - cmd, - cwd=folder_path, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=1024 * 8 # 8KB buffer - ) - - # Collect output with proper error handling - try: - stdout, stderr = proc.communicate(timeout=30) - if proc.returncode != 0: - logger.error("Zip error: %s", stderr.decode()) - data = python_zip_implementation(folder_path, local_folder_exclude) - return data, len(data), None - - if stdout: - size = len(stdout) - logger.info("Zip finished, size: %.2f MB", size / 1024 / 1024) - return stdout, size, None - else: - logger.warning("zip produced no data; falling back to Python implementation") - data = python_zip_implementation(folder_path, local_folder_exclude) - return data, len(data), None - - except subprocess.TimeoutExpired: - proc.kill() - proc.wait(timeout=5) # Give it 5 seconds to cleanup - logger.warning("zip process timed out; falling back to Python implementation") - try: - data = python_zip_implementation(folder_path, local_folder_exclude) - return data, len(data), None - except Exception as e: - return None, 0, e - + # Use package_directory with include_gitignore=False (function code upload) + data = package_directory(folder_path, include_gitignore=False) + size = len(data) + logger.info("Zip finished, size: %.2f MB", size / 1024 / 1024) + return data, size, None except Exception as e: - logger.error("System zip error: %s", str(e)) - try: - data = python_zip_implementation(folder_path, local_folder_exclude) - return data, len(data), None - except Exception as e2: - return None, 0, e2 - -def python_zip_implementation(folder_path: str, local_folder_exclude: List[str] = None) -> bytes: - """Pure Python zip implementation with permissions support""" - buffer = BytesIO() - - with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: - for root, dirs, files in os.walk(folder_path): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, folder_path) - - # Skip excluded paths and binary/cache files - if any(excl in arcname for excl in ['.git', '.venv', '__pycache__', '.pyc']): - continue - if local_folder_exclude and any(fnmatch.fnmatch(arcname, pattern) for pattern in local_folder_exclude): - continue + logger.error("Zip error: %s", str(e)) + return None, 0, e - try: - - st = os.stat(file_path) - dt = datetime.datetime.fromtimestamp(st.st_mtime) - date_time = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) - - info = zipfile.ZipInfo(arcname) - info.external_attr = (0o755 << 16) # rwxr-xr-x - info.date_time = date_time - - with open(file_path, 'rb') as f: - zipf.writestr(info, f.read()) - except Exception as e: - logger.warning("Skipping file %s due to error: %s", arcname, str(e)) - - logger.info("Python zip finished, size: %.2f MB", buffer.tell() / 1024 / 1024) - return buffer.getvalue() - -def _get_upload_code_description() -> str: - """Generate a concise, dynamic description for the `upload_code` tool.""" - base_desc = ( - "Upload function code to TOS.\n\n" - "Args:\n" - " - function_id: The ID of the function to upload code for.\n" - " - region: The region of the function.\n" - " - local_folder_path: The path to the local folder containing the code to upload.\n" - " - local_folder_exclude: Optional list of patterns to exclude from the upload (e.g., ['.venv', 'node_modules', '.git', '*.pyc']).\n" - " - file_dict: {filename -> content}\n\n" - - "Returns:\n" - "- 'code_upload_callback'\n" - "- 'dependency': {dependency_task_created, should_check_dependency_status, skip_reason?}\n\n" - - "**Code & Runtime Checklist (follow before uploading):**\n" - " - Provide an executable startup script that launches the service; skip compile or dependency install commands.\n" - " - Pre-build Linux-compatible binaries for compiled languages and invoke them directly from the startup script.\n" - " - Python/Node dependencies belong in 'requirements.txt' or 'package.json'; never ship virtualenvs or 'node_modules'.\n" - " - HTTP servers must bind to 0.0.0.0:8000 and include required templates/static assets in the package.\n" - " - CLI tooling is not on PATH—call Python modules with 'python -m ' (e.g., 'python -m uvicorn main:app --host 0.0.0.0 --port 8000') or start the server directly in code; apply the same rule for any missing CLI.\n" - " - Exclude local build artifacts and dependency folders (e.g., '.venv', 'site-packages', 'node_modules', '.git') via 'local_folder_exclude'.\n\n" - ) - - # Detect run mode via FASTMCP_* environment variables. - is_network_transport = os.getenv("FASTMCP_STATELESS_HTTP") == "true" or os.getenv("FASTMCP_HOST") or os.getenv("FASTMCP_PORT") - if is_network_transport: - note = ( - "Note: Running over network transport; local file system is not accessible.\n" - " - Use 'file_dict'; 'local_folder_path' is ignored.\n\n" - ) - else: - note = ( - "Note: Running locally via STDIO; 'local_folder_path' is recommended.\n\n" - ) - - tail = ( - "After upload: dependency install (if any) runs asynchronously; if triggered, you MUST call 'poll_dependency_install_task_status' to poll until Succeeded/Failed." - ) - - return base_desc + note + tail - -@mcp.tool(description=_get_upload_code_description()) def upload_code(function_id: str, region: Optional[str] = None, local_folder_path: Optional[str] = None, - local_folder_exclude: Optional[List[str]] = None, file_dict: Optional[dict[str, Union[str, bytes]]] = None) -> str: region = validate_and_set_region(region) @@ -948,7 +620,7 @@ def upload_code(function_id: str, region: Optional[str] = None, local_folder_pat raise ValueError(f"Authorization failed: {str(e)}") if local_folder_path: - data, size, error = zip_and_encode_folder(local_folder_path, local_folder_exclude) + data, size, error = zip_and_encode_folder(local_folder_path) if error: raise ValueError(f"Error zipping folder: {error}") if not data or size == 0: @@ -988,6 +660,7 @@ def upload_code(function_id: str, region: Optional[str] = None, local_folder_pat } return json.dumps(result, ensure_ascii=False, indent=2) + def handle_dependency( api_instance: VEFAASApi, function_id: str, @@ -1014,18 +687,18 @@ def handle_dependency( is_nodejs = 'node' in runtime has_requirements = ( - (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "requirements.txt"))) - or (file_dict is not None and "requirements.txt" in file_dict) + (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "requirements.txt"))) + or (file_dict is not None and "requirements.txt" in file_dict) ) has_package_json = ( - (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "package.json"))) - or (file_dict is not None and "package.json" in file_dict) + (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "package.json"))) + or (file_dict is not None and "package.json" in file_dict) ) has_node_modules = ( - (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "node_modules"))) - or (file_dict is not None and "node_modules" in file_dict) + (local_folder_path is not None and os.path.exists(os.path.join(local_folder_path, "node_modules"))) + or (file_dict is not None and "node_modules" in file_dict) ) # Minimal decision surface for the agent @@ -1058,20 +731,11 @@ def handle_dependency( # Keep behavior consistent with previous implementation: surface as an error raise ValueError(f"Error creating dependency install task: {str(e)}") -@mcp.tool(description="""Check dependency install task status (paired with 'upload_code'). -Args: -- function_id: ID of the veFaaS function whose dependency task you are checking. -- region: Region of the function (defaults to `cn-beijing` when omitted). - -Note: - - Call only after `upload_code` reports that a dependency install task was created. - - If status is `Failed`, download the provided log URL, fix issues (dependency specs, etc.), then rerun `upload_code`. -""") def poll_dependency_install_task_status( function_id: str, region: Optional[str] = None, - ): +): region = validate_and_set_region(region) try: ak, sk, token = get_authorization_credentials(mcp.get_context()) @@ -1109,7 +773,7 @@ def poll_dependency_install_task_status( result["log_download_url"] = url try: - resp = requests.get(url, timeout=30) + resp = requests.get(url, timeout=30) # noqa: security result["log_content"] = resp.text except Exception as ex: result["log_content_error"] = str(ex) @@ -1117,6 +781,7 @@ def poll_dependency_install_task_status( result["log_download_error"] = str(ex) return result + def upload_code_zip_for_function(api_instance: VEFAASApi(object), function_id: str, code_zip_size: int, zip_bytes, ak: str, sk: str, token: str, region: str,) -> bytes: req = volcenginesdkvefaas.GetCodeUploadAddressRequest( @@ -1131,7 +796,7 @@ def upload_code_zip_for_function(api_instance: VEFAASApi(object), function_id: s "Content-Type": "application/zip", } - response = requests.put(url=upload_url, data=zip_bytes, headers=headers) + response = requests.put(url=upload_url, data=zip_bytes, headers=headers) # noqa: security if 200 <= response.status_code < 300: logger.info("Upload successful. Size: %.2f MB", code_zip_size / 1024 / 1024) else: @@ -1154,6 +819,7 @@ def upload_code_zip_for_function(api_instance: VEFAASApi(object), function_id: s error_message = f"Error creating upstream: {str(e)}" raise ValueError(error_message) + def build_zip_bytes_for_file_dict(file_dict): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zip_file: @@ -1171,6 +837,8 @@ def build_zip_bytes_for_file_dict(file_dict): # - function_id (required): the ID of the function # - region (optional): deployment region, defaults to cn-beijing # - revision_number (optional): specific revision number to query. If not provided, defaults to version 0. + + def get_function_revision(function_id: str, region: Optional[str] = None, revision_number: Optional[int] = 0): region = validate_and_set_region(region) @@ -1190,12 +858,27 @@ def get_function_revision(function_id: str, region: Optional[str] = None, revisi raise ValueError(f"Failed to get function revision: {str(e)}") # Get function detail information from veFaaS. -# Use this to retrieve function detail information for a veFaaS function. This function returns the function details -# Params: -# - function_id (required): the ID of the function -# - region (optional): deployment region, defaults to cn-beijing + + +@mcp.tool(description="""Get veFaaS function details. + +**Use Cases**: +- View function configuration (runtime, command, envs) +- Check function status before update +- Get function info for local development + +**Parameters**: +- function_id: Function ID (required) +- region: Region (default cn-beijing) + +**Returns**: +- id, name, runtime, command, status +- envs: Environment variables list +- source_type: Code source type +- platform_url: Console link +""") def get_function_detail(function_id: str, region: Optional[str] = None): - """Get function information to check if it exists.""" + """Get function information including configuration details.""" region = validate_and_set_region(region) api_instance = init_client(region, mcp.get_context()) @@ -1204,13 +887,52 @@ def get_function_detail(function_id: str, region: Optional[str] = None): try: response = api_instance.get_function(req) - return response + + # Build user-friendly result based on actual API response fields + result = { + "id": response.id, + "name": response.name, + "runtime": response.runtime, + "command": getattr(response, 'command', '') or '', + "port": getattr(response, 'port', None), + "source_type": getattr(response, 'source_type', '') or '', + "region": region, + "platform_url": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}", + } + + # Add optional fields if present + if getattr(response, 'description', None): + result["description"] = response.description + + if getattr(response, 'envs', None): + result["envs"] = [{"key": env.key, "value": env.value} for env in response.envs] + + if getattr(response, 'build_config', None): + build_config = response.build_config + result["build_config"] = { + "command": getattr(build_config, 'command', '') or '', + "output_path": getattr(build_config, 'output_path', '') or '', + } + + # Check release status to detect in-progress deployments + try: + release_req = volcenginesdkvefaas.GetReleaseStatusRequest(function_id=function_id) + release_resp = api_instance.get_release_status(release_req) + if release_resp.status: + result["release_status"] = release_resp.status + if release_resp.status == "inprogress": + result["next_action"] = "Release is in progress. Wait for completion or check status again later." + except Exception: + pass # Release status check is optional, don't fail if unavailable + + return json.dumps(result, ensure_ascii=False, indent=2) except ApiException as e: if "not found" in str(e).lower() or "does not exist" in str(e).lower(): raise ValueError(f"Function {function_id} does not exist in region {region}") else: raise ValueError(f"Failed to get function: {str(e)}") + @mcp.tool(description="""Download function code for veFaaS function. Args: @@ -1268,7 +990,7 @@ def pull_function_code(function_id: str, region: Optional[str] = "", dest_dir: s logger.info(f"Source location: {source_location}") # Download the code zip file - response = requests.get(source_location) + response = requests.get(source_location) # noqa: security response.raise_for_status() if not dest_dir: @@ -1284,15 +1006,16 @@ def pull_function_code(function_id: str, region: Optional[str] = "", dest_dir: s # generate vefaas.yaml vefaas_yml_path = os.path.join(dest_dir, "vefaas.yaml") try: - function_detail = get_function_detail(function_id, region) + function_detail_str = get_function_detail(function_id, region) + function_detail = json.loads(function_detail_str) triggers = list_function_triggers(function_id, region).get("Result", {}).get("Items", []) with open(vefaas_yml_path, "w") as f: f.write(f"function_id: {function_id}\n") - f.write(f"name: {function_detail.name}\n") + f.write(f"name: {function_detail.get('name', '')}\n") f.write(f"region: {region}\n") - f.write(f"runtime: {function_detail.runtime}\n") - f.write(f"command: {function_detail.command}\n") - f.write(f"vefaas_function_platform: https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}?tab=config\n") + f.write(f"runtime: {function_detail.get('runtime', '')}\n") + f.write(f"command: {function_detail.get('command', '')}\n") + f.write(f"vefaas_function_platform: https://console.volcengine.com/vefaas/region:vefaas+{region}/function/detail/{function_id}\n") f.write(f"vefaas_access_link: {get_function_access_link(function_id, region)}\n") f.write(f"triggers:\n") for trigger in triggers: @@ -1301,17 +1024,22 @@ def pull_function_code(function_id: str, region: Optional[str] = "", dest_dir: s f.write(f" name: {trigger.get('Name', '')}\n") except Exception as e: logger.error(f"Failed to write vefaas.yaml for function {function_id}: {str(e)}") - return e + # Continue even if vefaas.yaml generation fails - return { + return json.dumps({ + "success": True, "function_id": function_id, - "revision": target_revision, - #"source_location": source_location, - } + "region": region, + "revision": "latest" if not target_revision else target_revision, + "dest_dir": dest_dir, + "message": f"Function code extracted to {dest_dir}", + "files_generated": ["vefaas.yaml"], + }, ensure_ascii=False, indent=2) except Exception as e: raise ValueError(f"Failed to download and extract function code: {str(e)}") + def list_function_triggers(function_id: str, region: Optional[str] = None): region = validate_and_set_region(region) @@ -1333,165 +1061,431 @@ def list_function_triggers(function_id: str, region: Optional[str] = None): except Exception as e: raise ValueError(f"Failed to list function triggers: {str(e)}") -@mcp.tool(description="""List veFaaS application templates. + +# ==================== Application Deployment Tools ==================== + +@mcp.tool(description="""Detect project configuration for veFaaS deployment. + +**RECOMMENDED**: Call this BEFORE `deploy_application` to ensure correct configuration! + +This tool analyzes the project structure and automatically detects: +- Framework (Next.js, Vite, FastAPI, Flask, Streamlit, etc.) +- Runtime and startup command +- Build command and output path +- Service port Args: - - page_number: Page index (default 1). - - page_size: Page size (default 100). + - project_path: Absolute path to the project root directory + +Returns: + - framework: Detected framework + - runtime: veFaaS runtime (e.g., "native-python3.12/v1", "native-node20/v1") + - build_command: Build command (for Node.js projects) + - start_command: **Startup command** - use this value in `deploy_application`! + - port: Service port + - output_path: Build output directory + - is_static: Whether it's a static site + +**Workflow**: +1. Call `detect_project` with project_path +2. Review the detected configuration (especially `start_command`) +3. Call `deploy_application` with `start_command` from step 1 + +**Example**: +``` +detect_project("/path/to/fastapi-app") +→ {"framework": "fastapi", "start_command": "python -m uvicorn main:app --host 0.0.0.0 --port 8080", ...} + +deploy_application(project_path="/path/to/fastapi-app", name="my-app", start_command="python -m uvicorn main:app --host 0.0.0.0 --port 8080") +``` +""") +def detect_project(project_path: str): + from .vefaas_cli_sdk import auto_detect + + if not os.path.isabs(project_path): + raise ValueError(f"project_path must be an absolute path, got: {project_path}") + if not os.path.exists(project_path): + raise ValueError(f"project_path does not exist: {project_path}") + + result = auto_detect(project_path) + + return { + "framework": result.framework, + "runtime": result.runtime, + "build_command": result.build_command, + "start_command": result.start_command, + "port": result.port, + "install_command": result.install_command, + "output_path": result.output_path, + "is_static": result.is_static, + } -Note: - - Run before creating an application to discover available templates and read their descriptions. - - Returns only templates that are enabled. - - Capture the chosen template's `id` and call `get_vefaas_application_template` to download its source. + +@mcp.tool(description="""**PRIMARY DEPLOYMENT TOOL** - Deploy a project to veFaaS with one command. + +This is the **recommended tool** for deploying applications. It handles the entire workflow automatically: +1. Detect project configuration +2. Build project (if needed) +3. Package and upload code to cloud storage +4. Create/update function with code +5. Wait for dependencies (Python) +6. Create application with API gateway +7. Deploy and wait for completion + +**Configuration Files (Auto-handled)**: +- If `.vefaas/config.json` exists, the tool will use `function_id` and `application_id` from it automatically. +- **Cross-region deployment**: Config IDs are only used if the config's region matches the target region. Deploying to a different region creates a new application. +- On successful deployment, both `.vefaas/config.json` (vefaas-cli compatible) and `vefaas.yaml` are updated. +- This means subsequent deployments only need `project_path` - no need to specify IDs again. + +**Scenarios**: +- **New deployment**: Provide `project_path` + `name` + `start_command` + `build_command` (non-Python) + `port` +- **Update existing app**: Just provide `project_path` (IDs read from config automatically) +- **Update by ID**: Use `application_id` to update an existing application +- **Deploy to different region**: Provide `project_path` + `name` + `region` (existing config for other regions is ignored) + +Args: + - project_path: Absolute path to the project root directory (required) + - name: Application name (required for NEW apps only) + - application_id: Application ID for updates (auto-read from config if exists) + - region: Region (cn-beijing, cn-shanghai, cn-guangzhou, ap-southeast-1) + - build_command: Build command (e.g., "npm run build"). Required for non-Python runtimes unless skip_build=True. + - start_command: Startup command. **REQUIRED**. Use detect_project to auto-detect. + - port: Service port. **IMPORTANT: Must match the actual listening port in start_command** (e.g., if start_command has --port 3000, then port must be 3000) + - skip_build: Skip build step (default False). Set to True if project is already built. + - gateway_name: API gateway name (optional, auto-selects first available gateway if not specified) + +Returns: + - application_id: Application ID + - function_id: Function ID + - access_url: **User can visit this URL to access the deployed app** + - console_url: veFaaS console link for management + +**Common Errors**: +- "start_command is required": Call `detect_project` first or provide start_command +- "build_command is required": Provide build_command or set skip_build=True if already built +- "Name already exists": Use the `application_id` from `.vefaas/config.json` or console, then retry with `application_id` parameter +- "deploy_fail": The returned error will include detailed error_message and error_logs_uri + +**Retry/Redeployment**: +- If deployment fails, fix the code and call `deploy_application` again. +- Do NOT use `create_function`, `update_function`, `upload_code`, or `release_function` as workarounds. """) -def list_vefaas_application_templates(page_number: int = 1, page_size: int = 100): +def deploy_application( + project_path: str, + name: Optional[str] = None, + application_id: Optional[str] = None, + region: Optional[str] = None, + build_command: Optional[str] = None, + start_command: Optional[str] = None, + port: Optional[int] = None, + skip_build: bool = False, + gateway_name: Optional[str] = None, +): + from .vefaas_cli_sdk import ( + DeployConfig, + VeFaaSClient, + deploy_application as sdk_deploy_application, + ) + + region = validate_and_set_region(region) + try: ak, sk, token = get_authorization_credentials(mcp.get_context()) except ValueError as e: - raise ValueError(f"Authorization failed: {str(e)}") + raise ValueError(f"Authentication failed: {str(e)}") - now = datetime.datetime.utcnow() - body = { - "PageNumber": page_number, - "PageSize": page_size, - } - try: - resp = request( - "POST", now, {}, {}, ak, sk, token, "ListApplicationTemplates", json.dumps(body), None, 5, - ) + # Initialize SDK client + client = VeFaaSClient(ak, sk, token, region) - except Exception as e: - raise ValueError(f"Failed to list application templates: {str(e)}") - - result = [] - for item in resp.get("Result", {}).get("Items", []): - if item.get("EnableTemplate", False): - result.append({ - "name": item.get("Name", ""), - "id": item.get("Id", ""), - "description": item.get("Description", ""), - }) - - # TODO: dirty code, remove this - # get function templates for streamlit - try: - func_body = { - "PageNumber": page_number, - "PageSize": page_size, - "Filters": [{ - "Item": { - "Key": "SourceType", - "Value": ["function"], - } - }] + # Build config + config = DeployConfig( + project_path=project_path, + name=name, + application_id=application_id, + region=region, + gateway_name=gateway_name, + build_command=build_command, + start_command=start_command, + port=port, + skip_build=skip_build, + ) + + # Call SDK deploy_application (only needs client!) + result = sdk_deploy_application(config, client) + + if not result.success: + # Build a clean, readable error message + error_msg = result.error or "Unknown error" + + # Format error for better readability with guidance for retry + error_response = { + "success": False, + "error": error_msg, + "logs": result.logs[-5:] if result.logs else [], # Last 5 log entries for context + "next_action": "Check the error message and recent logs, fix the issue, then call deploy_application again with the corrected parameters.", } - func_resp = request("POST", now, {}, {}, ak, sk, token, "ListTemplates", json.dumps(func_body), None, 5) - func_items = func_resp.get("Result", {}).get("Items", []) - for item in func_items: - if item.get("Name", "") == "vefaas-native-streamlit": - result.append({ - "name": item.get("Name", ""), - "id": item.get("Id", ""), - "description": item.get("Description", ""), - }) - except Exception as e: - logger.error(f"Failed to list function templates: {str(e)}") - return result + # Raise with a clean message + raise ValueError(json.dumps(error_response, ensure_ascii=False, indent=2)) + + return json.dumps({ + "success": True, + "application_id": result.application_id, + "function_id": result.function_id, + "access_url": result.access_url, + "console_url": result.app_console_url, + "logs": result.logs, # Deployment progress logs + }, ensure_ascii=False, indent=2) + -@mcp.tool(description="""Download a veFaaS application template. +@mcp.tool(description="""Get detailed information about a specific application. + +**When to use**: +- Check deployment status after `deploy_application` +- Get access_url for a deployed application +- Debug deployment issues Args: - - template_id: Template ID from `list_vefaas_application_templates`. - - destination_dir: Directory to extract the template contents into. + - application_id: Application ID (from deploy_application result, `.vefaas/config.json`, or console) + - region: Region (default cn-beijing) -Note: - - Download the archive and extract files; do not persist the zip itself. - - Reuse or clean `destination_dir` before repeated downloads to avoid partial overwrite issues. +Returns: + - id, name, status, config, access_url, console_url """) -def get_vefaas_application_template(template_id: str, destination_dir: str): +def get_application_detail(application_id: str, region: Optional[str] = None): + region = validate_and_set_region(region) + try: ak, sk, token = get_authorization_credentials(mcp.get_context()) except ValueError as e: - raise ValueError(f"Authorization failed: {str(e)}") + raise ValueError(f"Authentication failed: {str(e)}") now = datetime.datetime.utcnow() - body = {"Id": template_id} - - # TODO: dirty code, remove this. Adapt streamlit template. - if template_id == "68f9cd2474c2090008469163": - try: - resp = request( - "POST", now, {}, {}, ak, sk, token, "GetTemplateDetail", json.dumps(body), None, 20, - ) - except Exception as e: - raise ValueError(f"Failed to get application template detail: {str(e)}") - else: - try: - resp = request( - "POST", now, {}, {}, ak, sk, token, "GetApplicationTemplateDetail", json.dumps(body), None, 20, - ) - except Exception as e: - raise ValueError(f"Failed to get application template detail: {str(e)}") + body = {"Id": application_id} try: - source_location = resp.get("Result", {}).get("SourceLocation") - if not source_location: - raise ValueError("SourceLocation not found in the template detail response.") + response = request("POST", now, {}, {}, ak, sk, token, "GetApplication", json.dumps(body), region) + result = response.get("Result", {}) - # Download the template zip file - response = requests.get(source_location) - response.raise_for_status() # Raise an exception for bad status codes + # Extract access URL (same logic as deploy.py's extract_access_url_from_cloud_resource) + access_url = None + try: + cloud_resource_str = result.get("CloudResource", "") + if cloud_resource_str: + cloud_resource = json.loads(cloud_resource_str) + keys = list(cloud_resource.keys()) + if keys: + # Get first key's value (e.g., 'framework', 'custom', etc.) + data = cloud_resource[keys[0]] + url_obj = data.get('url', {}) + # Prefer system_url, fallback to inner_url + access_url = url_obj.get('system_url') or url_obj.get('inner_url') + except: + pass + + # Build base response + app_detail = { + "id": result.get("Id"), + "name": result.get("Name"), + "status": result.get("Status"), + "config": result.get("Config"), + "region": result.get("Region"), + "access_url": access_url, + "console_url": f"https://console.volcengine.com/vefaas/region:vefaas+{region}/application/detail/{application_id}", + } - # Determine the destination directory + # When deployment failed, try to get detailed error info from function release status + status = result.get("Status", "").lower() - # Create destination directory if it doesn't exist - os.makedirs(destination_dir, exist_ok=True) + # Add guidance for in-progress deployments + if status in ("deploying", "releasing", "deploy_pendding"): + app_detail["next_action"] = "Deployment is in progress. Wait for completion or check status again later." - # Unzip the file - with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref: - zip_ref.extractall(destination_dir) + if status in ("deploy_fail", "deleted", "delete_fail"): + error_details = {} + try: + # Try to get function_id from CloudResource first + function_id = None + cloud_resource_str = result.get("CloudResource", "") + if cloud_resource_str: + try: + cloud_resource = json.loads(cloud_resource_str) + keys = list(cloud_resource.keys()) + if keys: + first_key = keys[0] + function_id = cloud_resource.get(first_key, {}).get("function_id") + except: + pass + + # Fallback: try Config + if not function_id: + config_str = result.get("Config", "") + if config_str: + try: + config_data = json.loads(config_str) + function_id = config_data.get("function", {}).get("function_id") + except: + pass + + if function_id: + # Call GetReleaseStatus to get detailed error info + rel_body = {"FunctionId": function_id} + rel_result = request("POST", now, {}, {}, ak, sk, token, "GetReleaseStatus", json.dumps(rel_body), region) + rel = rel_result.get("Result", {}) + + status_msg = rel.get("StatusMessage", "").strip() + if status_msg: + error_details["error_message"] = status_msg + + log_url = rel.get("FailedInstanceLogs", "").strip() + if log_url: + error_details["error_logs_url"] = log_url + + error_details["function_id"] = function_id + except Exception as ex: + logger.debug(f"Failed to get release status error details: {ex}") - return f"Template {template_id} downloaded and extracted to {destination_dir}" + if error_details: + app_detail["error_details"] = error_details + return app_detail except Exception as e: - raise ValueError(f"Failed to download and extract application template: {str(e)}") - -@mcp.prompt(name="deploy_vefaas", title="""deploy veFaaS function""") -def deploy_vefaas( - function_id: str, - region: str = "cn-beijing", - local_folder_path: Optional[str] = None, - local_folder_exclude: Optional[List[str]] = None, - code_source_hint: Optional[str] = None, -): - """ - Generate deployment instructions for a veFaaS function. - - Args: - function_id: Target veFaaS function ID. - region: Deployment region (defaults to cn-beijing). - local_folder_path: Local path to upload (if using filesystem upload). - local_folder_exclude: Patterns to exclude during upload. - code_source_hint: Free-form context about where updated code lives (optional). - """ - folder_hint = f"Use `local_folder_path={local_folder_path!r}`" if local_folder_path else "Provide `local_folder_path` pointing at the prepared project root" - exclude_hint = ( - f"`local_folder_exclude={local_folder_exclude!r}`" - if local_folder_exclude - else "Set `local_folder_exclude` to skip noise (e.g., ['.venv', 'node_modules', '.git', '*.pyc'])" - ) - extra_hint = f"Context: {code_source_hint}\n" if code_source_hint else "" - - instructions = "\n".join( - [ - f"{extra_hint}Deploy veFaaS function `{function_id}` in `{region}`.", - "Tool names might include prefixes (e.g., `vefaas__upload_code`, `vefaas__release_function`); invoke whichever variant ends with the base name shown below.", - f"1. Call `upload_code` ({folder_hint}; {exclude_hint}) and follow its checklist.", - "2. If the response sets `dependency.dependency_task_created = true`, poll `poll_dependency_install_task_status` until it finishes (stop after three tries, surface logs on failure).", - f"3. When upload (and dependency install) is done, call `release_function` for `{function_id}` / `{region}`.", - "4. Immediately poll `poll_function_release_status` until it returns Succeeded/Failed (max three polls); report the outcome, platform URL, or errors before retrying.", - ] - ) - - return [instructions] + raise ValueError(f"Failed to get application details: {str(e)}") + + +# ==================== MCP Resources ==================== + +@mcp.resource("vefaas://prompts", mime_type="application/json") +def get_prompts_resource(): + return json.dumps({ + "prompts": [ + { + "name": "vefaas_deploy_guide", + "description": "veFaaS application deployment guide" + }, + { + "name": "vefaas_dev_guide", + "description": "veFaaS function local development guide" + } + ], + "usage": "Use prompts/get with the prompt name to get the full content" + }, ensure_ascii=False, indent=2) + + +# ==================== MCP Prompts ==================== + +@mcp.prompt(description="veFaaS application deployment guide") +def vefaas_deploy_guide(): + return """# veFaaS Application Deployment Guide + +## Applicable Scenarios +- Deploy local project to veFaaS cloud +- Update code for deployed applications +- First-time deployment of new applications + +## Recommended Workflow + +### First Deployment +1. **Detect project configuration** + ``` + detect_project(project_path="/path/to/project") + ``` + Get framework, runtime, start_command, port info + +2. **Execute deployment** + ``` + deploy_application( + project_path="/path/to/project", + name="my-app", + start_command="", + port=, + build_command="" + ) + ``` + +3. **View deployment result** + On success, access_url is returned for direct access + +### Redeployment/Update +If application_id exists in .vefaas/config.json, simply call: +``` +deploy_application(project_path="/path/to/project") +``` +The tool will auto-read config and update the application + +### Common Issues +- **Name exists**: Use get_application_detail to get ID, then retry with application_id parameter +- **start_command error**: Use detect_project to get correct command +- **Deployment failed**: Check error_details for error message and logs + +## Available Tools +- detect_project: Detect project configuration +- deploy_application: One-click deployment +- get_application_detail: Get application details +""" + + +@mcp.prompt(description="veFaaS function local development guide") +def vefaas_dev_guide(): + return """# veFaaS Function Local Development Guide + +## Applicable Scenarios +- Pull online function code to local for development +- Sync code changes to online +- Publish new function version + +## Recommended Workflow + +### Pull Code to Local +1. **Get function info** + ``` + get_function_detail(function_id="xxx", region="cn-beijing") + ``` + +2. **Pull code** + ``` + pull_function_code( + function_id="xxx", + dest_dir="/path/to/local/folder", + region="cn-beijing" + ) + ``` + Code will be downloaded to specified directory with vefaas.yaml config generated + +### Publish After Code Changes +1. **Upload code to function** + ``` + update_function( + function_id="xxx", + project_path="/path/to/local/folder", + region="cn-beijing" + ) + ``` + +2. **Release function** + ``` + release_function(function_id="xxx", region="cn-beijing") + ``` + Will auto-trigger dependency installation, wait for completion, and release. Returns access link. + +### Update Config Only +``` +update_function( + function_id="xxx", + command="python -m uvicorn main:app --host 0.0.0.0 --port 8000", + envs={"DEBUG": "true"} +) +release_function(function_id="xxx") +``` + +## Available Tools +- get_function_detail: Get function details +- pull_function_code: Pull function code +- update_function: Update function code/config +- release_function: Release function (auto dependency install + status polling) + +## Notes +- function_id can be found in vefaas.yaml or console +- To delete functions, use console: https://console.volcengine.com/vefaas +""" diff --git a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server_test.py b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server_test.py index dadf4fad..ba51672f 100644 --- a/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server_test.py +++ b/server/mcp_server_vefaas_function/src/mcp_server_vefaas_function/vefaas_server_test.py @@ -5,133 +5,234 @@ import zipfile from io import BytesIO -import pyzipper +from mcp_server_vefaas_function.vefaas_server import zip_and_encode_folder +from mcp_server_vefaas_function.vefaas_cli_sdk.deploy import ( + package_directory, + read_gitignore_patterns, + read_vefaasignore_patterns, + create_ignore_filter, + DEFAULT_VEFAASIGNORE, +) -from vefaas_server import python_zip_implementation, zip_and_encode_folder, \ - does_function_exist +class TestPackageDirectory(unittest.TestCase): + """Test directory packaging functionality - no network credentials required""" -class TestVeFaaSServerIntegration(unittest.TestCase): def setUp(self): - # Check if credentials are available - self.ak = os.environ.get("VOLCENGINE_ACCESS_KEY") - self.sk = os.environ.get("VOLCENGINE_SECRET_KEY") - self.alt_ak = os.environ.get("VOLC_ACCESSKEY") - self.alt_sk = os.environ.get("VOLC_SECRETKEY") - if (not self.ak or not self.sk) and (not self.alt_ak or not self.alt_sk): - self.assertFalse( - "VOLCENGINE_ACCESS_KEY or VOLCENGINE_SECRET_KEY or VOLC_ACCESSKEY or VOLC_SECRETKEY environment variables not set" - ) - - # 创建临时目录 + # Create temporary directory self.temp_dir = tempfile.mkdtemp() - # 创建一些测试文件和文件夹 + # Create test files and directories os.makedirs(os.path.join(self.temp_dir, "__pycache__")) os.makedirs(os.path.join(self.temp_dir, "subfolder")) + os.makedirs(os.path.join(self.temp_dir, ".git")) with open(os.path.join(self.temp_dir, "file1.py"), "w") as f: f.write("print('hello')") with open(os.path.join(self.temp_dir, "file2.pyc"), "w") as f: f.write("compiled") - with open(os.path.join(self.temp_dir, "__pycache__", "cached.pyc"), - "w") as f: + with open(os.path.join(self.temp_dir, "__pycache__", "cached.pyc"), "w") as f: f.write("cached") with open(os.path.join(self.temp_dir, "subfolder", "file3.txt"), "w") as f: f.write("text content") + with open(os.path.join(self.temp_dir, ".git", "config"), "w") as f: + f.write("git config") with open(os.path.join(self.temp_dir, ".gitignore"), "w") as f: - f.write("*") + f.write("*.log\n") def tearDown(self): - # 删除临时目录 shutil.rmtree(self.temp_dir) - def test_does_function_exist_with_real_credentials(self): - # Test with a known non-existent function ID - non_existent_id = "non-existent-function-123" - result = does_function_exist(non_existent_id, "cn-beijing") - self.assertFalse(result) + def test_zip_and_encode_folder_basic(self): + """Test zip_and_encode_folder basic functionality""" + zip_bytes, size, err = zip_and_encode_folder(self.temp_dir) - # Note: To test a positive case, you would need a real function ID - # that exists in your account. You could add something like: - # known_function_id = "your-real-function-id" - # result = does_function_exist(known_function_id, "cn-beijing") - # self.assertTrue(result) - - def test_python_zip_implementation(self): - with tempfile.TemporaryDirectory() as tmpdir: - file_path = os.path.join(tmpdir, "test.sh") - with open(file_path, "w") as f: - f.write("#!/bin/bash\necho hello\n") - os.chmod(file_path, 0o644) - - zip_bytes = python_zip_implementation(tmpdir) - - zip_path = os.path.join(tmpdir, "test.zip") - with open(zip_path, "wb") as fzip: - fzip.write(zip_bytes) - - with pyzipper.AESZipFile(zip_path, 'r') as zipf: - namelist = zipf.namelist() - assert "test.sh" in namelist - - info = zipf.getinfo("test.sh") - perm = (info.external_attr >> 16) & 0o777 - assert perm == 0o755, f"Expected 755 permission but got {oct(perm)}" - - content = zipf.read("test.sh").decode() - assert "echo hello" in content - - def test_zip_exclude_patterns_with_python_impl(self): - # 设置排除规则 - exclude_patterns = ["*.pyc", ".gitignore", "*/__pycache__/*"] - - zip_bytes = python_zip_implementation(self.temp_dir, exclude_patterns) self.assertIsInstance(zip_bytes, bytes) + self.assertIsInstance(size, int) + self.assertIsNone(err) + self.assertGreater(size, 0) - # 解压验证 + # Verify zip content with zipfile.ZipFile(BytesIO(zip_bytes)) as zipf: names = zipf.namelist() - print(names) - # 应该包含 file1.py 和 subfolder/file3.txt + # Should contain normal files self.assertIn("file1.py", names) self.assertIn("subfolder/file3.txt", names) - # 不包含排除的文件 - self.assertNotIn("file2.pyc", names) - self.assertNotIn(".gitignore", names) + # Default ignore patterns should exclude __pycache__ and .git self.assertNotIn("__pycache__/cached.pyc", names) + # .git directory should be excluded (default .vefaasignore rules) + git_dir_files = [n for n in names if n.startswith(".git/")] + self.assertEqual(len(git_dir_files), 0, f"Should not contain .git/ directory files, but found: {git_dir_files}") - def test_zip_with_exclude_patterns_with_system_impl(self): - exclude_patterns = ["*.pyc", ".gitignore", "*/__pycache__/*"] - zip_bytes, size, err = zip_and_encode_folder(self.temp_dir, exclude_patterns) + def test_package_directory_with_gitignore(self): + """Test package_directory with .gitignore rules applied""" + # Create a log file that should be ignored by .gitignore + with open(os.path.join(self.temp_dir, "test.log"), "w") as f: + f.write("log content") - self.assertIsInstance(zip_bytes, bytes) - self.assertIsInstance(size, int) - self.assertIsNone(err) + zip_bytes = package_directory(self.temp_dir, include_gitignore=True) - # 验证 zip 内容 with zipfile.ZipFile(BytesIO(zip_bytes)) as zipf: names = zipf.namelist() - print(names) - # 应该包含 file1.py 和 subfolder/file3.txt + # *.log should be excluded by .gitignore + self.assertNotIn("test.log", names) + # Normal files should be retained self.assertIn("file1.py", names) - self.assertIn("subfolder/file3.txt", names) - # 不应该包含排除文件 - self.assertNotIn("file2.pyc", names) - self.assertNotIn(".gitignore", names) - self.assertNotIn("__pycache__/cached.pyc", names) - def test_zip_empty_exclude_with_system_impl(self): - # 如果没有 exclude 规则,应该包含所有文件(除了默认规则) - zip_bytes, size, err = zip_and_encode_folder(self.temp_dir, []) - self.assertIsInstance(zip_bytes, bytes) - self.assertGreater(size, 0) - self.assertIsNone(err) + def test_package_directory_without_gitignore(self): + """Test package_directory without .gitignore rules (function code upload scenario)""" + # Create a log file that would normally be ignored by .gitignore + with open(os.path.join(self.temp_dir, "test.log"), "w") as f: + f.write("log content") + + zip_bytes = package_directory(self.temp_dir, include_gitignore=False) + with zipfile.ZipFile(BytesIO(zip_bytes)) as zipf: names = zipf.namelist() - print(names) + # .gitignore not applied, so *.log should be retained + self.assertIn("test.log", names) + # Normal files should be retained self.assertIn("file1.py", names) - self.assertNotIn("file2.pyc", names) + # But .vefaasignore default rules should still apply self.assertNotIn("__pycache__/cached.pyc", names) + def test_read_gitignore_patterns(self): + """Test reading .gitignore patterns""" + patterns = read_gitignore_patterns(self.temp_dir) + self.assertIn("*.log", patterns) + + def test_read_vefaasignore_patterns_creates_default(self): + """Test that .vefaasignore default file is auto-created when not exists""" + # Confirm .vefaasignore does not exist + vefaasignore_path = os.path.join(self.temp_dir, ".vefaasignore") + if os.path.exists(vefaasignore_path): + os.remove(vefaasignore_path) + + patterns = read_vefaasignore_patterns(self.temp_dir) + + # Should create default file + self.assertTrue(os.path.exists(vefaasignore_path)) + # Should contain default patterns + self.assertTrue(len(patterns) > 0) + + def test_create_ignore_filter(self): + """Test creating ignore filter""" + gitignore_patterns = ["*.log", "temp/"] + vefaasignore_patterns = [".git/", "__pycache__/"] + + spec = create_ignore_filter(gitignore_patterns, vefaasignore_patterns) + + # Verify filter works correctly + self.assertTrue(spec.match_file("test.log")) + self.assertTrue(spec.match_file(".git/config")) + self.assertTrue(spec.match_file("__pycache__/")) + self.assertFalse(spec.match_file("main.py")) + + +class TestCaddyfileGeneration(unittest.TestCase): + """Test cases for Caddyfile generation functionality""" + + def setUp(self): + # Create temporary directory + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + # Delete temporary directory + shutil.rmtree(self.temp_dir) + + def test_render_default_caddyfile_content(self): + """Test that default Caddyfile content is generated correctly""" + from mcp_server_vefaas_function.vefaas_cli_sdk.deploy import render_default_caddyfile_content + + content = render_default_caddyfile_content() + + # Verify content contains key configurations + self.assertIn(":8000", content) # Listening port + self.assertIn("root * .", content) # Static file root directory + self.assertIn("file_server", content) # File server directive + self.assertIn("try_files", content) # SPA routing support + self.assertIn("@unsafePath", content) # Secure path configuration + self.assertIn("Cache-Control", content) # Cache strategy + + def test_ensure_caddyfile_in_output_creates_file(self): + """Test that ensure_caddyfile_in_output creates file in output directory""" + from mcp_server_vefaas_function.vefaas_cli_sdk.deploy import ensure_caddyfile_in_output, DEFAULT_CADDYFILE_NAME + + # Create output subdirectory + output_path = "dist" + os.makedirs(os.path.join(self.temp_dir, output_path), exist_ok=True) + + # Generate Caddyfile + result = ensure_caddyfile_in_output(self.temp_dir, output_path) + + # Verify file creation + expected_path = os.path.join(self.temp_dir, output_path, DEFAULT_CADDYFILE_NAME) + self.assertEqual(result, expected_path) + self.assertTrue(os.path.exists(expected_path)) + + # Verify content + with open(expected_path, "r") as f: + content = f.read() + self.assertIn(":8000", content) + self.assertIn("file_server", content) + + def test_ensure_caddyfile_in_output_root_directory(self): + """Test Caddyfile generation in project root (output_path = './')""" + from mcp_server_vefaas_function.vefaas_cli_sdk.deploy import ensure_caddyfile_in_output, DEFAULT_CADDYFILE_NAME + + # Generate to root directory + result = ensure_caddyfile_in_output(self.temp_dir, "./") + + # Verify file creation in root directory + expected_path = os.path.join(self.temp_dir, DEFAULT_CADDYFILE_NAME) + self.assertEqual(result, expected_path) + self.assertTrue(os.path.exists(expected_path)) + + def test_ensure_caddyfile_creates_output_dir_if_not_exists(self): + """Test that output directory is created if it doesn't exist""" + from mcp_server_vefaas_function.vefaas_cli_sdk.deploy import ensure_caddyfile_in_output, DEFAULT_CADDYFILE_NAME + + output_path = "new_output_dir" + + # Confirm directory does not exist + self.assertFalse(os.path.exists(os.path.join(self.temp_dir, output_path))) + + # Generate Caddyfile + result = ensure_caddyfile_in_output(self.temp_dir, output_path) + + # Verify directory and file are both created + self.assertTrue(os.path.exists(os.path.join(self.temp_dir, output_path))) + self.assertTrue(os.path.exists(result)) + + +class TestDetector(unittest.TestCase): + """Test cases for project detector""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_detect_vite_project_is_static(self): + """Test that Vite project without SSR is detected as static""" + from mcp_server_vefaas_function.vefaas_cli_sdk.detector import auto_detect + + # Create Vite project structure + pkg = { + "name": "test-vite", + "devDependencies": {"vite": "^5.0.0"}, + "scripts": {"build": "vite build", "preview": "vite preview"} + } + with open(os.path.join(self.temp_dir, "package.json"), "w") as f: + import json + json.dump(pkg, f) + + result = auto_detect(self.temp_dir) + + self.assertEqual(result.framework, "vite") + self.assertEqual(result.runtime, "native-node20/v1") + self.assertTrue(result.is_static) + self.assertIn("caddy", result.start_command.lower()) + if __name__ == "__main__": unittest.main() diff --git a/server/mcp_server_vefaas_function/uv.lock b/server/mcp_server_vefaas_function/uv.lock index fa71ae20..f9859ce7 100644 --- a/server/mcp_server_vefaas_function/uv.lock +++ b/server/mcp_server_vefaas_function/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" [[package]] @@ -205,10 +205,11 @@ wheels = [ [[package]] name = "mcp-server-vefaas-function" -version = "0.0.1" +version = "0.0.5" source = { editable = "." } dependencies = [ { name = "mcp" }, + { name = "pathspec" }, { name = "pyzipper" }, { name = "requests" }, { name = "volcengine-python-sdk" }, @@ -217,11 +218,21 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "mcp", specifier = ">=1.12.3" }, + { name = "pathspec", specifier = ">=0.12.0" }, { name = "pyzipper", specifier = "==0.3.6" }, { name = "requests", specifier = "==2.32.3" }, { name = "volcengine-python-sdk", specifier = ">=3.0.8" }, ] +[[package]] +name = "pathspec" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/28/2e/83722ece0f6ee24387d6cb830dd562ddbcd6ce0b9d76072c6849670c31b4/pathspec-1.0.1.tar.gz", hash = "sha256:e2769b508d0dd47b09af6ee2c75b2744a2cb1f474ae4b1494fd6a1b7a841613c", size = 129791, upload-time = "2026-01-06T13:02:55.15Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d2/fe/2257c71721aeab6a6e8aa1f00d01f2a20f58547d249a6c8fef5791f559fc/pathspec-1.0.1-py3-none-any.whl", hash = "sha256:8870061f22c58e6d83463cfce9a7dd6eca0512c772c1001fb09ac64091816721", size = 54584, upload-time = "2026-01-06T13:02:53.601Z" }, +] + [[package]] name = "pycryptodomex" version = "3.23.0"