diff --git a/docs/source/fault_tolerance/usage_guide.rst b/docs/source/fault_tolerance/usage_guide.rst index 4c7a1efa..e93ae6a2 100644 --- a/docs/source/fault_tolerance/usage_guide.rst +++ b/docs/source/fault_tolerance/usage_guide.rst @@ -117,6 +117,34 @@ Validation behavior: - Other existing types (e.g., devices/symlinks): performs ``stat`` access +Attribution service integration +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Enable artifact analysis (e.g., logs) during rendezvous health checks by pointing to a running attribution service. +The feature is enabled by specifying both host and port. + +* CLI: + + - ``--ft-attrsvc-host `` (alias: ``--ft_attrsvc_host``) + - ``--ft-attrsvc-port `` (alias: ``--ft_attrsvc_port``) + + Example: + + .. code-block:: bash + + ft_launcher \ + --ft-attrsvc-host 127.0.0.1 \ + --ft-attrsvc-port 8000 \ + train.py + +* YAML: under the ``fault_tolerance`` section + + .. code-block:: yaml + + fault_tolerance: + attrsvc_host: "127.0.0.1" + attrsvc_port: 8000 + GPU Memory Reclaim ^^^^^^^^^^^^^^^^^^ diff --git a/examples/attribution/build_enroot_image.sh b/examples/attribution/build_enroot_image.sh new file mode 100755 index 00000000..8fdb4804 --- /dev/null +++ b/examples/attribution/build_enroot_image.sh @@ -0,0 +1,107 @@ +#!/bin/bash +# Build an enroot squash image for NVRX Attribution Service +# +# Usage: +# ./build_enroot_image.sh [output_path] +# +# Prerequisites: +# - enroot installed +# - Run from repo root: ./examples/attribution/build_enroot_image.sh +# +# The resulting .sqsh file can be used with Slurm + pyxis: +# srun --container-image=/path/to/nvrx-attrsvc.sqsh \ +# --container-env=NVRX_ATTRSVC_ALLOWED_ROOT=/data \ +# --container-mounts=/path/to/logs:/data:ro \ +# nvrx-attrsvc + +set -euo pipefail + +# Configuration +BASE_IMAGE="${BASE_IMAGE:-python:3.12-slim}" +CONTAINER_NAME="${CONTAINER_NAME:-nvrx-attrsvc-build}" +OUTPUT_PATH="${1:-nvrx-attrsvc.sqsh}" + +echo "=== NVRX Attribution Service - Enroot Image Builder ===" +echo "Base image: ${BASE_IMAGE}" +echo "Output: ${OUTPUT_PATH}" +echo "" + +# Check we're in repo root +if [[ ! -f "pyproject.toml" ]]; then + echo "ERROR: Run this script from the repo root directory" + echo " cd /path/to/nvidia-resiliency-ext" + echo " ./examples/attribution/build_enroot_image.sh" + exit 1 +fi + +# Clean up any existing container with same name +enroot remove -f "${CONTAINER_NAME}" 2>/dev/null || true + +# Step 1: Import base image +echo "=== Step 1: Importing base image ===" +enroot import -o "${CONTAINER_NAME}.sqsh" "docker://${BASE_IMAGE}" + +# Step 2: Create container from image +echo "=== Step 2: Creating container ===" +enroot create --name "${CONTAINER_NAME}" "${CONTAINER_NAME}.sqsh" +rm -f "${CONTAINER_NAME}.sqsh" # Clean up intermediate file + +# Step 3: Install system dependencies +echo "=== Step 3: Installing system dependencies ===" +enroot start --rw --root "${CONTAINER_NAME}" bash -c ' + set -e + apt-get update + apt-get install -y --no-install-recommends build-essential git + rm -rf /var/lib/apt/lists/* + pip install --no-cache-dir --upgrade pip +' + +# Step 4: Copy source code and install packages +echo "=== Step 4: Installing nvidia_resiliency_ext (no-deps) and nvrx-attrsvc ===" +REPO_ROOT="$(pwd)" + +# Echo the command for debugging +echo "Running: enroot start --rw --root --mount ${REPO_ROOT}:/tmp/repo ${CONTAINER_NAME} bash -c '...'" + +enroot start --rw --root --mount "${REPO_ROOT}:/tmp/repo" "${CONTAINER_NAME}" bash -c ' + set -e + cd /tmp/repo + + # Install main library without CUDA extensions (skip CUPTI build) + STRAGGLER_DET_SKIP_CUPTI_EXT_BUILD=1 pip install --no-cache-dir --no-deps . + + # Install attribution service (installs fastapi, uvicorn, mcp, logsage, etc.) + pip install --no-cache-dir ./examples/attribution +' + +# Step 5: Set environment variables +echo "=== Step 5: Setting environment variables ===" +enroot start --rw --root "${CONTAINER_NAME}" bash -c ' + cat >> /etc/environment << EOF +NVRX_ATTRSVC_HOST=0.0.0.0 +NVRX_ATTRSVC_PORT=8000 +NVRX_ATTRSVC_LOG_LEVEL_NAME=INFO +NVRX_ATTRSVC_ALLOWED_ROOT=/data +EOF +' + +# Step 6: Export as squash file +echo "=== Step 6: Exporting squash image ===" +enroot export -o "${OUTPUT_PATH}" "${CONTAINER_NAME}" + +# Cleanup +echo "=== Cleanup ===" +enroot remove -f "${CONTAINER_NAME}" + +echo "" +echo "=== Build complete ===" +echo "Squash image: ${OUTPUT_PATH}" +echo "" +echo "Usage with Slurm + pyxis:" +echo " srun --container-image=${OUTPUT_PATH} \\" +echo " --container-env=NVRX_ATTRSVC_ALLOWED_ROOT=/data \\" +echo " --container-mounts=/path/to/logs:/data:ro \\" +echo " nvrx-attrsvc" +echo "" +echo "Or submit as a batch job:" +echo " sbatch --container-image=${OUTPUT_PATH} job_script.sh" diff --git a/examples/attribution/nvrx_attrsvc.py b/examples/attribution/nvrx_attrsvc.py new file mode 100644 index 00000000..11caab56 --- /dev/null +++ b/examples/attribution/nvrx_attrsvc.py @@ -0,0 +1,457 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# NVIDIA CORPORATION and its licensors retain all intellectual property +# and proprietary rights in and to this software, related documentation +# and any modifications thereto. Any use, reproduction, disclosure or +# distribution of this software and related documentation without an express +# license agreement from NVIDIA CORPORATION is strictly prohibited. + +import logging +import os +import stat +import sys +import re +import time +from importlib.resources import files as pkg_files +from typing import Any +from datetime import datetime + +import uvicorn +from fastapi import FastAPI, HTTPException, Query +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field +from pydantic_settings import BaseSettings, SettingsConfigDict + +from nvdataflow import post + +from nvidia_resiliency_ext.attribution.mcp_integration.mcp_client import NVRxMCPClient + +# Setup logging (configurable via NVRX_ATTRSVC_LOG_LEVEL_NAME env: DEBUG|INFO|WARNING|ERROR|CRITICAL) +logger = logging.getLogger(__name__) + + +class Settings(BaseSettings): + """Typed configuration loaded from environment/.env (pydantic-settings v2).""" + + FAST_API_ROOT_PATH: str = Field(default="", description="FastAPI root path") + DEBUG: bool = Field(default=False, description="Enable debug mode") + LLM_MODEL: str = Field(default="nvdev/nvidia/llama-3.3-nemotron-super-49b-v1") + ALLOWED_ROOT: str = Field( + ..., description="Absolute base directory allowed for input paths (required)" + ) + HOST: str = Field(default="0.0.0.0") + PORT: int = Field(default=8000) + LOG_LEVEL_NAME: str = Field(default="INFO") + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + case_sensitive=False, + env_prefix="NVRX_ATTRSVC_", + ) + + +def setup() -> "Settings": + """ + Group environment configuration and logging setup for nvrx_attrsvc. + Returns a configured Settings instance. + """ + try: + cfg = Settings() # type: ignore[call-arg] + except Exception as e: + # Fail fast if required settings are missing or invalid + raise SystemExit(f"nvrx_attrsvc configuration error: {e}") + logging.basicConfig( + level=getattr(logging, cfg.LOG_LEVEL_NAME, logging.INFO), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + ) + # Validate ALLOWED_ROOT + allowed_root = os.path.realpath(cfg.ALLOWED_ROOT) + if not os.path.isabs(allowed_root): + raise SystemExit("ALLOWED_ROOT must be an absolute path") + if not os.path.isdir(allowed_root): + raise SystemExit(f"ALLOWED_ROOT is not a directory: {allowed_root}") + # Ensure we can traverse/read the root (execute bit for directories) + if not os.access(allowed_root, os.X_OK | os.R_OK): + raise SystemExit(f"ALLOWED_ROOT is not accessible: {allowed_root}") + return cfg + + +# Response models +class AttrSvcResult(BaseModel): + """Response model for log analysis.""" + + result: Any + status: str = "completed" + + +class ErrorResponse(BaseModel): + """Standard error body for nvrx_attrsvc.""" + + error_code: str + message: str + details: Any | None = None + + +class SubmitRequest(BaseModel): + """Submission model for analysis requests.""" + + log_path: str + + +class SubmitResponse(BaseModel): + """Response model for submit endpoint.""" + + submitted: bool + + +def create_app(cfg: Settings) -> FastAPI: + """ + Construct and return the FastAPI app for the NVRX Attribution Service (nvrx_attrsvc). + """ + app = FastAPI( + title="NVRX Attribution Service", + summary="nvrx_attrsvc - NVRX attribution service for artifact/log analysis", + contact={ + "name": "NVRX Attribution Service", + "email": "nvrx@nvidia.com", + }, + root_path=cfg.FAST_API_ROOT_PATH, + debug=cfg.DEBUG, + ) + + # Global exception handlers to standardize error bodies + @app.exception_handler(HTTPException) + async def http_exception_handler(_, exc: HTTPException): + # Support structured detail; otherwise fall back to generic mapping + detail = exc.detail + if isinstance(detail, dict): + error_code = str(detail.get("error_code", exc.status_code)).lower() + message = str(detail.get("message", "error")).lower().rstrip(".") + body = ErrorResponse(error_code=error_code, message=message).model_dump() + else: + body = ErrorResponse( + error_code=str(exc.status_code).lower(), + message=str(detail).lower().rstrip("."), + ).model_dump() + return JSONResponse(status_code=exc.status_code, content=body) + + @app.exception_handler(Exception) + async def unhandled_exception_handler(_, exc: Exception): + logger.exception("Unhandled exception in nvrx_attrsvc", exc_info=exc) + return JSONResponse( + status_code=500, + content=ErrorResponse( + error_code="internal_error", message="internal server error" + ).model_dump(), + ) + + @app.get("/healthz") + async def healthcheck() -> dict[str, str]: + """ + nvrx_attrsvc health check endpoint. + """ + return {"status": "OK"} + + @app.post( + "/logs", + response_model=SubmitResponse, + responses={ + 400: {"model": ErrorResponse}, + 500: {"model": ErrorResponse}, + }, + ) + async def submit_analysis(req: SubmitRequest) -> SubmitResponse: + """ + Submit a new analysis job to nvrx_attrsvc. For this example server the POST is a no-op and + analysis is performed synchronously via GET. We still accept the request + to align with the client flow. + + Possible errors: + - 400: missing or invalid log_path + """ + if not req.log_path: + raise HTTPException( + status_code=400, + detail={"error_code": "invalid_request", "message": "log_path is required"}, + ) + # Log the request (file may not exist yet - validation happens at GET time) + logger.info(f"POST /logs - received: {req.log_path}") + # Validate the path exists and is accessible + _normalize_and_validate_path(req.log_path, cfg, require_regular_file=True) + return SubmitResponse(submitted=True) + + @app.get( + "/print", + responses={ + 200: {"content": {"text/plain": {}}}, + 400: {"model": ErrorResponse}, + 404: {"model": ErrorResponse}, + 500: {"model": ErrorResponse}, + }, + ) + async def print_log_path( + log_path: str = Query( + ..., description="Absolute path to a file or directory under allowed root" + ) + ) -> str: + """ + Return the first 4KB of a file for preview. + """ + max_bytes = 4096 + try: + normalized = _normalize_and_validate_path(log_path, cfg, require_regular_file=False) + with open(normalized, "r", encoding="utf-8", errors="ignore") as f: + content = f.read(max_bytes) + return content + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=500, + detail={ + "error_code": "file_error", + "message": f"file error: {str(e)}".lower().rstrip("."), + }, + ) + + @app.get( + "/logs", + response_model=AttrSvcResult, + responses={ + 400: {"model": ErrorResponse}, + 404: {"model": ErrorResponse}, + 500: {"model": ErrorResponse}, + 422: { + "content": { + "application/json": { + "examples": { + "validation_error": { + "summary": "Validation error", + "value": { + "detail": [ + { + "loc": ["query", "log_path"], + "msg": "field required", + "type": "value_error.missing", + } + ] + }, + } + } + } + } + }, + }, + ) + async def attribution_log_path( + log_path: str = Query( + ..., min_length=1, description="Absolute path to a log file under allowed root" + ) + ) -> AttrSvcResult: + """ + nvrx_attrsvc: Analyze logs from a specific path. + + Args: + log_path: Path to the log file to analyze (must be under ALLOWED_ROOT and not a symlink) + + Returns: + AttrSvcResult with analysis results + + Errors: + - 400 invalid path (relative, outside ALLOWED_ROOT, symlink, unreadable, not regular file) + - 404 path not found + """ + # Log the request first (before validation, so we see what was requested) + logger.info(f"GET /logs - received: {log_path}") + try: + normalized = _normalize_and_validate_path(log_path, cfg, require_regular_file=True) + logger.info(f"Analyzing log: {normalized}") + + # Connect to the MCP server and run analysis + try: + client = _create_mcp_client() + except Exception as e: + raise HTTPException( + status_code=500, + detail={ + "error_code": "mcp_init_failed", + "message": f"failed to initialize mcp client: {str(e)}".lower().rstrip("."), + }, + ) + async with client: + s_time = time.time() + log_result = await client.run_module( + module_name="log_analyzer", + log_path=normalized, + model=cfg.LLM_MODEL, + temperature=0.0, + top_p=1, + exclude_nvrx_logs=False, + is_per_cycle=True, + max_tokens=8192, + ) + logger.info(f"Result preview: {str(log_result)}...") + e_time = time.time() + # 1. Access the main text blob inside the nested list + # data['result'] is a list, the first item is a list, and the text is the first item of that. + if 'result' in log_result and len(log_result['result'])>0: + for item in log_result['result']: + raw_text = item[0] + + # 2. Extract the First Line + # Split by newlines and take the first element + auto_resume = raw_text.split('\n')[0] + try: + auto_resume_explanation = raw_text.split('\n')[1] + except Exception as e: + auto_resume_explanation = "" + logger.info(f"Failed to extract auto resume explanation: {e}") + + # 3. Extract text after 'Attribution:' + # Split the text by the specific key "Attribution:" and take the second part + # We use .strip() to remove the leading newline character + attribution_text = raw_text.split('Attribution:') + if len(attribution_text)>1: + attribution_text = attribution_text[1].strip() + attribution_text = attribution_text.replace('"\\',"").replace('\"',"").split("\n\n")[0] + else: + attribution_text = "" + try: + match = re.search(r"_(\d+)_date_", normalized) + if not match: + raise ValueError("Job ID not found in path") + jobid = match.group(1) + except Exception as e: + jobid = "" + logger.info(f"Failed to extract job ID: {e}") + + logger.info("jobid: %s", jobid) + logger.info("log_path: %s",normalized) + logger.info("auto_resume: %s", auto_resume) + logger.info("auto_resume_explanation: %s", auto_resume_explanation) + logger.info("attribution_text: %s", attribution_text) + data = { + "s_cluster": "oci-hsg", + "s_user": "nvrx_attr", + "s_attribution": attribution_text, + "s_auto_resume": auto_resume, + "s_auto_resume_explanation": auto_resume_explanation, + "s_jobid": jobid, + "s_logpath": normalized, + "d_processing_time":round(e_time-s_time,2), + "ts_current_time": round(datetime.now().timestamp() * 1000), + } + result = post(data=data, project="aidot-fact-logsage") + + return AttrSvcResult(result=log_result, status="completed") + except HTTPException: + raise + except Exception as e: + logger.error(f"NVRX Attribution Service error: {e}") + raise HTTPException( + status_code=500, + detail={"error_code": "internal_error", "message": str(e).lower().rstrip(".")}, + ) + + return app + + +def _get_server_command() -> list[str]: + """ + Resolve and return the server launcher command for the MCP client. + """ + pkg = "nvidia_resiliency_ext.attribution.mcp_integration" + try: + resource = pkg_files(pkg).joinpath("server_launcher.py") + except Exception as e: + raise FileNotFoundError(f"failed to locate server_launcher.py in package {pkg}: {e}") + if not resource.exists(): + raise FileNotFoundError(f"server launcher not found in package: {pkg}/server_launcher.py") + return [sys.executable, str(resource)] + + +def _create_mcp_client() -> NVRxMCPClient: + """ + Create and return an initialized NVRxMCPClient. + """ + return NVRxMCPClient(_get_server_command()) + + +def _normalize_and_validate_path( + user_path: str, cfg: Settings, *, require_regular_file: bool +) -> str: + """ + Normalize and validate an input path: + - Must be absolute + - Resolve realpath under allowed root (no traversal outside) + - Must not be a symlink + - If require_regular_file=True, must be a regular readable file; otherwise allow directories too + Returns the normalized absolute path or raises HTTPException(400/404). + """ + if not os.path.isabs(user_path): + raise HTTPException( + status_code=400, + detail={"error_code": "invalid_path", "message": "log_path must be absolute"}, + ) + real = os.path.realpath(user_path) + allowed_root = os.path.realpath(cfg.ALLOWED_ROOT) + try: + common = os.path.commonpath([real, allowed_root]) + except Exception: + raise HTTPException( + status_code=400, detail={"error_code": "invalid_path", "message": "invalid path"} + ) + if common != allowed_root: + raise HTTPException( + status_code=400, + detail={ + "error_code": "outside_root", + "message": "access outside allowed root is not permitted", + }, + ) + try: + st = os.lstat(real) + except FileNotFoundError: + raise HTTPException( + status_code=404, detail={"error_code": "not_found", "message": "path not found"} + ) + except Exception as e: + raise HTTPException( + status_code=400, + detail={ + "error_code": "stat_failed", + "message": f"stat failed: {str(e)}".lower().rstrip("."), + }, + ) + if stat.S_ISLNK(st.st_mode): + raise HTTPException( + status_code=400, + detail={"error_code": "symlink_not_allowed", "message": "symlinks are not allowed"}, + ) + if require_regular_file: + if not stat.S_ISREG(st.st_mode): + raise HTTPException( + status_code=400, + detail={"error_code": "not_regular", "message": "path must be a regular file"}, + ) + # Basic readability + if not os.access(real, os.R_OK): + raise HTTPException( + status_code=400, + detail={"error_code": "not_readable", "message": "path is not readable"}, + ) + return real + + +def main(): + """Entry point for the NVRX Attribution Service.""" + cfg = setup() + logger.info(f"Starting NVRX Attribution Service (nvrx_attrsvc) on {cfg.HOST}:{cfg.PORT}") + logger.info(f"nvrx_attrsvc API Documentation: http://{cfg.HOST}:{cfg.PORT}/docs") + uvicorn.run(create_app(cfg), host=cfg.HOST, port=cfg.PORT) + + +if __name__ == "__main__": + main() diff --git a/examples/attribution/pyproject.toml b/examples/attribution/pyproject.toml new file mode 100644 index 00000000..0b7a6694 --- /dev/null +++ b/examples/attribution/pyproject.toml @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "nvrx-attrsvc" +version = "0.1.0" +description = "NVRX Attribution Service - FastAPI server for log analysis and attribution" +readme = "README.md" +license = {text = "Apache-2.0"} +requires-python = ">=3.11" +authors = [ + {name = "NVIDIA Corporation"} +] +classifiers = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Framework :: FastAPI", + "Operating System :: OS Independent", +] + +dependencies = [ + # Web framework + "fastapi>=0.100.0", + "uvicorn[standard]>=0.20.0", + "pydantic>=2.0.0", + "pydantic-settings>=2.0.0", + "httpx>=0.24.0", + # Attribution library (MCP client, analyzers) + "langchain-nvidia-ai-endpoints>=0.3.15", + "mcp>=1.15.0", + "logsage==0.1.4", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", + "httpx>=0.24.0", +] + +[project.scripts] +nvrx-attrsvc = "nvrx_attrsvc:main" + +[project.urls] +Homepage = "https://github.com/NVIDIA/nvidia-resiliency-ext" +Repository = "https://github.com/NVIDIA/nvidia-resiliency-ext" + +[tool.setuptools] +py-modules = ["nvrx_attrsvc"] diff --git a/examples/attribution/requirements.txt b/examples/attribution/requirements.txt new file mode 100644 index 00000000..94af83d2 --- /dev/null +++ b/examples/attribution/requirements.txt @@ -0,0 +1,6 @@ +# 1. Point to the internal NVIDIA Artifactory +--extra-index-url https://sc-hw-artf.nvidia.com/artifactory/api/pypi/hwinf-gpuwa-pypi/simple + +# 2. Install this component in editable mode +# This tells pip to read the local pyproject.toml dependencies +-e . \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index cfacccd3..5864af28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ nvidia-ml-py = ">=12.570.86" defusedxml = "*" langchain-nvidia-ai-endpoints = ">=0.3.15" mcp = ">=1.15.0" -logsage = "=0.1.2" +logsage = "=0.1.4" grpcio = "^1.76.0" grpcio-tools = "^1.76.0" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..909c26c1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +# 1. Point to the internal NVIDIA Artifactory +# We must repeat this here so the root installation knows where to look +--extra-index-url https://sc-hw-artf.nvidia.com/artifactory/api/pypi/hwinf-gpuwa-pypi/simple + +# 2. Install the internal component +# Adjust the path below to match the actual folder name of your component +-e ./nvrx-attrsvc \ No newline at end of file diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index fa035b0e..28acbe30 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -15,12 +15,10 @@ def chunk_logs_strict(lines): - """ - Chunks logs strictly between: + """Chunks logs strictly between: - START: The LAST occurrence of Cycle N - - END: The LAST occurrence of Cycle N+1 + - END: The LAST occurrence of Cycle N+1 OR End of File (for the last cycle). - Lines after the highest Cycle number are ignored. If no 'Cycle' markers are found, returns all lines as Cycle 0. """ # Regex to match the profiling line @@ -39,29 +37,35 @@ def chunk_logs_strict(lines): final_chunks = {} - # --- NEW LOGIC START --- # If no cycles were found, return all lines as Cycle 0 if not sorted_cycles: final_chunks[0] = lines return final_chunks - # --- NEW LOGIC END --- - # Step 2: Create chunks ONLY when we have both a Start (N) and an End (N+1) - # We iterate up to len() - 1 because the last cycle in the list - # serves only as the end boundary for the previous one. - for i in range(len(sorted_cycles) - 1): + # Step 2: Iterate through cycles to capture chunks + # We iterate through ALL sorted cycles now (not len - 1) + for i in range(len(sorted_cycles)): curr_cycle = sorted_cycles[i] - next_cycle = sorted_cycles[i + 1] # This is N+1 - start_index = last_cycle_indices[curr_cycle] - end_index = last_cycle_indices[next_cycle] - # Extract lines between LAST Cycle N and LAST Cycle N+1 - raw_chunk = lines[start_index:end_index] - - # Step 3: Remove marker lines + # Determine the End Index + if i < len(sorted_cycles) - 1: + # If there is a next cycle, stop there + next_cycle = sorted_cycles[i + 1] + end_index = last_cycle_indices[next_cycle] + raw_chunk = lines[start_index:end_index] + else: + # --- FIX: Handling the Last Cycle --- + # If this is the last cycle in the list, go to the end of the lines + raw_chunk = lines[start_index:] + + # Step 3: Remove marker lines using the existing logic clean_chunk = [line for line in raw_chunk if not cycle_pattern.search(line)] + # Apply the external 'lines_after' filter + # (Assuming lines_after is defined in your scope) + clean_chunk = lines_after(clean_chunk, "FT: initialized") + final_chunks[curr_cycle] = clean_chunk return final_chunks @@ -85,7 +89,8 @@ def __init__(self, args: argparse.Namespace): top_p=self.args.top_p, max_tokens=self.args.max_tokens, ) - self.exclude_nvrx_logs = args.exclude_nvrx_logs + self.exclude_nvrx_logs = getattr(args, 'exclude_nvrx_logs', False) + self.is_per_cycle = getattr(args, 'is_per_cycle', False) super().__init__( preprocess_input=self.analyze_logs, attribution=self.llm_analyze, @@ -111,18 +116,24 @@ async def analyze_logs(self) -> list[ApplicationData]: """ path = self.args.log_path - with open(path, 'r') as f: + with open(path, 'r', encoding="latin-1") as f: input_data = f.readlines() - if self.exclude_nvrx_logs: - input_data = [line for line in input_data if "nvidia_resiliency_ext" not in line] - input_data = [ - line for line in input_data if "[workload:" not in line or 'Cycle:' in line - ] - logger.info(f"Excluded {len(input_data)} lines from the input data") - with open(os.path.join(os.path.dirname(path), "nvrx_logs_edited.txt"), 'w') as f: - f.writelines(input_data) - chunks = chunk_logs_strict(input_data) # Splitting the app log to cycles + # If is_per_cycle is set, skip filtering and chunking (data is already single-cycle) + if self.is_per_cycle: + logger.info("is_per_cycle=True: skipping nvrx log filtering and cycle chunking") + chunks = {0: input_data} + else: + if self.exclude_nvrx_logs: + input_data = [line for line in input_data if "nvidia_resiliency_ext" not in line] + input_data = [ + line for line in input_data if "[workload:" not in line or 'Cycle:' in line + ] + logger.info(f"Excluded {len(input_data)} lines from the input data") + with open(os.path.join(os.path.dirname(path), "nvrx_logs_edited.txt"), 'w') as f: + f.writelines(input_data) + chunks = chunk_logs_strict(input_data) # Splitting the app log to cycles + output_list = [ return_application_errors(self.llm, lines, self.lru_cache) for cycle, lines in chunks.items() @@ -130,21 +141,28 @@ async def analyze_logs(self) -> list[ApplicationData]: return output_list async def llm_analyze(self, output_list: list[ApplicationData]) -> list[str]: - return [ - ( - get_proposed_solution_cat(self.llm, output) - if len(output.application_errors_list_full) > 0 - else "No error found from application logs" - ) - for output in output_list - ] + + result = [] + logger.info("output_list_size: %s", str(len(output_list))) + for output in output_list: + if len(output.application_errors_list_full): + result.append(get_proposed_solution_cat(self.llm, output)) + else: + if output.finished == "LLM_FAILURE": + result.append(("LLM FAILURE","LLM FAILURE","LLM FAILURE","LLM FAILURE","")) + elif output.finished != "SLURM_CANCELLED": + result.append(("ERRORS NOT FOUND","ERRORS NOT FOUND","ERRORS NOT FOUND","ERRORS NOT FOUND","")) + else: + result.append(("RESTART IMMEDIATE","","""Attribution: Primary issues: ["SLURM STEP CANCELLED"], Secondary issues: []""","","")) + + return result async def print_output( self, attribution_results: list[str] ) -> list[tuple[str, AttributionState]]: output_list = [] for attribution_result in attribution_results: - if attribution_result != "No error found from application logs": + if attribution_result: # Concatenate all strings in attribution_result if it's a list/tuple logger.info(f"attribution_result: {attribution_result}") attr_state = ( @@ -157,10 +175,6 @@ async def print_output( else: concatenated_result = str(attribution_result) output_list.append((concatenated_result, attr_state)) - else: - output_list.append( - ("No error found from application logs", AttributionState.CONTINUE) - ) return output_list @@ -181,6 +195,11 @@ def main(): parser.add_argument( '--exclude_nvrx_logs', action='store_true', help='Exclude nvrx logs from the input data' ) + parser.add_argument( + '--is_per_cycle', + action='store_true', + help='Input is already per-cycle data (skip filtering and chunking)', + ) args = parser.parse_args() diff --git a/src/nvidia_resiliency_ext/attribution/mcp_integration/module_definitions.py b/src/nvidia_resiliency_ext/attribution/mcp_integration/module_definitions.py index 896ea057..22e4a56e 100644 --- a/src/nvidia_resiliency_ext/attribution/mcp_integration/module_definitions.py +++ b/src/nvidia_resiliency_ext/attribution/mcp_integration/module_definitions.py @@ -48,6 +48,11 @@ def register_all_modules(): "description": "Exclude NVRX internal logs", "default": False, }, + "is_per_cycle": { + "type": "boolean", + "description": "Input is already per-cycle data (skip filtering and chunking)", + "default": False, + }, }, "required": ["log_path"], }, diff --git a/src/nvidia_resiliency_ext/fault_tolerance/_ft_rendezvous.py b/src/nvidia_resiliency_ext/fault_tolerance/_ft_rendezvous.py index 4ae3bd06..39d2df04 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/_ft_rendezvous.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/_ft_rendezvous.py @@ -60,6 +60,7 @@ from nvidia_resiliency_ext.shared_utils.log_manager import LogConfig from ..shared_utils.health_check import ( + AttributionService, DistributedStorageHealthCheck, GPUHealthCheck, NicLinkStateHealthCheck, @@ -1233,6 +1234,8 @@ def from_backend( enable_dist_storage_healthcheck: bool = False, link_state_path_template: Optional[str] = None, storage_healthcheck_paths: Optional[list] = None, + attrsvc_host: Optional[str] = None, + attrsvc_port: Optional[int] = None, ): """Create a new :py:class:`FtRendezvousHandler`. @@ -1286,6 +1289,8 @@ def from_backend( enable_dist_storage_healthcheck=enable_dist_storage_healthcheck, link_state_path_template=link_state_path_template, storage_healthcheck_paths=storage_healthcheck_paths, + attrsvc_host=attrsvc_host, + attrsvc_port=attrsvc_port, ) def __init__( @@ -1299,6 +1304,8 @@ def __init__( enable_dist_storage_healthcheck: bool = False, link_state_path_template: Optional[str] = None, storage_healthcheck_paths: Optional[list] = None, + attrsvc_host: Optional[str] = None, + attrsvc_port: Optional[int] = None, ) -> None: if not settings.run_id: raise ValueError("The run id must be a non-empty string.") @@ -1357,6 +1364,16 @@ def __init__( StoragePathHealthCheck(storage_healthcheck_paths) if storage_healthcheck_paths else None ) + # Attribution service client (optional) + if attrsvc_host and attrsvc_port is not None: + self._attr_service = AttributionService( + log_path=None, + host=attrsvc_host, + port=int(attrsvc_port), + ) + else: + self._attr_service = None + def _record( self, message: str, @@ -1444,6 +1461,14 @@ def ensure_node_is_healthy(self) -> None: f"Node {self._this_node} has invalid or unreadable paths.", ) + # Perform optional log analysis (non-fatal); rely on service to log errors internally + if self._attr_service is not None: + # Use cycle logfile pre-exposed on handler by launcher + cycle_log_file = getattr(self, "_current_cycle_log_file", None) + if cycle_log_file: + self._attr_service(cycle_log_file) + log.debug(f"Scheduled AttributionService for path: {cycle_log_file}") + # Perform Node health check _nodehealth_checker = get_node_health_check() if _nodehealth_checker is not None: @@ -1848,6 +1873,8 @@ def create_handler( ) storage_healthcheck_paths = params.config.get('storage_healthcheck_paths', None) link_state_path_template = params.config.get('link_state_path_template', None) + attrsvc_host = params.config.get('attrsvc_host', None) + attrsvc_port = params.config.get('attrsvc_port', None) return FtRendezvousHandler.from_backend( params.run_id, @@ -1863,6 +1890,8 @@ def create_handler( enable_dist_storage_healthcheck=enable_dist_storage_healthcheck, link_state_path_template=link_state_path_template, storage_healthcheck_paths=storage_healthcheck_paths, + attrsvc_host=attrsvc_host, + attrsvc_port=attrsvc_port, ) except Exception as e: construct_and_record_rdzv_event( diff --git a/src/nvidia_resiliency_ext/fault_tolerance/config.py b/src/nvidia_resiliency_ext/fault_tolerance/config.py index 6152c66a..d013cddb 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/config.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/config.py @@ -85,6 +85,9 @@ class FaultToleranceConfig: * `install_exception_hook` [bool] if True, installs sys.excepthook to capture uncaught exceptions in training worker processes, format and log the traceback, and use os._exit() to exit the process reliably. Default: False. + * Attribution service (optional): + - `attrsvc_host` [str] hostname/IP of the attribution service + - `attrsvc_port` [int] port of the attribution service If any timeout is None, it has no effect (as if it was +INF). All timeouts can be deduced and set during runtime. @@ -119,6 +122,9 @@ class FaultToleranceConfig: min_progress_iterations: int = 200 progress_update_interval: float = 30.0 # Seconds between sending progress updates to launcher install_exception_hook: bool = False + # Attribution service configuration (optional) + attrsvc_host: Optional[str] = None + attrsvc_port: Optional[int] = None @property def is_progress_tracking_enabled(self) -> bool: diff --git a/src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py b/src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py index 9b3cc069..70ddc858 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/ft_rendezvous_barrier.py @@ -45,6 +45,7 @@ from nvidia_resiliency_ext.shared_utils.log_manager import LogConfig from ..shared_utils.health_check import ( + AttributionService, DistributedStorageHealthCheck, GPUHealthCheck, NicLinkStateHealthCheck, @@ -1061,6 +1062,8 @@ def from_backend( enable_dist_storage_healthcheck: bool = False, link_state_path_template: Optional[str] = None, storage_healthcheck_paths: Optional[list] = None, + attrsvc_host: Optional[str] = None, + attrsvc_port: Optional[int] = None, ): """Create a new :py:class:`FtRendezvousBarrierHandler`. @@ -1111,6 +1114,8 @@ def from_backend( enable_dist_storage_healthcheck=enable_dist_storage_healthcheck, link_state_path_template=link_state_path_template, storage_healthcheck_paths=storage_healthcheck_paths, + attrsvc_host=attrsvc_host, + attrsvc_port=attrsvc_port, ) def __init__( @@ -1124,6 +1129,8 @@ def __init__( enable_dist_storage_healthcheck: bool = False, link_state_path_template: Optional[str] = None, storage_healthcheck_paths: Optional[list] = None, + attrsvc_host: Optional[str] = None, + attrsvc_port: Optional[int] = None, ) -> None: if not settings.run_id: raise ValueError("The run id must be a non-empty string.") @@ -1176,6 +1183,16 @@ def __init__( StoragePathHealthCheck(storage_healthcheck_paths) if storage_healthcheck_paths else None ) + # Attribution client (optional) + if attrsvc_host and attrsvc_port is not None: + self._attr_service = AttributionService( + log_path=None, + host=attrsvc_host, + port=int(attrsvc_port), + ) + else: + self._attr_service = None + def set_worker_group(self, worker_group: Any) -> None: """Set the worker group reference for this handler.""" self._worker_group = worker_group @@ -1264,6 +1281,14 @@ def ensure_node_is_healthy(self) -> None: f"Node {self._this_node} has invalid or unreadable paths.", ) + # Perform optional log analysis (non-fatal); rely on service to log errors internally + if self._attr_service is not None: + # Use cycle logfile pre-exposed on handler by launcher + cycle_log_file = getattr(self, "_current_cycle_log_file", None) + if cycle_log_file: + self._attr_service(cycle_log_file) + log.debug(f"Scheduled AttributionService for path: {cycle_log_file}") + # Perform Node health check (external service if available) _nodehealth_checker = get_node_health_check() if _nodehealth_checker is not None: @@ -1546,6 +1571,9 @@ def create_handler( ) storage_healthcheck_paths = params.config.get('storage_healthcheck_paths', None) link_state_path_template = params.config.get('link_state_path_template', None) + logs_analysis_log_path = params.config.get('logs_analysis_log_path', None) + attrsvc_host = params.config.get('attrsvc_host', None) + attrsvc_port = params.config.get('attrsvc_port', None) return FtRendezvousBarrierHandler.from_backend( params.run_id, @@ -1561,6 +1589,8 @@ def create_handler( enable_dist_storage_healthcheck=enable_dist_storage_healthcheck, link_state_path_template=link_state_path_template, storage_healthcheck_paths=storage_healthcheck_paths, + attrsvc_host=attrsvc_host, + attrsvc_port=attrsvc_port, ) except Exception as e: construct_and_record_rdzv_event( diff --git a/src/nvidia_resiliency_ext/fault_tolerance/launcher.py b/src/nvidia_resiliency_ext/fault_tolerance/launcher.py index d315a0f5..8d806072 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/launcher.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/launcher.py @@ -1230,6 +1230,22 @@ def _rendezvous(self, worker_group: WorkerGroup) -> None: # this will always be FtRendezvousBarrierHandler or FtRendezvousHandler (legacy) spec.rdzv_handler.set_worker_group(worker_group) + # Precompute and expose the per-cycle consolidated logfile on the LogsSpecs so that + # rendezvous handlers can consume it during ensure_node_is_healthy() (which runs before + # workers are started and before LogsSpecs.reify()). + try: + from .per_cycle_logs import PerCycleLogsSpecs as _PCS + + if isinstance(self._logs_specs, _PCS): + # Compute same restart_count used for naming in _start_workers + restart_count = spec.max_restarts - self._remaining_restarts + cycle_log_file = self._logs_specs.get_cycle_log_file(restart_count) + # Expose directly on the rendezvous handler + setattr(spec.rdzv_handler, "_current_cycle_log_file", cycle_log_file) + except Exception: + # Best-effort; do not disrupt rendezvous if any issue occurs + pass + # Call the parent class _rendezvous method super()._rendezvous(worker_group) @@ -2506,6 +2522,24 @@ def get_args_parser() -> ArgumentParser: "format and log the traceback, and use os._exit() to exit the process reliably. Default: False.", ) + # Attribution service configuration (optional) + parser.add_argument( + "--ft-attrsvc-host", + "--ft_attrsvc_host", + type=str, + default=None, + dest="ft_attrsvc_host", + help="Hostname or IP for the attribution service (e.g., 127.0.0.1).", + ) + parser.add_argument( + "--ft-attrsvc-port", + "--ft_attrsvc_port", + type=int, + default=None, + dest="ft_attrsvc_port", + help="Port for the attribution service (e.g., 8000).", + ) + parser.add_argument( action='store_true', dest="ft_ignore_missing_cfg", @@ -2714,6 +2748,12 @@ def config_from_args(args) -> Tuple[LaunchConfig, Union[Callable, str], List[str # Pass enable_nic_healthcheck and link_state_path_template from fault tolerance config to rendezvous config rdzv_configs['enable_nic_healthcheck'] = fault_tol_cfg.enable_nic_healthcheck rdzv_configs['link_state_path_template'] = fault_tol_cfg.link_state_path_template + # Pass attribution service configuration if provided + if getattr(fault_tol_cfg, 'attrsvc_host', None): + rdzv_configs['attrsvc_host'] = fault_tol_cfg.attrsvc_host + if getattr(fault_tol_cfg, 'attrsvc_port', None) is not None: + rdzv_configs['attrsvc_port'] = int(fault_tol_cfg.attrsvc_port) + # Removed legacy logs_fastapi_enabled - presence of host/port auto-enables # Pass distributed storage health check configuration cli_dist_storage = getattr(args, 'ft_enable_dist_storage_healthcheck', None) if cli_dist_storage is not None: diff --git a/src/nvidia_resiliency_ext/fault_tolerance/per_cycle_logs.py b/src/nvidia_resiliency_ext/fault_tolerance/per_cycle_logs.py index 2cc349e1..68253a93 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/per_cycle_logs.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/per_cycle_logs.py @@ -137,11 +137,11 @@ def reify( global_env = envs[0] run_id = global_env.get("TORCHELASTIC_RUN_ID", "test_run_id") restart_count = global_env.get("TORCHELASTIC_RESTART_COUNT", "0") - - # Create per-cycle log file - base_without_ext = os.path.splitext(self._base_log_file)[0] - ext = os.path.splitext(self._base_log_file)[1] or ".log" - cycle_log_file = f"{base_without_ext}_cycle{restart_count}{ext}" + try: + cycle_idx = int(restart_count) + except (TypeError, ValueError): + cycle_idx = 0 + cycle_log_file = self.get_cycle_log_file(cycle_idx) # Create the consolidated log file if it doesn't exist # This serves two purposes: @@ -238,3 +238,11 @@ def __eq__(self, other: object) -> bool: if not isinstance(other, PerCycleLogsSpecs): return False return self._base_log_file == other._base_log_file + + def get_cycle_log_file(self, cycle_index: int) -> str: + """ + Instance helper to build cycle logfile for this spec's base path. + """ + base_without_ext = os.path.splitext(self._base_log_file)[0] + ext = os.path.splitext(self._base_log_file)[1] or ".log" + return f"{base_without_ext}_cycle{cycle_index}{ext}" diff --git a/src/nvidia_resiliency_ext/shared_utils/health_check.py b/src/nvidia_resiliency_ext/shared_utils/health_check.py index 301cd3c1..3c602e54 100644 --- a/src/nvidia_resiliency_ext/shared_utils/health_check.py +++ b/src/nvidia_resiliency_ext/shared_utils/health_check.py @@ -24,9 +24,12 @@ import traceback from collections import defaultdict from functools import wraps -from typing import Callable, Dict, Optional, Union +from typing import Any, Callable, Dict, Optional, Union +from urllib.parse import quote_plus import defusedxml.ElementTree as ET +import httpx +from pydantic import BaseModel from nvidia_resiliency_ext.shared_utils.log_manager import LogConfig @@ -1297,3 +1300,99 @@ def _perform_health_check(self) -> bool: if self.paths: logger.debug("all storage paths accessible:\n" + "\n".join(self.paths)) return True + + +class AttrSvcResult(BaseModel): + result: Any + status: str = "completed" + + +class AttributionService: + """ + Client that queries an external attribution service to analyze artifacts (e.g., logs). + Behavior: + - POSTs to submit for log analysis + - GETs results by the last submitted log_path + """ + + def __init__( + self, + log_path: Optional[str], + host: str, + port: int, + ): + self.host = host + self.port = port + self.log_path = log_path + # Track the most recent log_path we submitted + self._last_submitted: Optional[str] = None + + def __call__(self, log_path: Optional[str] = None) -> None: + """ + Fire-and-forget entrypoint. + - If an event loop is running, schedules the request with create_task and returns the Task. + - If no event loop is running, runs in a background thread and returns None. + """ + effective_path = log_path if log_path else self.log_path + if effective_path is None: + raise ValueError("log_path is required (provide at init or call time)") + try: + loop = asyncio.get_running_loop() + loop.create_task(self.get_attrsvc_result_async(effective_path)) + except RuntimeError: + # No running event loop; run in a background thread without blocking + threading.Thread( + target=lambda: asyncio.run(self.get_attrsvc_result_async(effective_path)), + daemon=True, + ).start() + return None + + async def get_attrsvc_result_async(self, log_path: str) -> None: + """ + Async caller for _get_attrsvc_result. + """ + await self._get_attrsvc_result(log_path) + + async def _get_attrsvc_result(self, log_path: str) -> None: + """ + Internal async method that interacts with the external attribution service: + - If a prior submission exists, GET results for the last submitted path + - Then, POST the new log_path to submit for analysis + """ + try: + async with httpx.AsyncClient(timeout=60.0) as client: + create_url = f"http://{self.host}:{self.port}/logs" + + # 1) If we previously submitted a path, GET its results + if self._last_submitted: + q_last = quote_plus(self._last_submitted) + get_url = f"{create_url}?log_path={q_last}" + try: + resp = await client.get(get_url, headers={"accept": "application/json"}) + if resp.status_code == 200: + payload = resp.json() if resp.text else {} + result = payload.get("result", payload) + status = payload.get("status", "completed") + attrsvc_result = AttrSvcResult(result=result, status=status) + logger.info("AttrSvcResult status=%s", attrsvc_result.status) + logger.info( + "AttrSvcResult result preview: %s", str(attrsvc_result.result)[:200] + ) + else: + logger.warning(f"AttributionService GET returned {resp.status_code}") + except Exception as e: + logger.warning(f"AttributionService GET failed: {e}") + + # 2) Submit the new path for analysis + await client.post( + create_url, + json={"log_path": log_path}, + headers={"accept": "application/json"}, + ) + self._last_submitted = log_path + logger.debug(f"AttributionService submitted: {log_path}") + + except Exception as e: + # Logging is sufficient; do not propagate exceptions + logger.warning(f"AttributionService request failed: {e}") + return None