Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f1e3636
Add abstract base class for trainer clients
eoinfennessy Apr 2, 2025
4b67f1c
Add `LocalTrainerClient` class with unimplemented methods
eoinfennessy Apr 2, 2025
46a8ea7
Implement 'list_runtimes' method
eoinfennessy Apr 3, 2025
1243dd1
Implement 'get_runtime' method
eoinfennessy Apr 3, 2025
9db65cb
Introduce `LocalJobClient`
eoinfennessy Apr 7, 2025
540190d
Use trainer func for container entrypoint and command
eoinfennessy Apr 8, 2025
936c9bc
Enable multi-node distributed torch jobs
eoinfennessy Apr 11, 2025
7654b09
Add `LocalTrainerClient` to `trainer` package
eoinfennessy Apr 11, 2025
3676ebe
Move default `LocalJobClient` instantiation out of `__init__` signature
eoinfennessy Apr 11, 2025
e09152e
Rename `LocalJobClient` to `DockerJobClient`
eoinfennessy Apr 11, 2025
a675acc
Rename `TrainerClientABC` to `AbstractTrainerClient`
eoinfennessy Apr 11, 2025
88ed2e5
Use `importlib` for referencing runtime YAML files
eoinfennessy Apr 17, 2025
f586ec3
Change "master" to "head"
eoinfennessy Apr 17, 2025
50adf30
Add example notebook
eoinfennessy Apr 17, 2025
80cd4bb
Use `Optional` instead of union types
eoinfennessy Apr 22, 2025
ba16065
Add warning that `LocalTrainerClient` is an alpha feature
eoinfennessy Apr 22, 2025
bc91f68
Fix pre-commit fails
eoinfennessy Apr 24, 2025
edebbf4
Update copyright year in new files
eoinfennessy Apr 24, 2025
fb7e397
Rename `DockerJobClient` to `DockerJobRunner`
eoinfennessy Apr 24, 2025
946866b
Implement `get_job` methods
eoinfennessy Apr 24, 2025
fcc6fec
Implement `list_jobs` methods
eoinfennessy Apr 24, 2025
12dea43
Add docstrings
eoinfennessy May 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": [
"# Using `LocalTrainerClient` for MNIST image classification with PyTorch DDP\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the main value of the local mode is portability, so I'd rather demonstrate how to run the existing notebook example rather than duplicating it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of this notebook is to make readers aware of what LocalTrainerClient is and give a high level idea of how it works by showing the Docker resources created. There is some duplication with one of the other notebooks, but maybe it would detract from the focus of the other notebook if we were to update it to include information about what LocalTrainerClient is and ask users to examine the Docker resources created. WDYT?

Agreed regarding highlighting portability -- maybe we could eventually update the getting started guide on the Kubeflow website to make users aware of the LocalTrainerClient by giving them options to use it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be good for a "quickstart"

"\n",
"This notebook uses the `LocalTrainerClient` to train an image classification model using the MNIST fashion dataset.\n",
"\n",
"The `LocalTrainerClient` runs training jobs locally in Docker containers. No Kubernetes cluster is required."
],
"id": "3a425719bb33868e"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Install the KubeFlow SDK",
"id": "52b257e345242c84"
},
{
"metadata": {},
"cell_type": "code",
"source": [
"# TODO (eoinfennessy): Update repo and branch before merging.\n",
"!pip install git+https://github.com/eoinfennessy/trainer.git@add-local-trainer-client#subdirectory=sdk"
],
"id": "5398886ad279253b",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"## Define the training function\n",
"\n",
"This function trains a CNN model using the Fashion MNIST dataset.\n"
],
"id": "aacfd746ab04a56f"
},
{
"metadata": {},
"cell_type": "code",
"source": [
"def train_pytorch():\n",
" import os\n",
"\n",
" import torch\n",
" from torch import nn\n",
" import torch.nn.functional as F\n",
"\n",
" from torchvision import datasets, transforms\n",
" import torch.distributed as dist\n",
" from torch.utils.data import DataLoader, DistributedSampler\n",
"\n",
" # [1] Configure CPU/GPU device and distributed backend.\n",
" # Kubeflow Trainer will automatically configure the distributed environment.\n",
" device, backend = (\"cuda\", \"nccl\") if torch.cuda.is_available() else (\"cpu\", \"gloo\")\n",
" dist.init_process_group(backend=backend)\n",
"\n",
" local_rank = int(os.getenv(\"LOCAL_RANK\", 0))\n",
" print(\n",
" \"Distributed Training with WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}.\".format(\n",
" dist.get_world_size(),\n",
" dist.get_rank(),\n",
" local_rank,\n",
" )\n",
" )\n",
"\n",
" # [2] Define PyTorch CNN Model to be trained.\n",
" class Net(nn.Module):\n",
" def __init__(self):\n",
" super(Net, self).__init__()\n",
" self.conv1 = nn.Conv2d(1, 20, 5, 1)\n",
" self.conv2 = nn.Conv2d(20, 50, 5, 1)\n",
" self.fc1 = nn.Linear(4 * 4 * 50, 500)\n",
" self.fc2 = nn.Linear(500, 10)\n",
"\n",
" def forward(self, x):\n",
" x = F.relu(self.conv1(x))\n",
" x = F.max_pool2d(x, 2, 2)\n",
" x = F.relu(self.conv2(x))\n",
" x = F.max_pool2d(x, 2, 2)\n",
" x = x.view(-1, 4 * 4 * 50)\n",
" x = F.relu(self.fc1(x))\n",
" x = self.fc2(x)\n",
" return F.log_softmax(x, dim=1)\n",
"\n",
" # [3] Attach model to the correct device.\n",
" device = torch.device(f\"{device}:{local_rank}\")\n",
" model = nn.parallel.DistributedDataParallel(Net().to(device))\n",
" model.train()\n",
" optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)\n",
"\n",
" # [4] Get the Fashion-MNIST dataset and distributed it across all available devices.\n",
" dataset = datasets.FashionMNIST(\n",
" \"./data\",\n",
" train=True,\n",
" download=True,\n",
" transform=transforms.Compose([transforms.ToTensor()]),\n",
" )\n",
" train_loader = DataLoader(\n",
" dataset,\n",
" batch_size=100,\n",
" sampler=DistributedSampler(dataset),\n",
" )\n",
"\n",
" # [5] Define the training loop.\n",
" for epoch in range(3):\n",
" for batch_idx, (inputs, labels) in enumerate(train_loader):\n",
" # Attach tensors to the device.\n",
" inputs, labels = inputs.to(device), labels.to(device)\n",
"\n",
" # Forward pass\n",
" outputs = model(inputs)\n",
" loss = F.nll_loss(outputs, labels)\n",
"\n",
" # Backward pass\n",
" optimizer.zero_grad()\n",
" loss.backward()\n",
" optimizer.step()\n",
" if batch_idx % 10 == 0 and dist.get_rank() == 0:\n",
" print(\n",
" \"Train Epoch: {} [{}/{} ({:.0f}%)]\\tLoss: {:.6f}\".format(\n",
" epoch,\n",
" batch_idx * len(inputs),\n",
" len(train_loader.dataset),\n",
" 100.0 * batch_idx / len(train_loader),\n",
" loss.item(),\n",
" )\n",
" )\n",
"\n",
" # Wait for the training to complete and destroy to PyTorch distributed process group.\n",
" dist.barrier()\n",
" if dist.get_rank() == 0:\n",
" print(\"Training is finished\")\n",
" dist.destroy_process_group()"
],
"id": "86f2438e1c249731",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Create the trainer client",
"id": "65ce9b4ee3c2c64b"
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"The `LocalTrainerClient` exposes the same interface as the `TrainerClient`.\n",
"\n",
"When you wish to move your job to a Kubernetes cluster, simply change the\n",
"training client -- the rest of the notebook will work without modification."
],
"id": "429ad749682d68cc"
},
{
"metadata": {},
"cell_type": "code",
"source": [
"import os\n",
"\n",
"from kubeflow.trainer import LocalTrainerClient, TrainerClient, CustomTrainer\n",
"\n",
"exec_mode = os.getenv(\"KUBEFLOW_TRAINER_EXEC_MODE\", \"local\")\n",
"\n",
"if exec_mode == \"local\":\n",
" client = LocalTrainerClient()\n",
"else:\n",
" client = TrainerClient()"
],
"id": "f459587c03f32d05",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"## List runtimes\n",
"\n",
"Predefined training runtimes are included as part of the package. Currently only the `torch-distributed` runtime is included."
],
"id": "735a674339f10ee1"
},
{
"metadata": {},
"cell_type": "code",
"source": "client.list_runtimes()",
"id": "5e3dc7c8ab7c9322",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Get runtime",
"id": "8b08eb668205d5f4"
},
{
"metadata": {},
"cell_type": "code",
"source": "runtime = client.get_runtime(\"torch-distributed\")",
"id": "5cfaa7952b5fbcb1",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"## Start the training job\n",
"\n",
"This job uses the `torch-distrbuted` runtime to run the `train_pytorch` training function. The job runs on four worker node containers."
],
"id": "4c8aa9177a6ca0c1"
},
{
"metadata": {},
"cell_type": "code",
"source": [
"job_name = client.train(\n",
" runtime=runtime,\n",
" trainer=CustomTrainer(\n",
" func=train_pytorch,\n",
" num_nodes=4,\n",
" )\n",
" )"
],
"id": "9a3e893c987e6e9b",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Follow job logs",
"id": "5f8e96a51263ffdb"
},
{
"metadata": {},
"cell_type": "code",
"source": "_ = client.get_job_logs(job_name, follow=True)",
"id": "23c0d5f0321961a5",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"## Optional: Examine Docker resources\n",
"\n",
"In a terminal, run the following to list the containers running the training job:\n",
"\n",
"```shell\n",
"docker ps -a --filter label=trainer.kubeflow.org/train-job-name\n",
"```\n",
"\n",
"Example output:\n",
"\n",
"```\n",
"CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n",
"e116e42af00a pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-3\n",
"0f9d891edd74 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-2\n",
"1672a222d360 pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-1\n",
"e1f11b5fad3c pytorch/pytorch:2.5.0-cuda12.4-cudnn9-runtime \"bash -c '\\nread -r -…\" 11 minutes ago Exited (0) 7 minutes ago kubeflow-trainer-l20b00d08df5-0\n",
"```\n",
"\n",
"Run the following to see the Docker network:\n",
"\n",
"```shell\n",
"docker network ls --filter label=trainer.kubeflow.org/train-job-name\n",
"```\n",
"\n",
"Example output:\n",
"\n",
"```\n",
"NETWORK ID NAME DRIVER SCOPE\n",
"157a6ed5752f kubeflow-trainer-l20b00d08df5 bridge local\n",
"```"
],
"id": "306ef11f6d48d176"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Delete the job",
"id": "5a3478eb1567897c"
},
{
"metadata": {},
"cell_type": "code",
"source": "client.delete_job(job_name)",
"id": "e5e72bdf898d9129",
"outputs": [],
"execution_count": null
}
],
"metadata": {
"kernelspec": {
"name": "python3",
"language": "python",
"display_name": "Python 3 (ipykernel)"
}
},
"nbformat": 5,
"nbformat_minor": 9
}
3 changes: 3 additions & 0 deletions sdk/kubeflow/trainer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# Import the Kubeflow Trainer client.
from kubeflow.trainer.api.trainer_client import TrainerClient

# Import the Kubeflow Local Trainer client.
from kubeflow.trainer.api.local_trainer_client import LocalTrainerClient

# Import the Kubeflow Trainer constants.
from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH

Expand Down
62 changes: 62 additions & 0 deletions sdk/kubeflow/trainer/api/abstract_trainer_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2025 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types


class AbstractTrainerClient(ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think BaseTrainer name would make more sense here? or AbstractTrainer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could cause some confusion because a Trainer type already exists, which is different to the TrainerClient type that we are defining an interface for here.

@abstractmethod
def delete_job(self, name: str):
pass

@abstractmethod
def get_job(self, name: str) -> types.TrainJob:
pass

@abstractmethod
def get_job_logs(
self,
name: str,
follow: Optional[bool] = False,
step: str = constants.NODE,
node_rank: int = 0,
) -> Dict[str, str]:
pass

@abstractmethod
def get_runtime(self, name: str) -> types.Runtime:
pass

@abstractmethod
def list_jobs(
self, runtime: Optional[types.Runtime] = None
) -> List[types.TrainJob]:
pass

@abstractmethod
def list_runtimes(self) -> List[types.Runtime]:
pass

@abstractmethod
def train(
self,
runtime: types.Runtime = types.DEFAULT_RUNTIME,
initializer: Optional[types.Initializer] = None,
trainer: Optional[types.CustomTrainer] = None,
) -> str:
pass
Loading