diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 666b75c37..7d225785d 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -7,7 +7,7 @@ from collections import OrderedDict from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast import numpy as np import pandas as pd @@ -54,6 +54,225 @@ ERROR_CODE = 512 +def _validate_flow_and_task_inputs( + flow: OpenMLFlow | OpenMLTask, + task: OpenMLTask | OpenMLFlow, + flow_tags: list[str] | None, +) -> tuple[OpenMLFlow, OpenMLTask]: + """Validate and normalize inputs for flow and task execution. + + Parameters + ---------- + flow : OpenMLFlow or OpenMLTask + The flow object (may be swapped with task for backward compatibility). + task : OpenMLTask or OpenMLFlow + The task object (may be swapped with flow for backward compatibility). + flow_tags : List[str] or None + A list of tags that the flow should have at creation. + + Returns + ------- + Tuple[OpenMLFlow, OpenMLTask] + The validated flow and task. + + Raises + ------ + ValueError + If flow_tags is not a list or task is not published. + """ + if flow_tags is not None and not isinstance(flow_tags, list): + raise ValueError("flow_tags should be a list") + + # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). + # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). + if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): + # We want to allow either order of argument (to avoid confusion). + warnings.warn( + "The old argument order (Flow, model) is deprecated and " + "will not be supported in the future. Please use the " + "order (model, Flow).", + DeprecationWarning, + stacklevel=3, + ) + task, flow = flow, task + + if not isinstance(flow, OpenMLFlow): + raise TypeError("Flow must be OpenMLFlow after validation") + + if not isinstance(task, OpenMLTask): + raise TypeError("Task must be OpenMLTask after validation") + + if task.task_id is None: + raise ValueError("The task should be published at OpenML") + + return flow, task + + +def _sync_flow_with_server( + flow: OpenMLFlow, + task: OpenMLTask, + *, + upload_flow: bool, + avoid_duplicate_runs: bool, +) -> int | None: + """Synchronize flow with server and check if setup/task combination is already present. + + Parameters + ---------- + flow : OpenMLFlow + The flow to synchronize. + task : OpenMLTask + The task to check for duplicate runs. + upload_flow : bool + Whether to upload the flow if it doesn't exist. + avoid_duplicate_runs : bool + Whether to check for duplicate runs. + + Returns + ------- + int or None + The flow_id if synced with server, None otherwise. + + Raises + ------ + PyOpenMLError + If flow_id mismatch or flow doesn't exist when expected. + OpenMLRunsExistError + If duplicate runs exist and avoid_duplicate_runs is True. + """ + # We only need to sync with the server right now if we want to upload the flow, + # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. + flow_id = None + if upload_flow or avoid_duplicate_runs: + flow_id = flow_exists(flow.name, flow.external_version) + if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: + if flow_id is not False: + raise PyOpenMLError( + f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", + ) + raise PyOpenMLError( + "Flow does not exist on the server, but 'flow.flow_id' is not None." + ) + if upload_flow and flow_id is False: + flow.publish() + flow_id = flow.flow_id + elif flow_id: + flow_from_server = get_flow(flow_id) + _copy_server_fields(flow_from_server, flow) + if avoid_duplicate_runs: + flow_from_server.model = flow.model + setup_id = setup_exists(flow_from_server) + task_id = task.task_id + ids = run_exists(cast(int, task_id), setup_id) + if ids: + error_message = ( + "One or more runs of this setup were already performed on the task." + ) + raise OpenMLRunsExistError(ids, error_message) + else: + # Flow does not exist on server and we do not want to upload it. + # No sync with the server happens. + flow_id = None + + return flow_id + + +def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]: + """Prepare run environment information and tags. + + Parameters + ---------- + flow : OpenMLFlow + The flow to get version information from. + + Returns + ------- + Tuple[List[str], List[str]] + A tuple of (tags, run_environment). + """ + run_environment = flow.extension.get_version_information() + tags = ["openml-python", run_environment[1]] + return tags, run_environment + + +def _create_run_from_results( # noqa: PLR0913 + task: OpenMLTask, + flow: OpenMLFlow, + flow_id: int | None, + data_content: list[list], + trace: OpenMLRunTrace | None, + fold_evaluations: OrderedDict[str, OrderedDict], + sample_evaluations: OrderedDict[str, OrderedDict], + tags: list[str], + run_environment: list[str], + upload_flow: bool, # noqa: FBT001 + avoid_duplicate_runs: bool, # noqa: FBT001 +) -> OpenMLRun: + """Create an OpenMLRun object from execution results. + + Parameters + ---------- + task : OpenMLTask + The task that was executed. + flow : OpenMLFlow + The flow that was executed. + flow_id : int or None + The flow ID if synced with server. + data_content : List[List] + The prediction data content. + trace : OpenMLRunTrace or None + The execution trace if available. + fold_evaluations : OrderedDict + The fold-based evaluation measures. + sample_evaluations : OrderedDict + The sample-based evaluation measures. + tags : List[str] + Tags to attach to the run. + run_environment : List[str] + Environment information. + upload_flow : bool + Whether the flow was uploaded. + avoid_duplicate_runs : bool + Whether duplicate runs were checked. + + Returns + ------- + OpenMLRun + The created run object. + """ + dataset = task.get_dataset() + fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] + generated_description = "\n".join(fields) + + run = OpenMLRun( + task_id=cast(int, task.task_id), + flow_id=flow_id, + dataset_id=dataset.dataset_id, + model=flow.model, + flow_name=flow.name, + tags=tags, + trace=trace, + data_content=data_content, + flow=flow, + setup_string=flow.extension.create_setup_string(flow.model), + description_text=generated_description, + ) + + if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: + # We only extract the parameter settings if a sync happened with the server. + # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. + # Otherwise, we will do this at upload time. + run.parameter_settings = flow.extension.obtain_parameter_values(flow) + + # now we need to attach the detailed evaluations + if task.task_type_id == TaskType.LEARNING_CURVE: + run.sample_evaluations = sample_evaluations + else: + run.fold_evaluations = fold_evaluations + + return run + + # TODO(eddiebergman): Could potentially overload this but # it seems very big to do so def run_model_on_task( # noqa: PLR0913 @@ -175,7 +394,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask: return run -def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 +def run_flow_on_task( # noqa: PLR0913 flow: OpenMLFlow, task: OpenMLTask, avoid_duplicate_runs: bool | None = None, @@ -222,71 +441,29 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 run : OpenMLRun Result of the run. """ - if flow_tags is not None and not isinstance(flow_tags, list): - raise ValueError("flow_tags should be a list") - if avoid_duplicate_runs is None: avoid_duplicate_runs = openml.config.avoid_duplicate_runs - # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). - # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). - if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): - # We want to allow either order of argument (to avoid confusion). - warnings.warn( - "The old argument order (Flow, model) is deprecated and " - "will not be supported in the future. Please use the " - "order (model, Flow).", - DeprecationWarning, - stacklevel=2, - ) - task, flow = flow, task - - if task.task_id is None: - raise ValueError("The task should be published at OpenML") + # 1. Validate inputs + flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags) + # 2. Prepare the model if flow.model is None: flow.model = flow.extension.flow_to_model(flow) - flow.model = flow.extension.seed_model(flow.model, seed=seed) - # We only need to sync with the server right now if we want to upload the flow, - # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. - flow_id = None - if upload_flow or avoid_duplicate_runs: - flow_id = flow_exists(flow.name, flow.external_version) - if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: - if flow_id is not False: - raise PyOpenMLError( - f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", - ) - raise PyOpenMLError( - "Flow does not exist on the server, but 'flow.flow_id' is not None." - ) - if upload_flow and flow_id is False: - flow.publish() - flow_id = flow.flow_id - elif flow_id: - flow_from_server = get_flow(flow_id) - _copy_server_fields(flow_from_server, flow) - if avoid_duplicate_runs: - flow_from_server.model = flow.model - setup_id = setup_exists(flow_from_server) - ids = run_exists(task.task_id, setup_id) - if ids: - error_message = ( - "One or more runs of this setup were already performed on the task." - ) - raise OpenMLRunsExistError(ids, error_message) - else: - # Flow does not exist on server and we do not want to upload it. - # No sync with the server happens. - flow_id = None - - dataset = task.get_dataset() + # 3. Sync with server and check for duplicates + flow_id = _sync_flow_with_server( + flow, + task, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, + ) - run_environment = flow.extension.get_version_information() - tags = ["openml-python", run_environment[1]] + # 4. Prepare run environment + tags, run_environment = _prepare_run_environment(flow) + # 5. Check if model is already fitted if flow.extension.check_if_model_fitted(flow.model): warnings.warn( "The model is already fitted! This might cause inconsistency in comparison of results.", @@ -294,8 +471,8 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 stacklevel=2, ) - # execute the run - res = _run_task_get_arffcontent( + # 6. Execute the run (parallel processing happens here) + data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent( model=flow.model, task=task, extension=flow.extension, @@ -303,35 +480,22 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 n_jobs=n_jobs, ) - data_content, trace, fold_evaluations, sample_evaluations = res - fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] - generated_description = "\n".join(fields) - run = OpenMLRun( - task_id=task.task_id, + # 7. Create run from results + run = _create_run_from_results( + task=task, + flow=flow, flow_id=flow_id, - dataset_id=dataset.dataset_id, - model=flow.model, - flow_name=flow.name, - tags=tags, - trace=trace, data_content=data_content, - flow=flow, - setup_string=flow.extension.create_setup_string(flow.model), - description_text=generated_description, + trace=trace, + fold_evaluations=fold_evaluations, + sample_evaluations=sample_evaluations, + tags=tags, + run_environment=run_environment, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, ) - if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: - # We only extract the parameter settings if a sync happened with the server. - # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. - # Otherwise, we will do this at upload time. - run.parameter_settings = flow.extension.obtain_parameter_values(flow) - - # now we need to attach the detailed evaluations - if task.task_type_id == TaskType.LEARNING_CURVE: - run.sample_evaluations = sample_evaluations - else: - run.fold_evaluations = fold_evaluations - + # 8. Log completion message if flow_id: message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}" else: