Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions docs/splunk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Splunk HEC Integration

Lightspeed Core Stack can send inference telemetry events to Splunk via the HTTP Event Collector (HEC) protocol for monitoring and analytics.

## Overview

When enabled, the service sends telemetry events for:

- **Successful inference requests** (`infer_with_llm` sourcetype)
- **Failed inference requests** (`infer_error` sourcetype)

Events are sent asynchronously in the background and never block or affect the main request flow.

## Configuration

Add the `splunk` section to your `lightspeed-stack.yaml`:

```yaml
splunk:
enabled: true
url: "https://splunk.corp.example.com:8088/services/collector"
token_path: "/var/secrets/splunk-hec-token"
index: "rhel_lightspeed"
source: "lightspeed-stack"
timeout: 5
verify_ssl: true

deployment_environment: "production"
```

### Configuration Options

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `enabled` | bool | No | `false` | Enable/disable Splunk integration |
| `url` | string | Yes* | - | Splunk HEC endpoint URL |
| `token_path` | string | Yes* | - | Path to file containing HEC token |
| `index` | string | Yes* | - | Target Splunk index |
| `source` | string | No | `lightspeed-stack` | Event source identifier |
| `timeout` | int | No | `5` | HTTP timeout in seconds |
| `verify_ssl` | bool | No | `true` | Verify SSL certificates |

*Required when `enabled: true`

### Token File

Store your HEC token in a file (not directly in the config):

```bash
echo "your-hec-token-here" > /var/secrets/splunk-hec-token
chmod 600 /var/secrets/splunk-hec-token
```

The token is read from file on each request, supporting rotation without service restart.

## Event Format

Events follow the rlsapi telemetry format for consistency with existing analytics.

### HEC Envelope

```json
{
"time": 1737470400,
"host": "pod-lcs-abc123",
"source": "lightspeed-stack (v1.0.0)",
"sourcetype": "infer_with_llm",
"index": "rhel_lightspeed",
"event": { ... }
}
```

### Event Payload

```json
{
"question": "How do I configure SSH?",
"refined_questions": [],
"context": "",
"response": "To configure SSH, edit /etc/ssh/sshd_config...",
"inference_time": 2.34,
"model": "granite-3-8b-instruct",
"deployment": "production",
"org_id": "12345678",
"system_id": "abc-def-123",
"total_llm_tokens": 0,
"request_id": "req_xyz789",
"cla_version": "CLA/0.4.0",
"system_os": "RHEL",
"system_version": "9.3",
"system_arch": "x86_64"
}
```

### Field Descriptions

| Field | Description |
|-------|-------------|
| `question` | User's original question |
| `refined_questions` | Reserved for RAG (empty array) |
| `context` | Reserved for RAG (empty string) |
| `response` | LLM-generated response text |
| `inference_time` | Time in seconds for LLM inference |
| `model` | Model identifier from configuration |
| `deployment` | Value of `deployment_environment` config |
| `org_id` | Organization ID from RH Identity, or `auth_disabled` |
| `system_id` | System CN from RH Identity, or `auth_disabled` |
| `total_llm_tokens` | Reserved for token counting (currently `0`) |
| `request_id` | Unique request identifier |
| `cla_version` | Client User-Agent header |
| `system_os` | Client operating system |
| `system_version` | Client OS version |
| `system_arch` | Client CPU architecture |

## Endpoints

Currently, Splunk telemetry is enabled for:

| Endpoint | Sourcetype (Success) | Sourcetype (Error) |
|----------|---------------------|-------------------|
| `/rlsapi/v1/infer` | `infer_with_llm` | `infer_error` |

## Graceful Degradation

The Splunk client is designed for resilience:

- **Disabled by default**: No impact when not configured
- **Non-blocking**: Events sent via FastAPI BackgroundTasks
- **Fail-safe**: HTTP errors logged as warnings, never raise exceptions
- **Missing config**: Silently skips when required fields are missing

## Troubleshooting

### Events Not Appearing in Splunk

1. Verify `splunk.enabled: true` in config
2. Check token file exists and is readable
3. Verify HEC endpoint URL is correct
4. Check service logs for warning messages:
```
Splunk HEC request failed with status 403: Invalid token
```

### Connection Timeouts

Increase the timeout value:

```yaml
splunk:
timeout: 10
```

### SSL Certificate Errors

For development/testing with self-signed certs:

```yaml
splunk:
verify_ssl: false
```

**Warning**: Do not disable SSL verification in production.

## Extending to Other Endpoints

See [src/observability/README.md](../src/observability/README.md) for developer documentation on adding Splunk telemetry to additional endpoints.
120 changes: 118 additions & 2 deletions src/app/endpoints/rlsapi_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"""

import logging
import time
from typing import Annotated, Any, cast

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from llama_stack.apis.agents.openai_responses import OpenAIResponseObject
from llama_stack_client import APIConnectionError, APIStatusError, RateLimitError

import constants
import metrics
from authentication import get_auth_dependency
from authentication.interface import AuthTuple
from authentication.rh_identity import RHIdentityData
from authorization.middleware import authorize
from client import AsyncLlamaStackClientHolder
from configuration import configuration
Expand All @@ -29,12 +31,41 @@
)
from models.rlsapi.requests import RlsapiV1InferRequest, RlsapiV1SystemInfo
from models.rlsapi.responses import RlsapiV1InferData, RlsapiV1InferResponse
from observability import InferenceEventData, build_inference_event, send_splunk_event
from utils.responses import extract_text_from_response_output_item
from utils.suid import get_suid

logger = logging.getLogger(__name__)
router = APIRouter(tags=["rlsapi-v1"])

# Default values when RH Identity auth is not configured
AUTH_DISABLED = "auth_disabled"


def _get_rh_identity_context(request: Request) -> tuple[str, str]:
"""Extract org_id and system_id from RH Identity request state.

When RH Identity authentication is configured, the auth dependency stores
the RHIdentityData object in request.state.rh_identity_data. This function
extracts the org_id and system_id for telemetry purposes.

Args:
request: The FastAPI request object.

Returns:
Tuple of (org_id, system_id). Returns ("auth_disabled", "auth_disabled")
when RH Identity auth is not configured or data is unavailable.
"""
rh_identity: RHIdentityData | None = getattr(
request.state, "rh_identity_data", None
)
if rh_identity is None:
return AUTH_DISABLED, AUTH_DISABLED

org_id = rh_identity.get_org_id() or AUTH_DISABLED
system_id = rh_identity.get_user_id() or AUTH_DISABLED
return org_id, system_id


infer_responses: dict[int | str, dict[str, Any]] = {
200: RlsapiV1InferResponse.openapi_response(),
Expand Down Expand Up @@ -148,10 +179,52 @@ async def retrieve_simple_response(question: str, instructions: str) -> str:
)


def _get_cla_version(request: Request) -> str:
"""Extract CLA version from User-Agent header."""
return request.headers.get("User-Agent", "")


def _queue_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
background_tasks: BackgroundTasks,
infer_request: RlsapiV1InferRequest,
request: Request,
request_id: str,
response_text: str,
inference_time: float,
sourcetype: str,
) -> None:
"""Build and queue a Splunk telemetry event for background sending."""
org_id, system_id = _get_rh_identity_context(request)
systeminfo = infer_request.context.systeminfo

event_data = InferenceEventData(
question=infer_request.question,
response=response_text,
inference_time=inference_time,
model=(
(configuration.inference.default_model or "")
if configuration.inference
else ""
),
org_id=org_id,
system_id=system_id,
request_id=request_id,
cla_version=_get_cla_version(request),
system_os=systeminfo.os,
system_version=systeminfo.version,
system_arch=systeminfo.arch,
)

event = build_inference_event(event_data)
background_tasks.add_task(send_splunk_event, event, sourcetype)


@router.post("/infer", responses=infer_responses)
@authorize(Action.RLSAPI_V1_INFER)
async def infer_endpoint(
infer_request: RlsapiV1InferRequest,
request: Request,
background_tasks: BackgroundTasks,
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
) -> RlsapiV1InferResponse:
"""Handle rlsapi v1 /infer requests for stateless inference.
Expand All @@ -163,6 +236,8 @@ async def infer_endpoint(

Args:
infer_request: The inference request containing question and context.
request: The FastAPI request object for accessing headers and state.
background_tasks: FastAPI background tasks for async Splunk event sending.
auth: Authentication tuple from the configured auth provider.

Returns:
Expand All @@ -174,7 +249,6 @@ async def infer_endpoint(
# Authentication enforced by get_auth_dependency(), authorization by @authorize decorator.
_ = auth

# Generate unique request ID
request_id = get_suid()

logger.info("Processing rlsapi v1 /infer request %s", request_id)
Expand All @@ -185,35 +259,77 @@ async def infer_endpoint(
"Request %s: Combined input source length: %d", request_id, len(input_source)
)

start_time = time.monotonic()
try:
response_text = await retrieve_simple_response(input_source, instructions)
inference_time = time.monotonic() - start_time
except APIConnectionError as e:
inference_time = time.monotonic() - start_time
metrics.llm_calls_failures_total.inc()
logger.error(
"Unable to connect to Llama Stack for request %s: %s", request_id, e
)
_queue_splunk_event(
background_tasks,
infer_request,
request,
request_id,
str(e),
inference_time,
"infer_error",
)
response = ServiceUnavailableResponse(
backend_name="Llama Stack",
cause=str(e),
)
raise HTTPException(**response.model_dump()) from e
except RateLimitError as e:
inference_time = time.monotonic() - start_time
metrics.llm_calls_failures_total.inc()
logger.error("Rate limit exceeded for request %s: %s", request_id, e)
_queue_splunk_event(
background_tasks,
infer_request,
request,
request_id,
str(e),
inference_time,
"infer_error",
)
response = QuotaExceededResponse(
response="The quota has been exceeded", cause=str(e)
)
raise HTTPException(**response.model_dump()) from e
except APIStatusError as e:
inference_time = time.monotonic() - start_time
metrics.llm_calls_failures_total.inc()
logger.exception("API error for request %s: %s", request_id, e)
_queue_splunk_event(
background_tasks,
infer_request,
request,
request_id,
str(e),
inference_time,
"infer_error",
)
response = InternalServerErrorResponse.generic()
raise HTTPException(**response.model_dump()) from e

if not response_text:
logger.warning("Empty response from LLM for request %s", request_id)
response_text = constants.UNABLE_TO_PROCESS_RESPONSE

_queue_splunk_event(
background_tasks,
infer_request,
request,
request_id,
response_text,
inference_time,
"infer_with_llm",
)

logger.info("Completed rlsapi v1 /infer request %s", request_id)

return RlsapiV1InferResponse(
Expand Down
Loading
Loading