diff --git a/src/uipath/tracing/_otel_exporters.py b/src/uipath/tracing/_otel_exporters.py index 22fb20553..99ad76728 100644 --- a/src/uipath/tracing/_otel_exporters.py +++ b/src/uipath/tracing/_otel_exporters.py @@ -2,7 +2,7 @@ import logging import os import time -from typing import Any, Dict, List, Optional, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence import httpx from opentelemetry.sdk.trace import ReadableSpan @@ -97,9 +97,19 @@ class Status: def __init__( self, trace_id: Optional[str] = None, + span_filter: Optional[Callable[[Dict[str, Any]], bool]] = None, **kwargs, ): - """Initialize the exporter with the base URL and authentication token.""" + """Initialize the exporter with the base URL and authentication token. + + Args: + trace_id: Optional custom trace ID to use for all spans + span_filter: Optional filter function that takes a span dict and returns True + if the span should be filtered out (dropped). Children of filtered + spans will be reparented to the filtered span's parent. + If not provided but UIPATH_FILTER_SPAN_NAMES is set, a default + filter will be created to filter spans with those names. + """ super().__init__(**kwargs) self.base_url = self._get_base_url() self.auth_token = os.environ.get("UIPATH_ACCESS_TOKEN") @@ -113,14 +123,42 @@ def __init__( self.http_client = httpx.Client(**client_kwargs, headers=self.headers) self.trace_id = trace_id + # Set up span filter - use provided filter or create from env var + if span_filter: + self.span_filter = span_filter + else: + # Check for default filter names via environment variable + # UIPATH_FILTER_SPAN_NAMES can be comma-separated list, e.g. "LangGraph,OtherName" + filter_names = os.environ.get("UIPATH_FILTER_SPAN_NAMES", "") + if filter_names: + names_set = frozenset( + n.strip() for n in filter_names.split(",") if n.strip() + ) + # Use default arg to capture names_set by value + self.span_filter = lambda span, ns=names_set: span.get("Name") in ns + logger.info( + f"[Init] Created default span filter for names: {names_set}" + ) + else: + self.span_filter = None + + # Track filtered span IDs across batches: filtered_id -> new_parent_id + self._reparent_mapping: Dict[str, str] = {} + logger.info( + f"[Init] LlmOpsHttpExporter initialized, " + f"span_filter={'set' if self.span_filter else 'not set'}, " + f"UIPATH_FILTER_PARENT_SPAN={os.environ.get('UIPATH_FILTER_PARENT_SPAN', 'not set')}, " + f"UIPATH_PARENT_SPAN_ID={os.environ.get('UIPATH_PARENT_SPAN_ID', 'not set')}" + ) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: """Export spans to UiPath LLM Ops.""" if len(spans) == 0: logger.warning("No spans to export") return SpanExportResult.SUCCESS - logger.debug( - f"Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans" + logger.info( + f"[Export] Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans" ) # Use optimized path: keep attributes as dict for processing @@ -132,6 +170,23 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: for span in spans ] + # Log span names for debugging + span_names = [s.get("Name") for s in span_list] + logger.info(f"[Export] Span names in batch: {span_names}") + + # Apply filtering and reparenting if filter is configured + filter_enabled = os.environ.get("UIPATH_FILTER_PARENT_SPAN") + if filter_enabled: + span_list = self._filter_and_reparent_spans(span_list) + else: + logger.info( + "[Export] Filtering DISABLED (UIPATH_FILTER_PARENT_SPAN not set)" + ) + + if len(span_list) == 0: + logger.debug("No spans to export after filtering") + return SpanExportResult.SUCCESS + url = self._build_url(span_list) # Process spans in-place - work directly with dict @@ -149,6 +204,165 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._send_with_retries(url, span_list) + def _filter_and_reparent_spans( + self, span_list: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Filter out spans and reparent their children. + + Rules: + 1. Root spans (uipath.is_root=True) are DROPPED, children reparented to UIPATH_PARENT_SPAN_ID + 2. Spans matching span_filter are DROPPED, children reparented to filtered span's parent + + Args: + span_list: List of span dicts to filter + + Returns: + Filtered list of spans with updated ParentIds + """ + new_parent_id = os.environ.get("UIPATH_PARENT_SPAN_ID") + if not new_parent_id: + logger.info("[Filter] UIPATH_PARENT_SPAN_ID not set, skipping filtering") + return span_list + + logger.info( + f"[Filter] Starting filter with {len(span_list)} spans, " + f"UIPATH_PARENT_SPAN_ID={new_parent_id}, " + f"span_filter={'set' if self.span_filter else 'not set'}" + ) + + # First pass: identify spans to filter and build reparent mapping + logger.info("[Filter] === FIRST PASS: Identifying spans to filter ===") + for span in span_list: + span_id = span.get("Id") + span_name = span.get("Name") + span_parent_id = span.get("ParentId") + attributes = span.get("Attributes", {}) + + logger.info( + f"[Filter] Checking span: Id={span_id}, Name={span_name}, " + f"ParentId={span_parent_id}, attributes_type={type(attributes).__name__}" + ) + + if not isinstance(attributes, dict): + logger.info("[Filter] -> Skipping (attributes not a dict)") + continue + + is_root = attributes.get("uipath.is_root", False) + original_parent_id = attributes.get("uipath.original_parent_id") + + logger.info( + f"[Filter] -> is_root={is_root}, original_parent_id={original_parent_id}" + ) + + # Rule 1: Root spans are dropped, children go to UIPATH_PARENT_SPAN_ID + if is_root: + self._reparent_mapping[span_id] = new_parent_id + logger.info( + f"[Filter] Root span marked for filtering: " + f"Id={span_id}, Name={span.get('Name')}, " + f"children will be reparented to {new_parent_id}" + ) + continue + + # Rule 2: Check custom filter function + if not self.span_filter: + logger.info("[Filter] -> KEEP (no custom filter set)") + continue + + filter_result = self.span_filter(span) + logger.info(f"[Filter] -> Custom filter result: {filter_result}") + + if not filter_result: + logger.info("[Filter] -> KEEP (custom filter returned False)") + continue + + # Filtered span's children go to this span's parent + # Use original_parent_id if available, otherwise use current ParentId + parent = original_parent_id or span.get("ParentId") + if parent: + # Check if parent itself was filtered (transitive reparenting) + while parent in self._reparent_mapping: + parent = self._reparent_mapping[parent] + self._reparent_mapping[span_id] = parent + else: + self._reparent_mapping[span_id] = new_parent_id + logger.info( + f"[Filter] -> WILL FILTER (custom filter matched), " + f"children will be reparented to {self._reparent_mapping[span_id]}" + ) + + logger.info( + f"[Filter] After first pass, reparent_mapping has {len(self._reparent_mapping)} entries: " + f"{self._reparent_mapping}" + ) + + # Build set of span IDs in current batch for orphan detection + batch_span_ids = {span.get("Id") for span in span_list} + logger.info(f"[Filter] Batch span IDs: {batch_span_ids}") + + # Second pass: filter spans and reparent children + logger.info( + f"[Filter] === SECOND PASS: Filtering and reparenting === " + f"(mapping has {len(self._reparent_mapping)} entries)" + ) + filtered_spans = [] + for span in span_list: + span_id = span.get("Id") + span_name = span.get("Name") + parent_id = span.get("ParentId") + + # Skip filtered spans + if span_id in self._reparent_mapping: + logger.info( + f"[Filter] DROPPING span: Id={span_id}, Name={span_name}" + ) + continue + + # Reparent if parent was filtered + parent_in_mapping = parent_id and parent_id in self._reparent_mapping + parent_in_batch = parent_id and parent_id in batch_span_ids + parent_is_orphan = parent_id and not parent_in_mapping and not parent_in_batch + + logger.info( + f"[Filter] Checking span: Id={span_id}, Name={span_name}, " + f"ParentId={parent_id}, parent_in_mapping={parent_in_mapping}, " + f"parent_in_batch={parent_in_batch}, parent_is_orphan={parent_is_orphan}" + ) + + if parent_in_mapping: + old_parent = parent_id + # Follow the chain for transitive reparenting + while parent_id in self._reparent_mapping: + parent_id = self._reparent_mapping[parent_id] + span["ParentId"] = parent_id + logger.info( + f"[Filter] REPARENTING span: Id={span_id}, Name={span_name}, " + f"ParentId: {old_parent} -> {parent_id}" + ) + elif parent_is_orphan: + # Parent span was never seen - it was likely filtered externally or never exported + # Reparent to UIPATH_PARENT_SPAN_ID and add to mapping for future children + old_parent = parent_id + self._reparent_mapping[parent_id] = new_parent_id + span["ParentId"] = new_parent_id + logger.info( + f"[Filter] REPARENTING ORPHAN span: Id={span_id}, Name={span_name}, " + f"ParentId: {old_parent} -> {new_parent_id} (parent not in batch or mapping)" + ) + else: + logger.info( + f"[Filter] KEEPING span unchanged: Id={span_id}, Name={span_name}, " + f"ParentId={parent_id}" + ) + + filtered_spans.append(span) + + logger.info( + f"[Filter] Complete: {len(span_list)} input -> {len(filtered_spans)} output spans, " + f"mapping size: {len(self._reparent_mapping)}" + ) + return filtered_spans + def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush the exporter.""" return True diff --git a/src/uipath/tracing/_utils.py b/src/uipath/tracing/_utils.py index 1675d4f01..75bb65001 100644 --- a/src/uipath/tracing/_utils.py +++ b/src/uipath/tracing/_utils.py @@ -212,8 +212,13 @@ def otel_span_to_uipath_span( # Get parent span ID if it exists parent_id = None + is_root = otel_span.parent is None + original_parent_id: Optional[str] = None + if otel_span.parent is not None: parent_id = _SpanUtils.span_id_to_uuid4(otel_span.parent.span_id) + # Store original parent ID for potential reparenting later + original_parent_id = str(parent_id) else: # Only set UIPATH_PARENT_SPAN_ID for root spans (spans without a parent) parent_span_id_str = env.get("UIPATH_PARENT_SPAN_ID") @@ -226,6 +231,12 @@ def otel_span_to_uipath_span( # Only copy if we need to modify - we'll build attributes_dict lazily attributes_dict: dict[str, Any] = dict(otel_attrs) if otel_attrs else {} + # Add markers for filtering/reparenting in the exporter + if is_root: + attributes_dict["uipath.is_root"] = True + if original_parent_id: + attributes_dict["uipath.original_parent_id"] = original_parent_id + # Map status status = 1 # Default to OK if otel_span.status.status_code == StatusCode.ERROR: