From 2ed9eef88db7dd26a52a7f6276b2af5255ddb64f Mon Sep 17 00:00:00 2001 From: khushiiagrawal Date: Thu, 29 Jan 2026 22:59:47 +0530 Subject: [PATCH 1/2] simplify local-exec command building and fix job status Signed-off-by: khushiiagrawal --- .../trainer/backends/localprocess/backend.py | 2 +- .../backends/localprocess/constants.py | 91 +++++++++------ .../trainer/backends/localprocess/utils.py | 108 +++++------------- .../backends/localprocess/utils_test.py | 86 ++++++++++++++ 4 files changed, 172 insertions(+), 115 deletions(-) create mode 100644 kubeflow/trainer/backends/localprocess/utils_test.py diff --git a/kubeflow/trainer/backends/localprocess/backend.py b/kubeflow/trainer/backends/localprocess/backend.py index e52e58e9e..a63997cf2 100644 --- a/kubeflow/trainer/backends/localprocess/backend.py +++ b/kubeflow/trainer/backends/localprocess/backend.py @@ -267,7 +267,7 @@ def __get_job_status(self, job: LocalBackendJobs) -> str: elif constants.TRAINJOB_CREATED in statuses: status = constants.TRAINJOB_CREATED else: - status = constants.TRAINJOB_CREATED + status = constants.TRAINJOB_COMPLETE return status diff --git a/kubeflow/trainer/backends/localprocess/constants.py b/kubeflow/trainer/backends/localprocess/constants.py index a7164030a..94488090e 100644 --- a/kubeflow/trainer/backends/localprocess/constants.py +++ b/kubeflow/trainer/backends/localprocess/constants.py @@ -41,48 +41,69 @@ ] -# Create venv script - - -# The exec script to embed training function into container command. -DEPENDENCIES_SCRIPT = textwrap.dedent( - """ - PIP_DISABLE_PIP_VERSION_CHECK=1 pip install $QUIET \ - --no-warn-script-location $PIP_INDEX $PACKAGE_STR - """ -) - -# activate virtualenv, then run the entrypoint from the virtualenv bin -LOCAL_EXEC_ENTRYPOINT = textwrap.dedent( - """ - $ENTRYPOINT "$FUNC_FILE" "$PARAMETERS" - """ -) TORCH_COMMAND = "torchrun" - -# default command, will run from within the virtualenv DEFAULT_COMMAND = "python" -# remove virtualenv after training is completed. -LOCAL_EXEC_JOB_CLEANUP_SCRIPT = textwrap.dedent( - """ - rm -rf $PYENV_LOCATION - """ -) +# Create venv script -LOCAL_EXEC_JOB_TEMPLATE = textwrap.dedent( - """ - set -e - $OS_PYTHON_BIN -m venv --without-pip $PYENV_LOCATION - echo "Operating inside $PYENV_LOCATION" - source $PYENV_LOCATION/bin/activate - $PYENV_LOCATION/bin/python -m ensurepip --upgrade --default-pip - $DEPENDENCIES_SCRIPT - $ENTRYPOINT - $CLEANUP_SCRIPT +RUNNER_TEMPLATE = textwrap.dedent( """ +import os +import shutil +import subprocess +import sys + +def main(): + venv_dir = r"${pyenv_location}" + requirements = ${packages_list} + pip_index_urls = ${pip_index_urls} + command = ${command} + cleanup_venv = ${cleanup_venv} + + # 1. Create venv + print(f"Creating venv at {venv_dir}") + # Use sys.executable to ensure we use the same python interpreter kind + subprocess.run([sys.executable, "-m", "venv", venv_dir], check=True) + + venv_python = os.path.join(venv_dir, "bin", "python") + # Upgrade pip + subprocess.run([venv_python, "-m", "ensurepip", "--upgrade", "--default-pip"], check=True) + + # 2. Install dependencies + if requirements: + pip_cmd = [venv_python, "-m", "pip", "install"] + if pip_index_urls: + pip_cmd.extend(["--index-url", pip_index_urls[0]]) + for url in pip_index_urls[1:]: + pip_cmd.extend(["--extra-index-url", url]) + pip_cmd.extend(requirements) + print(f"Installing dependencies: {requirements}") + subprocess.run(pip_cmd, check=True) + + # 3. Run Training Command + print(f"Running command: {command}") + try: + subprocess.run(command, check=True) + except subprocess.CalledProcessError as e: + print(f"Command failed with exit code {e.returncode}") + sys.exit(e.returncode) + finally: + # 4. Cleanup + if cleanup_venv: + print(f"Cleaning up venv at {venv_dir}") + try: + # We are running inside venv_dir, so we can't delete it fully on some OSs. + # But typically on Unix it's allowed. + # If we encounter errors, we ignore them to avoid failing the job status. + shutil.rmtree(venv_dir, ignore_errors=True) + except Exception as e: + print(f"Warning: Failed to cleanup venv: {e}") + +if __name__ == "__main__": + main() +""" ) LOCAL_EXEC_FILENAME = "train_{}.py" diff --git a/kubeflow/trainer/backends/localprocess/utils.py b/kubeflow/trainer/backends/localprocess/utils.py index 4afcb1b07..0cfeff32d 100644 --- a/kubeflow/trainer/backends/localprocess/utils.py +++ b/kubeflow/trainer/backends/localprocess/utils.py @@ -148,35 +148,6 @@ def get_local_runtime_trainer( return trainer -def get_dependencies_command( - runtime_packages: list[str], - pip_index_urls: list[str], - trainer_packages: list[str], - quiet: bool = True, -) -> str: - # resolve runtime dependencies and trainer dependencies. - packages = get_install_packages( - runtime_packages=runtime_packages, - trainer_packages=trainer_packages, - ) - - options = [f"--index-url {pip_index_urls[0]}"] - options.extend(f"--extra-index-url {extra_index_url}" for extra_index_url in pip_index_urls[1:]) - - """ - PIP_DISABLE_PIP_VERSION_CHECK=1 pip install $QUIET $AS_USER \ - --no-warn-script-location $PIP_INDEX $PACKAGE_STR - """ - mapping = { - "QUIET": "--quiet" if quiet else "", - "PIP_INDEX": " ".join(options), - "PACKAGE_STR": '"{}"'.format('" "'.join(packages)), # quote deps - } - t = Template(local_exec_constants.DEPENDENCIES_SCRIPT) - result = t.substitute(**mapping) - return result - - def get_command_using_train_func( runtime: types.Runtime, train_func: Callable, @@ -185,7 +156,7 @@ def get_command_using_train_func( train_job_name: str, ) -> str: """ - Get the Trainer container command from the given training function and parameters. + Get the file path of the training function script. """ # Check if the runtime has a Trainer. if not runtime.trainer: @@ -207,11 +178,7 @@ def get_command_using_train_func( # We need to dedent the function code. func_code = textwrap.dedent(func_code) - # Wrap function code to execute it from the file. For example: - # TODO (andreyvelich): Find a better way to run users' scripts. - # def train(parameters): - # print('Start Training...') - # train({'lr': 0.01}) + # Wrap function code to execute it from the file. if train_func_parameters is None: func_code = f"{func_code}\n{train_func.__name__}()\n" else: @@ -219,30 +186,9 @@ def get_command_using_train_func( with open(func_file, "w") as f: f.write(func_code) - f.close() - - t = Template(local_exec_constants.LOCAL_EXEC_ENTRYPOINT) - mapping = { - "PARAMETERS": "", ## Torch Parameters if any - "PYENV_LOCATION": venv_dir, - "ENTRYPOINT": " ".join(runtime.trainer.command), - "FUNC_FILE": func_file, - } - entrypoint = t.safe_substitute(**mapping) - - return entrypoint + # File is closed automatically by with block - -def get_cleanup_venv_script(venv_dir: str, cleanup_venv: bool = True) -> str: - script = "\n" - if not cleanup_venv: - return script - - t = Template(local_exec_constants.LOCAL_EXEC_JOB_CLEANUP_SCRIPT) - mapping = { - "PYENV_LOCATION": venv_dir, - } - return t.substitute(**mapping) + return str(func_file) def get_local_train_job_script( @@ -251,9 +197,7 @@ def get_local_train_job_script( trainer: types.CustomTrainer, runtime: types.Runtime, cleanup_venv: bool = True, -) -> tuple: - # use local-exec train job template - t = Template(local_exec_constants.LOCAL_EXEC_JOB_TEMPLATE) +) -> list[str]: # find os python binary to create venv python_bin = shutil.which("python") if not python_bin: @@ -266,20 +210,19 @@ def get_local_train_job_script( runtime_trainer: LocalRuntimeTrainer = runtime.trainer else: raise ValueError("Invalid Runtime Trainer type: {type(runtime.trainer)}") - dependency_script = "\n" + + packages_list = [] if trainer.packages_to_install: - dependency_script = get_dependencies_command( - pip_index_urls=( - trainer.pip_index_urls - if trainer.pip_index_urls - else constants.DEFAULT_PIP_INDEX_URLS - ), + packages_list = get_install_packages( runtime_packages=runtime_trainer.packages, trainer_packages=trainer.packages_to_install, - quiet=False, ) + # Default runtime packages if no trainer packages (though get_install_packages handles merge) + elif runtime_trainer.packages: + packages_list = runtime_trainer.packages - entrypoint = get_command_using_train_func( + + func_file = get_command_using_train_func( venv_dir=venv_dir, runtime=runtime, train_func=trainer.func, @@ -287,16 +230,23 @@ def get_local_train_job_script( train_job_name=train_job_name, ) - cleanup_script = get_cleanup_venv_script(cleanup_venv=cleanup_venv, venv_dir=venv_dir) + # Build the full command to be executed inside the runner + # runtime.trainer.command should point to venv's python/torchrun + full_command = list(runtime.trainer.command) + [func_file] + # Create the runner script + runner_file = Path(venv_dir) / "runner.py" + t = Template(local_exec_constants.RUNNER_TEMPLATE) + mapping = { - "OS_PYTHON_BIN": python_bin, - "PYENV_LOCATION": venv_dir, - "DEPENDENCIES_SCRIPT": dependency_script, - "ENTRYPOINT": entrypoint, - "CLEANUP_SCRIPT": cleanup_script, + "pyenv_location": venv_dir, + "packages_list": str(packages_list), + "pip_index_urls": str(trainer.pip_index_urls if trainer.pip_index_urls else constants.DEFAULT_PIP_INDEX_URLS), + "command": str(full_command), + "cleanup_venv": str(cleanup_venv), } + + with open(runner_file, "w") as f: + f.write(t.substitute(**mapping)) - command = t.safe_substitute(**mapping) - - return "bash", "-c", command + return [python_bin, str(runner_file)] diff --git a/kubeflow/trainer/backends/localprocess/utils_test.py b/kubeflow/trainer/backends/localprocess/utils_test.py new file mode 100644 index 000000000..7a36b09b4 --- /dev/null +++ b/kubeflow/trainer/backends/localprocess/utils_test.py @@ -0,0 +1,86 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from pathlib import Path +import shutil +import tempfile +from unittest.mock import Mock, patch + +import pytest + +from kubeflow.trainer.backends.localprocess import constants as local_exec_constants +from kubeflow.trainer.backends.localprocess import utils +from kubeflow.trainer.backends.localprocess.types import LocalRuntimeTrainer +from kubeflow.trainer.types import types + + +def dummy_func(a: int = 1): + print(a) + + +def test_get_local_train_job_script(): + with tempfile.TemporaryDirectory() as venv_dir: + train_job_name = "test-job" + trainer = types.CustomTrainer( + func=dummy_func, + packages_to_install=["numpy"], + pip_index_urls=["https://pypi.org/simple"], + ) + runtime = types.Runtime( + name="test-runtime", + trainer=LocalRuntimeTrainer( + trainer_type=types.TrainerType.CUSTOM_TRAINER, + framework="torch", + num_nodes=1, + packages=["torch"], + image="local", + ), + ) + # Mock command to be just python + runtime.trainer.set_command(("python",)) + + command = utils.get_local_train_job_script( + train_job_name=train_job_name, + venv_dir=venv_dir, + trainer=trainer, + runtime=runtime, + cleanup_venv=True, + ) + + # Check command structure + assert isinstance(command, list) + assert len(command) == 2 + assert "python" in command[0].lower() # OS specific, but should contain python + assert command[1].endswith("runner.py") + assert Path(command[1]).exists() + + # Check runner.py content + with open(command[1]) as f: + content = f.read() + assert "import subprocess" in content + assert f'venv_dir = r"{venv_dir}"' in content + assert "numpy" in content + assert "torch" in content + assert "https://pypi.org/simple" in content + assert "python" in content # command part + + # Check train function file creation + func_file = Path(venv_dir) / local_exec_constants.LOCAL_EXEC_FILENAME.format(train_job_name) + assert func_file.exists() + with open(func_file) as f: + func_content = f.read() + assert "def dummy_func" in func_content + assert "dummy_func()" in func_content # parameters match + From a2594add8cd2f6783a9588179b5a7d89d4a94984 Mon Sep 17 00:00:00 2001 From: khushiiagrawal Date: Fri, 30 Jan 2026 15:44:02 +0530 Subject: [PATCH 2/2] refactor: explicitly cast runner script template variables to string and remove unused test imports Signed-off-by: khushiiagrawal --- .../trainer/backends/localprocess/constants.py | 1 - kubeflow/trainer/backends/localprocess/utils.py | 11 ++++++----- .../trainer/backends/localprocess/utils_test.py | 15 ++++----------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/kubeflow/trainer/backends/localprocess/constants.py b/kubeflow/trainer/backends/localprocess/constants.py index 94488090e..ba7574d76 100644 --- a/kubeflow/trainer/backends/localprocess/constants.py +++ b/kubeflow/trainer/backends/localprocess/constants.py @@ -41,7 +41,6 @@ ] - TORCH_COMMAND = "torchrun" DEFAULT_COMMAND = "python" diff --git a/kubeflow/trainer/backends/localprocess/utils.py b/kubeflow/trainer/backends/localprocess/utils.py index 0cfeff32d..2776242f3 100644 --- a/kubeflow/trainer/backends/localprocess/utils.py +++ b/kubeflow/trainer/backends/localprocess/utils.py @@ -219,8 +219,7 @@ def get_local_train_job_script( ) # Default runtime packages if no trainer packages (though get_install_packages handles merge) elif runtime_trainer.packages: - packages_list = runtime_trainer.packages - + packages_list = runtime_trainer.packages func_file = get_command_using_train_func( venv_dir=venv_dir, @@ -237,15 +236,17 @@ def get_local_train_job_script( # Create the runner script runner_file = Path(venv_dir) / "runner.py" t = Template(local_exec_constants.RUNNER_TEMPLATE) - + mapping = { "pyenv_location": venv_dir, "packages_list": str(packages_list), - "pip_index_urls": str(trainer.pip_index_urls if trainer.pip_index_urls else constants.DEFAULT_PIP_INDEX_URLS), + "pip_index_urls": str( + trainer.pip_index_urls if trainer.pip_index_urls else constants.DEFAULT_PIP_INDEX_URLS + ), "command": str(full_command), "cleanup_venv": str(cleanup_venv), } - + with open(runner_file, "w") as f: f.write(t.substitute(**mapping)) diff --git a/kubeflow/trainer/backends/localprocess/utils_test.py b/kubeflow/trainer/backends/localprocess/utils_test.py index 7a36b09b4..07ccf3e8d 100644 --- a/kubeflow/trainer/backends/localprocess/utils_test.py +++ b/kubeflow/trainer/backends/localprocess/utils_test.py @@ -12,16 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os from pathlib import Path -import shutil import tempfile -from unittest.mock import Mock, patch -import pytest - -from kubeflow.trainer.backends.localprocess import constants as local_exec_constants -from kubeflow.trainer.backends.localprocess import utils +from kubeflow.trainer.backends.localprocess import constants as local_exec_constants, utils from kubeflow.trainer.backends.localprocess.types import LocalRuntimeTrainer from kubeflow.trainer.types import types @@ -62,7 +56,7 @@ def test_get_local_train_job_script(): # Check command structure assert isinstance(command, list) assert len(command) == 2 - assert "python" in command[0].lower() # OS specific, but should contain python + assert "python" in command[0].lower() # OS specific, but should contain python assert command[1].endswith("runner.py") assert Path(command[1]).exists() @@ -74,7 +68,7 @@ def test_get_local_train_job_script(): assert "numpy" in content assert "torch" in content assert "https://pypi.org/simple" in content - assert "python" in content # command part + assert "python" in content # command part # Check train function file creation func_file = Path(venv_dir) / local_exec_constants.LOCAL_EXEC_FILENAME.format(train_job_name) @@ -82,5 +76,4 @@ def test_get_local_train_job_script(): with open(func_file) as f: func_content = f.read() assert "def dummy_func" in func_content - assert "dummy_func()" in func_content # parameters match - + assert "dummy_func()" in func_content # parameters match