From 85d427ab1989995b04d8626b2e54fc61e9999138 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 12 Jan 2026 12:03:58 +0200 Subject: [PATCH] fix(experiments): move evaluations to root experiment span --- langfuse/_client/client.py | 198 +++++++++++++++++++------------------ tests/test_prompt.py | 2 +- 2 files changed, 103 insertions(+), 97 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a3f653ada..39679c79b 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -2866,17 +2866,17 @@ async def _process_experiment_item( } ) - with _propagate_attributes( - experiment=PropagatedExperimentAttributes( - experiment_id=experiment_id, - experiment_name=experiment_run_name, - experiment_metadata=_serialize(experiment_metadata), - experiment_dataset_id=dataset_id, - experiment_item_id=experiment_item_id, - experiment_item_metadata=_serialize(item_metadata), - experiment_item_root_observation_id=span.id, - ) - ): + propagated_experiment_attributes = PropagatedExperimentAttributes( + experiment_id=experiment_id, + experiment_name=experiment_run_name, + experiment_metadata=_serialize(experiment_metadata), + experiment_dataset_id=dataset_id, + experiment_item_id=experiment_item_id, + experiment_item_metadata=_serialize(item_metadata), + experiment_item_root_observation_id=span.id, + ) + + with _propagate_attributes(experiment=propagated_experiment_attributes): output = await _run_task(task, item) span.update( @@ -2891,95 +2891,101 @@ async def _process_experiment_item( ) raise e - # Run evaluators - evaluations = [] - - for evaluator in evaluators: - try: - eval_metadata: Optional[Dict[str, Any]] = None + # Run evaluators + evaluations = [] - if isinstance(item, dict): - eval_metadata = item.get("metadata") - elif hasattr(item, "metadata"): - eval_metadata = item.metadata + for evaluator in evaluators: + try: + eval_metadata: Optional[Dict[str, Any]] = None - eval_results = await _run_evaluator( - evaluator, - input=input_data, - output=output, - expected_output=expected_output, - metadata=eval_metadata, - ) - evaluations.extend(eval_results) - - # Store evaluations as scores - for evaluation in eval_results: - self.create_score( - trace_id=trace_id, - observation_id=span.id, - name=evaluation.name, - value=evaluation.value, # type: ignore - comment=evaluation.comment, - metadata=evaluation.metadata, - config_id=evaluation.config_id, - data_type=evaluation.data_type, # type: ignore - ) - - except Exception as e: - langfuse_logger.error(f"Evaluator failed: {e}") - - # Run composite evaluator if provided and we have evaluations - if composite_evaluator and evaluations: - try: - composite_eval_metadata: Optional[Dict[str, Any]] = None - if isinstance(item, dict): - composite_eval_metadata = item.get("metadata") - elif hasattr(item, "metadata"): - composite_eval_metadata = item.metadata + if isinstance(item, dict): + eval_metadata = item.get("metadata") + elif hasattr(item, "metadata"): + eval_metadata = item.metadata - result = composite_evaluator( - input=input_data, - output=output, - expected_output=expected_output, - metadata=composite_eval_metadata, - evaluations=evaluations, - ) - - # Handle async composite evaluators - if asyncio.iscoroutine(result): - result = await result - - # Normalize to list - composite_evals: List[Evaluation] = [] - if isinstance(result, (dict, Evaluation)): - composite_evals = [result] # type: ignore - elif isinstance(result, list): - composite_evals = result # type: ignore - - # Store composite evaluations as scores and add to evaluations list - for composite_evaluation in composite_evals: - self.create_score( - trace_id=trace_id, - observation_id=span.id, - name=composite_evaluation.name, - value=composite_evaluation.value, # type: ignore - comment=composite_evaluation.comment, - metadata=composite_evaluation.metadata, - config_id=composite_evaluation.config_id, - data_type=composite_evaluation.data_type, # type: ignore - ) - evaluations.append(composite_evaluation) - - except Exception as e: - langfuse_logger.error(f"Composite evaluator failed: {e}") + with _propagate_attributes( + experiment=propagated_experiment_attributes + ): + eval_results = await _run_evaluator( + evaluator, + input=input_data, + output=output, + expected_output=expected_output, + metadata=eval_metadata, + ) + evaluations.extend(eval_results) + + # Store evaluations as scores + for evaluation in eval_results: + self.create_score( + trace_id=trace_id, + observation_id=span.id, + name=evaluation.name, + value=evaluation.value, # type: ignore + comment=evaluation.comment, + metadata=evaluation.metadata, + config_id=evaluation.config_id, + data_type=evaluation.data_type, # type: ignore + ) + + except Exception as e: + langfuse_logger.error(f"Evaluator failed: {e}") + + # Run composite evaluator if provided and we have evaluations + if composite_evaluator and evaluations: + try: + composite_eval_metadata: Optional[Dict[str, Any]] = None + if isinstance(item, dict): + composite_eval_metadata = item.get("metadata") + elif hasattr(item, "metadata"): + composite_eval_metadata = item.metadata + + with _propagate_attributes( + experiment=propagated_experiment_attributes + ): + result = composite_evaluator( + input=input_data, + output=output, + expected_output=expected_output, + metadata=composite_eval_metadata, + evaluations=evaluations, + ) - return ExperimentItemResult( - item=item, - output=output, - evaluations=evaluations, - trace_id=trace_id, - dataset_run_id=dataset_run_id, - ) + # Handle async composite evaluators + if asyncio.iscoroutine(result): + result = await result + + # Normalize to list + composite_evals: List[Evaluation] = [] + if isinstance(result, (dict, Evaluation)): + composite_evals = [result] # type: ignore + elif isinstance(result, list): + composite_evals = result # type: ignore + + # Store composite evaluations as scores and add to evaluations list + for composite_evaluation in composite_evals: + self.create_score( + trace_id=trace_id, + observation_id=span.id, + name=composite_evaluation.name, + value=composite_evaluation.value, # type: ignore + comment=composite_evaluation.comment, + metadata=composite_evaluation.metadata, + config_id=composite_evaluation.config_id, + data_type=composite_evaluation.data_type, # type: ignore + ) + evaluations.append(composite_evaluation) + + except Exception as e: + langfuse_logger.error(f"Composite evaluator failed: {e}") + + return ExperimentItemResult( + item=item, + output=output, + evaluations=evaluations, + trace_id=trace_id, + dataset_run_id=dataset_run_id, + ) def _create_experiment_run_name( self, *, name: Optional[str] = None, run_name: Optional[str] = None diff --git a/tests/test_prompt.py b/tests/test_prompt.py index bc3a5b7eb..6ba5ab85a 100644 --- a/tests/test_prompt.py +++ b/tests/test_prompt.py @@ -682,7 +682,7 @@ def test_prompt_end_to_end(): @pytest.fixture def langfuse(): from langfuse._client.resource_manager import LangfuseResourceManager - + langfuse_instance = Langfuse() langfuse_instance.api = Mock()