From f963a08487a0ba8fc6a486aeec43fa5f0d238114 Mon Sep 17 00:00:00 2001 From: Chibi Vikram Date: Thu, 18 Dec 2025 12:28:59 -0800 Subject: [PATCH] refactor: extract payload builders and tracing into reusable modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract evaluation reporting logic into dedicated modules for better code organization, reusability, and separation of concerns: - Add _eval_tracing.py: EvalTracingManager class that encapsulates all OpenTelemetry tracing logic for evaluation runs including parent trace creation, eval run traces, and evaluator span management - Add _payload_builders package: - BasePayloadBuilder: Abstract base class with shared utilities for GUID conversion, usage extraction from spans, completion metrics, and request spec building - CodedPayloadBuilder: Handles coded agent evaluation payloads with string IDs and /coded/ endpoint suffix - LegacyPayloadBuilder: Handles legacy (low-code) agent payloads with GUID conversion and assertionRuns format These modules provide reusable abstractions that can be used to simplify the StudioWebProgressReporter and enable easier testing and maintenance of the evaluation reporting functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/uipath/_cli/_evals/_eval_tracing.py | 340 ++++++++++++++++ .../_cli/_evals/_payload_builders/__init__.py | 11 + .../_cli/_evals/_payload_builders/_base.py | 375 ++++++++++++++++++ .../_cli/_evals/_payload_builders/_coded.py | 136 +++++++ .../_cli/_evals/_payload_builders/_legacy.py | 150 +++++++ 5 files changed, 1012 insertions(+) create mode 100644 src/uipath/_cli/_evals/_eval_tracing.py create mode 100644 src/uipath/_cli/_evals/_payload_builders/__init__.py create mode 100644 src/uipath/_cli/_evals/_payload_builders/_base.py create mode 100644 src/uipath/_cli/_evals/_payload_builders/_coded.py create mode 100644 src/uipath/_cli/_evals/_payload_builders/_legacy.py diff --git a/src/uipath/_cli/_evals/_eval_tracing.py b/src/uipath/_cli/_evals/_eval_tracing.py new file mode 100644 index 000000000..6a0c2462f --- /dev/null +++ b/src/uipath/_cli/_evals/_eval_tracing.py @@ -0,0 +1,340 @@ +"""Tracing utilities for evaluation reporting.""" + +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Any + +from opentelemetry import trace +from opentelemetry.trace import SpanContext, SpanKind, Status, StatusCode, TraceFlags +from pydantic import BaseModel + +from uipath.eval.models import EvalItemResult, ScoreType +from uipath.tracing import LlmOpsHttpExporter + +logger = logging.getLogger(__name__) + + +class EvalTracingManager: + """Manages OpenTelemetry tracing for evaluation runs.""" + + def __init__( + self, + spans_exporter: LlmOpsHttpExporter, + evaluators: dict[str, Any], + ): + """Initialize the tracing manager. + + Args: + spans_exporter: The LlmOps HTTP exporter for sending spans. + evaluators: Dict of evaluator ID to evaluator instance. + """ + self._spans_exporter = spans_exporter + self._evaluators = evaluators + self._tracer = trace.get_tracer(__name__) + + def set_trace_id(self, trace_id: str | None) -> None: + """Set the trace ID on the spans exporter. + + Args: + trace_id: The trace ID to set. + """ + self._spans_exporter.trace_id = trace_id + + def export_spans(self, spans: list[Any]) -> None: + """Export spans via the spans exporter. + + Args: + spans: List of spans to export. + """ + self._spans_exporter.export(spans) + + async def send_parent_trace(self, eval_set_run_id: str, eval_set_name: str) -> None: + """Send the parent trace span for the evaluation set run. + + Args: + eval_set_run_id: The ID of the evaluation set run. + eval_set_name: The name of the evaluation set. + """ + try: + trace_id_int = int(uuid.UUID(eval_set_run_id)) + + span_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_context)) + + with self._tracer.start_as_current_span( + eval_set_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluationSet") + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug(f"Created parent trace for eval set run: {eval_set_run_id}") + + except Exception as e: + logger.warning(f"Failed to create parent trace: {e}") + + async def send_eval_run_trace( + self, eval_run_id: str, eval_set_run_id: str, eval_name: str + ) -> None: + """Send the child trace span for an evaluation run. + + Args: + eval_run_id: The ID of the evaluation run. + eval_set_run_id: The ID of the parent evaluation set run. + eval_name: The name of the evaluation. + """ + try: + trace_id_int = int(uuid.UUID(eval_run_id)) + parent_span_id_int = int(uuid.UUID(eval_set_run_id)) + + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=parent_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + with self._tracer.start_as_current_span( + eval_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluation") + span.set_attribute("eval_run_id", eval_run_id) + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug( + f"Created trace for eval run: {eval_run_id} (parent: {eval_set_run_id})" + ) + + except Exception as e: + logger.warning(f"Failed to create eval run trace: {e}") + + async def send_evaluator_traces( + self, eval_run_id: str, eval_results: list[EvalItemResult], spans: list[Any] + ) -> None: + """Send trace spans for all evaluators. + + Args: + eval_run_id: The ID of the evaluation run. + eval_results: List of evaluator results. + spans: List of spans that may contain evaluator LLM calls. + """ + try: + if not eval_results: + logger.debug( + f"No evaluator results to trace for eval run: {eval_run_id}" + ) + return + + # Export agent execution spans + self._export_agent_spans(spans, eval_run_id) + + # Calculate timing + now = datetime.now(timezone.utc) + total_eval_time = sum( + ( + r.result.evaluation_time + for r in eval_results + if r.result.evaluation_time + ), + 0.0, + ) + + parent_end_time = now + parent_start_time = ( + datetime.fromtimestamp( + now.timestamp() - total_eval_time, tz=timezone.utc + ) + if total_eval_time > 0 + else now + ) + + # Find root span and create context + ctx = self._create_evaluators_context(eval_run_id, spans) + + # Create parent span + parent_start_ns = int(parent_start_time.timestamp() * 1_000_000_000) + parent_end_ns = int(parent_end_time.timestamp() * 1_000_000_000) + + parent_span = self._tracer.start_span( + "Evaluators", + context=ctx, + kind=SpanKind.INTERNAL, + start_time=parent_start_ns, + ) + parent_span.set_attribute("openinference.span.kind", "CHAIN") + parent_span.set_attribute("span.type", "evaluators") + parent_span.set_attribute("eval_run_id", eval_run_id) + + parent_ctx = trace.set_span_in_context(parent_span, ctx) + + # Create individual evaluator spans + readable_spans = [] + current_time = parent_start_time + + for eval_result in eval_results: + evaluator_span, eval_end = self._create_evaluator_span( + eval_result, eval_run_id, current_time, parent_ctx + ) + current_time = eval_end + + if hasattr(evaluator_span, "_readable_span"): + readable_spans.append(evaluator_span._readable_span()) + + # End parent span + parent_span.end(end_time=parent_end_ns) + if hasattr(parent_span, "_readable_span"): + readable_spans.insert(0, parent_span._readable_span()) + + # Export all spans + if readable_spans: + self._spans_exporter.export(readable_spans) + + logger.debug( + f"Created evaluator traces for eval run: {eval_run_id} ({len(eval_results)} evaluators)" + ) + + except Exception as e: + logger.warning(f"Failed to create evaluator traces: {e}") + + def _export_agent_spans(self, spans: list[Any], eval_run_id: str) -> None: + """Export agent execution spans. + + Args: + spans: List of agent execution spans. + eval_run_id: The evaluation run ID for logging. + """ + agent_readable_spans = [] + if spans: + for span in spans: + if hasattr(span, "_readable_span"): + agent_readable_spans.append(span._readable_span()) + + if agent_readable_spans: + self._spans_exporter.export(agent_readable_spans) + logger.debug( + f"Exported {len(agent_readable_spans)} agent execution spans for eval run: {eval_run_id}" + ) + + def _create_evaluators_context(self, eval_run_id: str, spans: list[Any]) -> Any: + """Create the context for evaluator spans. + + Args: + eval_run_id: The evaluation run ID. + spans: List of agent spans to find root span from. + + Returns: + OpenTelemetry context for creating child spans. + """ + trace_id_int = int(uuid.UUID(eval_run_id)) + + # Find root span from agent spans + root_span_uuid = None + if spans: + from uipath.tracing._utils import _SpanUtils + + for span in spans: + if span.parent is None: + span_context = span.get_span_context() + root_span_uuid = _SpanUtils.span_id_to_uuid4(span_context.span_id) + break + + if root_span_uuid: + root_span_id_int = int(root_span_uuid) + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=root_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + else: + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + return trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + def _create_evaluator_span( + self, + eval_result: EvalItemResult, + eval_run_id: str, + start_time: datetime, + parent_ctx: Any, + ) -> tuple[Any, datetime]: + """Create a single evaluator span. + + Args: + eval_result: The evaluator result. + eval_run_id: The evaluation run ID. + start_time: Start time for this evaluator. + parent_ctx: Parent context for the span. + + Returns: + Tuple of (span, end_time). + """ + evaluator = self._evaluators.get(eval_result.evaluator_id) + evaluator_name = evaluator.id if evaluator else eval_result.evaluator_id + + eval_time = eval_result.result.evaluation_time or 0 + eval_end = datetime.fromtimestamp( + start_time.timestamp() + eval_time, tz=timezone.utc + ) + + eval_start_ns = int(start_time.timestamp() * 1_000_000_000) + eval_end_ns = int(eval_end.timestamp() * 1_000_000_000) + + evaluator_span = self._tracer.start_span( + evaluator_name, + context=parent_ctx, + kind=SpanKind.INTERNAL, + start_time=eval_start_ns, + ) + + evaluator_span.set_attribute("openinference.span.kind", "EVALUATOR") + evaluator_span.set_attribute("span.type", "evaluator") + evaluator_span.set_attribute("evaluator_id", eval_result.evaluator_id) + evaluator_span.set_attribute("evaluator_name", evaluator_name) + evaluator_span.set_attribute("eval_run_id", eval_run_id) + evaluator_span.set_attribute("score", eval_result.result.score) + evaluator_span.set_attribute("score_type", eval_result.result.score_type.name) + + if eval_result.result.details: + if isinstance(eval_result.result.details, BaseModel): + evaluator_span.set_attribute( + "details", json.dumps(eval_result.result.details.model_dump()) + ) + else: + evaluator_span.set_attribute("details", str(eval_result.result.details)) + + if eval_result.result.evaluation_time: + evaluator_span.set_attribute( + "evaluation_time", eval_result.result.evaluation_time + ) + + if eval_result.result.score_type == ScoreType.ERROR: + evaluator_span.set_status(Status(StatusCode.ERROR, "Evaluation failed")) + else: + evaluator_span.set_status(Status(StatusCode.OK)) + + evaluator_span.end(end_time=eval_end_ns) + + return evaluator_span, eval_end diff --git a/src/uipath/_cli/_evals/_payload_builders/__init__.py b/src/uipath/_cli/_evals/_payload_builders/__init__.py new file mode 100644 index 000000000..44947905b --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/__init__.py @@ -0,0 +1,11 @@ +"""Payload builders for evaluation reporting to StudioWeb.""" + +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath._cli._evals._payload_builders._coded import CodedPayloadBuilder +from uipath._cli._evals._payload_builders._legacy import LegacyPayloadBuilder + +__all__ = [ + "BasePayloadBuilder", + "CodedPayloadBuilder", + "LegacyPayloadBuilder", +] diff --git a/src/uipath/_cli/_evals/_payload_builders/_base.py b/src/uipath/_cli/_evals/_payload_builders/_base.py new file mode 100644 index 000000000..16c8589f4 --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_base.py @@ -0,0 +1,375 @@ +"""Base payload builder with shared utilities for evaluation reporting.""" + +import json +import logging +import uuid +from abc import ABC, abstractmethod +from typing import Any + +from pydantic import BaseModel + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot +from uipath._utils import Endpoint, RequestSpec +from uipath.eval.models import EvalItemResult + +logger = logging.getLogger(__name__) + + +class BasePayloadBuilder(ABC): + """Abstract base class for payload builders. + + Provides shared utilities for both coded and legacy payload building. + """ + + def __init__( + self, + project_id: str | None, + endpoint_prefix: str, + tenant_header: dict[str, str | None], + ): + self._project_id = project_id + self._endpoint_prefix = endpoint_prefix + self._tenant_header = tenant_header + + @property + @abstractmethod + def endpoint_suffix(self) -> str: + """Return the endpoint suffix for this builder type. + + Returns: + "coded/" for coded evaluations, "" for legacy. + """ + pass + + @abstractmethod + def format_id(self, id_value: str) -> str: + """Format an ID for the backend API. + + Args: + id_value: The ID to format. + + Returns: + Formatted ID (GUID for legacy, string for coded). + """ + pass + + @abstractmethod + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build the eval snapshot portion of the payload. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot. + """ + pass + + @abstractmethod + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, Any], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect and format evaluation results. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to evaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (runs_list, scores_list). + """ + pass + + @abstractmethod + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build the payload for updating an eval run. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator/assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + pass + + # Shared utility methods + + @staticmethod + def string_to_deterministic_guid(value: str) -> str: + """Convert a string to a deterministic GUID using UUID5. + + Args: + value: The string to convert. + + Returns: + A deterministic GUID string. + """ + return str(uuid.uuid5(uuid.NAMESPACE_DNS, value)) + + @staticmethod + def try_parse_or_convert_guid(value: str) -> str: + """Try to parse as GUID, or convert string to deterministic GUID. + + Args: + value: The string to parse or convert. + + Returns: + A valid GUID string. + """ + try: + uuid.UUID(value) + return value + except ValueError: + return BasePayloadBuilder.string_to_deterministic_guid(value) + + @staticmethod + def serialize_justification(justification: BaseModel | str | None) -> str | None: + """Serialize justification to JSON string for API compatibility. + + Args: + justification: The justification object. + + Returns: + JSON string representation or None. + """ + if isinstance(justification, BaseModel): + return json.dumps(justification.model_dump()) + return justification + + @staticmethod + def extract_usage_from_spans(spans: list[Any]) -> dict[str, int | float | None]: + """Extract token usage and cost from OpenTelemetry spans. + + Args: + spans: List of ReadableSpan objects from agent execution. + + Returns: + Dictionary with tokens, completionTokens, promptTokens, and cost. + """ + total_tokens = 0 + completion_tokens = 0 + prompt_tokens = 0 + total_cost = 0.0 + + for span in spans: + try: + attrs = None + if hasattr(span, "attributes") and span.attributes: + if isinstance(span.attributes, dict): + attrs = span.attributes + elif isinstance(span.attributes, str): + attrs = json.loads(span.attributes) + + if not attrs and hasattr(span, "Attributes") and span.Attributes: + if isinstance(span.Attributes, str): + attrs = json.loads(span.Attributes) + elif isinstance(span.Attributes, dict): + attrs = span.Attributes + + if attrs: + if "usage" in attrs and isinstance(attrs["usage"], dict): + usage = attrs["usage"] + prompt_tokens += usage.get("promptTokens", 0) + completion_tokens += usage.get("completionTokens", 0) + total_tokens += usage.get("totalTokens", 0) + total_cost += usage.get("cost", 0.0) + + prompt_tokens += attrs.get("gen_ai.usage.prompt_tokens", 0) + completion_tokens += attrs.get("gen_ai.usage.completion_tokens", 0) + total_tokens += attrs.get("gen_ai.usage.total_tokens", 0) + total_cost += attrs.get("gen_ai.usage.cost", 0.0) + total_cost += attrs.get("llm.usage.cost", 0.0) + + except (json.JSONDecodeError, AttributeError, TypeError) as e: + logger.debug(f"Failed to parse span attributes: {e}") + continue + + return { + "tokens": total_tokens if total_tokens > 0 else None, + "completionTokens": completion_tokens if completion_tokens > 0 else None, + "promptTokens": prompt_tokens if prompt_tokens > 0 else None, + "cost": total_cost if total_cost > 0 else None, + } + + @staticmethod + def build_completion_metrics( + duration: float | None, + usage_metrics: dict[str, int | float | None], + ) -> dict[str, Any]: + """Build completion metrics dict. + + Args: + duration: Execution duration in seconds. + usage_metrics: Token usage and cost metrics. + + Returns: + Completion metrics dict. + """ + return { + "duration": int(duration) if duration else 0, + "cost": usage_metrics["cost"], + "tokens": usage_metrics["tokens"] or 0, + "completionTokens": usage_metrics["completionTokens"] or 0, + "promptTokens": usage_metrics["promptTokens"] or 0, + } + + # Request spec builders (shared structure, use abstract methods for differences) + + def build_create_eval_set_run_spec( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + ) -> RequestSpec: + """Build request spec for creating an eval set run. + + Args: + eval_set_id: The evaluation set ID. + agent_snapshot: The agent snapshot. + no_of_evals: Number of evaluations. + + Returns: + RequestSpec for the API call. + """ + payload = { + "agentId": self._project_id, + "evalSetId": self.format_id(eval_set_id), + "agentSnapshot": agent_snapshot.model_dump(by_alias=True), + "status": EvaluationStatus.IN_PROGRESS.value, + "numberOfEvalsExecuted": no_of_evals, + "source": 0, + } + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_create_eval_run_spec( + self, + eval_item: EvaluationItem, + eval_set_run_id: str, + ) -> RequestSpec: + """Build request spec for creating an eval run. + + Args: + eval_item: The evaluation item. + eval_set_run_id: The eval set run ID. + + Returns: + RequestSpec for the API call. + """ + eval_snapshot = self.build_eval_snapshot(eval_item) + + payload = { + "evalSetRunId": eval_set_run_id, + "evalSnapshot": eval_snapshot, + "status": EvaluationStatus.IN_PROGRESS.value, + } + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_update_eval_run_spec( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> RequestSpec: + """Build request spec for updating an eval run. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator/assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + RequestSpec for the API call. + """ + payload = self.build_update_eval_run_payload( + eval_run_id, runs, scores, actual_output, execution_time, success + ) + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_update_eval_set_run_spec( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + success: bool = True, + ) -> RequestSpec: + """Build request spec for updating an eval set run. + + Args: + eval_set_run_id: The eval set run ID. + evaluator_scores: Dict of evaluator ID to average score. + success: Whether the evaluation set succeeded. + + Returns: + RequestSpec for the API call. + """ + scores_list = [ + {"value": avg_score, "evaluatorId": self.format_id(evaluator_id)} + for evaluator_id, avg_score in evaluator_scores.items() + ] + + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + payload = { + "evalSetRunId": eval_set_run_id, + "status": status.value, + "evaluatorScores": scores_list, + } + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header, + ) diff --git a/src/uipath/_cli/_evals/_payload_builders/_coded.py b/src/uipath/_cli/_evals/_payload_builders/_coded.py new file mode 100644 index 000000000..eb280097b --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_coded.py @@ -0,0 +1,136 @@ +"""Coded agent payload builder for evaluation reporting.""" + +from typing import Any + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath.eval.evaluators import BaseEvaluator +from uipath.eval.models import EvalItemResult + + +class CodedPayloadBuilder(BasePayloadBuilder): + """Payload builder for coded agent evaluations. + + Coded agents use string IDs and the /coded/ endpoint suffix. + The payload format includes evaluatorRuns with nested result objects. + """ + + @property + def endpoint_suffix(self) -> str: + """Coded evaluations use the /coded/ endpoint suffix.""" + return "coded/" + + def format_id(self, id_value: str) -> str: + """Coded evaluations use string IDs directly.""" + return id_value + + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build eval snapshot with evaluationCriterias for coded agents. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot with evaluationCriterias. + """ + return { + "id": eval_item.id, + "name": eval_item.name, + "inputs": eval_item.inputs, + "evaluationCriterias": eval_item.evaluation_criterias, + } + + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, BaseEvaluator[Any, Any, Any]], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results for coded evaluators. + + Returns evaluatorRuns with nested result objects and scores list. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to BaseEvaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (evaluator_runs, evaluator_scores). + """ + evaluator_runs: list[dict[str, Any]] = [] + evaluator_scores: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + justification = self.serialize_justification(eval_result.result.details) + + evaluator_scores.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": eval_result.evaluator_id, + } + ) + + evaluator_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": eval_result.evaluator_id, + "result": { + "score": { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + }, + "justification": justification, + }, + "completionMetrics": self.build_completion_metrics( + eval_result.result.evaluation_time, usage_metrics + ), + } + ) + + return evaluator_runs, evaluator_scores + + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build update payload for coded evaluations. + + Coded format uses 'scores' and 'evaluatorRuns' keys. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "scores": scores, + }, + "completionMetrics": {"duration": int(execution_time)}, + "evaluatorRuns": runs, + } diff --git a/src/uipath/_cli/_evals/_payload_builders/_legacy.py b/src/uipath/_cli/_evals/_payload_builders/_legacy.py new file mode 100644 index 000000000..ad4c8d0df --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_legacy.py @@ -0,0 +1,150 @@ +"""Legacy (low-code) agent payload builder for evaluation reporting.""" + +from typing import Any + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath.eval.evaluators import LegacyBaseEvaluator +from uipath.eval.models import EvalItemResult + + +class LegacyPayloadBuilder(BasePayloadBuilder): + """Payload builder for legacy (low-code) agent evaluations. + + Legacy agents require GUIDs for IDs and use the base endpoint (no suffix). + The payload format includes assertionRuns with assertionSnapshot objects. + """ + + @property + def endpoint_suffix(self) -> str: + """Legacy evaluations use no endpoint suffix.""" + return "" + + def format_id(self, id_value: str) -> str: + """Legacy evaluations require GUID format. + + Converts string IDs to deterministic GUIDs using UUID5. + """ + return self.try_parse_or_convert_guid(id_value) + + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build eval snapshot with expectedOutput for legacy agents. + + Legacy agents expect expectedOutput directly in the snapshot. + Since eval items are migrated to EvaluationItem format, we extract + expectedOutput from the first evaluator criteria. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot with expectedOutput. + """ + # Extract expectedOutput from migrated evaluationCriterias + # All criteria have the same expectedOutput, so we take the first + expected_output: dict[str, Any] = {} + if eval_item.evaluation_criterias: + first_criteria = next(iter(eval_item.evaluation_criterias.values()), None) + if first_criteria and isinstance(first_criteria, dict): + expected_output = first_criteria.get("expectedOutput", {}) + + return { + "id": self.format_id(eval_item.id), + "name": eval_item.name, + "inputs": eval_item.inputs, + "expectedOutput": expected_output, + } + + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, LegacyBaseEvaluator[Any]], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results for legacy evaluators. + + Returns assertionRuns with assertionSnapshot objects and scores list. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to LegacyBaseEvaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (assertion_runs, evaluator_scores). + """ + assertion_runs: list[dict[str, Any]] = [] + evaluator_scores: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + evaluator_id_guid = self.format_id(eval_result.evaluator_id) + justification = self.serialize_justification(eval_result.result.details) + evaluator = evaluators[eval_result.evaluator_id] + + evaluator_scores.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": evaluator_id_guid, + } + ) + + assertion_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": evaluator_id_guid, + "completionMetrics": self.build_completion_metrics( + eval_result.result.evaluation_time, usage_metrics + ), + "assertionSnapshot": { + "assertionType": evaluator.evaluator_type.name, + "outputKey": evaluator.target_output_key, + }, + } + ) + + return assertion_runs, evaluator_scores + + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build update payload for legacy evaluations. + + Legacy format uses 'evaluatorScores' and 'assertionRuns' keys. + + Args: + eval_run_id: The evaluation run ID. + runs: List of assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "evaluatorScores": scores, + }, + "completionMetrics": {"duration": int(execution_time)}, + "assertionRuns": runs, + }