From f1e3636cdb68e156bbb5be243ac1991608cc99fe Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Wed, 2 Apr 2025 10:59:17 +0100 Subject: [PATCH 01/22] Add abstract base class for trainer clients Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/trainer_client.py | 3 +- .../trainer/api/trainer_client_abc.py | 62 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 sdk/kubeflow/trainer/api/trainer_client_abc.py diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index 68262f61e5..31f5080720 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -20,6 +20,7 @@ import uuid from typing import Dict, List, Optional +from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC import kubeflow.trainer.models as models from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -29,7 +30,7 @@ logger = logging.getLogger(__name__) -class TrainerClient: +class TrainerClient(TrainerClientABC): def __init__( self, config_file: Optional[str] = None, diff --git a/sdk/kubeflow/trainer/api/trainer_client_abc.py b/sdk/kubeflow/trainer/api/trainer_client_abc.py new file mode 100644 index 0000000000..3c304919f5 --- /dev/null +++ b/sdk/kubeflow/trainer/api/trainer_client_abc.py @@ -0,0 +1,62 @@ +# Copyright 2024 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Optional, Dict, List + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class TrainerClientABC(ABC): + @abstractmethod + def delete_job(self, name: str): + pass + + @abstractmethod + def get_job(self, name: str) -> types.TrainJob: + pass + + @abstractmethod + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def get_runtime(self, name: str) -> types.Runtime: + pass + + @abstractmethod + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + pass + + @abstractmethod + def list_runtimes(self) -> List[types.Runtime]: + pass + + @abstractmethod + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + pass From 4b67f1cb151dc8cf85feb504b08283ac687cc979 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Wed, 2 Apr 2025 12:57:06 +0100 Subject: [PATCH 02/22] Add `LocalTrainerClient` class with unimplemented methods Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 sdk/kubeflow/trainer/api/local_trainer_client.py diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py new file mode 100644 index 0000000000..60d6434777 --- /dev/null +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -0,0 +1,55 @@ +# Copyright 2024 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional, Dict + +from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class LocalTrainerClient(TrainerClientABC): + def list_runtimes(self) -> List[types.Runtime]: + raise NotImplementedError() + + def get_runtime(self, name: str) -> types.Runtime: + raise NotImplementedError() + + def train( + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, + ) -> str: + raise NotImplementedError() + + def list_jobs( + self, runtime: Optional[types.Runtime] = None + ) -> List[types.TrainJob]: + raise NotImplementedError() + + def get_job(self, name: str) -> types.TrainJob: + raise NotImplementedError() + + def get_job_logs( + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + raise NotImplementedError() + + def delete_job(self, name: str): + raise NotImplementedError() From 46a8ea74815d49fa194d4f45aad819d4dd533669 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 3 Apr 2025 09:55:52 +0100 Subject: [PATCH 03/22] Implement 'list_runtimes' method Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 20 +++++++++-- sdk/kubeflow/trainer/api/trainer_client.py | 28 ++------------- .../local_runtimes/torch_distributed.yaml | 34 +++++++++++++++++++ sdk/kubeflow/trainer/constants/constants.py | 3 ++ sdk/kubeflow/trainer/utils/utils.py | 22 ++++++++++++ 5 files changed, 79 insertions(+), 28 deletions(-) create mode 100644 sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 60d6434777..3e02b20d26 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -11,17 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os from typing import List, Optional, Dict +import yaml +from kubeflow.trainer import models from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types +from kubeflow.trainer.utils import utils class LocalTrainerClient(TrainerClientABC): + def __init__( + self, + local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, + ): + self.local_runtimes_path = local_runtimes_path + def list_runtimes(self) -> List[types.Runtime]: - raise NotImplementedError() + runtimes = [] + for filename in os.listdir(self.local_runtimes_path): + with open(os.path.join(self.local_runtimes_path, filename), "r") as f: + content_str = f.read() + content_dict = yaml.safe_load(content_str) + runtime_cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(content_dict) + runtimes.append(utils.get_runtime_from_crd(runtime_cr)) + return runtimes def get_runtime(self, name: str) -> types.Runtime: raise NotImplementedError() diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index 31f5080720..35fec09c5c 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -103,7 +103,7 @@ def list_runtimes(self) -> List[types.Runtime]: return result for runtime in runtime_list.items: - result.append(self.__get_runtime_from_crd(runtime)) + result.append(utils.get_runtime_from_crd(runtime)) except multiprocessing.TimeoutError: raise TimeoutError( @@ -145,7 +145,7 @@ def get_runtime(self, name: str) -> types.Runtime: f"{self.namespace}/{name}" ) - return self.__get_runtime_from_crd(runtime) # type: ignore + return utils.get_runtime_from_crd(runtime) # type: ignore def train( self, @@ -455,30 +455,6 @@ def delete_job(self, name: str): f"{constants.TRAINJOB_KIND} {self.namespace}/{name} has been deleted" ) - def __get_runtime_from_crd( - self, - runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, - ) -> types.Runtime: - - if not ( - runtime_crd.metadata - and runtime_crd.metadata.name - and runtime_crd.spec - and runtime_crd.spec.ml_policy - and runtime_crd.spec.template.spec - and runtime_crd.spec.template.spec.replicated_jobs - ): - raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") - - return types.Runtime( - name=runtime_crd.metadata.name, - trainer=utils.get_runtime_trainer( - runtime_crd.spec.template.spec.replicated_jobs, - runtime_crd.spec.ml_policy, - runtime_crd.metadata, - ), - ) - def __get_trainjob_from_crd( self, trainjob_crd: models.TrainerV1alpha1TrainJob, diff --git a/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml new file mode 100644 index 0000000000..a8bb2a0ace --- /dev/null +++ b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml @@ -0,0 +1,34 @@ +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: ClusterTrainingRuntime +metadata: + name: torch-distributed +spec: + mlPolicy: + numNodes: 1 + torch: + numProcPerNode: auto + template: + spec: + replicatedJobs: + - name: node + template: + metadata: + labels: + trainer.kubeflow.org/trainjob-ancestor-step: trainer + spec: + template: + spec: + containers: + - name: node + image: pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime + command: + - /bin/bash + - -c + - | + echo "Torch Distributed Runtime" + + echo "--------------------------------------" + echo "Torch Default Runtime Env" + env | grep PET_ + + pip list \ No newline at end of file diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 25d0999ed9..5fc934dbd9 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -116,3 +116,6 @@ # The default entrypoint for mpirun. MPI_ENTRYPOINT = "mpirun" + +# The default path to the directory containing local training runtime configs +LOCAL_RUNTIMES_PATH = "./kubeflow/trainer/config/local_runtimes" diff --git a/sdk/kubeflow/trainer/utils/utils.py b/sdk/kubeflow/trainer/utils/utils.py index 145579394e..6254a4b582 100644 --- a/sdk/kubeflow/trainer/utils/utils.py +++ b/sdk/kubeflow/trainer/utils/utils.py @@ -73,6 +73,28 @@ def get_container_devices( return device, str(device_count) +def get_runtime_from_crd( + runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, +) -> types.Runtime: + + if not ( + runtime_crd.metadata + and runtime_crd.metadata.name + and runtime_crd.spec + and runtime_crd.spec.ml_policy + and runtime_crd.spec.template.spec + and runtime_crd.spec.template.spec.replicated_jobs + ): + raise Exception(f"ClusterTrainingRuntime CRD is invalid: {runtime_crd}") + + return types.Runtime( + name=runtime_crd.metadata.name, + trainer=get_runtime_trainer( + runtime_crd.spec.template.spec.replicated_jobs, + runtime_crd.spec.ml_policy, + runtime_crd.metadata, + ), + ) def get_runtime_trainer_container( replicated_jobs: List[models.JobsetV1alpha2ReplicatedJob], From 1243dd1d15395559a1a9eacb1ca9045f01d2e5e3 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 3 Apr 2025 09:59:05 +0100 Subject: [PATCH 04/22] Implement 'get_runtime' method Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/local_trainer_client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 3e02b20d26..9e4340a54d 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -39,8 +39,11 @@ def list_runtimes(self) -> List[types.Runtime]: runtimes.append(utils.get_runtime_from_crd(runtime_cr)) return runtimes - def get_runtime(self, name: str) -> types.Runtime: - raise NotImplementedError() + def get_runtime(self, name: str) -> types.Runtime | None: + for r in self.list_runtimes(): + if r.name == name: + return r + return None def train( self, From 9db65cb58326521ad9b4e5b07274cb69903cf3d4 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Mon, 7 Apr 2025 15:36:13 +0100 Subject: [PATCH 05/22] Introduce `LocalJobClient` Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 68 ++++++-- sdk/kubeflow/trainer/api/trainer_client.py | 5 +- sdk/kubeflow/trainer/constants/constants.py | 11 +- .../trainer/local_job_client/__init__.py | 1 + .../local_job_client/local_job_client.py | 146 ++++++++++++++++++ sdk/kubeflow/trainer/utils/utils.py | 6 + sdk/pyproject.toml | 2 +- 7 files changed, 222 insertions(+), 17 deletions(-) create mode 100644 sdk/kubeflow/trainer/local_job_client/__init__.py create mode 100644 sdk/kubeflow/trainer/local_job_client/local_job_client.py diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 9e4340a54d..2ad28bf5d8 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import os from typing import List, Optional, Dict import yaml @@ -18,6 +19,7 @@ from kubeflow.trainer import models from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC from kubeflow.trainer.constants import constants +from kubeflow.trainer.local_job_client import LocalJobClient from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -26,24 +28,22 @@ class LocalTrainerClient(TrainerClientABC): def __init__( self, local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, + local_job_client: LocalJobClient = LocalJobClient(), ): self.local_runtimes_path = local_runtimes_path + self.local_job_client = local_job_client def list_runtimes(self) -> List[types.Runtime]: runtimes = [] - for filename in os.listdir(self.local_runtimes_path): - with open(os.path.join(self.local_runtimes_path, filename), "r") as f: - content_str = f.read() - content_dict = yaml.safe_load(content_str) - runtime_cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(content_dict) - runtimes.append(utils.get_runtime_from_crd(runtime_cr)) + for cr in self.__list_runtime_crs(): + runtimes.append(utils.get_runtime_from_crd(cr)) return runtimes - def get_runtime(self, name: str) -> types.Runtime | None: + def get_runtime(self, name: str) -> types.Runtime: for r in self.list_runtimes(): if r.name == name: return r - return None + raise RuntimeError(f"No runtime found with name '{name}'") def train( self, @@ -51,7 +51,20 @@ def train( initializer: Optional[types.Initializer] = None, trainer: Optional[types.CustomTrainer] = None, ) -> str: - raise NotImplementedError() + runtime_cr = self.__get_runtime_cr(runtime.name) + if runtime_cr is None: + raise RuntimeError(f"No runtime found with name '{runtime.name}'") + + if trainer and trainer.num_nodes: + num_nodes = trainer.num_nodes + else: + num_nodes = 1 + + train_job_name = self.local_job_client.create_job( + runtime_cr=runtime_cr, + num_nodes=num_nodes, + ) + return train_job_name def list_jobs( self, runtime: Optional[types.Runtime] = None @@ -68,7 +81,40 @@ def get_job_logs( step: str = constants.NODE, node_rank: int = 0, ) -> Dict[str, str]: - raise NotImplementedError() + """Gets logs for the specified training job + Args: + name (str): The name of the training job + follow (bool): If true, follows the job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + """ + return self.local_job_client.get_job_logs( + job_name=name, + follow=follow, + step=step, + node_rank=node_rank + ) def delete_job(self, name: str): - raise NotImplementedError() + self.local_job_client.delete_job(job_name=name) + + def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: + runtime_crs = [] + for filename in os.listdir(self.local_runtimes_path): + with open(os.path.join(self.local_runtimes_path, filename), "r") as f: + cr_str = f.read() + cr_dict = yaml.safe_load(cr_str) + cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(cr_dict) + if cr is not None: + runtime_crs.append(cr) + return runtime_crs + + def __get_runtime_cr(self, name: str) -> models.TrainerV1alpha1ClusterTrainingRuntime | None: + for cr in self.__list_runtime_crs(): + if cr.metadata.name == name: + return cr + return None diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index 35fec09c5c..e654ac4c5d 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -15,9 +15,6 @@ import logging import multiprocessing import queue -import random -import string -import uuid from typing import Dict, List, Optional from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC @@ -166,7 +163,7 @@ def train( # Generate unique name for the TrainJob. # TODO (andreyvelich): Discuss this TrainJob name generation. - train_job_name = random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] + train_job_name = utils.generate_train_job_name() # Build the Trainer. trainer_crd = models.TrainerV1alpha1Trainer() diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 5fc934dbd9..1dea2a29ca 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -117,5 +117,14 @@ # The default entrypoint for mpirun. MPI_ENTRYPOINT = "mpirun" -# The default path to the directory containing local training runtime configs +# The default path to the directory containing local training runtime configs. LOCAL_RUNTIMES_PATH = "./kubeflow/trainer/config/local_runtimes" + +# The label key used to associate docker resources with a train job name. +LOCAL_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" + +# The prefix given to names used for local train jobs. +LOCAL_TRAIN_JOB_NAME_PREFIX = "kubeflow-trainer-" + +# The key for the label used to indicate the rank of a local container node. +LOCAL_NODE_RANK_LABEL = "trainer.kubeflow.org/node-rank" diff --git a/sdk/kubeflow/trainer/local_job_client/__init__.py b/sdk/kubeflow/trainer/local_job_client/__init__.py new file mode 100644 index 0000000000..bf4245aef7 --- /dev/null +++ b/sdk/kubeflow/trainer/local_job_client/__init__.py @@ -0,0 +1 @@ +from kubeflow.trainer.local_job_client.local_job_client import LocalJobClient diff --git a/sdk/kubeflow/trainer/local_job_client/local_job_client.py b/sdk/kubeflow/trainer/local_job_client/local_job_client.py new file mode 100644 index 0000000000..0daab4c1aa --- /dev/null +++ b/sdk/kubeflow/trainer/local_job_client/local_job_client.py @@ -0,0 +1,146 @@ +# Copyright 2024 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Dict + +import docker + +from kubeflow.trainer import models +from kubeflow.trainer.constants import constants +from kubeflow.trainer.utils import utils + + +class LocalJobClient: + def __init__(self, docker_client: docker.DockerClient | None = None): + if docker_client is None: + self.docker_client = docker.from_env() + else: + self.docker_client = docker_client + + def create_job( + self, + runtime_cr: models.TrainerV1alpha1ClusterTrainingRuntime, + num_nodes: int, + ) -> str: + train_job_name = f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" + + docker_network = self.docker_client.networks.create( + name=train_job_name, + driver="bridge", + labels={ + constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, + }, + ) + + runtime_container = utils.get_runtime_trainer_container( + runtime_cr.spec.template.spec.replicated_jobs + ) + + for i in range(num_nodes): + c = self.docker_client.containers.run( + name=f"{train_job_name}-{i}", + image=runtime_container.image, + network=docker_network.id, + command=runtime_container.command, + labels={ + constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.LOCAL_NODE_RANK_LABEL: str(i), + }, + detach=True, + ) + + return train_job_name + + def get_job(self, job_name: str): + raise NotImplementedError() + + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + """Gets container logs for the training job + + Args: + job_name (str): The name of the training job + follow (bool): If true, follows the job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + """ + # TODO (eoinfennessy): use "step" in query. + containers = self.docker_client.containers.list( + all=True, + filters={ + "label": [ + f"{constants.LOCAL_TRAIN_JOB_NAME_LABEL}={job_name}", + f"{constants.LOCAL_NODE_RANK_LABEL}={node_rank}", + ] + }, + ) + if len(containers) == 0: + raise RuntimeError(f"Could not find job '{job_name}'") + + logs: Dict[str, str] = {} + if follow: + for l in containers[0].logs(stream=True): + decoded = l.decode("utf-8") + print(decoded) + logs[f"{step}-{node_rank}"] = ( + logs.get(f"{step}-{node_rank}", "") + + decoded + + "\n" + ) + else: + logs[f"{step}-{node_rank}"] = containers[0].logs().decode() + return logs + + def list_jobs(self) -> List[str]: + """Lists the names of all local training jobs. + + Returns: + List[str]: A list of local training job names. + """ + + # Because a network is created for each job, we use network names to list all jobs. + networks = self.docker_client.networks.list( + filters={"label": constants.LOCAL_TRAIN_JOB_NAME_LABEL}, + ) + + job_names = [] + for n in networks: + job_names.append(n.name) + return job_names + + def delete_job(self, job_name: str) -> None: + """Deletes all resources associated with a local training job. + Args: + job_name (str): The name of the local training job. + """ + containers = self.docker_client.containers.list( + all=True, + filters={"label": f"{constants.LOCAL_TRAIN_JOB_NAME_LABEL}={job_name}"} + ) + for c in containers: + c.remove(force=True) + print(f"Removed container: {c.name}") + + network = self.docker_client.networks.get(job_name) + network.remove() + print(f"Removed network: {network.name}") diff --git a/sdk/kubeflow/trainer/utils/utils.py b/sdk/kubeflow/trainer/utils/utils.py index 6254a4b582..d997057c0b 100644 --- a/sdk/kubeflow/trainer/utils/utils.py +++ b/sdk/kubeflow/trainer/utils/utils.py @@ -15,8 +15,11 @@ import inspect import os import queue +import string import textwrap import threading +import uuid +import random from typing import Any, Callable, Dict, List, Optional, Tuple import kubeflow.trainer.models as models @@ -420,3 +423,6 @@ def get_log_queue_pool(log_streams: List[Any]) -> List[queue.Queue]: pool.append(q) threading.Thread(target=wrap_log_stream, args=(q, log_stream)).start() return pool + +def generate_train_job_name() -> str: + return random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index 8f701d981d..3c17d0d976 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ] -dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0"] +dependencies = ["kubernetes>=27.2.0", "pydantic>=2.10.0", "docker>=7.1.0"] [project.urls] Homepage = "https://github.com/kubeflow/trainer" From 540190d146725e54cf5857dfc0525bb6e84e037d Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Tue, 8 Apr 2025 13:24:30 +0100 Subject: [PATCH 06/22] Use trainer func for container entrypoint and command Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 28 ++++++++++++++++++- .../local_job_client/local_job_client.py | 18 ++++++------ 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 2ad28bf5d8..2344eb6675 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -55,13 +55,39 @@ def train( if runtime_cr is None: raise RuntimeError(f"No runtime found with name '{runtime.name}'") + runtime_container = utils.get_runtime_trainer_container( + runtime_cr.spec.template.spec.replicated_jobs + ) + if runtime_container is None: + raise RuntimeError(f"No runtime container found") + + image = runtime_container.image + if image is None: + raise RuntimeError(f"No runtime container image specified") + + if trainer and trainer.func: + entrypoint, command = ( + utils.get_entrypoint_using_train_func( + runtime, + trainer.func, + trainer.func_args, + trainer.pip_index_url, + trainer.packages_to_install, + ) + ) + else: + entrypoint = runtime_container.command + command = runtime_container.args + if trainer and trainer.num_nodes: num_nodes = trainer.num_nodes else: num_nodes = 1 train_job_name = self.local_job_client.create_job( - runtime_cr=runtime_cr, + image=image, + entrypoint=entrypoint, + command=command, num_nodes=num_nodes, ) return train_job_name diff --git a/sdk/kubeflow/trainer/local_job_client/local_job_client.py b/sdk/kubeflow/trainer/local_job_client/local_job_client.py index 0daab4c1aa..4a32f84ef5 100644 --- a/sdk/kubeflow/trainer/local_job_client/local_job_client.py +++ b/sdk/kubeflow/trainer/local_job_client/local_job_client.py @@ -16,7 +16,6 @@ import docker -from kubeflow.trainer import models from kubeflow.trainer.constants import constants from kubeflow.trainer.utils import utils @@ -29,9 +28,11 @@ def __init__(self, docker_client: docker.DockerClient | None = None): self.docker_client = docker_client def create_job( - self, - runtime_cr: models.TrainerV1alpha1ClusterTrainingRuntime, - num_nodes: int, + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, ) -> str: train_job_name = f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" @@ -43,16 +44,13 @@ def create_job( }, ) - runtime_container = utils.get_runtime_trainer_container( - runtime_cr.spec.template.spec.replicated_jobs - ) - for i in range(num_nodes): c = self.docker_client.containers.run( name=f"{train_job_name}-{i}", - image=runtime_container.image, network=docker_network.id, - command=runtime_container.command, + image=image, + entrypoint=entrypoint, + command=command, labels={ constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, constants.LOCAL_NODE_RANK_LABEL: str(i), From 936c9bc21a78eb2929fcfedd8efa6017ba8b26ca Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Fri, 11 Apr 2025 10:24:40 +0100 Subject: [PATCH 07/22] Enable multi-node distributed torch jobs Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 1 + sdk/kubeflow/trainer/constants/constants.py | 3 ++ .../local_job_client/local_job_client.py | 39 ++++++++++++++++--- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 2344eb6675..75fd500fd5 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -89,6 +89,7 @@ def train( entrypoint=entrypoint, command=command, num_nodes=num_nodes, + framework=runtime.trainer.framework, ) return train_job_name diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 1dea2a29ca..7cf5a48573 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -128,3 +128,6 @@ # The key for the label used to indicate the rank of a local container node. LOCAL_NODE_RANK_LABEL = "trainer.kubeflow.org/node-rank" + +# The port number exposed by torch master nodes. +TORCH_MASTER_NODE_PORT = 29500 diff --git a/sdk/kubeflow/trainer/local_job_client/local_job_client.py b/sdk/kubeflow/trainer/local_job_client/local_job_client.py index 4a32f84ef5..2e8a0d4bad 100644 --- a/sdk/kubeflow/trainer/local_job_client/local_job_client.py +++ b/sdk/kubeflow/trainer/local_job_client/local_job_client.py @@ -16,6 +16,7 @@ import docker +from kubeflow.trainer.types import types from kubeflow.trainer.constants import constants from kubeflow.trainer.utils import utils @@ -28,12 +29,16 @@ def __init__(self, docker_client: docker.DockerClient | None = None): self.docker_client = docker_client def create_job( - self, - image: str, - entrypoint: List[str], - command: List[str], - num_nodes: int, + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework ) -> str: + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + train_job_name = f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" docker_network = self.docker_client.networks.create( @@ -55,6 +60,12 @@ def create_job( constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, constants.LOCAL_NODE_RANK_LABEL: str(i), }, + environment=self.__get_container_environment( + framework=framework, + master_node_address=f"{train_job_name}-0", + num_nodes=num_nodes, + node_rank=i, + ), detach=True, ) @@ -142,3 +153,21 @@ def delete_job(self, job_name: str) -> None: network = self.docker_client.networks.get(job_name) network.remove() print(f"Removed network: {network.name}") + + @staticmethod + def __get_container_environment( + framework: types.Framework, + master_node_address: str, + num_nodes: int, + node_rank: int, + ) -> Dict[str, str]: + if framework != types.Framework.TORCH: + raise RuntimeError(f"Framework '{framework}' is not currently supported.") + + return { + "PET_NNODES": str(num_nodes), + "PET_NPROC_PER_NODE": "1", + "PET_NODE_RANK": str(node_rank), + "PET_MASTER_ADDR": master_node_address, + "PET_MASTER_PORT": str(constants.TORCH_MASTER_NODE_PORT), + } From 7654b096567296f7b0972151963fc10186beb493 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Fri, 11 Apr 2025 10:25:50 +0100 Subject: [PATCH 08/22] Add `LocalTrainerClient` to `trainer` package Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/kubeflow/trainer/__init__.py b/sdk/kubeflow/trainer/__init__.py index e718d3807f..b1e4ce704f 100644 --- a/sdk/kubeflow/trainer/__init__.py +++ b/sdk/kubeflow/trainer/__init__.py @@ -20,6 +20,9 @@ # Import the Kubeflow Trainer client. from kubeflow.trainer.api.trainer_client import TrainerClient +# Import the Kubeflow Local Trainer client. +from kubeflow.trainer.api.local_trainer_client import LocalTrainerClient + # Import the Kubeflow Trainer constants. from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH From 3676ebed32e0e6cad3e281b901c770dff700d9a4 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Fri, 11 Apr 2025 11:58:06 +0100 Subject: [PATCH 09/22] Move default `LocalJobClient` instantiation out of `__init__` signature Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/local_trainer_client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 75fd500fd5..46a111eea2 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -28,10 +28,14 @@ class LocalTrainerClient(TrainerClientABC): def __init__( self, local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, - local_job_client: LocalJobClient = LocalJobClient(), + local_job_client: LocalJobClient | None = None, ): self.local_runtimes_path = local_runtimes_path - self.local_job_client = local_job_client + + if local_job_client is None: + self.local_job_client = LocalJobClient() + else: + self.local_job_client = local_job_client def list_runtimes(self) -> List[types.Runtime]: runtimes = [] From e09152e9520753294158f15ae2ffbe0f30dfc8c8 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Fri, 11 Apr 2025 13:33:58 +0100 Subject: [PATCH 10/22] Rename `LocalJobClient` to `DockerJobClient` Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 16 +++++++-------- sdk/kubeflow/trainer/constants/constants.py | 2 +- .../trainer/docker_job_client/__init__.py | 1 + .../docker_job_client.py} | 20 +++++++++---------- .../trainer/local_job_client/__init__.py | 1 - 5 files changed, 20 insertions(+), 20 deletions(-) create mode 100644 sdk/kubeflow/trainer/docker_job_client/__init__.py rename sdk/kubeflow/trainer/{local_job_client/local_job_client.py => docker_job_client/docker_job_client.py} (89%) delete mode 100644 sdk/kubeflow/trainer/local_job_client/__init__.py diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 46a111eea2..e7dab87af3 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -19,7 +19,7 @@ from kubeflow.trainer import models from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC from kubeflow.trainer.constants import constants -from kubeflow.trainer.local_job_client import LocalJobClient +from kubeflow.trainer.docker_job_client import DockerJobClient from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -28,14 +28,14 @@ class LocalTrainerClient(TrainerClientABC): def __init__( self, local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, - local_job_client: LocalJobClient | None = None, + docker_job_client: DockerJobClient | None = None, ): self.local_runtimes_path = local_runtimes_path - if local_job_client is None: - self.local_job_client = LocalJobClient() + if docker_job_client is None: + self.docker_job_client = DockerJobClient() else: - self.local_job_client = local_job_client + self.docker_job_client = docker_job_client def list_runtimes(self) -> List[types.Runtime]: runtimes = [] @@ -88,7 +88,7 @@ def train( else: num_nodes = 1 - train_job_name = self.local_job_client.create_job( + train_job_name = self.docker_job_client.create_job( image=image, entrypoint=entrypoint, command=command, @@ -123,7 +123,7 @@ def get_job_logs( Dict[str, str]: The logs of the training job, where the key is the step and node rank, and the value is the logs for that node. """ - return self.local_job_client.get_job_logs( + return self.docker_job_client.get_job_logs( job_name=name, follow=follow, step=step, @@ -131,7 +131,7 @@ def get_job_logs( ) def delete_job(self, name: str): - self.local_job_client.delete_job(job_name=name) + self.docker_job_client.delete_job(job_name=name) def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: runtime_crs = [] diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 7cf5a48573..10d9cea480 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -121,7 +121,7 @@ LOCAL_RUNTIMES_PATH = "./kubeflow/trainer/config/local_runtimes" # The label key used to associate docker resources with a train job name. -LOCAL_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" +DOCKER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" # The prefix given to names used for local train jobs. LOCAL_TRAIN_JOB_NAME_PREFIX = "kubeflow-trainer-" diff --git a/sdk/kubeflow/trainer/docker_job_client/__init__.py b/sdk/kubeflow/trainer/docker_job_client/__init__.py new file mode 100644 index 0000000000..971e19f87b --- /dev/null +++ b/sdk/kubeflow/trainer/docker_job_client/__init__.py @@ -0,0 +1 @@ +from kubeflow.trainer.docker_job_client.docker_job_client import DockerJobClient diff --git a/sdk/kubeflow/trainer/local_job_client/local_job_client.py b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py similarity index 89% rename from sdk/kubeflow/trainer/local_job_client/local_job_client.py rename to sdk/kubeflow/trainer/docker_job_client/docker_job_client.py index 2e8a0d4bad..ecb82b6970 100644 --- a/sdk/kubeflow/trainer/local_job_client/local_job_client.py +++ b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py @@ -21,7 +21,7 @@ from kubeflow.trainer.utils import utils -class LocalJobClient: +class DockerJobClient: def __init__(self, docker_client: docker.DockerClient | None = None): if docker_client is None: self.docker_client = docker.from_env() @@ -45,7 +45,7 @@ def create_job( name=train_job_name, driver="bridge", labels={ - constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.DOCKER_TRAIN_JOB_NAME_LABEL: train_job_name, }, ) @@ -57,7 +57,7 @@ def create_job( entrypoint=entrypoint, command=command, labels={ - constants.LOCAL_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.DOCKER_TRAIN_JOB_NAME_LABEL: train_job_name, constants.LOCAL_NODE_RANK_LABEL: str(i), }, environment=self.__get_container_environment( @@ -98,7 +98,7 @@ def get_job_logs( all=True, filters={ "label": [ - f"{constants.LOCAL_TRAIN_JOB_NAME_LABEL}={job_name}", + f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}", f"{constants.LOCAL_NODE_RANK_LABEL}={node_rank}", ] }, @@ -121,15 +121,15 @@ def get_job_logs( return logs def list_jobs(self) -> List[str]: - """Lists the names of all local training jobs. + """Lists the names of all Docker training jobs. Returns: - List[str]: A list of local training job names. + List[str]: A list of Docker training job names. """ # Because a network is created for each job, we use network names to list all jobs. networks = self.docker_client.networks.list( - filters={"label": constants.LOCAL_TRAIN_JOB_NAME_LABEL}, + filters={"label": constants.DOCKER_TRAIN_JOB_NAME_LABEL}, ) job_names = [] @@ -138,13 +138,13 @@ def list_jobs(self) -> List[str]: return job_names def delete_job(self, job_name: str) -> None: - """Deletes all resources associated with a local training job. + """Deletes all resources associated with a Docker training job. Args: - job_name (str): The name of the local training job. + job_name (str): The name of the Docker training job. """ containers = self.docker_client.containers.list( all=True, - filters={"label": f"{constants.LOCAL_TRAIN_JOB_NAME_LABEL}={job_name}"} + filters={"label": f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}"} ) for c in containers: c.remove(force=True) diff --git a/sdk/kubeflow/trainer/local_job_client/__init__.py b/sdk/kubeflow/trainer/local_job_client/__init__.py deleted file mode 100644 index bf4245aef7..0000000000 --- a/sdk/kubeflow/trainer/local_job_client/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from kubeflow.trainer.local_job_client.local_job_client import LocalJobClient From a675acca53379d6172fa14208de378465db8b969 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Fri, 11 Apr 2025 13:37:40 +0100 Subject: [PATCH 11/22] Rename `TrainerClientABC` to `AbstractTrainerClient` Signed-off-by: Eoin Fennessy --- .../api/{trainer_client_abc.py => abstract_trainer_client.py} | 2 +- sdk/kubeflow/trainer/api/local_trainer_client.py | 4 ++-- sdk/kubeflow/trainer/api/trainer_client.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) rename sdk/kubeflow/trainer/api/{trainer_client_abc.py => abstract_trainer_client.py} (98%) diff --git a/sdk/kubeflow/trainer/api/trainer_client_abc.py b/sdk/kubeflow/trainer/api/abstract_trainer_client.py similarity index 98% rename from sdk/kubeflow/trainer/api/trainer_client_abc.py rename to sdk/kubeflow/trainer/api/abstract_trainer_client.py index 3c304919f5..721b897617 100644 --- a/sdk/kubeflow/trainer/api/trainer_client_abc.py +++ b/sdk/kubeflow/trainer/api/abstract_trainer_client.py @@ -19,7 +19,7 @@ from kubeflow.trainer.types import types -class TrainerClientABC(ABC): +class AbstractTrainerClient(ABC): @abstractmethod def delete_job(self, name: str): pass diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index e7dab87af3..db5a88f21e 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -17,14 +17,14 @@ import yaml from kubeflow.trainer import models -from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants from kubeflow.trainer.docker_job_client import DockerJobClient from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils -class LocalTrainerClient(TrainerClientABC): +class LocalTrainerClient(AbstractTrainerClient): def __init__( self, local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index e654ac4c5d..ee0bb7122b 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -17,7 +17,7 @@ import queue from typing import Dict, List, Optional -from kubeflow.trainer.api.trainer_client_abc import TrainerClientABC +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient import kubeflow.trainer.models as models from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) -class TrainerClient(TrainerClientABC): +class TrainerClient(AbstractTrainerClient): def __init__( self, config_file: Optional[str] = None, From 88ed2e5df064af47fec011c2d6a1c78c658d852b Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 17 Apr 2025 10:44:41 +0100 Subject: [PATCH 12/22] Use `importlib` for referencing runtime YAML files Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/local_trainer_client.py | 14 +++++++++----- sdk/kubeflow/trainer/constants/constants.py | 5 ++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index db5a88f21e..02f96491dc 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os +from importlib import resources +from pathlib import Path from typing import List, Optional, Dict import yaml @@ -27,10 +28,13 @@ class LocalTrainerClient(AbstractTrainerClient): def __init__( self, - local_runtimes_path: str = constants.LOCAL_RUNTIMES_PATH, + local_runtimes_path: Path | None = None, docker_job_client: DockerJobClient | None = None, ): - self.local_runtimes_path = local_runtimes_path + if local_runtimes_path is None: + self.local_runtimes_path = resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH + else: + self.local_runtimes_path = local_runtimes_path if docker_job_client is None: self.docker_job_client = DockerJobClient() @@ -135,8 +139,8 @@ def delete_job(self, name: str): def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: runtime_crs = [] - for filename in os.listdir(self.local_runtimes_path): - with open(os.path.join(self.local_runtimes_path, filename), "r") as f: + for filename in self.local_runtimes_path.iterdir(): + with open(filename, "r") as f: cr_str = f.read() cr_dict = yaml.safe_load(cr_str) cr = models.TrainerV1alpha1ClusterTrainingRuntime.from_dict(cr_dict) diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 10d9cea480..6edd079d3d 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -118,7 +118,7 @@ MPI_ENTRYPOINT = "mpirun" # The default path to the directory containing local training runtime configs. -LOCAL_RUNTIMES_PATH = "./kubeflow/trainer/config/local_runtimes" +LOCAL_RUNTIMES_PATH = "trainer/config/local_runtimes" # The label key used to associate docker resources with a train job name. DOCKER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" @@ -131,3 +131,6 @@ # The port number exposed by torch master nodes. TORCH_MASTER_NODE_PORT = 29500 + +# The name of the Kubeflow Trainer SDK package. +PACKAGE_NAME = "kubeflow" From f586ec3c1e4e626cc8d07864fd2299bee991c358 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 17 Apr 2025 11:19:28 +0100 Subject: [PATCH 13/22] Change "master" to "head" Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/constants/constants.py | 4 ++-- .../trainer/docker_job_client/docker_job_client.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index 6edd079d3d..d8aab58373 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -129,8 +129,8 @@ # The key for the label used to indicate the rank of a local container node. LOCAL_NODE_RANK_LABEL = "trainer.kubeflow.org/node-rank" -# The port number exposed by torch master nodes. -TORCH_MASTER_NODE_PORT = 29500 +# The port number exposed by torch head nodes. +TORCH_HEAD_NODE_PORT = 29500 # The name of the Kubeflow Trainer SDK package. PACKAGE_NAME = "kubeflow" diff --git a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py index ecb82b6970..f3d496f44f 100644 --- a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py +++ b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py @@ -62,7 +62,7 @@ def create_job( }, environment=self.__get_container_environment( framework=framework, - master_node_address=f"{train_job_name}-0", + head_node_address=f"{train_job_name}-0", num_nodes=num_nodes, node_rank=i, ), @@ -157,7 +157,7 @@ def delete_job(self, job_name: str) -> None: @staticmethod def __get_container_environment( framework: types.Framework, - master_node_address: str, + head_node_address: str, num_nodes: int, node_rank: int, ) -> Dict[str, str]: @@ -168,6 +168,6 @@ def __get_container_environment( "PET_NNODES": str(num_nodes), "PET_NPROC_PER_NODE": "1", "PET_NODE_RANK": str(node_rank), - "PET_MASTER_ADDR": master_node_address, - "PET_MASTER_PORT": str(constants.TORCH_MASTER_NODE_PORT), + "PET_MASTER_ADDR": head_node_address, + "PET_MASTER_PORT": str(constants.TORCH_HEAD_NODE_PORT), } From 50adf30961311902668e6505d9cde84c40d42197 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 17 Apr 2025 14:25:52 +0100 Subject: [PATCH 14/22] Add example notebook Signed-off-by: Eoin Fennessy --- .../mnist-pytorch-ddp.ipynb | 313 ++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb diff --git a/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb b/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb new file mode 100644 index 0000000000..916ff4ea70 --- /dev/null +++ b/examples/local-trainer-client/image-classification/mnist-pytorch-ddp.ipynb @@ -0,0 +1,313 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "# Using `LocalTrainerClient` for MNIST image classification with PyTorch DDP\n", + "\n", + "This notebook uses the `LocalTrainerClient` to train an image classification model using the MNIST fashion dataset.\n", + "\n", + "The `LocalTrainerClient` runs training jobs locally in Docker containers. No Kubernetes cluster is required." + ], + "id": "3a425719bb33868e" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Install the KubeFlow SDK", + "id": "52b257e345242c84" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "# TODO (eoinfennessy): Update repo and branch before merging.\n", + "!pip install git+https://github.com/eoinfennessy/trainer.git@add-local-trainer-client#subdirectory=sdk" + ], + "id": "5398886ad279253b", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Define the training function\n", + "\n", + "This function trains a CNN model using the Fashion MNIST dataset.\n" + ], + "id": "aacfd746ab04a56f" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "def train_pytorch():\n", + " import os\n", + "\n", + " import torch\n", + " from torch import nn\n", + " import torch.nn.functional as F\n", + "\n", + " from torchvision import datasets, transforms\n", + " import torch.distributed as dist\n", + " from torch.utils.data import DataLoader, DistributedSampler\n", + "\n", + " # [1] Configure CPU/GPU device and distributed backend.\n", + " # Kubeflow Trainer will automatically configure the distributed environment.\n", + " device, backend = (\"cuda\", \"nccl\") if torch.cuda.is_available() else (\"cpu\", \"gloo\")\n", + " dist.init_process_group(backend=backend)\n", + "\n", + " local_rank = int(os.getenv(\"LOCAL_RANK\", 0))\n", + " print(\n", + " \"Distributed Training with WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}.\".format(\n", + " dist.get_world_size(),\n", + " dist.get_rank(),\n", + " local_rank,\n", + " )\n", + " )\n", + "\n", + " # [2] Define PyTorch CNN Model to be trained.\n", + " class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 20, 5, 1)\n", + " self.conv2 = nn.Conv2d(20, 50, 5, 1)\n", + " self.fc1 = nn.Linear(4 * 4 * 50, 500)\n", + " self.fc2 = nn.Linear(500, 10)\n", + "\n", + " def forward(self, x):\n", + " x = F.relu(self.conv1(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = F.relu(self.conv2(x))\n", + " x = F.max_pool2d(x, 2, 2)\n", + " x = x.view(-1, 4 * 4 * 50)\n", + " x = F.relu(self.fc1(x))\n", + " x = self.fc2(x)\n", + " return F.log_softmax(x, dim=1)\n", + "\n", + " # [3] Attach model to the correct device.\n", + " device = torch.device(f\"{device}:{local_rank}\")\n", + " model = nn.parallel.DistributedDataParallel(Net().to(device))\n", + " model.train()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)\n", + "\n", + " # [4] Get the Fashion-MNIST dataset and distributed it across all available devices.\n", + " dataset = datasets.FashionMNIST(\n", + " \"./data\",\n", + " train=True,\n", + " download=True,\n", + " transform=transforms.Compose([transforms.ToTensor()]),\n", + " )\n", + " train_loader = DataLoader(\n", + " dataset,\n", + " batch_size=100,\n", + " sampler=DistributedSampler(dataset),\n", + " )\n", + "\n", + " # [5] Define the training loop.\n", + " for epoch in range(3):\n", + " for batch_idx, (inputs, labels) in enumerate(train_loader):\n", + " # Attach tensors to the device.\n", + " inputs, labels = inputs.to(device), labels.to(device)\n", + "\n", + " # Forward pass\n", + " outputs = model(inputs)\n", + " loss = F.nll_loss(outputs, labels)\n", + "\n", + " # Backward pass\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + " if batch_idx % 10 == 0 and dist.get_rank() == 0:\n", + " print(\n", + " \"Train Epoch: {} [{}/{} ({:.0f}%)]\\tLoss: {:.6f}\".format(\n", + " epoch,\n", + " batch_idx * len(inputs),\n", + " len(train_loader.dataset),\n", + " 100.0 * batch_idx / len(train_loader),\n", + " loss.item(),\n", + " )\n", + " )\n", + "\n", + " # Wait for the training to complete and destroy to PyTorch distributed process group.\n", + " dist.barrier()\n", + " if dist.get_rank() == 0:\n", + " print(\"Training is finished\")\n", + " dist.destroy_process_group()" + ], + "id": "86f2438e1c249731", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Create the trainer client", + "id": "65ce9b4ee3c2c64b" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "The `LocalTrainerClient` exposes the same interface as the `TrainerClient`.\n", + "\n", + "When you wish to move your job to a Kubernetes cluster, simply change the\n", + "training client -- the rest of the notebook will work without modification." + ], + "id": "429ad749682d68cc" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "import os\n", + "\n", + "from kubeflow.trainer import LocalTrainerClient, TrainerClient, CustomTrainer\n", + "\n", + "exec_mode = os.getenv(\"KUBEFLOW_TRAINER_EXEC_MODE\", \"local\")\n", + "\n", + "if exec_mode == \"local\":\n", + " client = LocalTrainerClient()\n", + "else:\n", + " client = TrainerClient()" + ], + "id": "f459587c03f32d05", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## List runtimes\n", + "\n", + "Predefined training runtimes are included as part of the package. Currently only the `torch-distributed` runtime is included." + ], + "id": "735a674339f10ee1" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "client.list_runtimes()", + "id": "5e3dc7c8ab7c9322", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Get runtime", + "id": "8b08eb668205d5f4" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "runtime = client.get_runtime(\"torch-distributed\")", + "id": "5cfaa7952b5fbcb1", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Start the training job\n", + "\n", + "This job uses the `torch-distrbuted` runtime to run the `train_pytorch` training function. The job runs on four worker node containers." + ], + "id": "4c8aa9177a6ca0c1" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "job_name = client.train(\n", + " runtime=runtime,\n", + " trainer=CustomTrainer(\n", + " func=train_pytorch,\n", + " num_nodes=4,\n", + " )\n", + " )" + ], + "id": "9a3e893c987e6e9b", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Follow job logs", + "id": "5f8e96a51263ffdb" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "_ = client.get_job_logs(job_name, follow=True)", + "id": "23c0d5f0321961a5", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Optional: Examine Docker resources\n", + "\n", + "In a terminal, run the following to list the containers running the training job:\n", + "\n", + "```shell\n", + "docker ps -a --filter label=trainer.kubeflow.org/train-job-name\n", + "```\n", + "\n", + "Example output:\n", + "\n", + "```\n", + "CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n", + "e116e42af00a pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-3\n", + "0f9d891edd74 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-2\n", + "1672a222d360 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-1\n", + "e1f11b5fad3c pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-0\n", + "```\n", + "\n", + "Run the following to see the Docker network:\n", + "\n", + "```shell\n", + "docker network ls --filter label=trainer.kubeflow.org/train-job-name\n", + "```\n", + "\n", + "Example output:\n", + "\n", + "```\n", + "NETWORK ID NAME DRIVER SCOPE\n", + "157a6ed5752f kubeflow-trainer-l20b00d08df5 bridge local\n", + "```" + ], + "id": "306ef11f6d48d176" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## Delete the job", + "id": "5a3478eb1567897c" + }, + { + "metadata": {}, + "cell_type": "code", + "source": "client.delete_job(job_name)", + "id": "e5e72bdf898d9129", + "outputs": [], + "execution_count": null + } + ], + "metadata": { + "kernelspec": { + "name": "python3", + "language": "python", + "display_name": "Python 3 (ipykernel)" + } + }, + "nbformat": 5, + "nbformat_minor": 9 +} From 80cd4bb26ebac694ecb3a28cddea423a08806910 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Tue, 22 Apr 2025 09:40:17 +0100 Subject: [PATCH 15/22] Use `Optional` instead of union types Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/local_trainer_client.py | 6 +++--- sdk/kubeflow/trainer/docker_job_client/docker_job_client.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 02f96491dc..00ec230249 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -28,8 +28,8 @@ class LocalTrainerClient(AbstractTrainerClient): def __init__( self, - local_runtimes_path: Path | None = None, - docker_job_client: DockerJobClient | None = None, + local_runtimes_path: Optional[Path] = None, + docker_job_client: Optional[DockerJobClient] = None, ): if local_runtimes_path is None: self.local_runtimes_path = resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH @@ -148,7 +148,7 @@ def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntim runtime_crs.append(cr) return runtime_crs - def __get_runtime_cr(self, name: str) -> models.TrainerV1alpha1ClusterTrainingRuntime | None: + def __get_runtime_cr(self, name: str) -> Optional[models.TrainerV1alpha1ClusterTrainingRuntime]: for cr in self.__list_runtime_crs(): if cr.metadata.name == name: return cr diff --git a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py index f3d496f44f..48fd196ec5 100644 --- a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py +++ b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Dict +from typing import List, Dict, Optional import docker @@ -22,7 +22,7 @@ class DockerJobClient: - def __init__(self, docker_client: docker.DockerClient | None = None): + def __init__(self, docker_client: Optional[docker.DockerClient] = None): if docker_client is None: self.docker_client = docker.from_env() else: From ba16065f0a89e3d37a52a1f691c90de2d10d06aa Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Tue, 22 Apr 2025 09:57:03 +0100 Subject: [PATCH 16/22] Add warning that `LocalTrainerClient` is an alpha feature Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/local_trainer_client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 00ec230249..5cd3b44fe8 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -31,6 +31,11 @@ def __init__( local_runtimes_path: Optional[Path] = None, docker_job_client: Optional[DockerJobClient] = None, ): + print( + "Warning: LocalTrainerClient is an alpha feature for Kubeflow Trainer. " + "Some features may be unstable or unimplemented." + ) + if local_runtimes_path is None: self.local_runtimes_path = resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH else: From bc91f68d1a7b02470e8d39e83def6bed937b6e03 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 24 Apr 2025 09:42:36 +0100 Subject: [PATCH 17/22] Fix pre-commit fails Signed-off-by: Eoin Fennessy --- .../trainer/api/abstract_trainer_client.py | 12 +-- .../trainer/api/local_trainer_client.py | 76 +++++++++---------- sdk/kubeflow/trainer/api/trainer_client.py | 2 +- .../local_runtimes/torch_distributed.yaml | 2 +- .../trainer/docker_job_client/__init__.py | 2 + .../docker_job_client/docker_job_client.py | 25 +++--- sdk/kubeflow/trainer/utils/utils.py | 5 +- 7 files changed, 64 insertions(+), 60 deletions(-) diff --git a/sdk/kubeflow/trainer/api/abstract_trainer_client.py b/sdk/kubeflow/trainer/api/abstract_trainer_client.py index 721b897617..ef7c380254 100644 --- a/sdk/kubeflow/trainer/api/abstract_trainer_client.py +++ b/sdk/kubeflow/trainer/api/abstract_trainer_client.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import Optional, Dict, List +from typing import Dict, List, Optional from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -44,7 +44,7 @@ def get_runtime(self, name: str) -> types.Runtime: @abstractmethod def list_jobs( - self, runtime: Optional[types.Runtime] = None + self, runtime: Optional[types.Runtime] = None ) -> List[types.TrainJob]: pass @@ -54,9 +54,9 @@ def list_runtimes(self) -> List[types.Runtime]: @abstractmethod def train( - self, - runtime: types.Runtime = types.DEFAULT_RUNTIME, - initializer: Optional[types.Initializer] = None, - trainer: Optional[types.CustomTrainer] = None, + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, ) -> str: pass diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 5cd3b44fe8..f224c188e2 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -14,9 +14,9 @@ from importlib import resources from pathlib import Path -from typing import List, Optional, Dict -import yaml +from typing import Dict, List, Optional +import yaml from kubeflow.trainer import models from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants @@ -37,7 +37,9 @@ def __init__( ) if local_runtimes_path is None: - self.local_runtimes_path = resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH + self.local_runtimes_path = ( + resources.files(constants.PACKAGE_NAME) / constants.LOCAL_RUNTIMES_PATH + ) else: self.local_runtimes_path = local_runtimes_path @@ -59,10 +61,10 @@ def get_runtime(self, name: str) -> types.Runtime: raise RuntimeError(f"No runtime found with name '{name}'") def train( - self, - runtime: types.Runtime = types.DEFAULT_RUNTIME, - initializer: Optional[types.Initializer] = None, - trainer: Optional[types.CustomTrainer] = None, + self, + runtime: types.Runtime = types.DEFAULT_RUNTIME, + initializer: Optional[types.Initializer] = None, + trainer: Optional[types.CustomTrainer] = None, ) -> str: runtime_cr = self.__get_runtime_cr(runtime.name) if runtime_cr is None: @@ -72,21 +74,19 @@ def train( runtime_cr.spec.template.spec.replicated_jobs ) if runtime_container is None: - raise RuntimeError(f"No runtime container found") + raise RuntimeError("No runtime container found") image = runtime_container.image if image is None: - raise RuntimeError(f"No runtime container image specified") + raise RuntimeError("No runtime container image specified") if trainer and trainer.func: - entrypoint, command = ( - utils.get_entrypoint_using_train_func( - runtime, - trainer.func, - trainer.func_args, - trainer.pip_index_url, - trainer.packages_to_install, - ) + entrypoint, command = utils.get_entrypoint_using_train_func( + runtime, + trainer.func, + trainer.func_args, + trainer.pip_index_url, + trainer.packages_to_install, ) else: entrypoint = runtime_container.command @@ -107,7 +107,7 @@ def train( return train_job_name def list_jobs( - self, runtime: Optional[types.Runtime] = None + self, runtime: Optional[types.Runtime] = None ) -> List[types.TrainJob]: raise NotImplementedError() @@ -115,28 +115,25 @@ def get_job(self, name: str) -> types.TrainJob: raise NotImplementedError() def get_job_logs( - self, - name: str, - follow: Optional[bool] = False, - step: str = constants.NODE, - node_rank: int = 0, + self, + name: str, + follow: Optional[bool] = False, + step: str = constants.NODE, + node_rank: int = 0, ) -> Dict[str, str]: """Gets logs for the specified training job - Args: - name (str): The name of the training job - follow (bool): If true, follows the job logs and prints them to standard out (default False) - step (int): The training job step to target (default "node") - node_rank (int): The node rank to retrieve logs from (default 0) - - Returns: - Dict[str, str]: The logs of the training job, where the key is the - step and node rank, and the value is the logs for that node. - """ + Args: + name (str): The name of the training job + follow (bool): If true, follows job logs and prints them to standard out (default False) + step (int): The training job step to target (default "node") + node_rank (int): The node rank to retrieve logs from (default 0) + + Returns: + Dict[str, str]: The logs of the training job, where the key is the + step and node rank, and the value is the logs for that node. + """ return self.docker_job_client.get_job_logs( - job_name=name, - follow=follow, - step=step, - node_rank=node_rank + job_name=name, follow=follow, step=step, node_rank=node_rank ) def delete_job(self, name: str): @@ -153,7 +150,10 @@ def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntim runtime_crs.append(cr) return runtime_crs - def __get_runtime_cr(self, name: str) -> Optional[models.TrainerV1alpha1ClusterTrainingRuntime]: + def __get_runtime_cr( + self, + name: str, + ) -> Optional[models.TrainerV1alpha1ClusterTrainingRuntime]: for cr in self.__list_runtime_crs(): if cr.metadata.name == name: return cr diff --git a/sdk/kubeflow/trainer/api/trainer_client.py b/sdk/kubeflow/trainer/api/trainer_client.py index ee0bb7122b..21fb81928d 100644 --- a/sdk/kubeflow/trainer/api/trainer_client.py +++ b/sdk/kubeflow/trainer/api/trainer_client.py @@ -17,8 +17,8 @@ import queue from typing import Dict, List, Optional -from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient import kubeflow.trainer.models as models +from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils diff --git a/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml index a8bb2a0ace..95367e8786 100644 --- a/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml +++ b/sdk/kubeflow/trainer/config/local_runtimes/torch_distributed.yaml @@ -31,4 +31,4 @@ spec: echo "Torch Default Runtime Env" env | grep PET_ - pip list \ No newline at end of file + pip list diff --git a/sdk/kubeflow/trainer/docker_job_client/__init__.py b/sdk/kubeflow/trainer/docker_job_client/__init__.py index 971e19f87b..03437c738d 100644 --- a/sdk/kubeflow/trainer/docker_job_client/__init__.py +++ b/sdk/kubeflow/trainer/docker_job_client/__init__.py @@ -1 +1,3 @@ +# flake8: noqa: F401 + from kubeflow.trainer.docker_job_client.docker_job_client import DockerJobClient diff --git a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py index 48fd196ec5..6f93537bb0 100644 --- a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py +++ b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Dict, Optional +from typing import Dict, List, Optional import docker - -from kubeflow.trainer.types import types from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -34,12 +33,14 @@ def create_job( entrypoint: List[str], command: List[str], num_nodes: int, - framework: types.Framework + framework: types.Framework, ) -> str: if framework != types.Framework.TORCH: raise RuntimeError(f"Framework '{framework}' is not currently supported.") - train_job_name = f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" + train_job_name = ( + f"{constants.LOCAL_TRAIN_JOB_NAME_PREFIX}{utils.generate_train_job_name()}" + ) docker_network = self.docker_client.networks.create( name=train_job_name, @@ -50,7 +51,7 @@ def create_job( ) for i in range(num_nodes): - c = self.docker_client.containers.run( + self.docker_client.containers.run( name=f"{train_job_name}-{i}", network=docker_network.id, image=image, @@ -85,7 +86,7 @@ def get_job_logs( Args: job_name (str): The name of the training job - follow (bool): If true, follows the job logs and prints them to standard out (default False) + follow (bool): If true, follows job logs and prints them to standard out (default False) step (int): The training job step to target (default "node") node_rank (int): The node rank to retrieve logs from (default 0) @@ -108,13 +109,11 @@ def get_job_logs( logs: Dict[str, str] = {} if follow: - for l in containers[0].logs(stream=True): - decoded = l.decode("utf-8") + for line in containers[0].logs(stream=True): + decoded = line.decode("utf-8") print(decoded) logs[f"{step}-{node_rank}"] = ( - logs.get(f"{step}-{node_rank}", "") - + decoded - + "\n" + logs.get(f"{step}-{node_rank}", "") + decoded + "\n" ) else: logs[f"{step}-{node_rank}"] = containers[0].logs().decode() @@ -144,7 +143,7 @@ def delete_job(self, job_name: str) -> None: """ containers = self.docker_client.containers.list( all=True, - filters={"label": f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}"} + filters={"label": f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}"}, ) for c in containers: c.remove(force=True) diff --git a/sdk/kubeflow/trainer/utils/utils.py b/sdk/kubeflow/trainer/utils/utils.py index d997057c0b..edd6d526a1 100644 --- a/sdk/kubeflow/trainer/utils/utils.py +++ b/sdk/kubeflow/trainer/utils/utils.py @@ -15,11 +15,11 @@ import inspect import os import queue +import random import string import textwrap import threading import uuid -import random from typing import Any, Callable, Dict, List, Optional, Tuple import kubeflow.trainer.models as models @@ -76,6 +76,7 @@ def get_container_devices( return device, str(device_count) + def get_runtime_from_crd( runtime_crd: models.TrainerV1alpha1ClusterTrainingRuntime, ) -> types.Runtime: @@ -99,6 +100,7 @@ def get_runtime_from_crd( ), ) + def get_runtime_trainer_container( replicated_jobs: List[models.JobsetV1alpha2ReplicatedJob], ) -> Optional[models.IoK8sApiCoreV1Container]: @@ -424,5 +426,6 @@ def get_log_queue_pool(log_streams: List[Any]) -> List[queue.Queue]: threading.Thread(target=wrap_log_stream, args=(q, log_stream)).start() return pool + def generate_train_job_name() -> str: return random.choice(string.ascii_lowercase) + uuid.uuid4().hex[:11] From edebbf47489dfe4d9839128f5a269be22b83602d Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 24 Apr 2025 09:52:41 +0100 Subject: [PATCH 18/22] Update copyright year in new files Signed-off-by: Eoin Fennessy --- sdk/kubeflow/trainer/api/abstract_trainer_client.py | 2 +- sdk/kubeflow/trainer/api/local_trainer_client.py | 2 +- sdk/kubeflow/trainer/docker_job_client/docker_job_client.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/kubeflow/trainer/api/abstract_trainer_client.py b/sdk/kubeflow/trainer/api/abstract_trainer_client.py index ef7c380254..189f93a41c 100644 --- a/sdk/kubeflow/trainer/api/abstract_trainer_client.py +++ b/sdk/kubeflow/trainer/api/abstract_trainer_client.py @@ -1,4 +1,4 @@ -# Copyright 2024 The Kubeflow Authors. +# Copyright 2025 The Kubeflow Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index f224c188e2..67a6d20614 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -1,4 +1,4 @@ -# Copyright 2024 The Kubeflow Authors. +# Copyright 2025 The Kubeflow Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py index 6f93537bb0..722fe2578e 100644 --- a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py +++ b/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py @@ -1,4 +1,4 @@ -# Copyright 2024 The Kubeflow Authors. +# Copyright 2025 The Kubeflow Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From fb7e3977ef112268b56550625bf246517f729e83 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 24 Apr 2025 11:48:27 +0100 Subject: [PATCH 19/22] Rename `DockerJobClient` to `DockerJobRunner` ...and set groundwork for adding more job runners Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 16 +++--- .../trainer/docker_job_client/__init__.py | 3 -- sdk/kubeflow/trainer/job_runners/__init__.py | 3 ++ .../docker_job_runner.py} | 3 +- .../trainer/job_runners/job_runner.py | 54 +++++++++++++++++++ 5 files changed, 67 insertions(+), 12 deletions(-) delete mode 100644 sdk/kubeflow/trainer/docker_job_client/__init__.py create mode 100644 sdk/kubeflow/trainer/job_runners/__init__.py rename sdk/kubeflow/trainer/{docker_job_client/docker_job_client.py => job_runners/docker_job_runner.py} (98%) create mode 100644 sdk/kubeflow/trainer/job_runners/job_runner.py diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 67a6d20614..78d241dcb5 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -20,7 +20,7 @@ from kubeflow.trainer import models from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants -from kubeflow.trainer.docker_job_client import DockerJobClient +from kubeflow.trainer.job_runners import JobRunner, DockerJobRunner from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -29,7 +29,7 @@ class LocalTrainerClient(AbstractTrainerClient): def __init__( self, local_runtimes_path: Optional[Path] = None, - docker_job_client: Optional[DockerJobClient] = None, + job_runner: Optional[JobRunner] = None, ): print( "Warning: LocalTrainerClient is an alpha feature for Kubeflow Trainer. " @@ -43,10 +43,10 @@ def __init__( else: self.local_runtimes_path = local_runtimes_path - if docker_job_client is None: - self.docker_job_client = DockerJobClient() + if job_runner is None: + self.job_runner = DockerJobRunner() else: - self.docker_job_client = docker_job_client + self.job_runner = job_runner def list_runtimes(self) -> List[types.Runtime]: runtimes = [] @@ -97,7 +97,7 @@ def train( else: num_nodes = 1 - train_job_name = self.docker_job_client.create_job( + train_job_name = self.job_runner.create_job( image=image, entrypoint=entrypoint, command=command, @@ -132,12 +132,12 @@ def get_job_logs( Dict[str, str]: The logs of the training job, where the key is the step and node rank, and the value is the logs for that node. """ - return self.docker_job_client.get_job_logs( + return self.job_runner.get_job_logs( job_name=name, follow=follow, step=step, node_rank=node_rank ) def delete_job(self, name: str): - self.docker_job_client.delete_job(job_name=name) + self.job_runner.delete_job(job_name=name) def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: runtime_crs = [] diff --git a/sdk/kubeflow/trainer/docker_job_client/__init__.py b/sdk/kubeflow/trainer/docker_job_client/__init__.py deleted file mode 100644 index 03437c738d..0000000000 --- a/sdk/kubeflow/trainer/docker_job_client/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# flake8: noqa: F401 - -from kubeflow.trainer.docker_job_client.docker_job_client import DockerJobClient diff --git a/sdk/kubeflow/trainer/job_runners/__init__.py b/sdk/kubeflow/trainer/job_runners/__init__.py new file mode 100644 index 0000000000..7961d7d9d5 --- /dev/null +++ b/sdk/kubeflow/trainer/job_runners/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa: F401 + +from kubeflow.trainer.job_runners.docker_job_runner import JobRunner, DockerJobRunner diff --git a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py similarity index 98% rename from sdk/kubeflow/trainer/docker_job_client/docker_job_client.py rename to sdk/kubeflow/trainer/job_runners/docker_job_runner.py index 722fe2578e..02eece62ca 100644 --- a/sdk/kubeflow/trainer/docker_job_client/docker_job_client.py +++ b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py @@ -16,11 +16,12 @@ import docker from kubeflow.trainer.constants import constants +from kubeflow.trainer.job_runners.job_runner import JobRunner from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils -class DockerJobClient: +class DockerJobRunner(JobRunner): def __init__(self, docker_client: Optional[docker.DockerClient] = None): if docker_client is None: self.docker_client = docker.from_env() diff --git a/sdk/kubeflow/trainer/job_runners/job_runner.py b/sdk/kubeflow/trainer/job_runners/job_runner.py new file mode 100644 index 0000000000..00a8824595 --- /dev/null +++ b/sdk/kubeflow/trainer/job_runners/job_runner.py @@ -0,0 +1,54 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import List, Dict + +from kubeflow.trainer.constants import constants +from kubeflow.trainer.types import types + + +class JobRunner(ABC): + @abstractmethod + def create_job( + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + ) -> str: + pass + + @abstractmethod + def get_job(self, job_name: str): + pass + + @abstractmethod + def get_job_logs( + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, + ) -> Dict[str, str]: + pass + + @abstractmethod + def list_jobs(self) -> List[str]: + pass + + @abstractmethod + def delete_job(self, job_name: str) -> None: + pass From 946866b946bba3d104b69cba2961a9af8d5fe216 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 24 Apr 2025 13:19:49 +0100 Subject: [PATCH 20/22] Implement `get_job` methods Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 13 ++++- sdk/kubeflow/trainer/constants/constants.py | 7 ++- sdk/kubeflow/trainer/job_runners/__init__.py | 2 +- .../trainer/job_runners/docker_job_runner.py | 47 ++++++++++++++++--- .../trainer/job_runners/job_runner.py | 27 ++++++----- sdk/kubeflow/trainer/types/types.py | 20 ++++++++ 6 files changed, 91 insertions(+), 25 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 78d241dcb5..0ad64571da 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -20,7 +20,7 @@ from kubeflow.trainer import models from kubeflow.trainer.api.abstract_trainer_client import AbstractTrainerClient from kubeflow.trainer.constants import constants -from kubeflow.trainer.job_runners import JobRunner, DockerJobRunner +from kubeflow.trainer.job_runners import DockerJobRunner, JobRunner from kubeflow.trainer.types import types from kubeflow.trainer.utils import utils @@ -103,6 +103,7 @@ def train( command=command, num_nodes=num_nodes, framework=runtime.trainer.framework, + runtime_name=runtime.name, ) return train_job_name @@ -112,7 +113,15 @@ def list_jobs( raise NotImplementedError() def get_job(self, name: str) -> types.TrainJob: - raise NotImplementedError() + container_job = self.job_runner.get_job(name) + + return types.TrainJob( + name=container_job.name, + creation_timestamp=container_job.creation_timestamp, + steps=[container.to_step() for container in container_job.containers], + runtime=self.get_runtime(container_job.runtime_name), + status=container_job.status, + ) def get_job_logs( self, diff --git a/sdk/kubeflow/trainer/constants/constants.py b/sdk/kubeflow/trainer/constants/constants.py index d8aab58373..492ac200f3 100644 --- a/sdk/kubeflow/trainer/constants/constants.py +++ b/sdk/kubeflow/trainer/constants/constants.py @@ -120,8 +120,11 @@ # The default path to the directory containing local training runtime configs. LOCAL_RUNTIMES_PATH = "trainer/config/local_runtimes" -# The label key used to associate docker resources with a train job name. -DOCKER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" +# The label key used to associate container resources with a train job name. +CONTAINER_TRAIN_JOB_NAME_LABEL = "trainer.kubeflow.org/train-job-name" + +# The label key used to associate container resources with a runtime. +CONTAINER_RUNTIME_LABEL = "trainer.kubeflow.org/runtime" # The prefix given to names used for local train jobs. LOCAL_TRAIN_JOB_NAME_PREFIX = "kubeflow-trainer-" diff --git a/sdk/kubeflow/trainer/job_runners/__init__.py b/sdk/kubeflow/trainer/job_runners/__init__.py index 7961d7d9d5..0b468c3faa 100644 --- a/sdk/kubeflow/trainer/job_runners/__init__.py +++ b/sdk/kubeflow/trainer/job_runners/__init__.py @@ -1,3 +1,3 @@ # flake8: noqa: F401 -from kubeflow.trainer.job_runners.docker_job_runner import JobRunner, DockerJobRunner +from kubeflow.trainer.job_runners.docker_job_runner import DockerJobRunner, JobRunner diff --git a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py index 02eece62ca..4fe32ee9c0 100644 --- a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py +++ b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime from typing import Dict, List, Optional import docker @@ -35,6 +36,7 @@ def create_job( command: List[str], num_nodes: int, framework: types.Framework, + runtime_name: str, ) -> str: if framework != types.Framework.TORCH: raise RuntimeError(f"Framework '{framework}' is not currently supported.") @@ -47,7 +49,8 @@ def create_job( name=train_job_name, driver="bridge", labels={ - constants.DOCKER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.CONTAINER_RUNTIME_LABEL: runtime_name, }, ) @@ -59,8 +62,9 @@ def create_job( entrypoint=entrypoint, command=command, labels={ - constants.DOCKER_TRAIN_JOB_NAME_LABEL: train_job_name, + constants.CONTAINER_TRAIN_JOB_NAME_LABEL: train_job_name, constants.LOCAL_NODE_RANK_LABEL: str(i), + constants.CONTAINER_RUNTIME_LABEL: runtime_name, }, environment=self.__get_container_environment( framework=framework, @@ -73,8 +77,32 @@ def create_job( return train_job_name - def get_job(self, job_name: str): - raise NotImplementedError() + def get_job(self, job_name: str) -> types.ContainerJob: + network = self.docker_client.networks.get(job_name) + + docker_containers = self.docker_client.containers.list( + filters={ + "label": [f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"] + }, + all=True, + ) + + containers = [] + for container in docker_containers: + containers.append( + types.Container( + name=container.name, + status=container.status, + ), + ) + + return types.ContainerJob( + name=job_name, + creation_timestamp=datetime.fromisoformat(network.attrs["Created"]), + runtime_name=network.attrs["Labels"][constants.CONTAINER_RUNTIME_LABEL], + containers=containers, + status=self.__get_job_status(containers), + ) def get_job_logs( self, @@ -100,7 +128,7 @@ def get_job_logs( all=True, filters={ "label": [ - f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}", + f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}", f"{constants.LOCAL_NODE_RANK_LABEL}={node_rank}", ] }, @@ -129,7 +157,7 @@ def list_jobs(self) -> List[str]: # Because a network is created for each job, we use network names to list all jobs. networks = self.docker_client.networks.list( - filters={"label": constants.DOCKER_TRAIN_JOB_NAME_LABEL}, + filters={"label": constants.CONTAINER_TRAIN_JOB_NAME_LABEL}, ) job_names = [] @@ -144,7 +172,7 @@ def delete_job(self, job_name: str) -> None: """ containers = self.docker_client.containers.list( all=True, - filters={"label": f"{constants.DOCKER_TRAIN_JOB_NAME_LABEL}={job_name}"}, + filters={"label": f"{constants.CONTAINER_TRAIN_JOB_NAME_LABEL}={job_name}"}, ) for c in containers: c.remove(force=True) @@ -171,3 +199,8 @@ def __get_container_environment( "PET_MASTER_ADDR": head_node_address, "PET_MASTER_PORT": str(constants.TORCH_HEAD_NODE_PORT), } + + @staticmethod + def __get_job_status(_: List[types.Container]) -> str: + # TODO (eoinfennessy): Discuss how to report status + return constants.UNKNOWN diff --git a/sdk/kubeflow/trainer/job_runners/job_runner.py b/sdk/kubeflow/trainer/job_runners/job_runner.py index 00a8824595..b8b8479d3e 100644 --- a/sdk/kubeflow/trainer/job_runners/job_runner.py +++ b/sdk/kubeflow/trainer/job_runners/job_runner.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import List, Dict +from typing import Dict, List from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -22,26 +22,27 @@ class JobRunner(ABC): @abstractmethod def create_job( - self, - image: str, - entrypoint: List[str], - command: List[str], - num_nodes: int, - framework: types.Framework, + self, + image: str, + entrypoint: List[str], + command: List[str], + num_nodes: int, + framework: types.Framework, + runtime_name: str, ) -> str: pass @abstractmethod - def get_job(self, job_name: str): + def get_job(self, job_name: str) -> types.ContainerJob: pass @abstractmethod def get_job_logs( - self, - job_name: str, - follow: bool = False, - step: str = constants.NODE, - node_rank: int = 0, + self, + job_name: str, + follow: bool = False, + step: str = constants.NODE, + node_rank: int = 0, ) -> Dict[str, str]: pass diff --git a/sdk/kubeflow/trainer/types/types.py b/sdk/kubeflow/trainer/types/types.py index b7e1d8eb15..f2c8c6df48 100644 --- a/sdk/kubeflow/trainer/types/types.py +++ b/sdk/kubeflow/trainer/types/types.py @@ -102,6 +102,26 @@ class TrainJob: status: Optional[str] = constants.UNKNOWN +# Representation for a container used in a local job. +@dataclass +class Container: + name: str + status: str + + def to_step(self) -> Step: + return Step(name=self.name, status=self.status, pod_name=self.name) + + +# Representation for a local container job. +@dataclass +class ContainerJob: + name: str + creation_timestamp: datetime + runtime_name: str + containers: List[Container] + status: Optional[str] = constants.UNKNOWN + + # Configuration for the HuggingFace dataset initializer. # TODO (andreyvelich): Discuss how to keep these configurations is sync with pkg.initializers.types @dataclass From fcc6fec9c9b782791f4a8bef661f72b470c65611 Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 24 Apr 2025 14:26:35 +0100 Subject: [PATCH 21/22] Implement `list_jobs` methods Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 28 ++++++---- .../trainer/job_runners/docker_job_runner.py | 51 +++++++++++++------ .../trainer/job_runners/job_runner.py | 7 ++- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 0ad64571da..020c82b418 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -110,18 +110,17 @@ def train( def list_jobs( self, runtime: Optional[types.Runtime] = None ) -> List[types.TrainJob]: - raise NotImplementedError() + runtime_name = runtime.name if runtime else None + container_jobs = self.job_runner.list_jobs(runtime_name) + + train_jobs = [] + for container_job in container_jobs: + train_jobs.append(self.__container_job_to_train_job(container_job)) + return train_jobs def get_job(self, name: str) -> types.TrainJob: container_job = self.job_runner.get_job(name) - - return types.TrainJob( - name=container_job.name, - creation_timestamp=container_job.creation_timestamp, - steps=[container.to_step() for container in container_job.containers], - runtime=self.get_runtime(container_job.runtime_name), - status=container_job.status, - ) + return self.__container_job_to_train_job(container_job) def get_job_logs( self, @@ -167,3 +166,14 @@ def __get_runtime_cr( if cr.metadata.name == name: return cr return None + + def __container_job_to_train_job( + self, container_job: types.ContainerJob + ) -> types.TrainJob: + return types.TrainJob( + name=container_job.name, + creation_timestamp=container_job.creation_timestamp, + steps=[container.to_step() for container in container_job.containers], + runtime=self.get_runtime(container_job.runtime_name), + status=container_job.status, + ) diff --git a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py index 4fe32ee9c0..c3d55a8970 100644 --- a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py +++ b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py @@ -148,22 +148,14 @@ def get_job_logs( logs[f"{step}-{node_rank}"] = containers[0].logs().decode() return logs - def list_jobs(self) -> List[str]: - """Lists the names of all Docker training jobs. - - Returns: - List[str]: A list of Docker training job names. - """ - - # Because a network is created for each job, we use network names to list all jobs. - networks = self.docker_client.networks.list( - filters={"label": constants.CONTAINER_TRAIN_JOB_NAME_LABEL}, - ) - - job_names = [] - for n in networks: - job_names.append(n.name) - return job_names + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: + jobs = [] + for name in self.__list_job_names(runtime_name): + jobs.append(self.get_job(name)) + return jobs def delete_job(self, job_name: str) -> None: """Deletes all resources associated with a Docker training job. @@ -182,6 +174,33 @@ def delete_job(self, job_name: str) -> None: network.remove() print(f"Removed network: {network.name}") + def __list_job_names( + self, + runtime_name: Optional[str] = None, + ) -> List[str]: + """Lists the names of all Docker training jobs. + + Args: + runtime_name (Optional[str]): Filter by runtime name (default None) + + Returns: + List[str]: A list of Docker training job names. + """ + + filters = {"label": [constants.CONTAINER_TRAIN_JOB_NAME_LABEL]} + if runtime_name is not None: + filters["label"].append( + f"{constants.CONTAINER_RUNTIME_LABEL}={runtime_name}" + ) + + # Because a network is created for each job, we use network names to list all jobs. + networks = self.docker_client.networks.list(filters=filters) + + job_names = [] + for n in networks: + job_names.append(n.name) + return job_names + @staticmethod def __get_container_environment( framework: types.Framework, diff --git a/sdk/kubeflow/trainer/job_runners/job_runner.py b/sdk/kubeflow/trainer/job_runners/job_runner.py index b8b8479d3e..e5d56b8eee 100644 --- a/sdk/kubeflow/trainer/job_runners/job_runner.py +++ b/sdk/kubeflow/trainer/job_runners/job_runner.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Dict, List, Optional from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -47,7 +47,10 @@ def get_job_logs( pass @abstractmethod - def list_jobs(self) -> List[str]: + def list_jobs( + self, + runtime_name: Optional[str] = None, + ) -> List[types.ContainerJob]: pass @abstractmethod From 12dea430ac11cc3239719f1adc8fc2b5c30bea8e Mon Sep 17 00:00:00 2001 From: Eoin Fennessy Date: Thu, 1 May 2025 11:12:38 +0100 Subject: [PATCH 22/22] Add docstrings Signed-off-by: Eoin Fennessy --- .../trainer/api/local_trainer_client.py | 65 +++++++++++++++++++ .../trainer/job_runners/docker_job_runner.py | 42 ++++++++++++ 2 files changed, 107 insertions(+) diff --git a/sdk/kubeflow/trainer/api/local_trainer_client.py b/sdk/kubeflow/trainer/api/local_trainer_client.py index 020c82b418..415d5121a8 100644 --- a/sdk/kubeflow/trainer/api/local_trainer_client.py +++ b/sdk/kubeflow/trainer/api/local_trainer_client.py @@ -26,6 +26,19 @@ class LocalTrainerClient(AbstractTrainerClient): + """LocalTrainerClient exposes functionality for running training jobs locally. + + A Kubernetes cluster is not required. + It exposes the same interface as the TrainerClient. + + Args: + local_runtimes_path: The path to the directory containing runtime YAML files. + Defaults to the runtimes included with the package. + job_runner: The job runner to use for local training. + Options include the DockerJobRunner and PodmanJobRunner. + Defaults to the Docker job runner. + """ + def __init__( self, local_runtimes_path: Optional[Path] = None, @@ -49,12 +62,28 @@ def __init__( self.job_runner = job_runner def list_runtimes(self) -> List[types.Runtime]: + """Lists all runtimes. + + Returns: + A list of runtime objects. + """ runtimes = [] for cr in self.__list_runtime_crs(): runtimes.append(utils.get_runtime_from_crd(cr)) return runtimes def get_runtime(self, name: str) -> types.Runtime: + """Get a specific runtime by name. + + Args: + name: The name of the runtime. + + Returns: + A runtime object. + + Raises: + RuntimeError: if the specified runtime cannot be found. + """ for r in self.list_runtimes(): if r.name == name: return r @@ -66,6 +95,21 @@ def train( initializer: Optional[types.Initializer] = None, trainer: Optional[types.CustomTrainer] = None, ) -> str: + """Starts a training job. + + Args: + runtime: Config for the train job's runtime. + trainer: Config for the function that encapsulates the model training process. + initializer: Config for dataset and model initialization. + + Returns: + The generated name of the training job. + + Raises: + RuntimeError: if the specified runtime cannot be found, + or the runtime container cannot be found, + or the runtime container image is not specified. + """ runtime_cr = self.__get_runtime_cr(runtime.name) if runtime_cr is None: raise RuntimeError(f"No runtime found with name '{runtime.name}'") @@ -110,6 +154,14 @@ def train( def list_jobs( self, runtime: Optional[types.Runtime] = None ) -> List[types.TrainJob]: + """Lists all training jobs. + + Args: + runtime: If provided, only return jobs that use the given runtime. + + Returns: + A list of training jobs. + """ runtime_name = runtime.name if runtime else None container_jobs = self.job_runner.list_jobs(runtime_name) @@ -119,6 +171,14 @@ def list_jobs( return train_jobs def get_job(self, name: str) -> types.TrainJob: + """Get a specific training job by name. + + Args: + name: The name of the training job to get. + + Returns: + A training job. + """ container_job = self.job_runner.get_job(name) return self.__container_job_to_train_job(container_job) @@ -145,6 +205,11 @@ def get_job_logs( ) def delete_job(self, name: str): + """Deletes a specific training job. + + Args: + name: The name of the training job to delete. + """ self.job_runner.delete_job(job_name=name) def __list_runtime_crs(self) -> List[models.TrainerV1alpha1ClusterTrainingRuntime]: diff --git a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py index c3d55a8970..fd9f24f665 100644 --- a/sdk/kubeflow/trainer/job_runners/docker_job_runner.py +++ b/sdk/kubeflow/trainer/job_runners/docker_job_runner.py @@ -23,6 +23,13 @@ class DockerJobRunner(JobRunner): + """DockerJobRunner creates and manages training jobs using Docker. + + Args: + docker_client: If provided, this client is used for Docker API calls. + If not provided, a new client will be created from the user's environment. + """ + def __init__(self, docker_client: Optional[docker.DockerClient] = None): if docker_client is None: self.docker_client = docker.from_env() @@ -38,6 +45,22 @@ def create_job( framework: types.Framework, runtime_name: str, ) -> str: + """Creates a training job. + + Args: + image: The name of the container image to use for the job. + entrypoint: The entrypoint for the container. + command: The command to run in the container. + num_nodes: The number of nodes to run the job on. + framework: The framework being used. + runtime_name: The name of the runtime being used. + + Returns: + The name of the created job. + + Raises: + RuntimeError: If the framework provided is not supported. + """ if framework != types.Framework.TORCH: raise RuntimeError(f"Framework '{framework}' is not currently supported.") @@ -78,6 +101,14 @@ def create_job( return train_job_name def get_job(self, job_name: str) -> types.ContainerJob: + """Get a specified container training job by its name. + + Args: + job_name: The name of the training job to get. + + Returns: + A container training job. + """ network = self.docker_client.networks.get(job_name) docker_containers = self.docker_client.containers.list( @@ -122,6 +153,9 @@ def get_job_logs( Returns: Dict[str, str]: The logs of the training job, where the key is the step and node rank, and the value is the logs for that node. + + Raises: + RuntimeError: If the job is not found. """ # TODO (eoinfennessy): use "step" in query. containers = self.docker_client.containers.list( @@ -152,6 +186,14 @@ def list_jobs( self, runtime_name: Optional[str] = None, ) -> List[types.ContainerJob]: + """Lists container training jobs. + + Args: + runtime_name: If provided, only return jobs that use the given runtime name. + + Returns: + A list of container training jobs. + """ jobs = [] for name in self.__list_job_names(runtime_name): jobs.append(self.get_job(name))