diff --git a/industries/asset_lifecycle_management_agent/README.md b/industries/asset_lifecycle_management_agent/README.md index c2acdd906..14a6d2740 100644 --- a/industries/asset_lifecycle_management_agent/README.md +++ b/industries/asset_lifecycle_management_agent/README.md @@ -22,7 +22,7 @@ Multi-agent architecture designed for Asset Lifecycle Management with specialize - **ReAct Agent Workflow**: Main orchestration using ReAct pattern for intelligent decision-making - **SQL Retriever Tool**: Generates SQL queries using NIM LLM for asset data retrieval - **RUL Prediction Tool**: XGBoost model for remaining useful life prediction to optimize maintenance scheduling -- **Anomaly Detection Tool**: Detects anomalies in sensor data using time series foundational model for early failure detection +- **Anomaly Detection Tool**: Detects anomalies in sensor data using NV-Tesseract foundation model (NVIDIA NIM) for early failure detection, with MOMENT as an alternative - **Plotting Agents**: Multi-tool agent for data visualization and asset performance reporting - **Vector Database**: ChromaDB for storing table schema, Vanna training queries, and asset documentation @@ -554,13 +554,45 @@ Retrieve real RUL of each unit in the FD001 test dataset. Then plot a distributi ![Visualization Example](imgs/test_prompt_2.png) -**Anomaly Detection** +**Anomaly Detection with NV-Tesseract** + +The workflow uses [NV-Tesseract](https://developer.nvidia.com/blog/advancing-anomaly-detection-for-industry-applications-with-nvidia-nv-tesseract-ad/), NVIDIA's foundation model for time-series anomaly detection, as the default anomaly detection engine. NV-Tesseract is a foundational model where accuracy is highly data-dependent. However, the objective here is to demonstrate integration functionality. Its performance can be significantly improved through fine-tuning on domain-specific data—a capability currently on NVIDIA's NV-Tesseract roadmap. + ``` Retrieve and detect anomalies in sensor 4 measurements for engine number 78 in train FD001 dataset. ``` +**Sample Output:** +``` +NV TESSERACT NIM ANOMALY DETECTION COMPLETED SUCCESSFULLY + +Analysis Details: + • Engine Unit: 78 + • Sensor Analyzed: sensor_measurement_4 + • Model: NV Tesseract (NVIDIA Foundation Model) + +Anomaly Detection Results: + • Total Timesteps Analyzed: 231 + • Anomalous Timesteps Detected: 12 + • Anomaly Rate: 5.19% + +Output Files Generated: + • Enhanced Data with is_anomaly Column: retrieve_sensor_measurement_4__results.json + • Interactive HTML plot: anomaly_plot_sensor_measurement_4_engine78.html + • Static PNG image: anomaly_plot_sensor_measurement_4_engine78.png +``` + ![Anomaly Detection Example](imgs/test_prompt_4.png) +**Switching to MOMENT Foundation Model:** + +To use the MOMENT foundation model instead of NV-Tesseract, edit `configs/config-reasoning.yaml`: +1. Comment out the `nv_tesseract_anomaly_detection_tool` configuration +2. Uncomment the `moment_anomaly_detection_tool` configuration +3. Restart the workflow server + +Both models provide state-of-the-art anomaly detection capabilities for time-series data. + **Workspace Utilities Demo** ``` Retrieve RUL values and time in cycles for engine unit 24 from FD001 train dataset. Use the piece wise RUL transformation code utility to perform piecewise RUL transformation on the ground truth RUL values with MAXLIFE=100.Finally, Plot a comparison line chart with RUL values and its transformed values across time. @@ -581,6 +613,46 @@ Perform the following steps: *Note: This example automatically uses the workspace `apply_piecewise_rul_transformation` utility to create realistic knee-pattern RUL data for comparison, resulting in much cleaner and more meaningful visualizations.* +## Deploying NV-Tesseract NIM (Required for Anomaly Detection) + +Since NV-Tesseract is the default anomaly detection engine, you'll need to deploy the NV-Tesseract NIM container for anomaly detection capabilities. + +**Note:** Access to the NV-Tesseract NIM container requires approval. Contact your NVIDIA representative or request access through the [NVIDIA NGC Catalog](https://catalog.ngc.nvidia.com/). + +### Prerequisites +- NVIDIA GPU (A100, H100, or L40S recommended) +- Docker with NVIDIA Container Runtime +- NGC API key with NV-Tesseract access + +### Deploy NV-Tesseract NIM + +Set your NGC API key: +```bash +export NGC_API_KEY='your-ngc-api-key' +``` + +Deploy the NV-Tesseract NIM container: +```bash +docker run -d \ + --name nv-tesseract-nim \ + --gpus '"device=1"' \ + -p 8001:8000 \ + -e NGC_API_KEY=$NGC_API_KEY \ + --restart unless-stopped \ + nvcr.io/nim/nvidia/nv-tesseract:2.0.0 +``` + +Verify the deployment: +```bash +# Check container logs +docker logs -f nv-tesseract-nim + +# Health check +curl http://localhost:8001/v1/health/ready +``` + +**Note:** If you prefer to use the MOMENT foundation model instead (which doesn't require a NIM deployment), follow the instructions in the "Switching to MOMENT Foundation Model" section under Anomaly Detection above. + ## Observability (Optional) ### Monitor Your System with Phoenix diff --git a/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml b/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml index 6a061ae41..bff342d57 100644 --- a/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml +++ b/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml @@ -44,7 +44,7 @@ llms: # Data analysis and tool calling model analyst_llm: _type: nim - model_name: "qwen/qwen2.5-coder-32b-instruct" + model_name: "qwen/qwen3-coder-480b-a35b-instruct" # Python code generation model coding_llm: @@ -88,9 +88,19 @@ functions: scaler_path: "models/scaler_model.pkl" model_path: "models/xgb_model_fd001.pkl" + # Anomaly Detection Tool Configuration + # Default: NV-Tesseract (NVIDIA Foundation Model via NIM) anomaly_detection: - _type: moment_anomaly_detection_tool + _type: nv_tesseract_anomaly_detection_tool + nim_endpoint: "http://localhost:8001" + timeout: 120 output_folder: "output_data" + # custom_threshold: 3.0 # Optional: Lower threshold to catch gradual degradation (default: None for NIM auto-threshold) + + # Alternative: MOMENT Foundation Model (Comment out NV-Tesseract above and uncomment below to use MOMENT) + # anomaly_detection: + # _type: moment_anomaly_detection_tool + # output_folder: "output_data" plot_distribution: _type: plot_distribution_tool @@ -158,7 +168,7 @@ functions: Executing step: the step you are currently executing from the plan along with any instructions provided Thought: describe how you are going to execute the step Final Answer: the final answer to the original input question including the absolute file paths of the generated files with - `/Users/vikalluru/Documents/GenerativeAIExamples/industries/asset_lifecycle_management_agent/output_data/` prepended to the filename. + `./output_data/` prepended to the filename. **FORMAT 3 (when using a tool)** Input plan: Summarize all the steps in the plan. @@ -170,6 +180,7 @@ functions: ### HOW TO CHOOSE THE RIGHT TOOL ### Follow these guidelines while deciding the right tool to use: + **CRITICAL: When writing Action: tool_name, use PLAIN TEXT ONLY. Do NOT use markdown formatting like **tool_name**. Just write the tool name directly.** **Ensure that tool calls do not use single quotes or double quotes within the key-value pairs.** 1. **SQL Retrieval Tool** @@ -186,7 +197,7 @@ functions: - plot_comparison: to compare two columns of a dataset by plotting both of them on the same chart. 4. **Anomaly Detection Tools** - - Use anomaly_detection tool for state-of-the-art foundation model-based anomaly detection using MOMENT-1-Large. + - Use anomaly_detection tool for production-grade anomaly detection using NV Tesseract foundation model via NVIDIA NIM. - **REQUIRES JSON DATA**: First use sql_retriever to get sensor data, then pass the JSON file path to anomaly_detection. - **OUTPUT**: Creates enhanced sensor data with added 'is_anomaly' boolean column. - Use plot_anomaly to create interactive visualizations of anomaly detection results. diff --git a/industries/asset_lifecycle_management_agent/pyproject.toml b/industries/asset_lifecycle_management_agent/pyproject.toml index 757cc391d..404c79cb2 100644 --- a/industries/asset_lifecycle_management_agent/pyproject.toml +++ b/industries/asset_lifecycle_management_agent/pyproject.toml @@ -6,7 +6,7 @@ requires = ["setuptools >= 64"] name = "asset_lifecycle_management_agent" dynamic = ["version"] dependencies = [ - "nvidia-nat[profiling, langchain, telemetry]==1.3.0", + "nvidia-nat[profiling, langchain, telemetry]==1.2.1", "momentfm", "vanna==0.7.9", "chromadb", diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/predictors/nv_tesseract_anomaly_detection_tool.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/predictors/nv_tesseract_anomaly_detection_tool.py new file mode 100644 index 000000000..53b85f2d0 --- /dev/null +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/predictors/nv_tesseract_anomaly_detection_tool.py @@ -0,0 +1,280 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import pandas as pd +import numpy as np +import requests +from typing import List, Dict, Any +from pydantic import Field, BaseModel + +from nat.builder.builder import Builder +from nat.builder.function_info import FunctionInfo +from nat.cli.register_workflow import register_function +from nat.data_models.function import FunctionBaseConfig + +logger = logging.getLogger(__name__) + + +class NVTesseractAnomalyDetectionToolConfig(FunctionBaseConfig, name="nv_tesseract_anomaly_detection_tool"): + """ + NeMo Agent Toolkit function to perform anomaly detection using NV Tesseract NIM. + """ + nim_endpoint: str = Field( + description="NV Tesseract NIM endpoint URL", + default="http://localhost:8001" + ) + timeout: int = Field( + description="Request timeout in seconds", + default=120 + ) + output_folder: str = Field( + description="The path to the output folder to save results.", + default="./output_data" + ) + + +@register_function(config_type=NVTesseractAnomalyDetectionToolConfig) +async def nv_tesseract_anomaly_detection_tool( + config: NVTesseractAnomalyDetectionToolConfig, builder: Builder +): + class NVTesseractAnomalyDetectionInputSchema(BaseModel): + sensor_data_json_path: str = Field( + description="Path to JSON file containing sensor data (from sql_retriever tool)" + ) + engine_unit: int = Field( + description="Engine unit number to analyze", + default=5 + ) + sensor_name: str = Field( + description="Name of the sensor to analyze (e.g., 'sensor_measurement_1', 'sensor_measurement_4')", + default="sensor_measurement_1" + ) + + def call_nv_tesseract_nim(data_points: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Call NV Tesseract NIM API for anomaly detection. + + Args: + data_points: List of {"ts": timestamp_or_index, "value": sensor_value} + + Returns: + List of results with added anomaly detection fields + """ + endpoint = f"{config.nim_endpoint}/v2/detect-anomalies" + + try: + logger.info(f"Calling NV Tesseract NIM at {endpoint}") + logger.info(f"Sending {len(data_points)} data points") + + response = requests.post( + endpoint, + json=data_points, + timeout=config.timeout, + headers={"Content-Type": "application/json"} + ) + + response.raise_for_status() + results = response.json() + + logger.info(f"Received {len(results)} results from NV Tesseract NIM") + return results + + except requests.exceptions.RequestException as e: + logger.error(f"Error calling NV Tesseract NIM: {e}") + raise RuntimeError(f"Failed to call NV Tesseract NIM: {e}") + + def prepare_data_for_nim(df: pd.DataFrame, sensor_name: str) -> List[Dict[str, Any]]: + """Convert DataFrame to NV Tesseract NIM input format. + + Args: + df: DataFrame with time series data + sensor_name: Name of sensor column to process + + Returns: + List of {"ts": index, "value": sensor_value} + """ + if sensor_name not in df.columns: + raise ValueError(f"Sensor '{sensor_name}' not found in data. Available: {df.columns.tolist()}") + + data_points = [] + for idx, row in df.iterrows(): + data_points.append({ + "ts": int(idx), # Use index as timestamp + "value": float(row[sensor_name]) + }) + + logger.info(f"Prepared {len(data_points)} data points for NIM") + return data_points + + def process_nim_results(df: pd.DataFrame, nim_results: List[Dict[str, Any]]) -> pd.DataFrame: + """Add NIM anomaly detection results to DataFrame. + + Args: + df: Original DataFrame + nim_results: Results from NIM v2 with Anomaly field (boolean) and MAE metric + + Returns: + DataFrame with added is_anomaly boolean column + """ + # Extract anomaly labels (boolean in v2.0.0) + anomalies = [result["Anomaly"] for result in nim_results] + + # Add to DataFrame + df_result = df.copy() + df_result['is_anomaly'] = [bool(a) for a in anomalies] + + # Add MAE metric from NIM v2 + df_result['anomaly_score'] = [result.get("MAE", 0.0) for result in nim_results] + + logger.info(f"Processed {len(anomalies)} anomaly results") + logger.info(f"Detected {sum(anomalies)} anomalies") + + return df_result + + async def _response_fn( + sensor_data_json_path: str, + engine_unit: int = 5, + sensor_name: str = "sensor_measurement_1" + ) -> str: + """ + Perform anomaly detection using NV Tesseract NIM on JSON data from sql_retriever. + """ + try: + # Validate inputs + if not sensor_data_json_path.lower().endswith('.json'): + return "sensor_data_json_path must be a path to a JSON file (ending with .json)" + + if not os.path.exists(sensor_data_json_path): + return f"JSON file not found at path: {sensor_data_json_path}" + + # Load data from JSON file + from ..plotting.plot_utils import load_data_from_json + combined_df = load_data_from_json(sensor_data_json_path, config.output_folder) + + if combined_df is None or combined_df.empty: + return f"Could not load data or data is empty from JSON file: {sensor_data_json_path}" + + # Filter for specific engine unit + if 'unit_number' in combined_df.columns: + engine_data = combined_df[combined_df['unit_number'] == engine_unit] + if engine_data.empty: + available_units = sorted(combined_df['unit_number'].unique()) + return f"No data found for engine unit {engine_unit}. Available units: {available_units}" + else: + engine_data = combined_df + + # Sort by cycle for proper time series analysis + if 'time_in_cycles' in engine_data.columns: + engine_data = engine_data.sort_values('time_in_cycles').reset_index(drop=True) + else: + engine_data = engine_data.reset_index(drop=True) + + logger.info(f"Engine data shape: {engine_data.shape}") + logger.info(f"Analyzing sensor: {sensor_name}") + + # Prepare data for NIM + data_points = prepare_data_for_nim(engine_data, sensor_name) + + # Call NV Tesseract NIM + logger.info("Calling NV Tesseract NIM for anomaly detection...") + nim_results = call_nv_tesseract_nim(data_points) + + # Process results and add to DataFrame + result_df = process_nim_results(engine_data, nim_results) + + # Calculate summary statistics + total_anomalies = result_df['is_anomaly'].sum() + anomaly_rate = (total_anomalies / len(result_df)) * 100 + + # Save results + os.makedirs(config.output_folder, exist_ok=True) + + if not os.path.isabs(sensor_data_json_path): + save_path = os.path.join(config.output_folder, os.path.basename(sensor_data_json_path)) + else: + results_filename = f"nv_tesseract_anomaly_results_engine{engine_unit}.json" + save_path = os.path.join(config.output_folder, results_filename) + + result_df.to_json(save_path, orient='records', indent=2) + + # Build response + response_parts = [ + "NV TESSERACT NIM ANOMALY DETECTION COMPLETED SUCCESSFULLY", + "", + f"Analysis Details:", + f" • Engine Unit: {engine_unit}", + f" • Source Data: {os.path.basename(sensor_data_json_path)}", + f" • Sensor Analyzed: {sensor_name}", + f" • Model: NV Tesseract (NVIDIA Foundation Model)", + f" • NIM Endpoint: {config.nim_endpoint}", + "", + f"Anomaly Detection Results:", + f" • Total Timesteps Analyzed: {len(result_df)}", + f" • Anomalous Timesteps Detected: {total_anomalies}", + f" • Anomaly Rate: {anomaly_rate:.2f}%", + "", + f"Output Files Generated:", + f" • Enhanced Data with is_anomaly Column: {os.path.relpath(save_path, config.output_folder)}", + "", + f"Key Insights:", + f" • NV Tesseract provides production-grade anomaly detection via NIM", + f" • Scalable inference with GPU acceleration", + f" • {total_anomalies} anomalous time periods identified", + "", + f"Output Format:", + f" • Original sensor data with added 'is_anomaly' boolean column", + f" • Additional metrics: anomaly_score, lower_threshold, upper_threshold", + f" • Use the enhanced JSON file with plot_anomaly_tool for visualization", + "", + "NV TESSERACT ANOMALY DETECTION COMPLETE" + ] + + return "\n".join(response_parts) + + except Exception as e: + error_msg = f"Error performing NV Tesseract anomaly detection: {e}" + logger.error(error_msg) + return error_msg + + description = """ + Perform production-grade anomaly detection using NV Tesseract foundation model via NVIDIA NIM. + Outputs detailed anomaly detection results. Use plot_anomaly_tool afterward for visualization. + + Input: + - sensor_data_json_path: File path to JSON containing sensor data with timestamp and engine unit columns + - engine_unit: Engine unit number to analyze (default: 5) + - sensor_name: Name of sensor to analyze (e.g., 'sensor_measurement_1', 'sensor_measurement_4') + + Output: + - JSON file with original data plus 'is_anomaly' boolean column + - Additional NIM v2 metrics (MAE - Mean Absolute Error) + - Comprehensive analysis summary + """ + + yield FunctionInfo.from_fn( + _response_fn, + input_schema=NVTesseractAnomalyDetectionInputSchema, + description=description + ) + + try: + pass + except GeneratorExit: + logger.info("NV Tesseract anomaly detection function exited early!") + finally: + logger.info("Cleaning up NV Tesseract anomaly detection workflow.") + diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/register.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/register.py index cd6a71a20..2e0ee0ff4 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/register.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/register.py @@ -26,5 +26,6 @@ from .plotting import plot_anomaly_tool from .plotting import code_generation_assistant from .predictors import moment_anomaly_detection_tool +from .predictors import nv_tesseract_anomaly_detection_tool from .evaluators import llm_judge_evaluator_register from .evaluators import multimodal_llm_judge_evaluator_register diff --git a/industries/asset_lifecycle_management_agent/vanna_training_data.yaml b/industries/asset_lifecycle_management_agent/vanna_training_data.yaml index 95f6ebcd5..26090f688 100644 --- a/industries/asset_lifecycle_management_agent/vanna_training_data.yaml +++ b/industries/asset_lifecycle_management_agent/vanna_training_data.yaml @@ -98,6 +98,12 @@ documentation: | - When asked "How many units" → Use COUNT(DISTINCT unit_number) to count unique engines - When asked "How many records/data points/measurements/entries/rows" → Use COUNT(*) to count all records + Sensor Data Retrieval Pattern (CRITICAL FOR ANOMALY DETECTION): + - When retrieving sensor measurements, ALWAYS include: unit_number, time_in_cycles, and the requested sensor column(s) + - Example: For "Retrieve sensor_measurement_4 for unit 78" → SELECT unit_number, time_in_cycles, sensor_measurement_4 FROM train_FD001 WHERE unit_number = 78 ORDER BY time_in_cycles + - These columns are required for downstream time-series analysis and visualization tools + - NEVER retrieve only the sensor measurement column without unit_number and time_in_cycles + RUL Handling (CRITICAL - YOU MUST DISTINGUISH): 1. GROUND TRUTH RUL (for test data): @@ -186,4 +192,7 @@ question_sql_pairs: sql: "SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD003 ORDER BY unit_number, time_in_cycles" - question: "Get ground truth RUL values for all units in test FD002" - sql: "SELECT unit_number, RUL FROM RUL_FD002 ORDER BY unit_number" \ No newline at end of file + sql: "SELECT unit_number, RUL FROM RUL_FD002 ORDER BY unit_number" + + - question: "Retrieve sensor_measurement_4 data for engine unit 78 from FD001 training dataset" + sql: "SELECT unit_number, time_in_cycles, sensor_measurement_4 FROM train_FD001 WHERE unit_number = 78 ORDER BY time_in_cycles" \ No newline at end of file