From 6ca488d62ad3d391fd33927589ca39003b6d69f0 Mon Sep 17 00:00:00 2001 From: Priyanshu-u07 Date: Wed, 4 Feb 2026 00:04:31 +0530 Subject: [PATCH] docs: enhance API reference for SDK clients Signed-off-by: Priyanshu-u07 --- kubeflow/hub/api/model_registry_client.py | 227 ++++++++++++----- kubeflow/optimizer/api/optimizer_client.py | 242 ++++++++++++------ kubeflow/trainer/api/trainer_client.py | 273 ++++++++++++++------- 3 files changed, 509 insertions(+), 233 deletions(-) diff --git a/kubeflow/hub/api/model_registry_client.py b/kubeflow/hub/api/model_registry_client.py index ada533b34..0d60c650c 100644 --- a/kubeflow/hub/api/model_registry_client.py +++ b/kubeflow/hub/api/model_registry_client.py @@ -49,24 +49,29 @@ def __init__( Args: base_url: Base URL of the model registry server including scheme. - Examples: "https://registry.example.com", "http://localhost" - The scheme is used to infer is_secure and port if not explicitly provided. + Examples: "https://registry.example.com", "http://localhost". Keyword Args: - port: Server port. If not provided, inferred from base_url scheme: - - https:// defaults to 443 - - http:// defaults to 8080 - - no scheme defaults to 443 + port: Server port. If not provided, inferred from `base_url` scheme: + - https:// defaults to 443 + - http:// defaults to 8080 + - no scheme defaults to 443 author: Name of the author. - is_secure: Whether to use a secure connection. If not provided, inferred from base_url: - - https:// sets is_secure=True - - http:// sets is_secure=False - - no scheme defaults to True + is_secure: Whether to use a secure connection. If not provided, + inferred from `base_url`: + - https:// sets is_secure=True + - http:// sets is_secure=False + - no scheme defaults to True user_token: The PEM-encoded user token as a string. custom_ca: Path to the PEM-encoded root certificates as a string. Raises: - ImportError: If model-registry is not installed. + ImportError: If the `model-registry` package is not installed. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") """ try: from model_registry import ModelRegistry @@ -104,33 +109,46 @@ def register_model( version_description: str | None = None, metadata: Mapping[str, SupportedTypes] | None = None, ) -> RegisteredModel: - """Register a model. + """Register a new model or a new version of an existing model. - This registers a model in the model registry. The model is not downloaded, - and has to be stored prior to registration. - - Most models can be registered using their URI, along with optional - connection-specific parameters, `storage_key` and `storage_path` or, - simply a `service_account_name`. URI builder utilities are recommended - when referring to specialized storage; for example `utils.s3_uri_from` - helper when using S3 object storage data connections. + The model must be stored in external storage (e.g., S3, GCS) before + registration. The URI should point to the model artifacts. Args: - name: Name of the model. - uri: URI of the model. + name: The name of the registered model. + uri: The URI where the model artifacts are stored. Keyword Args: - version: Version of the model. Has to be unique. - model_format_name: Name of the model format (e.g., "pytorch", "tensorflow", "onnx"). - Used by KServe to select the appropriate serving runtime. - model_format_version: Version of the model format (e.g., "2.0", "1.15"). - author: Author of the model. Defaults to the client author. - owner: Owner of the model. Defaults to the client author. - version_description: Description of the model version. - metadata: Additional version metadata. + version: The version string for this registration. Must be unique + for this model name. + model_format_name: The format of the model (e.g., "pytorch", + "tensorflow"). Used by KServe for inference. + model_format_version: The version of the model format (e.g., "2.0"). + author: The author of the model. Defaults to the client author. + owner: The owner of the model. Defaults to the client author. + version_description: A description of this specific model version. + metadata: A dictionary of additional metadata to store with the version. Returns: - Registered model. + model_registry.types.RegisteredModel: The registered model object. + + Raises: + model_registry.exceptions.StoreError: If the registry backend fails + to register the model. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + + model = client.register_model( + name="mnist-classifier", + uri="s3://my-bucket/models/mnist/v1/", + version="v1.0.0", + model_format_name="pytorch", + version_description="Initial release of MNIST model" + ) + print(f"Registered model ID: {model.id}") """ return self._registry.register_model( name=name, @@ -145,17 +163,29 @@ def register_model( ) def update_model(self, model: RegisteredModel) -> RegisteredModel: - """Update a registered model. + """Update the metadata of an existing registered model. Args: - model: The registered model to update. Must have an ID. + model: The `RegisteredModel` instance to update. It must have + a valid ID. Returns: - Updated registered model. + model_registry.types.RegisteredModel: The updated registered model. Raises: - TypeError: If model is not a RegisteredModel instance. - model_registry.exceptions.StoreError: If model does not have an ID. + TypeError: If the input is not a `RegisteredModel` instance. + model_registry.exceptions.StoreError: If the registered model does + not have an ID. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + model = client.get_model(name="mnist-classifier") + + # Update description + model.description = "Updated description" + updated_model = client.update_model(model) """ from model_registry.types import RegisteredModel @@ -164,17 +194,28 @@ def update_model(self, model: RegisteredModel) -> RegisteredModel: return self._registry.update(model) def update_model_version(self, model_version: ModelVersion) -> ModelVersion: - """Update a model version. + """Update an existing model version's metadata. Args: - model_version: The model version to update. Must have an ID. + model_version: The `ModelVersion` instance to update. It must have + a valid ID. Returns: - Updated model version. + model_registry.types.ModelVersion: The updated model version. Raises: - TypeError: If model_version is not a ModelVersion instance. - model_registry.exceptions.StoreError: If model version does not have an ID. + TypeError: If the input is not a `ModelVersion` instance. + model_registry.exceptions.StoreError: If the version does not have an ID. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + version = client.get_model_version(name="mnist", version="v1.0.0") + + # Update metadata + version.metadata["accuracy"] = 0.98 + client.update_model_version(version) """ from model_registry.types import ModelVersion @@ -183,17 +224,28 @@ def update_model_version(self, model_version: ModelVersion) -> ModelVersion: return self._registry.update(model_version) def update_model_artifact(self, model_artifact: ModelArtifact) -> ModelArtifact: - """Update a model artifact. + """Update an existing model artifact's metadata. Args: - model_artifact: The model artifact to update. Must have an ID. + model_artifact: The `ModelArtifact` instance to update. It must + have a valid ID. Returns: - Updated model artifact. + model_registry.types.ModelArtifact: The updated model artifact. Raises: - TypeError: If model_artifact is not a ModelArtifact instance. - model_registry.exceptions.StoreError: If model artifact does not have an ID. + TypeError: If the input is not a `ModelArtifact` instance. + model_registry.exceptions.StoreError: If the artifact does not have an ID. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + artifact = client.get_model_artifact(name="mnist", version="v1.0.0") + + # Update artifact description + artifact.description = "Production-ready weights" + client.update_model_artifact(artifact) """ from model_registry.types import ModelArtifact @@ -202,16 +254,23 @@ def update_model_artifact(self, model_artifact: ModelArtifact) -> ModelArtifact: return self._registry.update(model_artifact) def get_model(self, name: str) -> RegisteredModel: - """Get a registered model. + """Get a specific registered model by name. Args: - name: Name of the model. + name: The name of the registered model. Returns: - Registered model. + model_registry.types.RegisteredModel: The registered model object. Raises: - ValueError: If the model does not exist. + ValueError: If a registered model with the given `name` is not found. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + model = client.get_model(name="mnist-classifier") + print(f"Model ID: {model.id}") """ model = self._registry.get_registered_model(name) if model is None: @@ -219,18 +278,27 @@ def get_model(self, name: str) -> RegisteredModel: return model def get_model_version(self, name: str, version: str) -> ModelVersion: - """Get a model version. + """Get a specific model version. Args: - name: Name of the model. - version: Version of the model. + name: The name of the registered model. + version: The version string to retrieve. Returns: - Model version. + model_registry.types.ModelVersion: The model version object. Raises: - model_registry.exceptions.StoreError: If the model does not exist. - ValueError: If the version does not exist. + model_registry.exceptions.StoreError: If the registered model does + not exist. + ValueError: If the version string is not found for the given + registered model. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + version = client.get_model_version(name="mnist", version="v1.0.0") + print(f"Version ID: {version.id}") """ model_version = self._registry.get_model_version(name, version) if model_version is None: @@ -238,18 +306,26 @@ def get_model_version(self, name: str, version: str) -> ModelVersion: return model_version def get_model_artifact(self, name: str, version: str) -> ModelArtifact: - """Get a model artifact. + """Get the artifact associated with a specific model version. Args: - name: Name of the model. - version: Version of the model. + name: The name of the registered model. + version: The version of the registered model. Returns: - Model artifact. + model_registry.types.ModelArtifact: The model artifact object. Raises: - model_registry.exceptions.StoreError: If either the model or the version don't exist. - ValueError: If the artifact does not exist. + model_registry.exceptions.StoreError: If either the registered + model or version does not exist. + ValueError: If the artifact is not found. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + artifact = client.get_model_artifact(name="mnist", version="v1.0.0") + print(f"Artifact URI: {artifact.uri}") """ artifact = self._registry.get_model_artifact(name, version) if artifact is None: @@ -257,23 +333,38 @@ def get_model_artifact(self, name: str, version: str) -> ModelArtifact: return artifact def list_models(self) -> Iterator[RegisteredModel]: - """Get an iterator for registered models. + """Get an iterator for all registered models. Yields: - Registered models. + model_registry.types.RegisteredModel: The next registered model. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + for model in client.list_models(): + print(f"Model: {model.name}") """ yield from self._registry.get_registered_models() def list_model_versions(self, name: str) -> Iterator[ModelVersion]: - """Get an iterator for model versions. + """Get an iterator for all versions of a specific registered model. Args: - name: Name of the model. + name: The name of the registered model. Yields: - Model versions. + model_registry.types.ModelVersion: The next model version. Raises: - model_registry.exceptions.StoreError: If the model does not exist. + model_registry.exceptions.StoreError: If the registered model does + not exist. + + Example: + from kubeflow.hub import ModelRegistryClient + + client = ModelRegistryClient(base_url="http://localhost:8080") + for version in client.list_model_versions(name="mnist"): + print(f"Version: {version.version}") """ yield from self._registry.get_model_versions(name) diff --git a/kubeflow/optimizer/api/optimizer_client.py b/kubeflow/optimizer/api/optimizer_client.py index f64986953..0e45d90a6 100644 --- a/kubeflow/optimizer/api/optimizer_client.py +++ b/kubeflow/optimizer/api/optimizer_client.py @@ -39,12 +39,17 @@ def __init__( """Initialize a Kubeflow Optimizer client. Args: - backend_config: Backend configuration. Either KubernetesBackendConfig or None to use - default config class. Defaults to KubernetesBackendConfig. + backend_config: Backend configuration. Either a `KubernetesBackendConfig` + or `None` to use the default config class. Defaults to + `KubernetesBackendConfig`. Raises: - ValueError: Invalid backend configuration. + ValueError: If the provided `backend_config` is invalid. + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() """ # Set the default backend config. if not backend_config: @@ -64,23 +69,50 @@ def optimize( objectives: Optional[list[Objective]] = None, algorithm: Optional[BaseAlgorithm] = None, ) -> str: - """Create an OptimizationJob for hyperparameter tuning. + """Create and start an OptimizationJob for hyperparameter tuning. Args: - trial_template: The TrainJob template defining the training script. - trial_config: Optional configuration to run Trials. - objectives: List of objectives to optimize. - search_space: Dictionary mapping parameter names to Search specifications using - Search.uniform(), Search.loguniform(), Search.choice(), etc. - algorithm: The optimization algorithm to use. Defaults to RandomSearch. + trial_template: The TrainJob template defining the training code. + + Keyword Args: + trial_config: Optional configuration for running Trials (e.g., resources, + parallelism). + search_space: A dictionary mapping parameter names to search + specifications. Use `Search.uniform()`, `Search.loguniform()`, + `Search.choice()`, etc. + objectives: A list of objectives to optimize (e.g., maximizing accuracy). + algorithm: The optimization algorithm to use (e.g., `RandomSearch`, + `BayesianOptimization`). Defaults to `RandomSearch`. Returns: - The unique name of the Experiment that has been generated. + str: The unique name of the generated OptimizationJob (Experiment). Raises: - ValueError: Input arguments are invalid. - TimeoutError: Timeout to create Experiment. - RuntimeError: Failed to create Experiment. + ValueError: If the input arguments are invalid. + TimeoutError: If the request to create the job times out. + RuntimeError: If the backend fails to create the job. + + Example: + from kubeflow.optimizer import OptimizerClient, Search, Objective + + client = OptimizerClient() + + # Define search space + search_space = { + "learning_rate": Search.loguniform(min=0.01, max=0.1), + "batch_size": Search.choice(values=[16, 32, 64]), + "optimizer": Search.choice(values=["sgd", "adam"]) + } + + # Define objective + objectives = [Objective(name="accuracy", goal=0.99, type="maximize")] + + job_name = client.optimize( + trial_template=tpl, + search_space=search_space, + objectives=objectives + ) + print(f"Started optimization job: {job_name}") """ return self.backend.optimize( trial_template=trial_template, @@ -91,31 +123,46 @@ def optimize( ) def list_jobs(self) -> list[OptimizationJob]: - """List of the created OptimizationJobs + """List the created OptimizationJobs. Returns: - List of created OptimizationJobs. If no OptimizationJob exist, - an empty list is returned. + list[kubeflow.optimizer.types.OptimizationJob]: A list of created + OptimizationJobs. If no jobs exist, an empty list is returned. Raises: - TimeoutError: Timeout to list OptimizationJobs. - RuntimeError: Failed to list OptimizationJobs. + TimeoutError: If the request to list jobs times out. + RuntimeError: If the backend fails to list jobs. + + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() + jobs = client.list_jobs() + for job in jobs: + print(f"Job Name: {job.name}, Status: {job.status}") """ return self.backend.list_jobs() def get_job(self, name: str) -> OptimizationJob: - """Get the OptimizationJob object + """Get information about a specific OptimizationJob. Args: - name: Name of the OptimizationJob. + name: The unique name of the OptimizationJob. Returns: - A OptimizationJob object. + kubeflow.optimizer.types.OptimizationJob: The OptimizationJob object. Raises: - TimeoutError: Timeout to get a OptimizationJob. - RuntimeError: Failed to get a OptimizationJob. + TimeoutError: If the request to retrieve the job times out. + RuntimeError: If the backend fails to retrieve the job. + + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() + job = client.get_job(name="my-opt-job-abc123") + print(f"Job Status: {job.status}") """ return self.backend.get_job(name=name) @@ -128,54 +175,66 @@ def get_job_logs( ) -> Iterator[str]: """Get logs from a specific trial of an OptimizationJob. - You can watch for the logs in realtime as follows: - ```python - from kubeflow.optimizer import OptimizerClient + Args: + name: The unique name of the OptimizationJob. + trial_name: Optional name of a specific Trial. If not provided, logs + from the current best trial are returned. If no best trial is + available, logs from the first trial are returned. + follow: Whether to stream logs in realtime as they are produced. + Defaults to False. - # Get logs from the best current trial - for logline in OptimizerClient().get_job_logs(name="n7fb28dbee94"): - print(logline) + Returns: + Iterator[str]: An iterator yielding log lines as strings. - # Get logs from a specific trial - for logline in OptimizerClient().get_job_logs( - name="n7fb28dbee94", trial_name="n7fb28dbee94-abc123", follow=True - ): - print(logline) - ``` + Raises: + TimeoutError: If the request to retrieve logs times out. + RuntimeError: If the backend fails to retrieve logs. - Args: - name: Name of the OptimizationJob. - trial_name: Optional name of a specific Trial. If not provided, logs from the - current best trial are returned. If no best trial is available yet, logs - from the first trial are returned. - follow: Whether to stream logs in realtime as they are produced. + Example: + from kubeflow.optimizer import OptimizerClient - Returns: - Iterator of log lines. + client = OptimizerClient() + # Get logs from the current best trial + for log_line in client.get_job_logs(name="my-opt-job"): + print(log_line) - Raises: - TimeoutError: Timeout to get an OptimizationJob. - RuntimeError: Failed to get an OptimizationJob. + # Stream logs from a specific trial + for log_line in client.get_job_logs( + name="my-opt-job", + trial_name="my-opt-job-trial-1", + follow=True + ): + print(log_line) """ return self.backend.get_job_logs(name=name, trial_name=trial_name, follow=follow) def get_best_results(self, name: str) -> Optional[Result]: """Get the best hyperparameters and metrics from an OptimizationJob. - This method retrieves the optimal hyperparameters and their corresponding metrics - from the best trial found during the optimization process. + This method retrieves the optimal hyperparameters and their corresponding + metrics from the best trial found during the optimization process. Args: - name: Name of the OptimizationJob. + name: The unique name of the OptimizationJob. Returns: - A Result object containing the best hyperparameters and metrics, or None if - no best trial is available yet. + Optional[kubeflow.optimizer.types.Result]: A object containing the + best hyperparameters and metrics, or `None` if no best trial is + available yet. Raises: - TimeoutError: Timeout to get an OptimizationJob. - RuntimeError: Failed to get an OptimizationJob. + TimeoutError: If the request to retrieve results times out. + RuntimeError: If the backend fails to retrieve results. + + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() + best_result = client.get_best_results(name="my-opt-job") + if best_result: + print(f"Best parameters: {best_result.parameters}") + print(f"Best metrics: {best_result.metrics}") """ return self.backend.get_best_results(name=name) @@ -190,23 +249,34 @@ def wait_for_job_status( """Wait for an OptimizationJob to reach a desired status. Args: - name: Name of the OptimizationJob. - status: Expected statuses. Must be a subset of Created, Running, Complete, and - Failed statuses. - timeout: Maximum number of seconds to wait for the OptimizationJob to reach one of the - expected statuses. - polling_interval: The polling interval in seconds to check OptimizationJob status. - callbacks: Optional list of callback functions to be invoked after each polling - interval. Each callback should accept a single argument: the OptimizationJob object. + name: The unique name of the OptimizationJob. + status: A set of expected statuses to wait for (e.g., {"Complete"}). + Must be a subset of "Created", "Running", "Complete", and "Failed". + timeout: Maximum number of seconds to wait. Defaults to 3600. + polling_interval: Seconds to wait between status checks. Defaults to 2. + callbacks: Optional list of callback functions to invoke after each poll. + Each callback accepts the `OptimizationJob` object as an argument. Returns: - An OptimizationJob object that reaches the desired status. + kubeflow.optimizer.types.OptimizationJob: The OptimizationJob object + after reaching the desired status. Raises: - ValueError: The input values are incorrect. - RuntimeError: Failed to get OptimizationJob or OptimizationJob reaches unexpected - Failed status. - TimeoutError: Timeout to wait for OptimizationJob status. + ValueError: If the input values (e.g., status) are invalid. + RuntimeError: If the job reaches an unexpected "Failed" status or + the client fails to get the status. + TimeoutError: If the job does not reach the desired status within + the timeout period. + + Example: + from kubeflow.optimizer import OptimizerClient, constants + + client = OptimizerClient() + job = client.wait_for_job_status( + name="my-opt-job", + status={constants.OPTIMIZATION_JOB_COMPLETE} + ) + print(f"Optimization finished with status: {job.status}") """ return self.backend.wait_for_job_status( name=name, @@ -217,32 +287,46 @@ def wait_for_job_status( ) def delete_job(self, name: str): - """Delete the OptimizationJob. + """Delete an OptimizationJob. Args: - name: Name of the OptimizationJob. + name: The unique name of the OptimizationJob to delete. Raises: - TimeoutError: Timeout to delete OptimizationJob. - RuntimeError: Failed to delete OptimizationJob. + TimeoutError: If the request to delete the job times out. + RuntimeError: If the backend fails to delete the job. + + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() + client.delete_job(name="my-opt-job") """ return self.backend.delete_job(name=name) def get_job_events(self, name: str) -> list[Event]: - """Get events for an OptimizationJob. + """Get Kubernetes events associated with an OptimizationJob. - This provides additional clarity about the state of the OptimizationJob - when logs alone are not sufficient. Events include information about - trial state changes, errors, and other significant occurrences. + Events provide additional clarity about the state of the OptimizationJob, + such as trial state changes, errors, and other significant occurrences. Args: - name: Name of the OptimizationJob. + name: The unique name of the OptimizationJob. Returns: - A list of Event objects associated with the OptimizationJob. + list[kubeflow.trainer.types.Event]: A list of Event objects + associated with the OptimizationJob. Raises: - TimeoutError: Timeout to get an OptimizationJob events. - RuntimeError: Failed to get an OptimizationJob events. + TimeoutError: If the request to retrieve events times out. + RuntimeError: If the backend fails to retrieve events. + + Example: + from kubeflow.optimizer import OptimizerClient + + client = OptimizerClient() + events = client.get_job_events(name="my-opt-job") + for event in events: + print(f"Event: {event.message} ({event.reason})") """ return self.backend.get_job_events(name=name) diff --git a/kubeflow/trainer/api/trainer_client.py b/kubeflow/trainer/api/trainer_client.py index 08c6a1878..3aa7d6d9e 100644 --- a/kubeflow/trainer/api/trainer_client.py +++ b/kubeflow/trainer/api/trainer_client.py @@ -45,13 +45,25 @@ def __init__( Args: backend_config: Backend configuration. Either KubernetesBackendConfig, - LocalProcessBackendConfig, ContainerBackendConfig, - or None to use the backend's default config class. - Defaults to KubernetesBackendConfig. + LocalProcessBackendConfig, ContainerBackendConfig, or None to use + the backend's default config class. Defaults to + KubernetesBackendConfig. Raises: - ValueError: Invalid backend configuration. + ValueError: If the provided `backend_config` is invalid. + Example: + Initialize with the default Kubernetes backend: + + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + + Initialize with a local process backend: + + from kubeflow.trainer import TrainerClient, LocalProcessBackendConfig + + client = TrainerClient(backend_config=LocalProcessBackendConfig()) """ # Set the default backend config. if not backend_config: @@ -67,38 +79,70 @@ def __init__( raise ValueError(f"Invalid backend config '{backend_config}'") def list_runtimes(self) -> list[types.Runtime]: - """List of the available runtimes. + """List the available Training Runtimes. Returns: - A list of available training runtimes. If no runtimes exist, an empty list is returned. + list[kubeflow.trainer.types.Runtime]: A list of available training + runtimes. If no runtimes exist, an empty list is returned. Raises: - TimeoutError: Timeout to list runtimes. - RuntimeError: Failed to list runtimes. + TimeoutError: If the request to list runtimes times out. + RuntimeError: If the client fails to list runtimes. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + runtimes = client.list_runtimes() + + for runtime in runtimes: + print(f"Runtime: {runtime.name}") """ return self.backend.list_runtimes() def get_runtime(self, name: str) -> types.Runtime: - """Get the runtime object + """Get information about a specific Training Runtime. + Args: - name: Name of the runtime. + name: The name of the Training Runtime to retrieve. Returns: - A runtime object. + kubeflow.trainer.types.Runtime: The Training Runtime object. + + Raises: + ValueError: If the runtime `name` is invalid or not found. + TimeoutError: If checking for the Training Runtime times out. + RuntimeError: If the backend fails to retrieve the Training Runtime. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + runtime = client.get_runtime(name="pytorch-distributed") """ return self.backend.get_runtime(name=name) def get_runtime_packages(self, runtime: types.Runtime): - """Print the installed Python packages for the given runtime. If a runtime has GPUs it also - prints available GPUs on the single training node. + """Print the installed Python packages for the given Training Runtime. + + If the Training Runtime supports GPUs, it also prints available GPU + information for the training node. Args: - runtime: Reference to one of existing runtimes. + runtime: A reference to an existing Training Runtime object, + typically obtained via `get_runtime` or `list_runtimes`. Raises: - ValueError: Input arguments are invalid. - RuntimeError: Failed to get Runtime. + ValueError: If the input arguments are invalid. + RuntimeError: If the client fails to get package information for + the Training Runtime. + + Example: + from kubeflow.trainer import TrainerClient + client = TrainerClient() + runtime = client.get_runtime(name="pytorch-distributed") + client.get_runtime_packages(runtime=runtime) """ return self.backend.get_runtime_packages(runtime=runtime) @@ -111,33 +155,45 @@ def train( ] = None, options: Optional[list] = None, ) -> str: - """Create a TrainJob. You can configure the TrainJob using one of these trainers: - - - CustomTrainer: Runs training with a user-defined function that fully encapsulates the - training process. - - CustomTrainerContainer: Runs training with a user-defined image that fully encapsulates - the training process. - - BuiltinTrainer: Uses a predefined trainer with built-in post-training logic, requiring - only parameter configuration. + """Create and start a TrainJob. + + You can configure the TrainJob using one of these trainer types: + - `CustomTrainer`: Runs training with a user-defined function. + - `CustomTrainerContainer`: Runs training with a user-defined container image. + - `BuiltinTrainer`: Uses a predefined trainer with built-in logic. Args: - runtime: Optional reference to one of the existing runtimes. It can accept the runtime - name or Runtime object from the `get_runtime()` API. - Defaults to the torch-distributed runtime if not provided. - initializer: Optional configuration for the dataset and model initializers. - trainer: Optional configuration for a CustomTrainer, CustomTrainerContainer, or - BuiltinTrainer. If not specified, the TrainJob will use the - runtime's default values. - options: Optional list of configuration options to apply to the TrainJob. - Options can be imported from kubeflow.trainer.options. + runtime: Reference to an existing Training Runtime. Accepts a + runtime name (str) or a Runtime object. Defaults to the + "torch-distributed" runtime if not provided. + initializer: Optional configuration for dataset and model + initializers. + trainer: Configuration for the trainer. If not specified, the + TrainJob uses the Training Runtime's default values. + options: Optional list of configuration options to apply to the + TrainJob. Import options from `kubeflow.trainer.options`. Returns: - The unique name of the TrainJob that has been generated. + str: The unique name of the generated TrainJob. Raises: - ValueError: Input arguments are invalid. - TimeoutError: Timeout to create TrainJobs. - RuntimeError: Failed to create TrainJobs. + ValueError: If the input arguments are invalid. + TimeoutError: If the request to create the TrainJob times out. + RuntimeError: If the backend fails to create the TrainJob. + + Example: + from kubeflow.trainer import TrainerClient, types + + def train_func(parameters): + # Your training logic here + print(f"Training with parameters: {parameters}") + + client = TrainerClient() + job_name = client.train( + runtime="torch-distributed", + trainer=types.CustomTrainer(func=train_func) + ) + print(f"Started job: {job_name}") """ return self.backend.train( runtime=runtime, @@ -147,33 +203,52 @@ def train( ) def list_jobs(self, runtime: Optional[types.Runtime] = None) -> list[types.TrainJob]: - """List of the created TrainJobs. If a runtime is specified, only TrainJobs associated with + """List the generated TrainJobs. + + If a Training Runtime is specified, only TrainJobs associated with that runtime are returned. Args: - runtime: Reference to one of the existing runtimes. + runtime: Optional reference to an existing Training Runtime object + to filter TrainJobs. Returns: - List of created TrainJobs. If no TrainJob exist, an empty list is returned. + list[kubeflow.trainer.types.TrainJob]: A list of generated + TrainJob objects. If no jobs exist, an empty list is returned. Raises: - TimeoutError: Timeout to list TrainJobs. - RuntimeError: Failed to list TrainJobs. + TimeoutError: If the request to list TrainJobs times out. + RuntimeError: If the backend fails to list TrainJobs. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + jobs = client.list_jobs() + for job in jobs: + print(f"Job Name: {job.name}, Status: {job.status}") """ return self.backend.list_jobs(runtime=runtime) def get_job(self, name: str) -> types.TrainJob: - """Get the TrainJob object + """Get information about a specific TrainJob. Args: - name: Name of the TrainJob. + name: The unique name of the TrainJob. Returns: - A TrainJob object. + kubeflow.trainer.types.TrainJob: The TrainJob object. Raises: - TimeoutError: Timeout to get a TrainJob. - RuntimeError: Failed to get a TrainJob. + TimeoutError: If the request to retrieve the job times out. + RuntimeError: If the backend fails to retrieve the job. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + job = client.get_job(name="my-training-job-abc123") + print(f"Job Status: {job.status}") """ return self.backend.get_job(name=name) @@ -186,45 +261,53 @@ def get_job_logs( ) -> Iterator[str]: """Get logs from a specific step of a TrainJob. - You can watch for the logs in realtime as follows: - ```python - from kubeflow.trainer import TrainerClient - - for logline in TrainerClient().get_job_logs(name="s8d44aa4fb6d", follow=True): - print(logline) - ``` - Args: - name: Name of the TrainJob. - step: Step of the TrainJob to collect logs from, like dataset-initializer or node-0. - follow: Whether to stream logs in realtime as they are produced. + name: The unique name of the TrainJob. + step: The step of the TrainJob to collect logs from (e.g., + "dataset-initializer" or "node-0"). Defaults to "node-0". + follow: Whether to stream logs in realtime as they are produced. + Defaults to False. Returns: - Iterator of log lines. - + Iterator[str]: An iterator yielding log lines as strings. Raises: - TimeoutError: Timeout to get a TrainJob. - RuntimeError: Failed to get a TrainJob. + TimeoutError: If the request to retrieve logs times out. + RuntimeError: If the backend fails to retrieve logs. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + for log_line in client.get_job_logs(name="my-job", follow=True): + print(log_line) """ return self.backend.get_job_logs(name=name, follow=follow, step=step) def get_job_events(self, name: str) -> list[types.Event]: - """Get events for a TrainJob. + """Get Kubernetes events associated with a TrainJob. - This provides additional clarity about the state of the TrainJob - when logs alone are not sufficient. Events include information about - pod state changes, errors, and other significant occurrences. + Events provide additional clarity about the state of the TrainJob, + such as pod state changes, errors, and other significant occurrences. Args: - name: Name of the TrainJob. + name: The unique name of the TrainJob. Returns: - A list of Event objects associated with the TrainJob. + list[kubeflow.trainer.types.Event]: A list of Event objects + associated with the TrainJob. Raises: - TimeoutError: Timeout to get a TrainJob events. - RuntimeError: Failed to get a TrainJob events. + TimeoutError: If the request to retrieve events times out. + RuntimeError: If the backend fails to retrieve events. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + events = client.get_job_events(name="my-job") + for event in events: + print(f"Event: {event.message} ({event.reason})") """ return self.backend.get_job_events(name=name) @@ -239,22 +322,34 @@ def wait_for_job_status( """Wait for a TrainJob to reach a desired status. Args: - name: Name of the TrainJob. - status: Expected statuses. Must be a subset of Created, Running, Complete, and - Failed statuses. - timeout: Maximum number of seconds to wait for the TrainJob to reach one of the - expected statuses. - polling_interval: The polling interval in seconds to check TrainJob status. - callbacks: Optional list of callback functions to be invoked after each polling - interval. Each callback should accept a single argument: the TrainJob object. + name: The unique name of the TrainJob. + status: A set of expected statuses to wait for (e.g., {"Complete"}). + Must be a subset of "Created", "Running", "Complete", and "Failed". + timeout: Maximum number of seconds to wait. Defaults to 600. + polling_interval: Seconds to wait between status checks. Defaults to 2. + callbacks: Optional list of callback functions to invoke after each + poll. Each callback accepts the `TrainJob` object as an argument. Returns: - A TrainJob object that reaches the desired status. + kubeflow.trainer.types.TrainJob: The TrainJob object after reaching + the desired status. Raises: - ValueError: The input values are incorrect. - RuntimeError: Failed to get TrainJob or TrainJob reaches unexpected Failed status. - TimeoutError: Timeout to wait for TrainJob status. + ValueError: If the input values (e.g., status) are invalid. + RuntimeError: If the job reaches an unexpected "Failed" status or + the client fails to get the status. + TimeoutError: If the job does not reach the desired status within + the timeout period. + + Example: + from kubeflow.trainer import TrainerClient, constants + + client = TrainerClient() + job = client.wait_for_job_status( + name="my-job", + status={constants.TRAINJOB_COMPLETE} + ) + print(f"Job finished with status: {job.status}") """ return self.backend.wait_for_job_status( name=name, @@ -265,13 +360,19 @@ def wait_for_job_status( ) def delete_job(self, name: str): - """Delete the TrainJob. + """Delete a TrainJob. Args: - name: Name of the TrainJob. + name: The unique name of the TrainJob to delete. Raises: - TimeoutError: Timeout to delete TrainJob. - RuntimeError: Failed to delete TrainJob. + TimeoutError: If the request to delete the job times out. + RuntimeError: If the backend fails to delete the TrainJob. + + Example: + from kubeflow.trainer import TrainerClient + + client = TrainerClient() + client.delete_job(name="my-job") """ return self.backend.delete_job(name=name)