diff --git a/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb b/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb new file mode 100644 index 0000000000..916ff4ea70 --- /dev/null +++ b/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb @@ -0,0 +1,313 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "# Using `LocalTrainerClient` for MNIST image classification with PyTorch DDP\n", + "\n", + "This notebook uses the `LocalTrainerClient` to train an image classification model using the MNIST fashion dataset.\n", + "\n", + "The `LocalTrainerClient` runs training jobs locally in Docker containers. No Kubernetes cluster is required." + ], + "id": "3a425719bb33868e" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Install the KubeFlow SDK", + "id": "52b257e345242c84" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "# TODO (eoinfennessy): Update repo and branch before merging.\n", + "!pip install git+https://github.com/eoinfennessy/trainer.git@add-local-trainer-client#subdirectory=sdk" + ], + "id": "5398886ad279253b", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Define the training function\n", + "\n", + "This function trains a CNN model using the Fashion MNIST dataset.\n" + ], + "id": "aacfd746ab04a56f" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "def train_pytorch():\n", + " import os\n", + "\n", + " import torch\n", + " from torch import nn\n", + " import torch.nn.functional as F\n", + "\n", + " from torchvision import datasets, transforms\n", + " import torch.distributed as dist\n", + " from torch.utils.data import DataLoader, DistributedSampler\n", + "\n", + " # [1] Configure CPU/GPU device and distributed backend.\n", + " # Kubeflow Trainer will automatically configure the distributed environment.\n", + " device, backend = (\"cuda\", \"nccl\") if torch.cuda.is_available() else (\"cpu\", \"gloo\")\n", + " dist.init_process_group(backend=backend)\n", + "\n", + " local_rank = int(os.getenv(\"LOCAL_RANK\", 0))\n", + " print(\n", + " \"Distributed Training with WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}.\".format(\n", + " dist.get_world_size(),\n", + " dist.get_rank(),\n", + " local_rank,\n", + " )\n", + " )\n", + "\n", + " # [2] Define PyTorch CNN Model to be trained.\n", + " class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 20, 5, 1)\n", + " self.conv2 = nn.Conv2d(20, 50, 5, 1)\n", + " self.fc1 = nn.Linear(4 * 4 * 50, 500)\n", + " self.fc2 = nn.Linear(500, 10)\n", + "\n", + " def forward(self, x):\n", + " x = F.relu(self.conv1(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = F.relu(self.conv2(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = x.view(-1, 4 * 4 * 50)\n", + " x = F.relu(self.fc1(x))\n", + " x = self.fc2(x)\n", + " return F.log_softmax(x, dim=1)\n", + "\n", + " # [3] Attach model to the correct device.\n", + " device = torch.device(f\"{device}:{local_rank}\")\n", + " model = nn.parallel.DistributedDataParallel(Net().to(device))\n", + " model.train()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)\n", + "\n", + " # [4] Get the Fashion-MNIST dataset and distributed it across all available devices.\n", + " dataset = datasets.FashionMNIST(\n", + " \"./data\",\n", + " train=True,\n", + " download=True,\n", + " transform=transforms.Compose([transforms.ToTensor()]),\n", + " )\n", + " train_loader = DataLoader(\n", + " dataset,\n", + " batch_size=100,\n", + " sampler=DistributedSampler(dataset),\n", + " )\n", + "\n", + " # [5] Define the training loop.\n", + " for epoch in range(3):\n", + " for batch_idx, (inputs, labels) in enumerate(train_loader):\n", + " # Attach tensors to the device.\n", + " inputs, labels = inputs.to(device), labels.to(device)\n", + "\n", + " # Forward pass\n", + " outputs = model(inputs)\n", + " loss = F.nll_loss(outputs, labels)\n", + "\n", + " # Backward pass\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + " if batch_idx % 10 == 0 and dist.get_rank() == 0:\n", + " print(\n", + " \"Train Epoch: {} [{}/{} ({:.0f}%)]\\tLoss: {:.6f}\".format(\n", + " epoch,\n", + " batch_idx * len(inputs),\n", + " len(train_loader.dataset),\n", + " 100.0 * batch_idx / len(train_loader),\n", + " loss.item(),\n", + " )\n", + " )\n", + "\n", + " # Wait for the training to complete and destroy to PyTorch distributed process group.\n", + " dist.barrier()\n", + " if dist.get_rank() == 0:\n", + " print(\"Training is finished\")\n", + " dist.destroy_process_group()" + ], + "id": "86f2438e1c249731", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Create the trainer client", + "id": "65ce9b4ee3c2c64b" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "The `LocalTrainerClient` exposes the same interface as the `TrainerClient`.\n", + "\n", + "When you wish to move your job to a Kubernetes cluster, simply change the\n", + "training client -- the rest of the notebook will work without modification." + ], + "id": "429ad749682d68cc" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "import os\n", + "\n", + "from kubeflow.trainer import LocalTrainerClient, TrainerClient, CustomTrainer\n", + "\n", + "exec_mode = os.getenv(\"KUBEFLOW_TRAINER_EXEC_MODE\", \"local\")\n", + "\n", + "if exec_mode == \"local\":\n", + " client = LocalTrainerClient()\n", + "else:\n", + " client = TrainerClient()" + ], + "id": "f459587c03f32d05", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## List runtimes\n", + "\n", + "Predefined training runtimes are included as part of the package. Currently only the `torch-distributed` runtime is included." + ], + "id": "735a674339f10ee1" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "client.list_runtimes()", + "id": "5e3dc7c8ab7c9322", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Get runtime", + "id": "8b08eb668205d5f4" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "runtime = client.get_runtime(\"torch-distributed\")", + "id": "5cfaa7952b5fbcb1", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Start the training job\n", + "\n", + "This job uses the `torch-distrbuted` runtime to run the `train_pytorch` training function. The job runs on four worker node containers." + ], + "id": "4c8aa9177a6ca0c1" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "job_name = client.train(\n", + " runtime=runtime,\n", + " trainer=CustomTrainer(\n", + " func=train_pytorch,\n", + " num_nodes=4,\n", + " )\n", + " )" + ], + "id": "9a3e893c987e6e9b", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Follow job logs", + "id": "5f8e96a51263ffdb" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "_ = client.get_job_logs(job_name, follow=True)", + "id": "23c0d5f0321961a5", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Optional: Examine Docker resources\n", + "\n", + "In a terminal, run the following to list the containers running the training job:\n", + "\n", + "```shell\n", + "docker ps -a --filter label=trainer.kubeflow.org/train-job-name\n", + "```\n", + "\n", + "Example output:\n", + "\n", + "```\n", + "CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n", + "e116e42af00a pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-3\n", + "0f9d891edd74 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-2\n", + "1672a222d360 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-1\n", + "e1f11b5fad3c pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-0\n", + "```\n", + "\n", + "Run the following to see the Docker network:\n", + "\n", + "```shell\n", + "docker network ls --filter label=trainer.kubeflow.org/train-job-name\n", + "```\n", + "\n", + "Example output:\n", + "\n", + "```\n", + "NETWORK ID NAME DRIVER SCOPE\n", + "157a6ed5752f kubeflow-trainer-l20b00d08df5 bridge local\n", + "```" + ], + "id": "306ef11f6d48d176" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Delete the job", + "id": "5a3478eb1567897c" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "client.delete_job(job_name)", + "id": "e5e72bdf898d9129", + "outputs": [], + "execution_count": null + } + ], + "metadata": { + "kernelspec": { + "name": "python3", + "language": "python", + "display_name": "Python 3 (ipykernel)" + } + }, + "nbformat": 5, + "nbformat_minor": 9 +} diff --git a/sdk/kubeflow/trainer/__init__.py b/sdk/kubeflow/trainer/__init__.py index e718d3807f..b1e4ce704f 100644 --- a/sdk/kubeflow/trainer/__init__.py +++ b/sdk/kubeflow/trainer/__init__.py @@ -20,6 +20,9 @@ # Import the Kubeflow Trainer client. from kubeflow.trainer.api.trainer_client import TrainerClient +# Import the Kubeflow Local Trainer client. +from kubeflow.trainer.api.local_trainer_client import LocalTrainerClient + # Import the Kubeflow Trainer constants. from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH diff --git a/sdk/kubeflow/trainer/api/abstract_trainer_client.py b/sdk/kubeflow/trainer/api/abstract_trainer_client.py new file mode 100644 index 0000000000..189f93a41c --- /dev/null +++ b/sdk/kubeflow/trainer/api/abstract_trainer_client.py @@ -0,0 +1,62 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class AbstractTrainerClient(ABC): + @abstractmethod + def delete_job(self, name: str): + pass + + @abstractmethod + def get_job(self, name: str) -> types.TrainJob: + pass + + @abstractmethod + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def get_runtime(self, name: str) -> types.Runtime: + pass + + @abstractmethod + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + pass + + @abstractmethod + def list_runtimes(self) -> List[types.Runtime]: + pass + + @abstractmethod + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + pass diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py new file mode 100644 index 0000000000..415d5121a8 --- /dev/null +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -0,0 +1,244 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from importlib import resources +from pathlib import Path +from typing import Dict, List, Optional + +import yaml +from kubeflow.trainer import models +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient +from kubeflow.trainer.constants import constants +from kubeflow.trainer.job_runners import DockerJobRunner, JobRunner +from kubeflow.trainer.types import types +from kubeflow.trainer.utils import utils + + +class LocalTrainerClient(AbstractTrainerClient): + """LocalTrainerClient exposes functionality for running training jobs locally. + + A Kubernetes cluster is not required. + It exposes the same interface as the TrainerClient. + + Args: + local_runtimes_path: The path to the directory containing runtime YAML files. + Defaults to the runtimes included with the package. + job_runner: The job runner to use for local training. + Options include the DockerJobRunner and PodmanJobRunner. + Defaults to the Docker job runner. + """ + + def __init__( + self, + local_runtimes_path: Optional[Path] = None, + job_runner: Optional[JobRunner] = None, + ): + print( + "Warning: LocalTrainerClient is an alpha feature for Kubeflow Trainer. " + "Some features may be unstable or unimplemented." + ) + + if local_runtimes_path is None: + self.local_runtimes_path = ( + resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH + ) + else: + self.local_runtimes_path = local_runtimes_path + + if job_runner is None: + self.job_runner = DockerJobRunner() + else: + self.job_runner = job_runner + + def list_runtimes(self) -> List[types.Runtime]: + """Lists all runtimes. + + Returns: + A list of runtime objects. + """ + runtimes = [] + for cr in self.__list_runtime_crs(): + runtimes.append(utils.get_runtime_from_crd(cr)) + return runtimes + + def get_runtime(self, name: str) -> types.Runtime: + """Get a specific runtime by name. + + Args: + name: The name of the runtime. + + Returns: + A runtime object. + + Raises: + RuntimeError: if the specified runtime cannot be found. + """ + for r in self.list_runtimes(): + if r.name == name: + return r + raise RuntimeError(f"No runtime found with name '{name}'") + + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + """Starts a training job. + + Args: + runtime: Config for the train job's runtime. + trainer: Config for the function that encapsulates the model training process. + initializer: Config for dataset and model initialization. + + Returns: + The generated name of the training job. + + Raises: + RuntimeError: if the specified runtime cannot be found, + or the runtime container cannot be found, + or the runtime container image is not specified. + """ + runtime_cr = self.__get_runtime_cr(runtime.name) + if runtime_cr is None: + raise RuntimeError(f"No runtime found with name '{runtime.name}'") + + runtime_container = utils.get_runtime_trainer_container( + runtime_cr.spec.template.spec.replicated_jobs + ) + if runtime_container is None: + raise RuntimeError("No runtime container found") + + image = runtime_container.image + if image is None: + raise RuntimeError("No runtime container image specified") + + if trainer and trainer.func: + entrypoint, command = utils.get_entrypoint_using_train_func( + runtime, + trainer.func, + trainer.func_args, + trainer.pip_index_url, + trainer.packages_to_install, + ) + else: + entrypoint = runtime_container.command + command = runtime_container.args + + if trainer and trainer.num_nodes: + num_nodes = trainer.num_nodes + else: + num_nodes = 1 + + train_job_name = self.job_runner.create_job( + image=image, + entrypoint=entrypoint, + command=command, + num_nodes=num_nodes, + framework=runtime.trainer.framework, + runtime_name=runtime.name, + ) + return train_job_name + + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + """Lists all training jobs. + + Args: + runtime: If provided, only return jobs that use the given runtime. + + Returns: + A list of training jobs. + """ + runtime_name = runtime.name if runtime else None + container_jobs = self.job_runner.list_jobs(runtime_name) + + train_jobs = [] + for container_job in container_jobs: + train_jobs.append(self.__container_job_to_train_job(container_job)) + return train_jobs + + def get_job(self, name: str) -> types.TrainJob: + """Get a specific training job by name. + + Args: + name: The name of the training job to get. + + Returns: + A training job. + """ + container_job = self.job_runner.get_job(name) + return self.__container_job_to_train_job(container_job) + + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + """Gets logs for the specified training job + Args: + name (str): The name of the training job + follow (bool): If true, follows job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + """ + return self.job_runner.get_job_logs( + job_name=name, follow=follow, step=step, node_rank=node_rank + ) + + def delete_job(self, name: str): + """Deletes a specific training job. + + Args: + name: The name of the training job to delete. + """ + self.job_runner.delete_job(job_name=name) + + def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: + runtime_crs = [] + for filename in self.local_runtimes_path.iterdir(): + with open(filename, "r") as f: + cr_str = f.read() + cr_dict = yaml.safe_load(cr_str) + cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(cr_dict) + if cr is not None: + runtime_crs.append(cr) + return runtime_crs + + def __get_runtime_cr( + self, + name: str, + ) -> Optional[models.TrainerV1alpha1ClusterTrainingRuntime]: + for cr in self.__list_runtime_crs(): + if cr.metadata.name == name: + return cr + return None + + def __container_job_to_train_job( + self, container_job: types.ContainerJob + ) -> types.TrainJob: + return types.TrainJob( + name=container_job.name, + creation_timestamp=container_job.creation_timestamp, + steps=[container.to_step() for container in container_job.containers], + runtime=self.get_runtime(container_job.runtime_name), + status=container_job.status, + ) diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index 68262f61e5..21fb81928d 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -15,12 +15,10 @@ import logging import multiprocessing import queue -import random -import string -import uuid from typing import Dict, List, Optional import kubeflow.trainer.models as models +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -29,7 +27,7 @@ logger = logging.getLogger(__name__) -class TrainerClient: +class TrainerClient(AbstractTrainerClient): def __init__( self, config_file: Optional[str] = None, @@ -102,7 +100,7 @@ def list_runtimes(self) -> List[types.Runtime]: return result for runtime in runtime_list.items: - result.append(self.__get_runtime_from_crd(runtime)) + result.append(utils.get_runtime_from_crd(runtime)) except multiprocessing.TimeoutError: raise TimeoutError( @@ -144,7 +142,7 @@ def get_runtime(self, name: str) -> types.Runtime: f"{self.namespace}/{name}" ) - return self.__get_runtime_from_crd(runtime) # type: ignore + return utils.get_runtime_from_crd(runtime) # type: ignore def train( self, @@ -165,7 +163,7 @@ def train( # Generate unique name for the TrainJob. # TODO (andreyvelich): Discuss this TrainJob name generation. - train_job_name = random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] + train_job_name = utils.generate_train_job_name() # Build the Trainer. trainer_crd = models.TrainerV1alpha1Trainer() @@ -454,30 +452,6 @@ def delete_job(self, name: str): f"{constants.TRAINJOB_KIND} {self.namespace}/{name} has been deleted" ) - def __get_runtime_from_crd( - self, - runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, - ) -> types.Runtime: - - if not ( - runtime_crd.metadata - and runtime_crd.metadata.name - and runtime_crd.spec - and runtime_crd.spec.ml_policy - and runtime_crd.spec.template.spec - and runtime_crd.spec.template.spec.replicated_jobs - ): - raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") - - return types.Runtime( - name=runtime_crd.metadata.name, - trainer=utils.get_runtime_trainer( - runtime_crd.spec.template.spec.replicated_jobs, - runtime_crd.spec.ml_policy, - runtime_crd.metadata, - ), - ) - def __get_trainjob_from_crd( self, trainjob_crd: models.TrainerV1alpha1TrainJob, diff --git a/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml new file mode 100644 index 0000000000..95367e8786 --- /dev/null +++ b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml @@ -0,0 +1,34 @@ +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: ClusterTrainingRuntime +metadata: + name: torch-distributed +spec: + mlPolicy: + numNodes: 1 + torch: + numProcPerNode: auto + template: + spec: + replicatedJobs: + - name: node + template: + metadata: + labels: + trainer.kubeflow.org/trainjob-ancestor-step: trainer + spec: + template: + spec: + containers: + - name: node + image: pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime + command: + - /bin/bash + - -c + - | + echo "Torch Distributed Runtime" + + echo "--------------------------------------" + echo "Torch Default Runtime Env" + env | grep PET_ + + pip list diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 25d0999ed9..492ac200f3 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -116,3 +116,24 @@ # The default entrypoint for mpirun. MPI_ENTRYPOINT = "mpirun" + +# The default path to the directory containing local training runtime configs. +LOCAL_RUNTIMES_PATH = "trainer/config/local_runtimes" + +# The label key used to associate container resources with a train job name. +CONTAINER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" + +# The label key used to associate container resources with a runtime. +CONTAINER_RUNTIME_LABEL = "trainer.kubeflow.org/runtime" + +# The prefix given to names used for local train jobs. +LOCAL_TRAIN_JOB_NAME_PREFIX = "kubeflow-trainer-" + +# The key for the label used to indicate the rank of a local container node. +LOCAL_NODE_RANK_LABEL = "trainer.kubeflow.org/node-rank" + +# The port number exposed by torch head nodes. +TORCH_HEAD_NODE_PORT = 29500 + +# The name of the Kubeflow Trainer SDK package. +PACKAGE_NAME = "kubeflow" diff --git a/sdk/kubeflow/trainer/job_runners/__init__.py b/sdk/kubeflow/trainer/job_runners/__init__.py new file mode 100644 index 0000000000..0b468c3faa --- /dev/null +++ b/sdk/kubeflow/trainer/job_runners/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa: F401 + +from kubeflow.trainer.job_runners.docker_job_runner import DockerJobRunner, JobRunner diff --git a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py new file mode 100644 index 0000000000..fd9f24f665 --- /dev/null +++ b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py @@ -0,0 +1,267 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from typing import Dict, List, Optional + +import docker +from kubeflow.trainer.constants import constants +from kubeflow.trainer.job_runners.job_runner import JobRunner +from kubeflow.trainer.types import types +from kubeflow.trainer.utils import utils + + +class DockerJobRunner(JobRunner): + """DockerJobRunner creates and manages training jobs using Docker. + + Args: + docker_client: If provided, this client is used for Docker API calls. + If not provided, a new client will be created from the user's environment. + """ + + def __init__(self, docker_client: Optional[docker.DockerClient] = None): + if docker_client is None: + self.docker_client = docker.from_env() + else: + self.docker_client = docker_client + + def create_job( + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + runtime_name: str, + ) -> str: + """Creates a training job. + + Args: + image: The name of the container image to use for the job. + entrypoint: The entrypoint for the container. + command: The command to run in the container. + num_nodes: The number of nodes to run the job on. + framework: The framework being used. + runtime_name: The name of the runtime being used. + + Returns: + The name of the created job. + + Raises: + RuntimeError: If the framework provided is not supported. + """ + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + + train_job_name = ( + f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" + ) + + docker_network = self.docker_client.networks.create( + name=train_job_name, + driver="bridge", + labels={ + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.CONTAINER_RUNTIME_LABEL: runtime_name, + }, + ) + + for i in range(num_nodes): + self.docker_client.containers.run( + name=f"{train_job_name}-{i}", + network=docker_network.id, + image=image, + entrypoint=entrypoint, + command=command, + labels={ + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.LOCAL_NODE_RANK_LABEL: str(i), + constants.CONTAINER_RUNTIME_LABEL: runtime_name, + }, + environment=self.__get_container_environment( + framework=framework, + head_node_address=f"{train_job_name}-0", + num_nodes=num_nodes, + node_rank=i, + ), + detach=True, + ) + + return train_job_name + + def get_job(self, job_name: str) -> types.ContainerJob: + """Get a specified container training job by its name. + + Args: + job_name: The name of the training job to get. + + Returns: + A container training job. + """ + network = self.docker_client.networks.get(job_name) + + docker_containers = self.docker_client.containers.list( + filters={ + "label": [f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"] + }, + all=True, + ) + + containers = [] + for container in docker_containers: + containers.append( + types.Container( + name=container.name, + status=container.status, + ), + ) + + return types.ContainerJob( + name=job_name, + creation_timestamp=datetime.fromisoformat(network.attrs["Created"]), + runtime_name=network.attrs["Labels"][constants.CONTAINER_RUNTIME_LABEL], + containers=containers, + status=self.__get_job_status(containers), + ) + + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + """Gets container logs for the training job + + Args: + job_name (str): The name of the training job + follow (bool): If true, follows job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + + Raises: + RuntimeError: If the job is not found. + """ + # TODO (eoinfennessy): use "step" in query. + containers = self.docker_client.containers.list( + all=True, + filters={ + "label": [ + f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}", + f"{constants.LOCAL_NODE_RANK_LABEL}={node_rank}", + ] + }, + ) + if len(containers) == 0: + raise RuntimeError(f"Could not find job '{job_name}'") + + logs: Dict[str, str] = {} + if follow: + for line in containers[0].logs(stream=True): + decoded = line.decode("utf-8") + print(decoded) + logs[f"{step}-{node_rank}"] = ( + logs.get(f"{step}-{node_rank}", "") + decoded + "\n" + ) + else: + logs[f"{step}-{node_rank}"] = containers[0].logs().decode() + return logs + + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: + """Lists container training jobs. + + Args: + runtime_name: If provided, only return jobs that use the given runtime name. + + Returns: + A list of container training jobs. + """ + jobs = [] + for name in self.__list_job_names(runtime_name): + jobs.append(self.get_job(name)) + return jobs + + def delete_job(self, job_name: str) -> None: + """Deletes all resources associated with a Docker training job. + Args: + job_name (str): The name of the Docker training job. + """ + containers = self.docker_client.containers.list( + all=True, + filters={"label": f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"}, + ) + for c in containers: + c.remove(force=True) + print(f"Removed container: {c.name}") + + network = self.docker_client.networks.get(job_name) + network.remove() + print(f"Removed network: {network.name}") + + def __list_job_names( + self, + runtime_name: Optional[str] = None, + ) -> List[str]: + """Lists the names of all Docker training jobs. + + Args: + runtime_name (Optional[str]): Filter by runtime name (default None) + + Returns: + List[str]: A list of Docker training job names. + """ + + filters = {"label": [constants.CONTAINER_TRAIN_JOB_NAME_LABEL]} + if runtime_name is not None: + filters["label"].append( + f"{constants.CONTAINER_RUNTIME_LABEL}={runtime_name}" + ) + + # Because a network is created for each job, we use network names to list all jobs. + networks = self.docker_client.networks.list(filters=filters) + + job_names = [] + for n in networks: + job_names.append(n.name) + return job_names + + @staticmethod + def __get_container_environment( + framework: types.Framework, + head_node_address: str, + num_nodes: int, + node_rank: int, + ) -> Dict[str, str]: + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + + return { + "PET_NNODES": str(num_nodes), + "PET_NPROC_PER_NODE": "1", + "PET_NODE_RANK": str(node_rank), + "PET_MASTER_ADDR": head_node_address, + "PET_MASTER_PORT": str(constants.TORCH_HEAD_NODE_PORT), + } + + @staticmethod + def __get_job_status(_: List[types.Container]) -> str: + # TODO (eoinfennessy): Discuss how to report status + return constants.UNKNOWN diff --git a/sdk/kubeflow/trainer/job_runners/job_runner.py b/sdk/kubeflow/trainer/job_runners/job_runner.py new file mode 100644 index 0000000000..e5d56b8eee --- /dev/null +++ b/sdk/kubeflow/trainer/job_runners/job_runner.py @@ -0,0 +1,58 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class JobRunner(ABC): + @abstractmethod + def create_job( + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + runtime_name: str, + ) -> str: + pass + + @abstractmethod + def get_job(self, job_name: str) -> types.ContainerJob: + pass + + @abstractmethod + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: + pass + + @abstractmethod + def delete_job(self, job_name: str) -> None: + pass diff --git a/sdk/kubeflow/trainer/types/types.py b/sdk/kubeflow/trainer/types/types.py index b7e1d8eb15..f2c8c6df48 100644 --- a/sdk/kubeflow/trainer/types/types.py +++ b/sdk/kubeflow/trainer/types/types.py @@ -102,6 +102,26 @@ class TrainJob: status: Optional[str] = constants.UNKNOWN +# Representation for a container used in a local job. +@dataclass +class Container: + name: str + status: str + + def to_step(self) -> Step: + return Step(name=self.name, status=self.status, pod_name=self.name) + + +# Representation for a local container job. +@dataclass +class ContainerJob: + name: str + creation_timestamp: datetime + runtime_name: str + containers: List[Container] + status: Optional[str] = constants.UNKNOWN + + # Configuration for the HuggingFace dataset initializer. # TODO (andreyvelich): Discuss how to keep these configurations is sync with pkg.initializers.types @dataclass diff --git a/sdk/kubeflow/trainer/utils/utils.py b/sdk/kubeflow/trainer/utils/utils.py index 145579394e..edd6d526a1 100644 --- a/sdk/kubeflow/trainer/utils/utils.py +++ b/sdk/kubeflow/trainer/utils/utils.py @@ -15,8 +15,11 @@ import inspect import os import queue +import random +import string import textwrap import threading +import uuid from typing import Any, Callable, Dict, List, Optional, Tuple import kubeflow.trainer.models as models @@ -74,6 +77,30 @@ def get_container_devices( return device, str(device_count) +def get_runtime_from_crd( + runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, +) -> types.Runtime: + + if not ( + runtime_crd.metadata + and runtime_crd.metadata.name + and runtime_crd.spec + and runtime_crd.spec.ml_policy + and runtime_crd.spec.template.spec + and runtime_crd.spec.template.spec.replicated_jobs + ): + raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") + + return types.Runtime( + name=runtime_crd.metadata.name, + trainer=get_runtime_trainer( + runtime_crd.spec.template.spec.replicated_jobs, + runtime_crd.spec.ml_policy, + runtime_crd.metadata, + ), + ) + + def get_runtime_trainer_container( replicated_jobs: List[models.JobsetV1alpha2ReplicatedJob], ) -> Optional[models.IoK8sApiCoreV1Container]: @@ -398,3 +425,7 @@ def get_log_queue_pool(log_streams: List[Any]) -> List[queue.Queue]: pool.append(q) threading.Thread(target=wrap_log_stream, args=(q, log_stream)).start() return pool + + +def generate_train_job_name() -> str: + return random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index 8f701d981d..3c17d0d976 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ] -dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0"] +dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0", "docker>=7.1.0"] [project.urls] Homepage = "https://github.com/kubeflow/trainer"