diff --git a/src/uipath/_cli/_evals/_eval_tracing.py b/src/uipath/_cli/_evals/_eval_tracing.py new file mode 100644 index 000000000..6a0c2462f --- /dev/null +++ b/src/uipath/_cli/_evals/_eval_tracing.py @@ -0,0 +1,340 @@ +"""Tracing utilities for evaluation reporting.""" + +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Any + +from opentelemetry import trace +from opentelemetry.trace import SpanContext, SpanKind, Status, StatusCode, TraceFlags +from pydantic import BaseModel + +from uipath.eval.models import EvalItemResult, ScoreType +from uipath.tracing import LlmOpsHttpExporter + +logger = logging.getLogger(__name__) + + +class EvalTracingManager: + """Manages OpenTelemetry tracing for evaluation runs.""" + + def __init__( + self, + spans_exporter: LlmOpsHttpExporter, + evaluators: dict[str, Any], + ): + """Initialize the tracing manager. + + Args: + spans_exporter: The LlmOps HTTP exporter for sending spans. + evaluators: Dict of evaluator ID to evaluator instance. + """ + self._spans_exporter = spans_exporter + self._evaluators = evaluators + self._tracer = trace.get_tracer(__name__) + + def set_trace_id(self, trace_id: str | None) -> None: + """Set the trace ID on the spans exporter. + + Args: + trace_id: The trace ID to set. + """ + self._spans_exporter.trace_id = trace_id + + def export_spans(self, spans: list[Any]) -> None: + """Export spans via the spans exporter. + + Args: + spans: List of spans to export. + """ + self._spans_exporter.export(spans) + + async def send_parent_trace(self, eval_set_run_id: str, eval_set_name: str) -> None: + """Send the parent trace span for the evaluation set run. + + Args: + eval_set_run_id: The ID of the evaluation set run. + eval_set_name: The name of the evaluation set. + """ + try: + trace_id_int = int(uuid.UUID(eval_set_run_id)) + + span_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_context)) + + with self._tracer.start_as_current_span( + eval_set_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluationSet") + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug(f"Created parent trace for eval set run: {eval_set_run_id}") + + except Exception as e: + logger.warning(f"Failed to create parent trace: {e}") + + async def send_eval_run_trace( + self, eval_run_id: str, eval_set_run_id: str, eval_name: str + ) -> None: + """Send the child trace span for an evaluation run. + + Args: + eval_run_id: The ID of the evaluation run. + eval_set_run_id: The ID of the parent evaluation set run. + eval_name: The name of the evaluation. + """ + try: + trace_id_int = int(uuid.UUID(eval_run_id)) + parent_span_id_int = int(uuid.UUID(eval_set_run_id)) + + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=parent_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + with self._tracer.start_as_current_span( + eval_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluation") + span.set_attribute("eval_run_id", eval_run_id) + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug( + f"Created trace for eval run: {eval_run_id} (parent: {eval_set_run_id})" + ) + + except Exception as e: + logger.warning(f"Failed to create eval run trace: {e}") + + async def send_evaluator_traces( + self, eval_run_id: str, eval_results: list[EvalItemResult], spans: list[Any] + ) -> None: + """Send trace spans for all evaluators. + + Args: + eval_run_id: The ID of the evaluation run. + eval_results: List of evaluator results. + spans: List of spans that may contain evaluator LLM calls. + """ + try: + if not eval_results: + logger.debug( + f"No evaluator results to trace for eval run: {eval_run_id}" + ) + return + + # Export agent execution spans + self._export_agent_spans(spans, eval_run_id) + + # Calculate timing + now = datetime.now(timezone.utc) + total_eval_time = sum( + ( + r.result.evaluation_time + for r in eval_results + if r.result.evaluation_time + ), + 0.0, + ) + + parent_end_time = now + parent_start_time = ( + datetime.fromtimestamp( + now.timestamp() - total_eval_time, tz=timezone.utc + ) + if total_eval_time > 0 + else now + ) + + # Find root span and create context + ctx = self._create_evaluators_context(eval_run_id, spans) + + # Create parent span + parent_start_ns = int(parent_start_time.timestamp() * 1_000_000_000) + parent_end_ns = int(parent_end_time.timestamp() * 1_000_000_000) + + parent_span = self._tracer.start_span( + "Evaluators", + context=ctx, + kind=SpanKind.INTERNAL, + start_time=parent_start_ns, + ) + parent_span.set_attribute("openinference.span.kind", "CHAIN") + parent_span.set_attribute("span.type", "evaluators") + parent_span.set_attribute("eval_run_id", eval_run_id) + + parent_ctx = trace.set_span_in_context(parent_span, ctx) + + # Create individual evaluator spans + readable_spans = [] + current_time = parent_start_time + + for eval_result in eval_results: + evaluator_span, eval_end = self._create_evaluator_span( + eval_result, eval_run_id, current_time, parent_ctx + ) + current_time = eval_end + + if hasattr(evaluator_span, "_readable_span"): + readable_spans.append(evaluator_span._readable_span()) + + # End parent span + parent_span.end(end_time=parent_end_ns) + if hasattr(parent_span, "_readable_span"): + readable_spans.insert(0, parent_span._readable_span()) + + # Export all spans + if readable_spans: + self._spans_exporter.export(readable_spans) + + logger.debug( + f"Created evaluator traces for eval run: {eval_run_id} ({len(eval_results)} evaluators)" + ) + + except Exception as e: + logger.warning(f"Failed to create evaluator traces: {e}") + + def _export_agent_spans(self, spans: list[Any], eval_run_id: str) -> None: + """Export agent execution spans. + + Args: + spans: List of agent execution spans. + eval_run_id: The evaluation run ID for logging. + """ + agent_readable_spans = [] + if spans: + for span in spans: + if hasattr(span, "_readable_span"): + agent_readable_spans.append(span._readable_span()) + + if agent_readable_spans: + self._spans_exporter.export(agent_readable_spans) + logger.debug( + f"Exported {len(agent_readable_spans)} agent execution spans for eval run: {eval_run_id}" + ) + + def _create_evaluators_context(self, eval_run_id: str, spans: list[Any]) -> Any: + """Create the context for evaluator spans. + + Args: + eval_run_id: The evaluation run ID. + spans: List of agent spans to find root span from. + + Returns: + OpenTelemetry context for creating child spans. + """ + trace_id_int = int(uuid.UUID(eval_run_id)) + + # Find root span from agent spans + root_span_uuid = None + if spans: + from uipath.tracing._utils import _SpanUtils + + for span in spans: + if span.parent is None: + span_context = span.get_span_context() + root_span_uuid = _SpanUtils.span_id_to_uuid4(span_context.span_id) + break + + if root_span_uuid: + root_span_id_int = int(root_span_uuid) + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=root_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + else: + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + return trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + def _create_evaluator_span( + self, + eval_result: EvalItemResult, + eval_run_id: str, + start_time: datetime, + parent_ctx: Any, + ) -> tuple[Any, datetime]: + """Create a single evaluator span. + + Args: + eval_result: The evaluator result. + eval_run_id: The evaluation run ID. + start_time: Start time for this evaluator. + parent_ctx: Parent context for the span. + + Returns: + Tuple of (span, end_time). + """ + evaluator = self._evaluators.get(eval_result.evaluator_id) + evaluator_name = evaluator.id if evaluator else eval_result.evaluator_id + + eval_time = eval_result.result.evaluation_time or 0 + eval_end = datetime.fromtimestamp( + start_time.timestamp() + eval_time, tz=timezone.utc + ) + + eval_start_ns = int(start_time.timestamp() * 1_000_000_000) + eval_end_ns = int(eval_end.timestamp() * 1_000_000_000) + + evaluator_span = self._tracer.start_span( + evaluator_name, + context=parent_ctx, + kind=SpanKind.INTERNAL, + start_time=eval_start_ns, + ) + + evaluator_span.set_attribute("openinference.span.kind", "EVALUATOR") + evaluator_span.set_attribute("span.type", "evaluator") + evaluator_span.set_attribute("evaluator_id", eval_result.evaluator_id) + evaluator_span.set_attribute("evaluator_name", evaluator_name) + evaluator_span.set_attribute("eval_run_id", eval_run_id) + evaluator_span.set_attribute("score", eval_result.result.score) + evaluator_span.set_attribute("score_type", eval_result.result.score_type.name) + + if eval_result.result.details: + if isinstance(eval_result.result.details, BaseModel): + evaluator_span.set_attribute( + "details", json.dumps(eval_result.result.details.model_dump()) + ) + else: + evaluator_span.set_attribute("details", str(eval_result.result.details)) + + if eval_result.result.evaluation_time: + evaluator_span.set_attribute( + "evaluation_time", eval_result.result.evaluation_time + ) + + if eval_result.result.score_type == ScoreType.ERROR: + evaluator_span.set_status(Status(StatusCode.ERROR, "Evaluation failed")) + else: + evaluator_span.set_status(Status(StatusCode.OK)) + + evaluator_span.end(end_time=eval_end_ns) + + return evaluator_span, eval_end diff --git a/src/uipath/_cli/_evals/_payload_builders/__init__.py b/src/uipath/_cli/_evals/_payload_builders/__init__.py new file mode 100644 index 000000000..44947905b --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/__init__.py @@ -0,0 +1,11 @@ +"""Payload builders for evaluation reporting to StudioWeb.""" + +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath._cli._evals._payload_builders._coded import CodedPayloadBuilder +from uipath._cli._evals._payload_builders._legacy import LegacyPayloadBuilder + +__all__ = [ + "BasePayloadBuilder", + "CodedPayloadBuilder", + "LegacyPayloadBuilder", +] diff --git a/src/uipath/_cli/_evals/_payload_builders/_base.py b/src/uipath/_cli/_evals/_payload_builders/_base.py new file mode 100644 index 000000000..16c8589f4 --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_base.py @@ -0,0 +1,375 @@ +"""Base payload builder with shared utilities for evaluation reporting.""" + +import json +import logging +import uuid +from abc import ABC, abstractmethod +from typing import Any + +from pydantic import BaseModel + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot +from uipath._utils import Endpoint, RequestSpec +from uipath.eval.models import EvalItemResult + +logger = logging.getLogger(__name__) + + +class BasePayloadBuilder(ABC): + """Abstract base class for payload builders. + + Provides shared utilities for both coded and legacy payload building. + """ + + def __init__( + self, + project_id: str | None, + endpoint_prefix: str, + tenant_header: dict[str, str | None], + ): + self._project_id = project_id + self._endpoint_prefix = endpoint_prefix + self._tenant_header = tenant_header + + @property + @abstractmethod + def endpoint_suffix(self) -> str: + """Return the endpoint suffix for this builder type. + + Returns: + "coded/" for coded evaluations, "" for legacy. + """ + pass + + @abstractmethod + def format_id(self, id_value: str) -> str: + """Format an ID for the backend API. + + Args: + id_value: The ID to format. + + Returns: + Formatted ID (GUID for legacy, string for coded). + """ + pass + + @abstractmethod + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build the eval snapshot portion of the payload. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot. + """ + pass + + @abstractmethod + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, Any], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect and format evaluation results. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to evaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (runs_list, scores_list). + """ + pass + + @abstractmethod + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build the payload for updating an eval run. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator/assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + pass + + # Shared utility methods + + @staticmethod + def string_to_deterministic_guid(value: str) -> str: + """Convert a string to a deterministic GUID using UUID5. + + Args: + value: The string to convert. + + Returns: + A deterministic GUID string. + """ + return str(uuid.uuid5(uuid.NAMESPACE_DNS, value)) + + @staticmethod + def try_parse_or_convert_guid(value: str) -> str: + """Try to parse as GUID, or convert string to deterministic GUID. + + Args: + value: The string to parse or convert. + + Returns: + A valid GUID string. + """ + try: + uuid.UUID(value) + return value + except ValueError: + return BasePayloadBuilder.string_to_deterministic_guid(value) + + @staticmethod + def serialize_justification(justification: BaseModel | str | None) -> str | None: + """Serialize justification to JSON string for API compatibility. + + Args: + justification: The justification object. + + Returns: + JSON string representation or None. + """ + if isinstance(justification, BaseModel): + return json.dumps(justification.model_dump()) + return justification + + @staticmethod + def extract_usage_from_spans(spans: list[Any]) -> dict[str, int | float | None]: + """Extract token usage and cost from OpenTelemetry spans. + + Args: + spans: List of ReadableSpan objects from agent execution. + + Returns: + Dictionary with tokens, completionTokens, promptTokens, and cost. + """ + total_tokens = 0 + completion_tokens = 0 + prompt_tokens = 0 + total_cost = 0.0 + + for span in spans: + try: + attrs = None + if hasattr(span, "attributes") and span.attributes: + if isinstance(span.attributes, dict): + attrs = span.attributes + elif isinstance(span.attributes, str): + attrs = json.loads(span.attributes) + + if not attrs and hasattr(span, "Attributes") and span.Attributes: + if isinstance(span.Attributes, str): + attrs = json.loads(span.Attributes) + elif isinstance(span.Attributes, dict): + attrs = span.Attributes + + if attrs: + if "usage" in attrs and isinstance(attrs["usage"], dict): + usage = attrs["usage"] + prompt_tokens += usage.get("promptTokens", 0) + completion_tokens += usage.get("completionTokens", 0) + total_tokens += usage.get("totalTokens", 0) + total_cost += usage.get("cost", 0.0) + + prompt_tokens += attrs.get("gen_ai.usage.prompt_tokens", 0) + completion_tokens += attrs.get("gen_ai.usage.completion_tokens", 0) + total_tokens += attrs.get("gen_ai.usage.total_tokens", 0) + total_cost += attrs.get("gen_ai.usage.cost", 0.0) + total_cost += attrs.get("llm.usage.cost", 0.0) + + except (json.JSONDecodeError, AttributeError, TypeError) as e: + logger.debug(f"Failed to parse span attributes: {e}") + continue + + return { + "tokens": total_tokens if total_tokens > 0 else None, + "completionTokens": completion_tokens if completion_tokens > 0 else None, + "promptTokens": prompt_tokens if prompt_tokens > 0 else None, + "cost": total_cost if total_cost > 0 else None, + } + + @staticmethod + def build_completion_metrics( + duration: float | None, + usage_metrics: dict[str, int | float | None], + ) -> dict[str, Any]: + """Build completion metrics dict. + + Args: + duration: Execution duration in seconds. + usage_metrics: Token usage and cost metrics. + + Returns: + Completion metrics dict. + """ + return { + "duration": int(duration) if duration else 0, + "cost": usage_metrics["cost"], + "tokens": usage_metrics["tokens"] or 0, + "completionTokens": usage_metrics["completionTokens"] or 0, + "promptTokens": usage_metrics["promptTokens"] or 0, + } + + # Request spec builders (shared structure, use abstract methods for differences) + + def build_create_eval_set_run_spec( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + ) -> RequestSpec: + """Build request spec for creating an eval set run. + + Args: + eval_set_id: The evaluation set ID. + agent_snapshot: The agent snapshot. + no_of_evals: Number of evaluations. + + Returns: + RequestSpec for the API call. + """ + payload = { + "agentId": self._project_id, + "evalSetId": self.format_id(eval_set_id), + "agentSnapshot": agent_snapshot.model_dump(by_alias=True), + "status": EvaluationStatus.IN_PROGRESS.value, + "numberOfEvalsExecuted": no_of_evals, + "source": 0, + } + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_create_eval_run_spec( + self, + eval_item: EvaluationItem, + eval_set_run_id: str, + ) -> RequestSpec: + """Build request spec for creating an eval run. + + Args: + eval_item: The evaluation item. + eval_set_run_id: The eval set run ID. + + Returns: + RequestSpec for the API call. + """ + eval_snapshot = self.build_eval_snapshot(eval_item) + + payload = { + "evalSetRunId": eval_set_run_id, + "evalSnapshot": eval_snapshot, + "status": EvaluationStatus.IN_PROGRESS.value, + } + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_update_eval_run_spec( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> RequestSpec: + """Build request spec for updating an eval run. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator/assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + RequestSpec for the API call. + """ + payload = self.build_update_eval_run_payload( + eval_run_id, runs, scores, actual_output, execution_time, success + ) + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header, + ) + + def build_update_eval_set_run_spec( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + success: bool = True, + ) -> RequestSpec: + """Build request spec for updating an eval set run. + + Args: + eval_set_run_id: The eval set run ID. + evaluator_scores: Dict of evaluator ID to average score. + success: Whether the evaluation set succeeded. + + Returns: + RequestSpec for the API call. + """ + scores_list = [ + {"value": avg_score, "evaluatorId": self.format_id(evaluator_id)} + for evaluator_id, avg_score in evaluator_scores.items() + ] + + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + payload = { + "evalSetRunId": eval_set_run_id, + "status": status.value, + "evaluatorScores": scores_list, + } + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._endpoint_prefix}execution/agents/{self._project_id}/{self.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header, + ) diff --git a/src/uipath/_cli/_evals/_payload_builders/_coded.py b/src/uipath/_cli/_evals/_payload_builders/_coded.py new file mode 100644 index 000000000..eb280097b --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_coded.py @@ -0,0 +1,136 @@ +"""Coded agent payload builder for evaluation reporting.""" + +from typing import Any + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath.eval.evaluators import BaseEvaluator +from uipath.eval.models import EvalItemResult + + +class CodedPayloadBuilder(BasePayloadBuilder): + """Payload builder for coded agent evaluations. + + Coded agents use string IDs and the /coded/ endpoint suffix. + The payload format includes evaluatorRuns with nested result objects. + """ + + @property + def endpoint_suffix(self) -> str: + """Coded evaluations use the /coded/ endpoint suffix.""" + return "coded/" + + def format_id(self, id_value: str) -> str: + """Coded evaluations use string IDs directly.""" + return id_value + + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build eval snapshot with evaluationCriterias for coded agents. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot with evaluationCriterias. + """ + return { + "id": eval_item.id, + "name": eval_item.name, + "inputs": eval_item.inputs, + "evaluationCriterias": eval_item.evaluation_criterias, + } + + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, BaseEvaluator[Any, Any, Any]], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results for coded evaluators. + + Returns evaluatorRuns with nested result objects and scores list. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to BaseEvaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (evaluator_runs, evaluator_scores). + """ + evaluator_runs: list[dict[str, Any]] = [] + evaluator_scores: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + justification = self.serialize_justification(eval_result.result.details) + + evaluator_scores.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": eval_result.evaluator_id, + } + ) + + evaluator_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": eval_result.evaluator_id, + "result": { + "score": { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + }, + "justification": justification, + }, + "completionMetrics": self.build_completion_metrics( + eval_result.result.evaluation_time, usage_metrics + ), + } + ) + + return evaluator_runs, evaluator_scores + + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build update payload for coded evaluations. + + Coded format uses 'scores' and 'evaluatorRuns' keys. + + Args: + eval_run_id: The evaluation run ID. + runs: List of evaluator runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "scores": scores, + }, + "completionMetrics": {"duration": int(execution_time)}, + "evaluatorRuns": runs, + } diff --git a/src/uipath/_cli/_evals/_payload_builders/_legacy.py b/src/uipath/_cli/_evals/_payload_builders/_legacy.py new file mode 100644 index 000000000..ad4c8d0df --- /dev/null +++ b/src/uipath/_cli/_evals/_payload_builders/_legacy.py @@ -0,0 +1,150 @@ +"""Legacy (low-code) agent payload builder for evaluation reporting.""" + +from typing import Any + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._payload_builders._base import BasePayloadBuilder +from uipath.eval.evaluators import LegacyBaseEvaluator +from uipath.eval.models import EvalItemResult + + +class LegacyPayloadBuilder(BasePayloadBuilder): + """Payload builder for legacy (low-code) agent evaluations. + + Legacy agents require GUIDs for IDs and use the base endpoint (no suffix). + The payload format includes assertionRuns with assertionSnapshot objects. + """ + + @property + def endpoint_suffix(self) -> str: + """Legacy evaluations use no endpoint suffix.""" + return "" + + def format_id(self, id_value: str) -> str: + """Legacy evaluations require GUID format. + + Converts string IDs to deterministic GUIDs using UUID5. + """ + return self.try_parse_or_convert_guid(id_value) + + def build_eval_snapshot(self, eval_item: EvaluationItem) -> dict[str, Any]: + """Build eval snapshot with expectedOutput for legacy agents. + + Legacy agents expect expectedOutput directly in the snapshot. + Since eval items are migrated to EvaluationItem format, we extract + expectedOutput from the first evaluator criteria. + + Args: + eval_item: The evaluation item. + + Returns: + Dict containing the eval snapshot with expectedOutput. + """ + # Extract expectedOutput from migrated evaluationCriterias + # All criteria have the same expectedOutput, so we take the first + expected_output: dict[str, Any] = {} + if eval_item.evaluation_criterias: + first_criteria = next(iter(eval_item.evaluation_criterias.values()), None) + if first_criteria and isinstance(first_criteria, dict): + expected_output = first_criteria.get("expectedOutput", {}) + + return { + "id": self.format_id(eval_item.id), + "name": eval_item.name, + "inputs": eval_item.inputs, + "expectedOutput": expected_output, + } + + def collect_results( + self, + eval_results: list[EvalItemResult], + evaluators: dict[str, LegacyBaseEvaluator[Any]], + usage_metrics: dict[str, int | float | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results for legacy evaluators. + + Returns assertionRuns with assertionSnapshot objects and scores list. + + Args: + eval_results: List of evaluation results. + evaluators: Dict of evaluator ID to LegacyBaseEvaluator instance. + usage_metrics: Token usage and cost metrics. + + Returns: + Tuple of (assertion_runs, evaluator_scores). + """ + assertion_runs: list[dict[str, Any]] = [] + evaluator_scores: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + evaluator_id_guid = self.format_id(eval_result.evaluator_id) + justification = self.serialize_justification(eval_result.result.details) + evaluator = evaluators[eval_result.evaluator_id] + + evaluator_scores.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": evaluator_id_guid, + } + ) + + assertion_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": evaluator_id_guid, + "completionMetrics": self.build_completion_metrics( + eval_result.result.evaluation_time, usage_metrics + ), + "assertionSnapshot": { + "assertionType": evaluator.evaluator_type.name, + "outputKey": evaluator.target_output_key, + }, + } + ) + + return assertion_runs, evaluator_scores + + def build_update_eval_run_payload( + self, + eval_run_id: str, + runs: list[dict[str, Any]], + scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Build update payload for legacy evaluations. + + Legacy format uses 'evaluatorScores' and 'assertionRuns' keys. + + Args: + eval_run_id: The evaluation run ID. + runs: List of assertion runs. + scores: List of evaluator scores. + actual_output: The agent's actual output. + execution_time: Total execution time. + success: Whether the evaluation succeeded. + + Returns: + The payload dict. + """ + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "evaluatorScores": scores, + }, + "completionMetrics": {"duration": int(execution_time)}, + "assertionRuns": runs, + }